- python 오퍼레이터
- 라이브러리를 가져와서 사용하면 됨
from airflow.operators.python import PythonOperator
- 즉, 파이썬 함수를 실행시키기 위한 오퍼레이터가 PythonOperator임
- BranchPythonOperator도 있는데
- 파이썬 함수 실행 결과에 따라 task를 선택적으로 실행시킬때 사용되는 오퍼레이터임
외부 파이썬 함수 수행하기
from airflow.operators.python import PythonOperator
- airflow 폴더 아래 operators 폴더 아래 python 파일 아래에서 PythonOperator 클래스를 가져오라는거
- 파이썬은 이 경로를 어떻게 찾을까?
- 즉, dag에서 우리가 만든 외부 함수를 import 해와야 하는데 import 경로를 어떻게 작성할지 알아보자
- 파이썬은 sys.path 변수에서 모듈의 위치를 검색함
- 실행하는 파이썬 파일과 동일 디렉토리에 있는 파일을 import 하는 경우
- pip으로 설치한 라이브러리들은 sys.path에 들어가 있음
- 즉, 우리가 어떤 함수를 만들어서 dag에 추가할려면 sys.path에 추가해주면 됨, 추가 방법에는 두 가지가 있음
- 명시적으로 추가 (ex: sys.append('/home/ex1') )
- OS 환경변수 PYTHONPATH에 값을 추가
- 불편하기에 airflow는 자동적으로 dags 폴더와 plugins 폴더를 sys.path에 추가해줌
- python_path에 dags와 plugins가 추가 되어있는 것을 알 수 있음
- 즉, plugins에 파이썬 파일들을 저장해두면 dags에서 import해서 쓸 수 있음
이렇게 사용하면 됨
- 공통함수 작성을 통해 재활용성이 증가됨
- 실습을 해보자
import pendulum
from common.common_func import get_sftp # plugins와 공통으로 연결된 path 덕분에 바로 import 가능함
# Airflow 3.0 부터 아래 경로로 import 합니다.
from airflow.sdk import DAG
from airflow.providers.standard.operators.python import PythonOperator
# Airflow 2.10.5 이하 버전에서 실습시 아래 경로에서 import 하세요.
#from airflow.operators.python import PythonOperator
#from airflow import DAG
with DAG(
dag_id="dags_python_import_func",
schedule="30 6 * * *",
start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
task_get_sftp = PythonOperator(
task_id='task_get_sftp',
python_callable=get_sftp
)
- 경로를 적을때 plugins.common 이렇게하면 airflow가 인식이 안됨 (plugins도 path에 들어가있기 때문에)
- 그래서 그대로 common.common~~ 이렇게 적어주면 되지만, 오류가 발생해서 env 파일을 통해 해결을 할려함
WORKSPACE_FOLDER = /Users/minimac/Desktop/study/airflow_log
PYTHONPATH = ${WORKSPACE_FOLDER}/plugins
- .env 파일을 작성해주고 깃허브에는 안올려도 되니깐 .gitignore에도 추가를 해놓자
- 깃허브 올리고 다시 도커있는데 넣어주면 아래와 같이 나타나는 것을 확인할 수 있음
task 데코레이터 사용하기
- 원래의 함수를 wrapping해서 추가 기능을 덧붙이는걸 말함
- 위에처럼 쓰면 번거롭기에 데코레이터를 붙여서 쓰면 좋음
- 데코레이터를 쓰면 아래처럼 효율적으로 사용 가능함
- airflow에도 task 데코레이터를 사용할 수 있음
파이썬 함수 파라미터
def regist(name, univ):
print(name)
print(univ)
regist('min','ku')
- 호출하는 로직에서 몇 개의 파라미터를 넘길지 모를때는 일반적인 방법이 어려움
- 또는, 선택적으로 변수를 받고 싶을 때도..
- 그래서 *args를 사용함
- 기본적인 틀 외에 추가적인 파라미터들은 *args에 추가되어서 넘어가게됨
- args로 들어온 값은 튜플로 저장됨
- 그렇기에 args에서 값을 꺼낼 때는 인덱스를 이용함 (args[0], args[1] ...)
- args라는 이름 외 다른 이름으로 받아도됨 (some_func(*kk)
**kwargs
- kwargs에서는 딕셔너리 형태로 값이 다 저장되게 됨
- args, kwargs를 같이 써도 괜찮음
op_args 사용법
- 즉, op_args = [~~] 이 위치를 통해 값을 args로 전달함
op_kwargs