패키지 설치
Airflow 작업 결과를 슬랙으로 받아보도록 하겠습니다. 우선 에어플로우가 설치된 가상환경에서 해당 패키지를 설치합니다.
pip install slack_sdk
슬랙에서 APP 생성
https://api.slack.com/ 에 접속하면 앱을 생성할 수 있습니다.
앱 이름은 나중에 변경할 수 있습니다. 사용하고자 하는 워크스페이스를 설정한 후 앱을 생성합니다.
앱을 생성한 후 설정 페이지에서 좌측 중단 Feature - OAuth & Permissions를 클릭합니다.
중단 Scopes에서 chat:write 옵션을 추가합니다. 앱은 해당 옵션을 통해 메세지를 보낼 권한을 얻을 수 있습니다.
이후에는 상단의 OAuth Tokens for Your Workspace에서 Install to Workspace를 클릭합니다.
인스톨 후에 해당 위치에 토큰이 생성된 것을 확인할 수 있습니다.
이제 이 토큰을 사용해 메세지를 보낼 수 있습니다. 그 전에 알림을 받고자 하는 채널에 생성한 앱을 추가해야 합니다. 슬랙의 앱에서 방금 만든 앱을 찾아 들어간 후, 채널에 이 앱 추가 버튼을 통해 원하는 채널에 앱을 추가합니다.
이제 준비가 완료되었습니다.
슬랙 알람 클래스 정의
이제 Airflow가 슬랙으로 알람을 줄 수 있게끔 클래스를 정의합니다. token에는 방금 받았던 정보를 입력합니다. 하기 코드는 입맛에 따라 변경하시면 됩니다. 결과 샘플은 사진과 같습니다.
from slack_sdk import WebClient
from airflow.hooks.base import BaseHook
from datetime import datetime
class SlackAlert:
def __init__(self, channel):
self.channel = channel
# self.token = BaseHook.get_connection('slack').password
self.token = ##################### # 아까 받은 토큰을 넣어주세요
self.client = WebClient(token=self.token)
def failure_alert(self, context):
text = (
f'\n`DAG` : {context.get("task_instance").dag_id}'
f'\n`Task` : {context.get("task_instance").task_id}'
f'\n`Run ID` : {context.get("run_id")}'
f'\n`Date` : {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
)
self.client.chat_postMessage(channel=self.channel,
text='Process Failed :cloud:',
attachments=[{"mrkdwn_in" : ["text"],
"title" : "",
"text" : text,
"actions": [{"type": "button",
"name": "view log",
"text": "View Log",
"url": context.get("task_instance").log_url,
"style": "danger"}],
"color" : "danger"
}])
def success_alert(self, context):
text = (
f'\n`DAG` : {context.get("task_instance").dag_id}'
f'\n`Task` : {context.get("task_instance").task_id}'
f'\n`Run ID` : {context.get("run_id")}'
f'\n`Date` : {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
)
self.client.chat_postMessage(channel=self.channel,
text='Process Success :sunny:',
attachments=[{"mrkdwn_in" : ["text"],
"title" : "",
"text" : text,
"actions": [{"type": "button",
"name": "view log",
"text": "View log",
"url": context.get("task_instance").log_url,
"style": "default"}],
"color" : "good"
}])
def default_alert(self, context):
text = context
self.client.chat_postMessage(channel=self.channel, text=text, attachments=[{"mrkdwn_in" : ["text"],
"title" : "",
"text" : text,
"color": "good"}])
저는 위의 클래스를 dags 폴더 하위에 하나 폴더를 새로 파서 넣고, dag 코드에서 해당 클래스를 불러오게끔 설정했습니다.
DAG 코드에는 하기 구문을 넣습니다.
from datautils.alert import SlackAlert # 본인이 SlackAlert 클래스를 둔 곳을 임포트
slackbot = SlackAlert('#채널') # 받고자 하는 채널(아까 만든 앱이 채널에 있어야 함)
DAG의 default args에서 on_failure_callback을 추가합니다. 예시로 사용한 DAG 전체 코드는 다음과 같습니다.
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
from datautils.alert import SlackAlert
slackbot = SlackAlert('#채널') # 원하는 채널을 넣어주세요
with DAG(
"test_alert",
default_args={
'depends_on_past': True,
'email': ['your@email.co.kr'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=1),
'on_failure_callback': slackbot.failure_alert,
},
start_date=datetime(2021, 1, 1),
schedule='0/5 0-16,23 * * *',
max_active_runs=3,
max_active_tasks=5,
catchup=False,
tags=['TEST'],
) as dag:
t1 = BashOperator(task_id="success", bash_command="exit 0", dag=dag)
t2 = BashOperator(task_id="task2", depends_on_past=True, bash_command=exit 1, dag=dag)
t1 >> t2
t2에서 반드시 에러가 나오게 설정되어 있으므로 해당 DAG를 돌렸을 시 정상적으로 에러 메세지가 출력되는 걸 확인하실 수 있습니다.
참고(Slack Message 실패 경험담)
추가로 저는 SlackAlert 클래스에 success_alert 등을 추가하여 DAG 성공 시에도 메세지를 보내려 했으나 이유 모를 에러로 계속 실패했습니다. 다른 방법으로 PythonOperator로 메세지를 보내는 방법을 시도했지만 이 방법도 실패했습니다.
이 문제의 경우 BashOperator에서 아예 슬랙 메세지를 보내는 새로운 파이썬 코드를 run하는 방식을 사용하니 다행히도 메세지가 보내지는 걸 확인할 수 있었습니다. 하지만 왜 failure 시에만 메세지가 보내지고, 다른 상황에서 메세지를 보내려면 굳이 파이썬 코드를 하나 따로 만들어야 되는지는 잘 모르겠습니다.