데이터/Airflow

오퍼레이터 기본/ mount 설정

mini'scloud 2025. 8. 14. 17:50

실습자료는 아래 깃허브에서 받을 수 있음

https://github.com/hjkim-sun/airflow

 

GitHub - hjkim-sun/airflow

Contribute to hjkim-sun/airflow development by creating an account on GitHub.

github.com

 

  • operator
    • 특정 행위를 할 수 있는 기능을 모아 놓은 클래스(설계도)
  • Task
    • operator에서 객체화(인스턴스화)되어 DAG에서 실행 가능한 오브젝트

스케줄러가 DAG 파일 파싱 -> 메타 DB에 주기, 시간.. 이런걸 저장하고 -> 시작 시간에 맞추어서 워커가 DAG 파일 읽고 -> 파일을 처리하기전에 메타 DB를 업데이트하고, 처리 후에도 업데이트를 한번 진행해줌

  • Task의 수행 주체
    • 스케줄러
      • DAG parsing 후 DB에 정보저장
      • DAG 시작 시간 결정
    • 워커
      • 실제 작업 수행

Bash Operator

  • 쉘 스크립트 명령을 수행하는 오퍼레이터
  • 기본 값을 설정할때 샘플 코드를 참고하면 되는데 airflow 웹에서 example_compley 예시를 참고해서 보면 됨
import pendulum
# Airflow 3.0 부터 아래 경로로 import 합니다.
from airflow.providers.standard.operators.bash import BashOperator
from airflow.sdk import DAG

# Airflow 2.10.5 이하 버전에서 실습시 아래 경로에서 import 하세요.
#from airflow.operators.bash import BashOperator
#from airflow import DAG

with DAG(
    dag_id="dags_bash_operator",
    schedule="0 0 * * *",
    start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
    catchup=False
) as dag:
    bash_t1 = BashOperator(
        task_id="bash_t1",
        bash_command="echo whoami",
    )

    bash_t2 = BashOperator(
        task_id="bash_t2",
        bash_command="echo $HOSTNAME; echo good",
    )

    bash_t3 = BashOperator(
        task_id="bash_t3",
        bash_command="echo $HOSTNAME; echo good",
    )

    bash_t4 = BashOperator(
        task_id="bash_t4",
        bash_command="echo $HOSTNAME; echo good",
    )

    bash_t1 >> bash_t2 >> bash_t3 >> bash_t4

 

  • with DAG에 있는 dag_id의 경우 파일명과 맞춰서 작성해주는게 좋음
  • catchup
    • ex) DAG가 1/1에 올라와있지만, 현재는 3/1 일때 catchup이 true이면 두 기간 사이 누락된걸 다 수행하고,
    • catchup이 false면 두 기간 사이 누락된걸 무시하고, 현재(3/1)부터 수행함 (일반적으로 false가 좋긴함)
  • 로컬(airflow_log 폴더)에서 작성했으니 이를 도커가 있는 공간으로 이동 시켜보자 (깃허브를 거쳐서 이동시킬거임)
git status
git add .
#dags 폴더만 할거면 폴더 자체를 push해주면 됨
git commit -m "bash operator"
git push

 

 

  • 도커가 있는 airflow 폴더에서 깃허브에 올린 파일을 받아보자
git pull
  • 다운 받은 dags을 airflow에 연결하기 위해서 mount를 해줘야 함

Mount

  • 내가 dags을 로컬에서 작성하는 주소는 airflow_log/dags/dag작성파일
  • 내가 깃허브에서 작성한 파일을 다운 받는 주소는 airflow/dags 폴더 아래에 저장됨

  • 즉, 내가 깃허브에서 받아오는 dag파일 공간과 컨테이너 디렉토리를 mount하는거임
  • :을 중심으로 왼쪽이 dag 파일 받는 공간, 오른쪽이 도커 컨테이너 volume을 작성하면 됨
  • ${AIRFLOW_PROJ_DIR:- .} 이거는, AIRFLOW_PROJ_DIR에 아무것도 없으면 그냥 . 출력해라는 의미로
  • ./dags폴더 아래 파일을 가리키게 됨
docker exec -it airflow-airflow-scheduler-1 /bin/bash
ls -la /opt/airflow/dags

이 명령어를 통해 mount가 잘되었는지 확인할 수 있음

 

 

bash_t1 >> bash_t2 ... 이렇게 순서대로 잘 표현됨

 

 

Cron 스케줄

 

task 연결하기

import pendulum
# Airflow 3.0 부터 아래 경로로 import 합니다.
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk import DAG

# Airflow 2.10.5 이하 버전에서 실습시 아래 경로에서 import 하세요.
#from airflow.operators.empty import EmptyOperator
#from airflow import DAG

with DAG(
    dag_id="dags_conn_test",
    schedule=None,
    start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
    catchup=False
) as dag:
    
    t1 = EmptyOperator(
        task_id="t1"
    )

    t2 = EmptyOperator(
        task_id="t2"
    )

    t3 = EmptyOperator(
        task_id="t3"
    )

    t4 = EmptyOperator(
        task_id="t4"
    )

    t5 = EmptyOperator(
        task_id="t5"
    )

    t6 = EmptyOperator(
        task_id="t6"
    )

    t7 = EmptyOperator(
        task_id="t7"
    )

    t8 = EmptyOperator(
        task_id="t8"
    )

    t1 >> [t2, t3] >> t4
    t5 >> t4 
    [t4, t7] >> t6 >> t8

 

Bash Operator로 컨테이너 외부의 쉡 스크립트 수행하기

  • shell 스크립트란?
    • unix/Linux shell 명령을 이용하여 만들어지고 인터프리터에 의해 한 줄씩 처리되는 파일
  • worker 컨테이너가 쉘 스크립트를 실행할려면 plugins 폴더를 마운트 해주면됨

plugins에 쉡 스크립트를 넣어주면 됨

 

 

실습을 해보자

  • select_fruit.sh 파일에 파일 실행 권한을 부여해준다
chmod +x select_fruit.sh

잘 나오는 것을 확인했음

 

깃허브에 올리고 컨테이너 영역에서 실행시킬 airflow/dags에 쉡 스크립트를 pull을 해놓자


import pendulum
# Airflow 3.0 부터 아래 경로로 import 합니다.
from airflow.providers.standard.operators.bash import BashOperator
from airflow.sdk import DAG

# Airflow 2.10.5 이하 버전에서 실습시 아래 경로에서 import 하세요.
#from airflow.operators.bash import BashOperator
#from airflow import DAG

with DAG(
    dag_id="dags_bash_select_fruit",
    schedule="10 0 * * 6#1",
    start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
    catchup=False
) as dag:
    
    t1_orange = BashOperator(
        task_id="t1_orange",
        bash_command="/opt/airflow/plugins/select_fruit.sh ORANGE",
    )

    t2_avocado = BashOperator(
        task_id="t2_avocado",
        bash_command="/opt/airflow/plugins/select_fruit.sh AVOCADO",
    )

    t1_orange >> t2_avocado
  • dag역시 작성하면됨
  • bash_command는 도커 컨테이너 내부의 경로를 가리키고 있음

 

잘 나온것을 확인할 수 있음

 

 

'데이터 > Airflow' 카테고리의 다른 글

데이터 공유  (0) 2025.08.27
Airflow Template Variable  (0) 2025.08.27
Python 오퍼레이터  (0) 2025.08.15
Airflow 소개 & 환경구축  (0) 2025.06.28