데이터/Airflow

Python 오퍼레이터

mini'scloud 2025. 8. 15. 16:58
  • python 오퍼레이터
    • 정의된 파이썬 함수를 실행시키는 오퍼레이터
  • 라이브러리를 가져와서 사용하면 됨
from airflow.operators.python import PythonOperator

 

  • 즉, 파이썬 함수를 실행시키기 위한 오퍼레이터가 PythonOperator임
  • BranchPythonOperator도 있는데
    • 파이썬 함수 실행 결과에 따라 task를 선택적으로 실행시킬때 사용되는 오퍼레이터임

 

 

외부 파이썬 함수 수행하기

from airflow.operators.python import PythonOperator
  • airflow 폴더 아래 operators 폴더 아래 python 파일 아래에서 PythonOperator 클래스를 가져오라는거
  • 파이썬은 이 경로를 어떻게 찾을까?
    • 즉, dag에서 우리가 만든 외부 함수를 import 해와야 하는데 import 경로를 어떻게 작성할지 알아보자
  • 파이썬은 sys.path 변수에서 모듈의 위치를 검색함

  • 실행하는 파이썬 파일과 동일 디렉토리에 있는 파일을 import 하는 경우
    • 그냥 import 파일 하면됨
  • pip으로 설치한 라이브러리들은 sys.path에 들어가 있음
  • 즉, 우리가 어떤 함수를 만들어서 dag에 추가할려면 sys.path에 추가해주면 됨, 추가 방법에는 두 가지가 있음
    • 명시적으로 추가 (ex: sys.append('/home/ex1') )
    • OS 환경변수 PYTHONPATH에 값을 추가
  • 불편하기에 airflow는 자동적으로 dags 폴더와 plugins 폴더를 sys.path에 추가해줌

  • python_path에 dags와 plugins가 추가 되어있는 것을 알 수 있음
  • 즉, plugins에 파이썬 파일들을 저장해두면 dags에서 import해서 쓸 수 있음

이렇게 사용하면 됨

  • 공통함수 작성을 통해 재활용성이 증가됨
  • 실습을 해보자
import pendulum
from common.common_func import get_sftp # plugins와 공통으로 연결된 path 덕분에 바로 import 가능함

# Airflow 3.0 부터 아래 경로로 import 합니다.
from airflow.sdk import DAG
from airflow.providers.standard.operators.python import PythonOperator

# Airflow 2.10.5 이하 버전에서 실습시 아래 경로에서 import 하세요.
#from airflow.operators.python import PythonOperator
#from airflow import DAG

with DAG(
    dag_id="dags_python_import_func",
    schedule="30 6 * * *",
    start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
    catchup=False
) as dag:

    task_get_sftp = PythonOperator(
        task_id='task_get_sftp',
        python_callable=get_sftp
    )

 

  • 경로를 적을때 plugins.common 이렇게하면 airflow가 인식이 안됨 (plugins도 path에 들어가있기 때문에)
  • 그래서 그대로 common.common~~ 이렇게 적어주면 되지만, 오류가 발생해서 env 파일을 통해 해결을 할려함
WORKSPACE_FOLDER = /Users/minimac/Desktop/study/airflow_log
PYTHONPATH = ${WORKSPACE_FOLDER}/plugins
  • .env 파일을 작성해주고 깃허브에는 안올려도 되니깐 .gitignore에도 추가를 해놓자
  • 깃허브 올리고 다시 도커있는데 넣어주면 아래와 같이 나타나는 것을 확인할 수 있음

 


task 데코레이터 사용하기

  • 원래의 함수를 wrapping해서 추가 기능을 덧붙이는걸 말함

  • 위에처럼 쓰면 번거롭기에 데코레이터를 붙여서 쓰면 좋음

  • 데코레이터를 쓰면 아래처럼 효율적으로 사용 가능함

 

  • airflow에도 task 데코레이터를 사용할 수 있음

 


파이썬 함수 파라미터

  • 먼저, 일반적인 함수 인자 작성 방법을 보자
def regist(name, univ):
	print(name)
	print(univ)
    
    
regist('min','ku')
  • 호출하는 로직에서 몇 개의 파라미터를 넘길지 모를때는 일반적인 방법이 어려움
  • 또는, 선택적으로 변수를 받고 싶을 때도..
  • 그래서 *args를 사용함

  • 기본적인 틀 외에 추가적인 파라미터들은 *args에 추가되어서 넘어가게됨
  • args로 들어온 값은 튜플로 저장됨
    • 그렇기에 args에서 값을 꺼낼 때는 인덱스를 이용함 (args[0], args[1] ...)
    • args라는 이름 외 다른 이름으로 받아도됨 (some_func(*kk)

**kwargs

  • kwargs에서는 딕셔너리 형태로 값이 다 저장되게 됨
  • args, kwargs를 같이 써도 괜찮음

 

op_args 사용법

 

  • 즉, op_args = [~~] 이 위치를 통해 값을 args로 전달함

op_kwargs

'데이터 > Airflow' 카테고리의 다른 글

데이터 공유  (0) 2025.08.27
Airflow Template Variable  (0) 2025.08.27
오퍼레이터 기본/ mount 설정  (0) 2025.08.14
Airflow 소개 & 환경구축  (0) 2025.06.28