Python Operator with Xcom
Xcom (Cross Communication)
- airflow DAG 안 Task 간 데이터 공유를 위해 사용되는 기술
- ex) task1의 수행 중 내용이나 결과를 task2에서 사용 또는 입력으로 주고 싶은 경우
- 주로 작은 규모의 데이터 공유를 위해 사용
- Xcom 내용은 메타 DB의 xcom 테이블에 값이 저장됨
- 1GB 이상의 대용량 데이터 공유를 위해서는 외부 솔루션이 필요함 (s3, HDFS 등...)
xcom 사용 방법
1. **kwargs에 존재하는 ti(task_instance) 객체 활용

- task_ids를 명시해주지 않으면 가장 최근에 push한게 불러와지기에, task_ids를 통해 id를 명시해줘야함
2. 파이썬 함수의 return 값 활용


실습을 해보자
import pendulum
# Airflow 3.0 부터 아래 경로로 import 합니다.
from airflow.sdk import DAG, task
with DAG(
dag_id="dags_python_with_xcom_eg1",
schedule="30 6 * * *",
start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
@task(task_id='python_xcom_push_task1')
def xcom_push1(**kwargs):
ti = kwargs['ti']
ti.xcom_push(key="result1", value="value_1")
ti.xcom_push(key="result2", value=[1,2,3])
@task(task_id='python_xcom_push_task2')
def xcom_push2(**kwargs):
ti = kwargs['ti']
ti.xcom_push(key="result1", value="value_2")
ti.xcom_push(key="result2", value=[1,2,3,4])
@task(task_id='python_xcom_pull_task')
def xcom_pull(**kwargs):
ti = kwargs['ti']
# 2025/07/06 추가 사항
# 3.0.0 버전부터 task_ids 값을 주지 않으면 Xcom 을 찾지 못합니다.
# 버그인지, 의도한 것인지는 확실치 않으나 해결될 때까지 task_ids 값을 리스트로 넣어 결과가 어떻게 나오는지 보는 것으로 대체합니다.
# value1 = ti.xcom_pull(key="result1")
value1 = ti.xcom_pull(key="result1", task_ids=['python_xcom_push_task1','python_xcom_push_task2'])
value2 = ti.xcom_pull(key="result2", task_ids='python_xcom_push_task1')
print(value1)
print(value2)
xcom_push1() >> xcom_push2() >> xcom_pull()


import pendulum
# Airflow 3.0 부터 아래 경로로 import 합니다.
from airflow.sdk import DAG, task
with DAG(
dag_id="dags_python_with_xcom_eg2",
schedule="30 6 * * *",
start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
@task(task_id='python_xcom_push_by_return')
def xcom_push_result(**kwargs):
return 'Success'
@task(task_id='python_xcom_pull_1')
def xcom_pull_1(**kwargs):
ti = kwargs['ti']
value1 = ti.xcom_pull(task_ids='python_xcom_push_by_return')
print('xcom_pull 메서드로 직접 찾은 리턴 값:' + value1)
@task(task_id='python_xcom_pull_2')
def xcom_pull_2(status, **kwargs):
print('함수 입력값으로 받은 값:' + status)
python_xcom_push_by_return = xcom_push_result()
xcom_pull_2(python_xcom_push_by_return)
python_xcom_push_by_return >> xcom_pull_1()



요약
- xcom push 방법
- ti.xcom_push 명시적 사용
- 함수 return 사용
- xcom pull 방법
- ti.xcom_pull 명시적 사용
- return 값을 input으로 사용
Bash Operator에서 Xcom 사용하기
- bash 오퍼레이터는 env, bash_command 파라미터에서 template을 이용하여 push/pull을 진행함

- bash_command의 경우 출력되는 값들이 return으로 간주됨
- 위에서는 출력이 3번이지만, 마지막 출력문만 자동으로 return_value에 저장됨
- do_xcom_push = Fasle를 해야함
- bash_command에서 출력되는 값은 자동으로 return값으로 간주되기 때문
Python & Bash 오퍼레이터간 Xcom 사용 실습
import pendulum
# Airflow 3.0 부터 각각 아래 경로로 import 합니다.
from airflow.providers.standard.operators.bash import BashOperator
from airflow.sdk import DAG, task
with DAG(
dag_id="dags_bash_python_with_xcom",
schedule="30 9 * * *",
start_date=pendulum.datetime(2023, 4, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
@task(task_id='python_push')
def python_push_xcom():
result_dict = {'status':'Good','data':[1,2,3],'options_cnt':100}
return result_dict
bash_pull = BashOperator(
task_id='bash_pull',
env={
'STATUS':'{{ti.xcom_pull(task_ids="python_push")["status"]}}',
'DATA':'{{ti.xcom_pull(task_ids="python_push")["data"]}}',
'OPTIONS_CNT':'{{ti.xcom_pull(task_ids="python_push")["options_cnt"]}}'
},
bash_command='echo $STATUS && echo $DATA && echo $OPTIONS_CNT'
)
python_push_xcom() >> bash_pull
bash_push = BashOperator(
task_id='bash_push',
bash_command='echo PUSH_START '
'{{ti.xcom_push(key="bash_pushed",value=200)}} && '
'echo PUSH_COMPLETE'
)
@task(task_id='python_pull')
def python_pull_xcom(**kwargs):
ti = kwargs['ti']
status_value = ti.xcom_pull(key='bash_pushed', task_ids='bash_push')
return_value = ti.xcom_pull(task_ids='bash_push')
print('status_value:' + str(status_value))
print('return_value:' + return_value)
bash_push >> python_pull_xcom()




전역변수 variable 이용하기
- xcom: 특정 DAG, 특정 schedule에 수행되는 task 간에만 공유
- 모든 DAG이 공유할 수 있는 전역 변수는 없을까?
=> variable을 이용하면 됨


- 실제 variable의 key, value 값은 메타 DB에 저장됨(variable 테이블)
전역변수 사용하기
- variable 라이브러리 이용, 파이썬 문법을 이용해 미리 가져오기
- Jinja 템플릿 이용, 오퍼레이터 내부에서 가져오기


- 2안을 사용해야함
- 스케줄러의 주기적 DAG 파싱시 Variable.get 개수만큼 DB 연결을 일으켜서 불필요한 부하가 발생하기 때문
- 스케줄러 과부하 원인 중 하나임
- 전역변수는 주로 협업 환경에서 dag을 만들기 위해 주로 사용함
- 주로 상수로 지정해서 사용할 변수들을 셋팅함
- ex)
- base_sh_dir = /opt/airflow/plugins/shell
- email, Alert 메시지를 받을 담당자의 email 주소 정보 이런거..
import pendulum
# Airflow 3.0 부터 아래 경로로 import 합니다.
from airflow.providers.standard.operators.bash import BashOperator
from airflow.sdk import DAG, Variable
with DAG(
dag_id="dags_bash_with_variable",
schedule="10 9 * * *",
start_date=pendulum.datetime(2023, 4, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
var_value = Variable.get("sample_key")
bash_var_1 = BashOperator(
task_id="bash_var_1",
bash_command=f"echo variable:{var_value}"
)
bash_var_2 = BashOperator(
task_id="bash_var_2",
bash_command="echo variable:{{var.value.sample_key}}"
)

'데이터 > Airflow' 카테고리의 다른 글
| Airflow Template Variable (0) | 2025.08.27 |
|---|---|
| Python 오퍼레이터 (0) | 2025.08.15 |
| 오퍼레이터 기본/ mount 설정 (0) | 2025.08.14 |
| Airflow 소개 & 환경구축 (0) | 2025.06.28 |