데이터 수집 및 적재, 데이터 분석 등을 파이프라인으로 구현해주는 에어플로우 사용 방법에 대해 정리해보도록 하겠습니다.
설치
우선 에어플로우 설치를 진행합니다. M1, M2를 사용하는 맥북의 경우 아래 방식으로 설치를 진행합니다.
https://airflow.apache.org/docs/apache-airflow/stable/installation/index.html
pip install apache-airflow
# MAC
# 환경에 맞춰 파라미터 변경
# AIRFLOW_VERSION은 latest 가능
pip install "apache-airflow[celery]==2.4.0" --constraint "<https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt>"
# 예시
pip install "apache-airflow[celery]==2.4.0" --constraint "<https://raw.githubusercontent.com/apache/airflow/constraints-2.4.0/constraints-3.8.txt>"
실행
설치 완료 후에는 실행을 위한 설정을 진행해야 합니다. airflow 관련 설정은 airflow.cfg에서 확인할 수 있습니다.
# airflow 폴더에서
vi airflow.cfg
airflow 메타데이터를 관리하는 DB를 init합니다.
airflow db init
# rm -rf ./airflow.db
관리자 계정을 생성합니다.
airflow users create \\
--username admin \\
--firstname FIRST_NAME \\
--lastname LAST_NAME \\
--role Admin \\
--email admin@example.org
webserver를 실행합니다.
airflow webserver --port 8080
# 참고(webserver kill)
lsof -i tcp:8080
kill <pid>
# 또는
pkill -f airflow
웹서버에 접속합니다. http://0.0.0.0:8080
참고로 DAG 관련 파일은 /airflow/dags 경로에 있어야 합니다. 작업할 파일을 해당 경로로 이동하거나, 또는 airflow.cfg에서 폴더 위치를 원하는 곳으로 변경합니다.
다른 커맨드 창에서는 스케줄러를 실행하여 에어플로우 스케줄러가 작동할 수 있도록 합니다.
airflow scheduler
이제 준비가 끝났으므로 파이프라인 구성을 배워보도록 하겠습니다.
튜토리얼
에어플로우에서 어떻게 파이프라인을 구성하는지 튜토리얼을 통해 알아보도록 하겠습니다.
우선 임의의 배치 코드를 생성하도록 합시다. 저는 sample_batch.py 로 생성하고, 하기 코드들을 이 파일에 작성하도록 하겠습니다.
모듈 임포트
Airflow 파이프라인에 필요한 라이브러리를 임포트합니다.
from datetime import datetime, timedelta
from textwrap import dedent
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
기본 인수
향후 필요한 작업에 사용할 파라미터 사전을 정의합니다.
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
},
DAG 선언
Airflow의 핵심 개념인 DAG(Directed Acyclic Graph)를 선언합니다. DAG는 각 작업을 정의하고, 실행 순서, 스케줄 등을 정의하는 역할을 합니다.
https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html
with DAG(
'tutorial',
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args=default_args, # 아까 위에서 선언한 defalt_args
description='A simple tutorial DAG',
schedule=timedelta(days=1), # 스케줄 간격
start_date=datetime(2021, 1, 1), # 시작 시간, catchup = True시 그 사이의 스케줄을 모두 수행
catchup=False,
tags=['example'],
) as dag:
Operators
Operator는 Airflow의 작업 단위를 정의합니다. 일부 다른 경우에는 TaskFlow API를 사용할 수 있습니다.
모든 오퍼레이터는 BaseOperator에서 상속됩니다. 자주 사용되는 오퍼레이터는 PythonOperator, BashOperator 및 KubernetesPodOperator 등이 있습니다.
https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/bash.html
Tasks
DAG에서 Operator를 사용하려면 Task로 인스턴스화 해야 합니다. 작업은 DAG 컨텍스트 내에서 Operator의 작업을 실행하는 방법을 결정합니다.
t1 = BashOperator(
task_id='print_date',
bash_command='date',
)
t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
retries=3,
)
Task에 대한 우선 순위 규칙은 다음과 같습니다.
- 명시적으로 전달된 인수
- default_args사전 에 존재하는 값
- 연산자의 기본값(있는 경우)
Task는 반드시 task_id 를 지정하거나 owner 로 상속받아야 합니다.
Jinja 템플릿
Task에 Jinja 템플릿을 사용할 수 있습니다.
https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html#templates-ref
templated_command = dedent(
"""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
{% endfor %}
"""
)
t3 = BashOperator(
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
)
Adding Documentation
DAG, Task에 설명 문서를 추가할 수 있습니다.
t1.doc_md = dedent(
"""\\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](<http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png>)
**Image Credit:** Randall Munroe, [XKCD](<https://xkcd.com/license.html>)
"""
)
dag.doc_md = __doc__ # providing that you have a docstring at the beginning of the DAG; OR
dag.doc_md = """
This is a documentation placed anywhere
""" # otherwise, type it like this
Setting up Dependencies
각 테스크별 의존성을 다음과 같이 설정할 수 있습니다.
t1.set_downstream(t2)
# This means that t2 will depend on t1
# running successfully to run.
# It is equivalent to:
t2.set_upstream(t1)
# The bit shift operator can also be
# used to chain operations:
t1 >> t2
# And the upstream dependency with the
# bit shift operator:
t2 << t1
# Chaining multiple dependencies becomes
# concise with the bit shift operator:
t1 >> t2 >> t3
# A list of tasks can also be set as
# dependencies. These operations
# all have the same effect:
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1
Timezone
표준 라이브러리의 Timezone은 제한 등의 문제가 있어 DAG에서는 사용하지 않습니다.
샘플
위의 내용들을 기반으로 테스트 샘플을 만들었습니다. 저는 sample_batch.py 코드에 아래 내용을 복사하여 dag 폴더 경로로 이동시켰습니다. 기본경로는 /airflow/dags 이며 airflow.cfg 파일에서 경로를 수정할 수 있습니다.
from datetime import datetime, timedelta
from textwrap import dedent
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
with DAG(
'tutorial',
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
},
description='A simple tutorial DAG',
schedule=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['example'],
) as dag:
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
)
t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
retries=3,
)
t1.doc_md = dedent(
"""\\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](<http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png>)
**Image Credit:** Randall Munroe, [XKCD](<https://xkcd.com/license.html>)
"""
)
dag.doc_md = __doc__ # providing that you have a docstring at the beginning of the DAG; OR
dag.doc_md = """
This is a documentation placed anywhere
""" # otherwise, type it like this
templated_command = dedent(
"""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
{% endfor %}
"""
)
t3 = BashOperator(
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
)
t1 >> [t2, t3]
DAG 실행
커맨드 라인에서도 실행할 수 있지만 웹 서버를 활용해 실습해보도록 하겠습니다. 튜토리얼 전 실행 단계에서 웹서버를 키지 않은 경우 다음과 같은 명령어를 사용할 수 있습니다.
airflow webserver --port 8080
이후 http://0.0.0.0:8080 로 들어간 후 아까 설정한 계정으로 접속합니다. 만약 계정을 만들지 않았다면 위의 실행 쪽 문서를 진행해 주시면 됩니다.
상기 이미지는 튜토리얼 파일은 아니고 제가 테스트중인 DAG의 리스트입니다.
웹서버에 로그인하면 아까 올린 DAG py들이 올라온 걸 확인할 수 있습니다. 좌측 빨간 박스의 pause/unpause 키를 눌러 DAG를 on/off할 수 있으며, 우측 Actions의 실행 키를 통해 테스트를 진행 할 수 있습니다.
DAG의 진행 상황은 Runs를 통해, 각 Task의 진행 상황은 Recent Tasks를 통해 확인할 수 있습니다.