에어플로우에서는 간단하게 파이프라인을 병렬 구조로 구현할 수 있습니다. 예시로 병렬 파이프라인을 하나 만들어 보도록 하겠습니다.
t1 >> [t2, t3]
t2 >> t4
t3 >> t5
task의 간단한 연결 만으로도 파이프라인 구조를 병렬로 구현하는 것을 확인할 수 있습니다. 다만 구조 자체는 병렬이지만, 어떠한 변경 없이는 이 task들은 하기 이미지처럼 순차적으로 처리가 됩니다.
이미지를 보시면 extract\_1
, extract\_2
가 병렬로 실행되어야 할 것 같지만, 1이 끝난 후 처리가 시작되는 것을 확인할 수 있습니다. savedb\_1
은 extract\_2
가 끝나야 실행됩니다.
이 현상이 발생하는 이유는 Executor의 설정 때문입니다.
Executor는 작업 방법을 정하는 역할을 하는데, default로 설정된 Executor가 Sequential Executor이다 보니 순차적으로 처리하게끔 설정이 되어 있습니다. 이 Sequential Executor는 Sqlite에서 사용하는 Executor로써 이를 다른 Executor로 변경하기 위해서는 DB 또한 변경을 진행해야 합니다.
이 게시물에서는 DB를 PostgreSQL로, Executor를 Local Executor로 변경하도록 하겠습니다.
PostgreSQL DB 설치 및 Executor 설정
이 내용은 제 이전 게시물에 적혀 있습니다. 아래 링크는 제 이전 게시물 링크입니다.
https://dyddl1993.tistory.com/54
결과
성공적으로 수행되는 것을 확인하실 수 있습니다.
LocalExecuter 추가 설정(airflow worker -- LocalExecutor)
이렇게 LocalExecuter를 설정하고 에어플로우 웹서버와 스케줄러를 실행한 후 ps -ef
를 통해 확인해면 수십개의 airflow worker -- LocalExecutor 가 대기하고 있는 것을 확인할 수 있습니다.
병렬처리를 위해 미리 대기를 하고 있는 것인데 대기하고 있더라도 일정 자원을 소모하고 있는 것이므로 이 갯수를 조절해야 합니다. CelaryExecuter를 사용할 경우 해결이 가능하다는 문서는 봤는데, 저는 이쪽에 대해서는 잘 모르므로 병렬처리할 프로세스 갯수를 적절하게 제한해야 합니다.
너무 줄이면 병렬처리를 할 프로세스 수가 줄어드므로 적절한 값을 설정해야 합니다. 보통 CPU Core - 1개로 설정한다고 합니다.
방법
airflow 폴더의 airflow.cfg파일에서 parallelism 옵션이 디폴트로 32로 잡혀있는데, 이 갯수를 원하는 갯수만큼 줄입니다. 보통 CPU Core - 1개로 설정한다고 합니다.
저는 8개로 설정하고 airflow worker -- LocalExecutor 수를 확인하도록 하겠습니다.
정확하게 8개로 줄어드는 걸 확인하실 수 있습니다.