데이터/Airflow

Airflow Template Variable

mini'scloud 2025. 8. 27. 00:56

Jinja Template

  • 문서(파일)에서 특정 양식으로 작성된 값을 런타임시 실제 값으로 치환해주는 처리 엔진
  • 여러 엔진이 존재하지만, 파이썬에서는 Jinja를 사용함

render 함수를 사용해서 치환해서 사용함

  • ex) HTML 템플릿 저장 후 화면에 보여질 때 실제 값으로 변환해서 출력함
  • SQL 작성시에도 활용 가능
    • select * from tables where base_dt = {{ }}
    • 이 select 구문을 템플릿으로 만들고 매일 변경되는 날짜만 필터 조건으로 설정을 해주면 됨

https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html

 

Templates reference — Airflow 3.0.4 Documentation

 

airflow.apache.org

  • docs 보면서 제공되는 변수들을 통해서 템플릿을 작성하면됨

 

오퍼레이터에서 템플릿 사용하는거는 아래 링크를 통해서 확인 가능함

https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/operators.html#jinja-templating

 

Operators — Airflow 3.0.4 Documentation

 

airflow.apache.org

 

 

Bash Operator에서 Jinja 템플릿 사용하기

  • bash_command와 env를 통해서 사용하는 것을 확인할 수 있음
import pendulum
from airflow.providers.standard.operators.bash import BashOperator
from airflow.sdk import DAG

with DAG(
    dag_id="dags_bash_with_template",
    schedule="10 0 * * *",
    start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
    catchup=False
) as dag:
    bash_t1 = BashOperator(
        task_id='bash_t1',
        bash_command='echo "data_interval_end: {{ data_interval_end }}  "'
    )

    bash_t2 = BashOperator(
        task_id='bash_t2',
        env={
            'START_DATE':'{{data_interval_start | ds }}',
            'END_DATE':'{{data_interval_end | ds }}'
        },
        bash_command='echo $START_DATE && echo $END_DATE'
    )

    bash_t1 >> bash_t2

 

  • {{ data_interval_start | ds }}: 이 템플릿은 DAG의 논리적 시작 시간을 나타내는 data_interval_start 변수를 가져와 | ds 필터를 적용함
    • ds 필터는 'day stamp'의 약자로, 날짜를 YYYY-MM-DD 형식의 문자열로 변환해줌
  • {{ data_interval_end | ds }}: 이 템플릿은 DAG의 논리적 종료 시간을 나타내는 data_interval_end 변수를 가져와 동일하게 YYYY-MM-DD 형식의 문자열로 변환함

 

 

 

Airflow의 날짜 개념

  • 쿼리를 통해 25일에 추출을 했지만 실제로 추출된 기간은 24~25사이에 다 추출이 됨

 

  •  즉, 배치일 날짜를 가져올려면 data_interval_end를 가져와야하고, 배치일 전 날짜를 가져올려면 data_interval_start를 가져와야함

 

Python 오퍼레이터에서 Template 변수 사용

  • python 오퍼레이터는 어떤 파라미터에 template을 쓸 수 있는지..
    • python_callable
    • op_kwargs
    • op_args
    • tmeplates_dict
    • ... 필요하면 문서 보자

  • **kwargs는 Airflow가 태스크 실행 시 자동으로 전달하는 추가적인 정보를 받기 위해 포함되어 있음

  • kwargs에 정의된 templates 변수들이 정해져있어서 여기에서 가져와서 하는 거임

 

 

Bash Operator with macros

  • 이런 문제를 가지고 있음

 

  • template을 통해 유연하게 날짜 연산을 할 수 있음

 

  • 실습을 해보자
    • ex1) 매월 말일 수행되는 DAG에서
      • 변수 START_DATE: 전월 말일,
      • 변수 END_DATE: 어제로 env 세팅하기
    • ex2) 매월 둘째주 토요일에 수행되는 DAG에서 
      • 변수 START_DATE: 2주 전 월요일
      • 변수 END_DATE: 2주전 토요일로 env 세팅하기
import pendulum
# Airflow 3.0 부터 아래 경로로 import 합니다.
from airflow.providers.standard.operators.bash import BashOperator
from airflow.sdk import DAG

with DAG(
    dag_id="dags_bash_with_macro_eg1",
    schedule="10 0 L * *",
    start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
    catchup=False
) as dag:
    # START_DATE: 전월 말일, END_DATE: 1일 전
    bash_task_1 = BashOperator(
        task_id='bash_task_1',
        env={'START_DATE':'{{ data_interval_start.in_timezone("Asia/Seoul") | ds }}',
             'END_DATE':'{{ (data_interval_end.in_timezone("Asia/Seoul") - macros.dateutil.relativedelta.relativedelta(days=1)) | ds}}'
        },
        bash_command='echo "START_DATE: $START_DATE" && echo "END_DATE: $END_DATE"'
    )
import pendulum
# Airflow 3.0 부터 아래 경로로 import 합니다.
from airflow.providers.standard.operators.bash import BashOperator
from airflow.sdk import DAG

with DAG(
    dag_id="dags_bash_with_macro_eg2",
    schedule="10 0 * * 6#2",
    start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
    catchup=False
) as dag:
    # START_DATE: 2주전 월요일, END_DATE: 2주전 토요일
    bash_task_2 = BashOperator(
        task_id='bash_task_2',
        env={'START_DATE':'{{ (data_interval_end.in_timezone("Asia/Seoul") - macros.dateutil.relativedelta.relativedelta(days=19)) | ds}}',
             'END_DATE':'{{ (data_interval_end.in_timezone("Asia/Seoul") - macros.dateutil.relativedelta.relativedelta(days=14)) | ds}}'
        },
        bash_command='echo "START_DATE: $START_DATE" && echo "END_DATE: $END_DATE"'
    )

 

 

Python Operator macro 변수 사용하기

templates_dict를 통해 key,value 형식으로 작성해주면 됨

  • kwargs.get을 통해 templates_dict값을 가져오는거임
    • 즉, templates_dict 자체가 key로 들어감 
    • 뒤에 나머지 부분이 value로 들어감
  • 그런데 python 오퍼레이터에서 굳이 macro 연산을 사용해할지...
    • 그래서 날짜 연산을 dag안에서 직접 할 수 있는 방법도 있음

 

'데이터 > Airflow' 카테고리의 다른 글

데이터 공유  (0) 2025.08.27
Python 오퍼레이터  (0) 2025.08.15
오퍼레이터 기본/ mount 설정  (0) 2025.08.14
Airflow 소개 & 환경구축  (0) 2025.06.28