실습자료는 아래 깃허브에서 받을 수 있음
https://github.com/hjkim-sun/airflow
GitHub - hjkim-sun/airflow
Contribute to hjkim-sun/airflow development by creating an account on GitHub.
github.com

- operator
- 특정 행위를 할 수 있는 기능을 모아 놓은 클래스(설계도)
- Task
- operator에서 객체화(인스턴스화)되어 DAG에서 실행 가능한 오브젝트

- Task의 수행 주체
- 스케줄러
- DAG parsing 후 DB에 정보저장
- DAG 시작 시간 결정
- 워커
- 실제 작업 수행
- 스케줄러
Bash Operator
- 쉘 스크립트 명령을 수행하는 오퍼레이터
- 기본 값을 설정할때 샘플 코드를 참고하면 되는데 airflow 웹에서 example_compley 예시를 참고해서 보면 됨
import pendulum
# Airflow 3.0 부터 아래 경로로 import 합니다.
from airflow.providers.standard.operators.bash import BashOperator
from airflow.sdk import DAG
# Airflow 2.10.5 이하 버전에서 실습시 아래 경로에서 import 하세요.
#from airflow.operators.bash import BashOperator
#from airflow import DAG
with DAG(
dag_id="dags_bash_operator",
schedule="0 0 * * *",
start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
bash_t1 = BashOperator(
task_id="bash_t1",
bash_command="echo whoami",
)
bash_t2 = BashOperator(
task_id="bash_t2",
bash_command="echo $HOSTNAME; echo good",
)
bash_t3 = BashOperator(
task_id="bash_t3",
bash_command="echo $HOSTNAME; echo good",
)
bash_t4 = BashOperator(
task_id="bash_t4",
bash_command="echo $HOSTNAME; echo good",
)
bash_t1 >> bash_t2 >> bash_t3 >> bash_t4
- with DAG에 있는 dag_id의 경우 파일명과 맞춰서 작성해주는게 좋음
- catchup
- ex) DAG가 1/1에 올라와있지만, 현재는 3/1 일때 catchup이 true이면 두 기간 사이 누락된걸 다 수행하고,
- catchup이 false면 두 기간 사이 누락된걸 무시하고, 현재(3/1)부터 수행함 (일반적으로 false가 좋긴함)
- 로컬(airflow_log 폴더)에서 작성했으니 이를 도커가 있는 공간으로 이동 시켜보자 (깃허브를 거쳐서 이동시킬거임)
git status
git add .
#dags 폴더만 할거면 폴더 자체를 push해주면 됨
git commit -m "bash operator"
git push
- 도커가 있는 airflow 폴더에서 깃허브에 올린 파일을 받아보자
git pull
- 다운 받은 dags을 airflow에 연결하기 위해서 mount를 해줘야 함
Mount
- 내가 dags을 로컬에서 작성하는 주소는 airflow_log/dags/dag작성파일
- 내가 깃허브에서 작성한 파일을 다운 받는 주소는 airflow/dags 폴더 아래에 저장됨

- 즉, 내가 깃허브에서 받아오는 dag파일 공간과 컨테이너 디렉토리를 mount하는거임
- :을 중심으로 왼쪽이 dag 파일 받는 공간, 오른쪽이 도커 컨테이너 volume을 작성하면 됨
- ${AIRFLOW_PROJ_DIR:- .} 이거는, AIRFLOW_PROJ_DIR에 아무것도 없으면 그냥 . 출력해라는 의미로
- ./dags폴더 아래 파일을 가리키게 됨
docker exec -it airflow-airflow-scheduler-1 /bin/bash
ls -la /opt/airflow/dags
이 명령어를 통해 mount가 잘되었는지 확인할 수 있음


Cron 스케줄

task 연결하기
import pendulum
# Airflow 3.0 부터 아래 경로로 import 합니다.
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk import DAG
# Airflow 2.10.5 이하 버전에서 실습시 아래 경로에서 import 하세요.
#from airflow.operators.empty import EmptyOperator
#from airflow import DAG
with DAG(
dag_id="dags_conn_test",
schedule=None,
start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
t1 = EmptyOperator(
task_id="t1"
)
t2 = EmptyOperator(
task_id="t2"
)
t3 = EmptyOperator(
task_id="t3"
)
t4 = EmptyOperator(
task_id="t4"
)
t5 = EmptyOperator(
task_id="t5"
)
t6 = EmptyOperator(
task_id="t6"
)
t7 = EmptyOperator(
task_id="t7"
)
t8 = EmptyOperator(
task_id="t8"
)
t1 >> [t2, t3] >> t4
t5 >> t4
[t4, t7] >> t6 >> t8

Bash Operator로 컨테이너 외부의 쉡 스크립트 수행하기
- shell 스크립트란?
- unix/Linux shell 명령을 이용하여 만들어지고 인터프리터에 의해 한 줄씩 처리되는 파일
- worker 컨테이너가 쉘 스크립트를 실행할려면 plugins 폴더를 마운트 해주면됨

실습을 해보자

- select_fruit.sh 파일에 파일 실행 권한을 부여해준다
chmod +x select_fruit.sh

깃허브에 올리고 컨테이너 영역에서 실행시킬 airflow/dags에 쉡 스크립트를 pull을 해놓자
import pendulum
# Airflow 3.0 부터 아래 경로로 import 합니다.
from airflow.providers.standard.operators.bash import BashOperator
from airflow.sdk import DAG
# Airflow 2.10.5 이하 버전에서 실습시 아래 경로에서 import 하세요.
#from airflow.operators.bash import BashOperator
#from airflow import DAG
with DAG(
dag_id="dags_bash_select_fruit",
schedule="10 0 * * 6#1",
start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
t1_orange = BashOperator(
task_id="t1_orange",
bash_command="/opt/airflow/plugins/select_fruit.sh ORANGE",
)
t2_avocado = BashOperator(
task_id="t2_avocado",
bash_command="/opt/airflow/plugins/select_fruit.sh AVOCADO",
)
t1_orange >> t2_avocado
- dag역시 작성하면됨
- bash_command는 도커 컨테이너 내부의 경로를 가리키고 있음


'데이터 > Airflow' 카테고리의 다른 글
| 데이터 공유 (0) | 2025.08.27 |
|---|---|
| Airflow Template Variable (0) | 2025.08.27 |
| Python 오퍼레이터 (0) | 2025.08.15 |
| Airflow 소개 & 환경구축 (0) | 2025.06.28 |