데이터/Airflow

데이터 공유

mini'scloud 2025. 8. 27. 15:59

Python Operator with Xcom

Xcom (Cross Communication)

  • airflow DAG 안 Task 간 데이터 공유를 위해 사용되는 기술
  • ex) task1의 수행 중 내용이나 결과를 task2에서 사용 또는 입력으로 주고 싶은 경우
  • 주로 작은 규모의 데이터 공유를 위해 사용
    • Xcom 내용은 메타 DB의 xcom 테이블에 값이 저장됨
    • 1GB 이상의 대용량 데이터 공유를 위해서는 외부 솔루션이 필요함 (s3, HDFS 등...)

xcom 사용 방법

1. **kwargs에 존재하는 ti(task_instance) 객체 활용

  • task_ids를 명시해주지 않으면 가장 최근에 push한게 불러와지기에, task_ids를 통해 id를 명시해줘야함

 

2. 파이썬 함수의 return 값 활용

return을 하면 자동으로 xcom에 저장이됨

 

key = 'return_value'는 자동으로 설정되어서 들어가짐

 


실습을 해보자

import pendulum
# Airflow 3.0 부터 아래 경로로 import 합니다.
from airflow.sdk import DAG, task

with DAG(
    dag_id="dags_python_with_xcom_eg1",
    schedule="30 6 * * *",
    start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
    catchup=False
) as dag:
    
    @task(task_id='python_xcom_push_task1')
    def xcom_push1(**kwargs):
        ti = kwargs['ti']
        ti.xcom_push(key="result1", value="value_1")
        ti.xcom_push(key="result2", value=[1,2,3])

    @task(task_id='python_xcom_push_task2')
    def xcom_push2(**kwargs):
        ti = kwargs['ti']
        ti.xcom_push(key="result1", value="value_2")
        ti.xcom_push(key="result2", value=[1,2,3,4])

    @task(task_id='python_xcom_pull_task')
    def xcom_pull(**kwargs):
        ti = kwargs['ti']

        # 2025/07/06 추가 사항
        # 3.0.0 버전부터 task_ids 값을 주지 않으면 Xcom 을 찾지 못합니다.
        # 버그인지, 의도한 것인지는 확실치 않으나 해결될 때까지 task_ids 값을 리스트로 넣어 결과가 어떻게 나오는지 보는 것으로 대체합니다.
        # value1 = ti.xcom_pull(key="result1")
        value1 = ti.xcom_pull(key="result1", task_ids=['python_xcom_push_task1','python_xcom_push_task2'])
        value2 = ti.xcom_pull(key="result2", task_ids='python_xcom_push_task1')
        print(value1)
        print(value2)


    xcom_push1() >> xcom_push2() >> xcom_pull()

 

 

import pendulum
# Airflow 3.0 부터 아래 경로로 import 합니다.
from airflow.sdk import DAG, task

with DAG(
    dag_id="dags_python_with_xcom_eg2",
    schedule="30 6 * * *",
    start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
    catchup=False
) as dag:
    
    @task(task_id='python_xcom_push_by_return')
    def xcom_push_result(**kwargs):
        return 'Success'


    @task(task_id='python_xcom_pull_1')
    def xcom_pull_1(**kwargs):
        ti = kwargs['ti']
        value1 = ti.xcom_pull(task_ids='python_xcom_push_by_return')
        print('xcom_pull 메서드로 직접 찾은 리턴 값:' + value1)

    @task(task_id='python_xcom_pull_2')
    def xcom_pull_2(status, **kwargs):
        print('함수 입력값으로 받은 값:' + status)


    python_xcom_push_by_return = xcom_push_result()
    xcom_pull_2(python_xcom_push_by_return)
    python_xcom_push_by_return >> xcom_pull_1()

이런 형태의 그래프를 갖게됨

 

pull_2 결과

 

요약

  • xcom push 방법
    • ti.xcom_push 명시적 사용
    • 함수 return 사용
  • xcom pull 방법
    • ti.xcom_pull 명시적 사용
    • return 값을 input으로 사용

Bash Operator에서 Xcom 사용하기

  • bash 오퍼레이터는 env, bash_command 파라미터에서 template을 이용하여 push/pull을 진행함

key값을 지정했기에 first_bash_message를 가져옴 // task_ids = 'bash_push'를 통해 return 값을 가져옴

  • bash_command의 경우 출력되는 값들이 return으로 간주됨
    • 위에서는 출력이 3번이지만, 마지막 출력문만 자동으로 return_value에 저장됨
  • do_xcom_push = Fasle를 해야함
    • bash_command에서 출력되는 값은 자동으로 return값으로 간주되기 때문

Python & Bash 오퍼레이터간 Xcom 사용 실습

import pendulum
# Airflow 3.0 부터 각각 아래 경로로 import 합니다.
from airflow.providers.standard.operators.bash import BashOperator
from airflow.sdk import DAG, task

with DAG(
    dag_id="dags_bash_python_with_xcom",
    schedule="30 9 * * *",
    start_date=pendulum.datetime(2023, 4, 1, tz="Asia/Seoul"),
    catchup=False
) as dag:

    @task(task_id='python_push')
    def python_push_xcom():
        result_dict = {'status':'Good','data':[1,2,3],'options_cnt':100}
        return result_dict

    bash_pull = BashOperator(
        task_id='bash_pull',
        env={
            'STATUS':'{{ti.xcom_pull(task_ids="python_push")["status"]}}',
            'DATA':'{{ti.xcom_pull(task_ids="python_push")["data"]}}',
            'OPTIONS_CNT':'{{ti.xcom_pull(task_ids="python_push")["options_cnt"]}}'

        },
        bash_command='echo $STATUS && echo $DATA && echo $OPTIONS_CNT'
    )
    python_push_xcom() >> bash_pull

    bash_push = BashOperator(
    task_id='bash_push',
    bash_command='echo PUSH_START '
                 '{{ti.xcom_push(key="bash_pushed",value=200)}} && '
                 'echo PUSH_COMPLETE'
    )

    @task(task_id='python_pull')
    def python_pull_xcom(**kwargs):
        ti = kwargs['ti']

        status_value = ti.xcom_pull(key='bash_pushed', task_ids='bash_push')
        return_value = ti.xcom_pull(task_ids='bash_push')
        print('status_value:' + str(status_value))
        print('return_value:' + return_value)

    bash_push >> python_pull_xcom()

 

bash_push
python_pull

 

 

python_push
bash_pull

 


전역변수 variable 이용하기

  • xcom: 특정 DAG, 특정 schedule에 수행되는 task 간에만 공유
  • 모든 DAG이 공유할 수 있는 전역 변수는 없을까?

=> variable을 이용하면 됨

  • 실제 variable의 key, value 값은 메타 DB에 저장됨(variable 테이블)

전역변수 사용하기

  • variable 라이브러리 이용, 파이썬 문법을 이용해 미리 가져오기
  • Jinja 템플릿 이용, 오퍼레이터 내부에서 가져오기

  • 2안을 사용해야함
    • 스케줄러의 주기적 DAG 파싱시 Variable.get 개수만큼 DB 연결을 일으켜서 불필요한 부하가 발생하기 때문
    • 스케줄러 과부하 원인 중 하나임

 

  • 전역변수는 주로 협업 환경에서 dag을 만들기 위해 주로 사용함
    • 주로 상수로 지정해서 사용할 변수들을 셋팅함
    • ex)
    • base_sh_dir = /opt/airflow/plugins/shell
    • email, Alert 메시지를 받을 담당자의 email 주소 정보 이런거..
import pendulum
# Airflow 3.0 부터 아래 경로로 import 합니다.
from airflow.providers.standard.operators.bash import BashOperator
from airflow.sdk import DAG, Variable


with DAG(
    dag_id="dags_bash_with_variable",
    schedule="10 9 * * *",
    start_date=pendulum.datetime(2023, 4, 1, tz="Asia/Seoul"),
    catchup=False
) as dag:
    var_value = Variable.get("sample_key")

    bash_var_1 = BashOperator(
    task_id="bash_var_1",
    bash_command=f"echo variable:{var_value}"
    )

    bash_var_2 = BashOperator(
    task_id="bash_var_2",
    bash_command="echo variable:{{var.value.sample_key}}"
    )

 

'데이터 > Airflow' 카테고리의 다른 글

Airflow Template Variable  (0) 2025.08.27
Python 오퍼레이터  (0) 2025.08.15
오퍼레이터 기본/ mount 설정  (0) 2025.08.14
Airflow 소개 & 환경구축  (0) 2025.06.28