Jinja Template
- 문서(파일)에서 특정 양식으로 작성된 값을 런타임시 실제 값으로 치환해주는 처리 엔진
- 여러 엔진이 존재하지만, 파이썬에서는 Jinja를 사용함

- ex) HTML 템플릿 저장 후 화면에 보여질 때 실제 값으로 변환해서 출력함
- SQL 작성시에도 활용 가능
- select * from tables where base_dt = {{ }}
- 이 select 구문을 템플릿으로 만들고 매일 변경되는 날짜만 필터 조건으로 설정을 해주면 됨
https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html
Templates reference — Airflow 3.0.4 Documentation
airflow.apache.org
- docs 보면서 제공되는 변수들을 통해서 템플릿을 작성하면됨
오퍼레이터에서 템플릿 사용하는거는 아래 링크를 통해서 확인 가능함
https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/operators.html#jinja-templating
Operators — Airflow 3.0.4 Documentation
airflow.apache.org
Bash Operator에서 Jinja 템플릿 사용하기

- bash_command와 env를 통해서 사용하는 것을 확인할 수 있음
import pendulum
from airflow.providers.standard.operators.bash import BashOperator
from airflow.sdk import DAG
with DAG(
dag_id="dags_bash_with_template",
schedule="10 0 * * *",
start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
bash_t1 = BashOperator(
task_id='bash_t1',
bash_command='echo "data_interval_end: {{ data_interval_end }} "'
)
bash_t2 = BashOperator(
task_id='bash_t2',
env={
'START_DATE':'{{data_interval_start | ds }}',
'END_DATE':'{{data_interval_end | ds }}'
},
bash_command='echo $START_DATE && echo $END_DATE'
)
bash_t1 >> bash_t2
- {{ data_interval_start | ds }}: 이 템플릿은 DAG의 논리적 시작 시간을 나타내는 data_interval_start 변수를 가져와 | ds 필터를 적용함
- ds 필터는 'day stamp'의 약자로, 날짜를 YYYY-MM-DD 형식의 문자열로 변환해줌
- {{ data_interval_end | ds }}: 이 템플릿은 DAG의 논리적 종료 시간을 나타내는 data_interval_end 변수를 가져와 동일하게 YYYY-MM-DD 형식의 문자열로 변환함


Airflow의 날짜 개념

- 쿼리를 통해 25일에 추출을 했지만 실제로 추출된 기간은 24~25사이에 다 추출이 됨

- 즉, 배치일 날짜를 가져올려면 data_interval_end를 가져와야하고, 배치일 전 날짜를 가져올려면 data_interval_start를 가져와야함
Python 오퍼레이터에서 Template 변수 사용
- python 오퍼레이터는 어떤 파라미터에 template을 쓸 수 있는지..
- python_callable
- op_kwargs
- op_args
- tmeplates_dict
- ... 필요하면 문서 보자

- **kwargs는 Airflow가 태스크 실행 시 자동으로 전달하는 추가적인 정보를 받기 위해 포함되어 있음

- kwargs에 정의된 templates 변수들이 정해져있어서 여기에서 가져와서 하는 거임


Bash Operator with macros

- 이런 문제를 가지고 있음

- template을 통해 유연하게 날짜 연산을 할 수 있음

- 실습을 해보자
- ex1) 매월 말일 수행되는 DAG에서
- 변수 START_DATE: 전월 말일,
- 변수 END_DATE: 어제로 env 세팅하기
- ex2) 매월 둘째주 토요일에 수행되는 DAG에서
- 변수 START_DATE: 2주 전 월요일
- 변수 END_DATE: 2주전 토요일로 env 세팅하기
- ex1) 매월 말일 수행되는 DAG에서
import pendulum
# Airflow 3.0 부터 아래 경로로 import 합니다.
from airflow.providers.standard.operators.bash import BashOperator
from airflow.sdk import DAG
with DAG(
dag_id="dags_bash_with_macro_eg1",
schedule="10 0 L * *",
start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
# START_DATE: 전월 말일, END_DATE: 1일 전
bash_task_1 = BashOperator(
task_id='bash_task_1',
env={'START_DATE':'{{ data_interval_start.in_timezone("Asia/Seoul") | ds }}',
'END_DATE':'{{ (data_interval_end.in_timezone("Asia/Seoul") - macros.dateutil.relativedelta.relativedelta(days=1)) | ds}}'
},
bash_command='echo "START_DATE: $START_DATE" && echo "END_DATE: $END_DATE"'
)
import pendulum
# Airflow 3.0 부터 아래 경로로 import 합니다.
from airflow.providers.standard.operators.bash import BashOperator
from airflow.sdk import DAG
with DAG(
dag_id="dags_bash_with_macro_eg2",
schedule="10 0 * * 6#2",
start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
# START_DATE: 2주전 월요일, END_DATE: 2주전 토요일
bash_task_2 = BashOperator(
task_id='bash_task_2',
env={'START_DATE':'{{ (data_interval_end.in_timezone("Asia/Seoul") - macros.dateutil.relativedelta.relativedelta(days=19)) | ds}}',
'END_DATE':'{{ (data_interval_end.in_timezone("Asia/Seoul") - macros.dateutil.relativedelta.relativedelta(days=14)) | ds}}'
},
bash_command='echo "START_DATE: $START_DATE" && echo "END_DATE: $END_DATE"'
)
Python Operator macro 변수 사용하기

- kwargs.get을 통해 templates_dict값을 가져오는거임
- 즉, templates_dict 자체가 key로 들어감
- 뒤에 나머지 부분이 value로 들어감
- 그런데 python 오퍼레이터에서 굳이 macro 연산을 사용해할지...
- 그래서 날짜 연산을 dag안에서 직접 할 수 있는 방법도 있음

'데이터 > Airflow' 카테고리의 다른 글
| 데이터 공유 (0) | 2025.08.27 |
|---|---|
| Python 오퍼레이터 (0) | 2025.08.15 |
| 오퍼레이터 기본/ mount 설정 (0) | 2025.08.14 |
| Airflow 소개 & 환경구축 (0) | 2025.06.28 |