1. Introduction
Apache Airflow란??
⇒ Workflow를 author, monitor, schedule하는 플랫폼
- dependencies 고려
- DAG(Directed Acyclic Graphs)의 형태의 코드로 작성
- dynamic, manageable해짐
- 파이썬 기반!
- UI가 편하고 코드 치기 편
Airflow이전의 기존의 scheduling방법들??
ETL이란(data pipe line의 기본적인 구조) ⇒ Extract → Transform → Load
Cron : unix 계열 운영 체제에서 특정 시간에 명령을 실행하는 데 사용할 수 있는 유틸리티, crontab에 의해 구현
Cronjob : 특정 시간에 파일 백업, 보고서 생성 또는 데이터베이스 업데이트와 같은 작업을 실행하는 데 사용 ⇒ 정기적으로 job을 만드는 것
Crontab : 주어진 일정에 주기적으로 실행하도록 규정해놓은 crontab (cron table) 파일
- 스크립트들이 많아지고 서로에 대한 의존성이 생기게 되면 컨트롤하기 어렵고, 기존 작업이 실패했을 때 다시 스크립트를 실행하려면 로그를 확인하고 실행해야 하는 등의 문제점
- 어디서 잘못되었는지 확인하기도 어렵고, 의존성이 있는 스크립트가 잘못되었는데 그 다음 스크립트가 실행이 되버리는 등의 문제점
Airflow는 이걸 어떻게 해결하는가?
Airflow는 DAG 단위로 구성되어있음 (DAG : Directed Acyclic Graph : 방향이 있는 순환되지 않는 그래프) ⇒ workflow를 작은 task들로 구분할 수 있음
<장점>
- Dynamic : Python기반이기 때문에 자유롭게 수정하고 변경하는 등 control을 가지고 있
- Extensible : Airflow는 작업 관리를 위해 필요한 연산자들을 다양하게 보유하지만, 자신의 환경에 맞는 자기만의 연산자를 정의할 수 있음
- Scalable : Module형태 & Queuting System으로 인해, 여러개의 TASK를 진행할 수 있음
- Configurable : 자유롭게 구성 가능함 DAG뿐만 아니라, 핵심 setting들도 변경할 수 있음
- Monitoring이 쉬운 UI & 중앙 관리(track쉬움) & task사이의 dependency handling이 쉬움 & 과거 데이터를 쉽게 재생산 가능 & 오픈 소스라 좋음~
의존성을 표현할 수 있고, 스크립트가 실패했을 때 알람을 보내 확인하고 쉽게 수정 & 재시도할 수 있고, 이전 날짜 작업이 실패했을 때 그 날짜만 다시 실행하는 등 위의 문제점을 많이 해결
기본 용어 정리
DAG
- Loop가 존재하지 않은 단방향 graph
- 각 node는 task를 의미하고, edge는 관련성을 의미함!
어떤 작업들이 병렬적으로 이루어질 수 있는지, 어떤 작업들이 순차적으로 이루어져야하는지를 파악한 뒤에 이에 따라 task를 진행
Operator (Task 정의)
- DAG의 결과로 실제 어떤 것을 해야하는지를 결정
- DAG는 어떻게 task를 수행할 것인지에 대한 것이고, 실제 연산을 하지 않기 때문에 실제 script를 수행하는 것은 operator
- python, aws, slack, bash 등에서의 동작 실행 가능
Task
- Task는 Operator가 실제로 instance화 된 결과
- 실제로 worker가 work를 하는 대상
Workflow
- 의존성(dependencies)로 연결된 task들의 집합
- 사실상 DAG
2. Architecture of Apach Airflow
Architecture of Airflow
METADATA
- 모든 DAG 실행 결과, task instance, 성공, 실패, 수행중 여부,,,,
- DAB, task 등에 대한 모든 현재, 과거 정보를 저장함
- SQL 형태
Web Server/UI
- Frontend / HTTP 유지
- 사용자에게 필요한 다양한 데이터를 예쁘게 보여주는 곳~~ (graph, number,,,)
SCHEDULER
- 핵심 부분, task를 배치하고 실행하는 부분
- DAG에 있는 configuration 분석(task간의 순서 등) ⇒ 필요한 task들을 수행
- Executor에게 task제공해주는 역할
Executor
- Scheduler의 역할은 각 task들이 올바른 시점에 시작할 수 있도록 하는 장치
- 실제로 task 를 시작하고 실행하는 곳은 Executor
- Schedulere와 밀접하게 작동해서 Message Queiuing Process, 예정된 작업을 실행할 worker를 정
- Sequential & Local & Celery & Mesos,.,,등의 executor을 사용 가능
Worker
- 실세로 task를 실행하는 주체
- Executor의 종류에 따라 다양하게 동작함
DAG
- 데이터 엔지니어가 pythoh으로 작성한 workflow
- task들 사이의 dependency를 정의해줌
어떤 Cluster형태를 사용하는지에 따라서 다른 Executor를 사용할 수 있음
Single Node Cluster
모든 components들이 Master Node라는 한개의 Node에 위치하고 있음
이 경우, 외부 resource를 사용할 수 없기 때문에 Master Node의 resource에 의존함
⇒ Local Executor 사용
Multi Node Cluster
DAG의 개수가 많아지고, big data를 다루게 되면, Multi Node Cluster를 사용
components 자체는 동일하지만, Scheduler & Webserver만 Master Node에 존재하고, 나머지는 worder들은 별개의 instance로 존재함
새로운 worker node를 추가하면 되기 때문에 scalability 측면에서 유리함
⇒ “Celery Executor” 사용
한편, 어떤 Executor를 사용하든 Scheduler로부터 받는 task에 따라서 수행하게 됨!!
Scheduler는 DAG로부터 task에 대한 모든 정보를 받고, 그에 따라서 task queue를 생성
⇒ 반드시 Queuing System을 사용!!!
- Local Executor은 내부적으로 queuing system이 존재하지만, multi node인 경우에는 외부 system이 필요함 (RabbitMQ, Redis)
Life Cycle of a Task
DAG가 airflow에 주어졌을 때, task의 lifecycle은 어떻게 이루어지는가???!?!
- Scheduler는 Metadata에서 실행되어야할 DAG들에 대해 계속 monitoring을 진행
- 실행되어야할 DAG를 찾으면, DAG Run 진행 → DAG의 상태를 “running”으로 update
- DAG의 내용에 따라, 어떤 task를 먼저 진행해야할지 선택하고 Queue에 입력 (Task에 대한 정보를 담은 Message) → Task의 상태를 “queued”으로 update
- Queue에 따라서 worker들이 task를 pull! → Task의 상태를 “running”으로 update
- Task가 완료되면 → Task의 상태를 “Success/failed”으로 update
- …..DAG의 모든 Task에 대해 반복….
- DAG의 상태를 “Success/failed”으로 update
3. Installation
<Docker를 사용한 설치 완료>
4. Understanding Directories in Airflow Projects
Compose file
딱히 어려운건 없음!~~ : 필요한 부분들에 대한 docker image,,,,
<DAG File 예시>
결국 apach airflow를 쓴다는건 이 DAG문서를 작성하는 과정임! (자세한건 뒤에서)
5. Airflow’s UI Tour
- Toggle : on/off : DAG이 활성화 되어있는지 여부
- DAG ID (DAG의 Task)
- Schedule : DAG가 실행될 interval time
- 들어가보면 이런거 있음 (running, success, failed,,,,)
- Recent Task
- 최근에 run된 DAG의 task들의 status
- Last Run
- Dag Runs
Tutorial DAG File 실행해보기
- 시작 날짜가 2015년으로 되어있어서 수정해야함
catchup=False 설정하기 : 나중에 설명해줌
- Start date 바꾸기 ⇒ Refresh
DAG 가 실행되는 시점은, 가장 최근 run date + schedule interval time이 지난 직후!
Graph View는 한번의 DAG Run에 대한 log만 볼 수 있다면,
Tree View에서는 여러개의 DAG Run들에 대한 log를 동시에 볼 수 있
Graph
Tree
Gantt
Task Duration : 각 DAG시도에 따른 Task별로 소요 시간
Task Tries : Task가 fail해서 재시도한 경우를 확인할 수 있음 (Untried인 경우에도 포함됨)
DAG File의 구성
우선, DAG definition file은 실제로 데이터가 어떻게 이동되고, 처리되는지에 대한 pipeline이 아님!!
⇒ DAG definition file은 data pipeline들에 대한 setting을 정리한 코드임 (DAG Structure)
- 언제 시작할건지, 언제 종료할 것인지, 재시도는 언제 몇번 할 것인지, task dependencies,,,
1. 필요한 Library import
2. 공통변수 & DAG 속성값 정의
- depends_on_past : 현재 실행되고 있는 DAG가 과거에 성공했던 DAG에 depend할 것인지 여
- retries : 실패했을 경우 몇번 재시도 할 것인지
- retry_delay : retry할 때 얼마나 뒤에 retry할 것인지
3. DAG 정의(속성값 전달)
- schedule_interval : DAG가 실행될 time interval (1 day)
4. Task 정의
- 어떤 작업을 할 것인지에 따라서 Operator가 달라짐 (여기서는 BashOperator)
- PythonOperator, EmailOperator 등
- task_id(UI에 보여지는 이름) - bash_command(실제 수행되는 command) - dag(Dag정의)
- jinja template???
- Jinja Template (Flask 개발에 많이 사용되는 템플릿 엔진)
- 웹 템플릿 엔진 : 웹페이지에 연산 같은 작업을 수행시켜 웹페이지를 편하게 완성시키는 기능 ⇒ html 코드 내에서 파이썬 코드를 작성하고 연산이 가능해짐
- 간단한 문법으로 html에서 {}, {{}} 등의 규칙을 이용하여 python 프로그래밍이 가능
- 데이터를 담은 변수를 return값으로 html과 함께 넘겨준 뒤, html에서 jinja 템플릿 규칙에 따라 화면에 표현할 수 있음
- Jinja 템플릿 html에서 주석은 {# 주석으로 처리할 내용 #}으로 표시
5. Dependencies 정의(순서)
- t2.set_upstream(t1) : t1이 완료된 후에 t2 실행
- downstream은 반대 의미~~
- 혹은 >> << 사용해서 표현하기도
⇒ 다시 UI로 돌아가서 Log를 살펴보면 좀 더 잘 이해할 수 있다
6. Operators
Operator는 workflow상에서 각 task를 정의/설명해주는 부분이고, 실제로 task를 수행하지는 않음
- Operator는 주로 atomic하다 (독립적으로 존재하고 다른 operator과 resource공유 안함)
- Operator는 idempotent하다 ⇒ 매번 같은 결과를 가지고 와야함
- Oprator가 instance화 되면 task로 간주된다
- 다양한 Operator는 Airflow의 BaseOperator Class에서 파생됨
- 만약 operator가 서로 정보를 공유해야하는 경우, XCom을 사용
- Sensor Operator
Ex) 특정 파일이 입력될때까지 수행해서 입력되면 반응 (HdfsSensor, FileSensor,,,)
- Transfer Operator
- Data를 한 곳에서 다른 곳으로 transfer 해줌
Ex) MySqlToHiveTransfer, S3ToReshiftTransfer,,,
- Action Operators (Operators)
- 실제로 다양한 action을 trigger하는 operator
Ex) BashOperator, PythonOperator, BigQueryOperator,,,,
- TaskFlow Operator
- Airflow 2.0.0 신규 기능
- @task 데코레이터를 사용해서 task를 패키지화 할 수 있는 기능
- 간단한 task들을 파이썬으로 직접 작성할 수 있음
8. Airflow CLI
- Airflow 실행
- docker exec -it xxxx bash
- Directory 내 DAG 확인
airflow list_dags
- Meatadata DB 초기화
airflow initdb
- UI를 구성하는 webserver instance 생성
- airflow webserver
- Scheduler Instance 실행
airflow scheduler
- Airflow의 모든 connection 종류
airflow connections
- 특정 DAG의 task를 확인
- airflow list_tasks XX
- 주어진 DAG id의 dag run 확인
- airflow list_dag_runs XXX
- 특정 DAG를 실행
airflow trigger_dag XXX
- 실행중인 DAG 중단/재생
- airflow pause/unpause XXX
- Dependency, data 저장과 관계 없이 특정 task를 실험해보고 싶을 때
airflow test
- 다음 execution 시점
airflow delete_dag XXX
9. Executor
Executor ⇒ Scalabe 함!!! (확장 가능하다~!!)
Airflow의 architecture를 다시 생각해보면, scheduler가 meta data의 DAG, Task를 보고, 이를 통해 queue를 생성
이를 바탕으로 Executor가 실제로 어떤 resource를 가지고, 어떤 worker를
Sequential Executor
- Airflow에서 사용하는 default Executor
- 여러개의 task를 병렬적으로 처리하지 못하고, 한번에 한개씩의 task만 수행
- 실시간 사용보다는 test할 때에 주로 사용됨
- Sequential Executor는 자동적으로 task를 진행해주지 않아서, CLI에서 직접 trigger 해줘야함!! ⇒ 직접하면 compose file의 영향을 받지 않게됨,,??
- 우리는 docker를 통해서 airflow를 사용하기 때문에, 원래는 sequential이 기본이지만 compose file에는 local로 되어있어서 Local Executor로 진행
Local Executor
- 동시에 여러개 task 수행 가능 (locally in parallel)
- 한개의 local worker!!
- MySQL & Postgres 사용
- 저렴하고 resource 소모량이 적음 (제한된 상황에서 사용good), 그만큼 local이라는 한계가 있음
cfg file의 설정
- parallelism : 모든 DAG들에 대해 동시에 진행할 수 있는 최대 task 개수
- dag_concurrency : 각 DAG 내에서 동시에 진행할 수 있는 최대 task 개
Celery Executor
- scale up을 위한 executor ⇒ 여러 개의 worker 사용 가능 (multi-node)
- RabbitMQ, Redis등을 사용해 여러 개의 task사이의 communication을 하는 비동기적 task queue system임
- scheduler과 worker가 별도로 작용됨
- flower : Celery Worker, Task 진행 상황 등을 monitoring하기 위
- scheduler & worker을 독립적으로 작성
추가(오늘의 집 Tech Blog)
버킷플레이스 Airflow 도입기 - 오늘의집 블로그
쏘카 데이터 그룹 - Airflow와 함께한 데이터 환경 구축기(feat. Airflow on Kubernetes)
전사 구성원들이 사용하는 배치 데이터 플랫폼 만들기 - Airflow Advanced
쏘카에서 Airflow를 정리한건 Airflow 자체보다는 자기네들이 git하고 google cloud platform에서 어떻게 아키텍쳐를 구성하고 활용하는지에 대한 내용 (디테일한 내용들)
오늘의 집에서는 원래 젠킨슨을 썼다고 하더라,, 근데
- 모니터링이 잘 안되고
- 확장성도 별로 안좋고
- 복잡한 파이프라인을 짜기 어렵다 하드라
1. Executor 고민
오늘의 집 같은 기업에서는 당연히 Local이나 Sequential을 사용할리가 없고!!
Celery Executor하고 Kubernetes Executor 사이에서 고민을 했다고 함
1-1. Celery Executor
1-2. Kubernetes Executor
- Task를 스케줄러가 실행가능 상태로 변경하면 메시지 브로커에 전달하는게 아니라 Kubernetes API를 사용하여 Airflow worker를 pod 형태로 실행하는 방식
- Worker에 대한 유지 보수가 필요없고, Kubernetes를 활용하여 지속적으로 자원을 점유하지 않기 때문에 효율적으로 자원을 사용할 수 있음
- 짧은 Task에도 pod을 생성하는 overhead가 있으며, celery executor에 비해 자료가 적고 구성이 복잡하다는 단점
Pod가 무엇인가?
Kubernetes 구조
- 클러스터 전체를 관리하는 master + 컨테이너가 배포되는 Node(Worker Node)
- Master에서 적절한 Worker Node에 컨테이너를 할당하고 운영하는 구조
Kubernetes의 가장 기본적인 배포 단위로 kubernetes는 컨테이너(docker)를 하나씩 배포하는게 아니라, Pod안에 컨테이너를 탑재한 상태로 배포한다 (1개 이상의 컨테이너 탑재 가능)
- 장점1. Pod를 사용하면 통신 용이성이 향상되고(컨테이너끼리 local host로 호출 가능해짐)
- 장점 2. Pod 내의 컨테이너끼리는 디스크 볼륨을 공유할 수 있음
KubernetesPodOperator
- 쿠버네티스 환경에서 Airflow Worker가 지정한 이미지로 새로운 pod을 생성하여 Task를 실행
- Airflow Worker ⇒ Kubernetes Pod 생성 이 순서이므로, Kubernetes API를 모두 사용하여 Pod 실행을 조절할 수 있음
- 해당 워커 POD에서 개발자가 직접 정의한 컨테이너 이미지를 POD 형태로 또다시 실행하므로, Airflow 환경에서도 다양한 서비스에 접근할 수 있는 자유로운 환경 생
한편, 이런식으로 필요할때만 Kubernetes를 사용하는, KubernetesExecutor & KubernetesPodOperator를 통해 사용하는 방식이 있는가 하면,
그냥 kubernets 환경 위에 airflow를 구축하는 방식도 있다.
- 구성이 간단하고 템플릿화하기 쉽다는 장점이 있지만,
- 구성이 Pod 형태로 바뀌었을 뿐, 자원을 계속 점유해야하거나, 규모가 커질 경우 유지보수가 어려워진다는 단점은 그대로 있다
그 내용은 아래 링크에 자세히 나와있는데, 모르는 내용이 많아서 제대로 정리하지 못했다..
Kubernetes를 이용한 효율적인 데이터 엔지니어링(Airflow on Kubernetes VS Airflow Kubernetes Executor) - 2
2. DAG 작성
2-1. 시간
start_date & schedule_interval & execution_date
- Airflow에서 Start Date는 실제로 DAG가 실행되기 시작하는 날짜가 아니라, schedule이 시작되는 날짜
- 따라서, 1/1부터 하루에 한번씩 스케줄이 등록되므로, 1/2부터 execution_date가 1/1인 DAG가 실행된다
- execution_date가 1/1인 task는 1/1의 데이터를 가지고 Task를 수행해야하기 때문에 실제 실행되는 것은 1/2부터라고 이해할 수 있
2-2. Backfill & Catchup
- Start_date를 과거로 설정해놓으면, 과거의 Task를 차례로 실행하는 Backfill이 실행된다
- 근데 이를 원치 않을 경우 Catchup=False로 하면 Backfill을 실행하지 않음!
2-4. DRT(Dont Repeat Yourself) DAG 작성
별건 아니고, 작성한 Task들이 반복적인 경우가 있을 경우, 여러번 작성하지 말고 for 문을 사용해서 Task 생성할 수 있음
Apache Airflow란??
⇒ Workflow를 author, monitor, schedule하는 플랫폼
- dependencies 고려
- DAG(Directed Acyclic Graphs)의 형태의 코드로 작성
- dynamic, manageable해짐
- 파이썬 기반!
- UI가 편하고 코드 치기 편
Airflow이전의 기존의 scheduling방법들??
ETL이란(data pipe line의 기본적인 구조) ⇒ Extract → Transform → Load
Cron : unix 계열 운영 체제에서 특정 시간에 명령을 실행하는 데 사용할 수 있는 유틸리티, crontab에 의해 구현
Cronjob : 특정 시간에 파일 백업, 보고서 생성 또는 데이터베이스 업데이트와 같은 작업을 실행하는 데 사용 ⇒ 정기적으로 job을 만드는 것
Crontab : 주어진 일정에 주기적으로 실행하도록 규정해놓은 crontab (cron table) 파일
- 스크립트들이 많아지고 서로에 대한 의존성이 생기게 되면 컨트롤하기 어렵고, 기존 작업이 실패했을 때 다시 스크립트를 실행하려면 로그를 확인하고 실행해야 하는 등의 문제점
- 어디서 잘못되었는지 확인하기도 어렵고, 의존성이 있는 스크립트가 잘못되었는데 그 다음 스크립트가 실행이 되버리는 등의 문제점
Airflow는 이걸 어떻게 해결하는가?
Airflow는 DAG 단위로 구성되어있음 (DAG : Directed Acyclic Graph : 방향이 있는 순환되지 않는 그래프) ⇒ workflow를 작은 task들로 구분할 수 있음
<장점>
- Dynamic : Python기반이기 때문에 자유롭게 수정하고 변경하는 등 control을 가지고 있
- Extensible : Airflow는 작업 관리를 위해 필요한 연산자들을 다양하게 보유하지만, 자신의 환경에 맞는 자기만의 연산자를 정의할 수 있음
- Scalable : Module형태 & Queuting System으로 인해, 여러개의 TASK를 진행할 수 있음
- Configurable : 자유롭게 구성 가능함 DAG뿐만 아니라, 핵심 setting들도 변경할 수 있음
- Monitoring이 쉬운 UI & 중앙 관리(track쉬움) & task사이의 dependency handling이 쉬움 & 과거 데이터를 쉽게 재생산 가능 & 오픈 소스라 좋음~
의존성을 표현할 수 있고, 스크립트가 실패했을 때 알람을 보내 확인하고 쉽게 수정 & 재시도할 수 있고, 이전 날짜 작업이 실패했을 때 그 날짜만 다시 실행하는 등 위의 문제점을 많이 해결
기본 용어 정리
DAG
- Loop가 존재하지 않은 단방향 graph
- 각 node는 task를 의미하고, edge는 관련성을 의미함!
어떤 작업들이 병렬적으로 이루어질 수 있는지, 어떤 작업들이 순차적으로 이루어져야하는지를 파악한 뒤에 이에 따라 task를 진행
Operator (Task 정의)
- DAG의 결과로 실제 어떤 것을 해야하는지를 결정
- DAG는 어떻게 task를 수행할 것인지에 대한 것이고, 실제 연산을 하지 않기 때문에 실제 script를 수행하는 것은 operator
- python, aws, slack, bash 등에서의 동작 실행 가능
Task
- Task는 Operator가 실제로 instance화 된 결과
- 실제로 worker가 work를 하는 대상
Workflow
- 의존성(dependencies)로 연결된 task들의 집합
- 사실상 DAG
2. Architecture of Apach Airflow
Architecture of Airflow
METADATA
- 모든 DAG 실행 결과, task instance, 성공, 실패, 수행중 여부,,,,
- DAB, task 등에 대한 모든 현재, 과거 정보를 저장함
- SQL 형태
Web Server/UI
- Frontend / HTTP 유지
- 사용자에게 필요한 다양한 데이터를 예쁘게 보여주는 곳~~ (graph, number,,,)
SCHEDULER
- 핵심 부분, task를 배치하고 실행하는 부분
- DAG에 있는 configuration 분석(task간의 순서 등) ⇒ 필요한 task들을 수행
- Executor에게 task제공해주는 역할
Executor
- Scheduler의 역할은 각 task들이 올바른 시점에 시작할 수 있도록 하는 장치
- 실제로 task 를 시작하고 실행하는 곳은 Executor
- Schedulere와 밀접하게 작동해서 Message Queiuing Process, 예정된 작업을 실행할 worker를 정
- Sequential & Local & Celery & Mesos,.,,등의 executor을 사용 가능
Worker
- 실세로 task를 실행하는 주체
- Executor의 종류에 따라 다양하게 동작함
DAG
- 데이터 엔지니어가 pythoh으로 작성한 workflow
- task들 사이의 dependency를 정의해줌
어떤 Cluster형태를 사용하는지에 따라서 다른 Executor를 사용할 수 있음
Single Node Cluster
모든 components들이 Master Node라는 한개의 Node에 위치하고 있음
이 경우, 외부 resource를 사용할 수 없기 때문에 Master Node의 resource에 의존함
⇒ Local Executor 사용
Multi Node Cluster
DAG의 개수가 많아지고, big data를 다루게 되면, Multi Node Cluster를 사용
components 자체는 동일하지만, Scheduler & Webserver만 Master Node에 존재하고, 나머지는 worder들은 별개의 instance로 존재함
새로운 worker node를 추가하면 되기 때문에 scalability 측면에서 유리함
⇒ “Celery Executor” 사용
한편, 어떤 Executor를 사용하든 Scheduler로부터 받는 task에 따라서 수행하게 됨!!
Scheduler는 DAG로부터 task에 대한 모든 정보를 받고, 그에 따라서 task queue를 생성
⇒ 반드시 Queuing System을 사용!!!
- Local Executor은 내부적으로 queuing system이 존재하지만, multi node인 경우에는 외부 system이 필요함 (RabbitMQ, Redis)
Life Cycle of a Task
DAG가 airflow에 주어졌을 때, task의 lifecycle은 어떻게 이루어지는가???!?!
- Scheduler는 Metadata에서 실행되어야할 DAG들에 대해 계속 monitoring을 진행
- 실행되어야할 DAG를 찾으면, DAG Run 진행 → DAG의 상태를 “running”으로 update
- DAG의 내용에 따라, 어떤 task를 먼저 진행해야할지 선택하고 Queue에 입력 (Task에 대한 정보를 담은 Message) → Task의 상태를 “queued”으로 update
- Queue에 따라서 worker들이 task를 pull! → Task의 상태를 “running”으로 update
- Task가 완료되면 → Task의 상태를 “Success/failed”으로 update
- …..DAG의 모든 Task에 대해 반복….
- DAG의 상태를 “Success/failed”으로 update
3. Installation
<Docker를 사용한 설치 완료>
4. Understanding Directories in Airflow Projects
Compose file
딱히 어려운건 없음!~~ : 필요한 부분들에 대한 docker image,,,,
<DAG File 예시>
결국 apach airflow를 쓴다는건 이 DAG문서를 작성하는 과정임! (자세한건 뒤에서)
5. Airflow’s UI Tour
- Toggle : on/off : DAG이 활성화 되어있는지 여부
- DAG ID (DAG의 Task)
- Schedule : DAG가 실행될 interval time
- 들어가보면 이런거 있음 (running, success, failed,,,,)
- Recent Task
- 최근에 run된 DAG의 task들의 status
- Last Run
- Dag Runs
Tutorial DAG File 실행해보기
- 시작 날짜가 2015년으로 되어있어서 수정해야함
catchup=False 설정하기 : 나중에 설명해줌
- Start date 바꾸기 ⇒ Refresh
DAG 가 실행되는 시점은, 가장 최근 run date + schedule interval time이 지난 직후!
Graph View는 한번의 DAG Run에 대한 log만 볼 수 있다면,
Tree View에서는 여러개의 DAG Run들에 대한 log를 동시에 볼 수 있
Graph
Tree
Gantt
Task Duration : 각 DAG시도에 따른 Task별로 소요 시간
Task Tries : Task가 fail해서 재시도한 경우를 확인할 수 있음 (Untried인 경우에도 포함됨)
DAG File의 구성
우선, DAG definition file은 실제로 데이터가 어떻게 이동되고, 처리되는지에 대한 pipeline이 아님!!
⇒ DAG definition file은 data pipeline들에 대한 setting을 정리한 코드임 (DAG Structure)
- 언제 시작할건지, 언제 종료할 것인지, 재시도는 언제 몇번 할 것인지, task dependencies,,,
1. 필요한 Library import
2. 공통변수 & DAG 속성값 정의
- depends_on_past : 현재 실행되고 있는 DAG가 과거에 성공했던 DAG에 depend할 것인지 여
- retries : 실패했을 경우 몇번 재시도 할 것인지
- retry_delay : retry할 때 얼마나 뒤에 retry할 것인지
3. DAG 정의(속성값 전달)
- schedule_interval : DAG가 실행될 time interval (1 day)
4. Task 정의
- 어떤 작업을 할 것인지에 따라서 Operator가 달라짐 (여기서는 BashOperator)
- PythonOperator, EmailOperator 등
- task_id(UI에 보여지는 이름) - bash_command(실제 수행되는 command) - dag(Dag정의)
- jinja template???
- Jinja Template (Flask 개발에 많이 사용되는 템플릿 엔진)
- 웹 템플릿 엔진 : 웹페이지에 연산 같은 작업을 수행시켜 웹페이지를 편하게 완성시키는 기능 ⇒ html 코드 내에서 파이썬 코드를 작성하고 연산이 가능해짐
- 간단한 문법으로 html에서 {}, {{}} 등의 규칙을 이용하여 python 프로그래밍이 가능
- 데이터를 담은 변수를 return값으로 html과 함께 넘겨준 뒤, html에서 jinja 템플릿 규칙에 따라 화면에 표현할 수 있음
- Jinja 템플릿 html에서 주석은 {# 주석으로 처리할 내용 #}으로 표시
5. Dependencies 정의(순서)
- t2.set_upstream(t1) : t1이 완료된 후에 t2 실행
- downstream은 반대 의미~~
- 혹은 >> << 사용해서 표현하기도
⇒ 다시 UI로 돌아가서 Log를 살펴보면 좀 더 잘 이해할 수 있다
6. Operators
Operator는 workflow상에서 각 task를 정의/설명해주는 부분이고, 실제로 task를 수행하지는 않음
- Operator는 주로 atomic하다 (독립적으로 존재하고 다른 operator과 resource공유 안함)
- Operator는 idempotent하다 ⇒ 매번 같은 결과를 가지고 와야함
- Oprator가 instance화 되면 task로 간주된다
- 다양한 Operator는 Airflow의 BaseOperator Class에서 파생됨
- 만약 operator가 서로 정보를 공유해야하는 경우, XCom을 사용
- Sensor Operator
Ex) 특정 파일이 입력될때까지 수행해서 입력되면 반응 (HdfsSensor, FileSensor,,,)
- Transfer Operator
- Data를 한 곳에서 다른 곳으로 transfer 해줌
Ex) MySqlToHiveTransfer, S3ToReshiftTransfer,,,
- Action Operators (Operators)
- 실제로 다양한 action을 trigger하는 operator
Ex) BashOperator, PythonOperator, BigQueryOperator,,,,
- TaskFlow Operator
- Airflow 2.0.0 신규 기능
- @task 데코레이터를 사용해서 task를 패키지화 할 수 있는 기능
- 간단한 task들을 파이썬으로 직접 작성할 수 있음
8. Airflow CLI
- Airflow 실행
- docker exec -it xxxx bash
- Directory 내 DAG 확인
airflow list_dags
- Meatadata DB 초기화
airflow initdb
- UI를 구성하는 webserver instance 생성
- airflow webserver
- Scheduler Instance 실행
airflow scheduler
- Airflow의 모든 connection 종류
airflow connections
- 특정 DAG의 task를 확인
- airflow list_tasks XX
- 주어진 DAG id의 dag run 확인
- airflow list_dag_runs XXX
- 특정 DAG를 실행
airflow trigger_dag XXX
- 실행중인 DAG 중단/재생
- airflow pause/unpause XXX
- Dependency, data 저장과 관계 없이 특정 task를 실험해보고 싶을 때
airflow test
- 다음 execution 시점
airflow delete_dag XXX
9. Executor
Executor ⇒ Scalabe 함!!! (확장 가능하다~!!)
Airflow의 architecture를 다시 생각해보면, scheduler가 meta data의 DAG, Task를 보고, 이를 통해 queue를 생성
이를 바탕으로 Executor가 실제로 어떤 resource를 가지고, 어떤 worker를
Sequential Executor
- Airflow에서 사용하는 default Executor
- 여러개의 task를 병렬적으로 처리하지 못하고, 한번에 한개씩의 task만 수행
- 실시간 사용보다는 test할 때에 주로 사용됨
- Sequential Executor는 자동적으로 task를 진행해주지 않아서, CLI에서 직접 trigger 해줘야함!! ⇒ 직접하면 compose file의 영향을 받지 않게됨,,??
- 우리는 docker를 통해서 airflow를 사용하기 때문에, 원래는 sequential이 기본이지만 compose file에는 local로 되어있어서 Local Executor로 진행
Local Executor
- 동시에 여러개 task 수행 가능 (locally in parallel)
- 한개의 local worker!!
- MySQL & Postgres 사용
- 저렴하고 resource 소모량이 적음 (제한된 상황에서 사용good), 그만큼 local이라는 한계가 있음
cfg file의 설정
- parallelism : 모든 DAG들에 대해 동시에 진행할 수 있는 최대 task 개수
- dag_concurrency : 각 DAG 내에서 동시에 진행할 수 있는 최대 task 개
Celery Executor
- scale up을 위한 executor ⇒ 여러 개의 worker 사용 가능 (multi-node)
- RabbitMQ, Redis등을 사용해 여러 개의 task사이의 communication을 하는 비동기적 task queue system임
- scheduler과 worker가 별도로 작용됨
- flower : Celery Worker, Task 진행 상황 등을 monitoring하기 위
- scheduler & worker을 독립적으로 작성
추가(오늘의 집 Tech Blog)
버킷플레이스 Airflow 도입기 - 오늘의집 블로그
쏘카 데이터 그룹 - Airflow와 함께한 데이터 환경 구축기(feat. Airflow on Kubernetes)
전사 구성원들이 사용하는 배치 데이터 플랫폼 만들기 - Airflow Advanced
쏘카에서 Airflow를 정리한건 Airflow 자체보다는 자기네들이 git하고 google cloud platform에서 어떻게 아키텍쳐를 구성하고 활용하는지에 대한 내용 (디테일한 내용들)
오늘의 집에서는 원래 젠킨슨을 썼다고 하더라,, 근데
- 모니터링이 잘 안되고
- 확장성도 별로 안좋고
- 복잡한 파이프라인을 짜기 어렵다 하드라
1. Executor 고민
오늘의 집 같은 기업에서는 당연히 Local이나 Sequential을 사용할리가 없고!!
Celery Executor하고 Kubernetes Executor 사이에서 고민을 했다고 함
1-1. Celery Executor
1-2. Kubernetes Executor
- Task를 스케줄러가 실행가능 상태로 변경하면 메시지 브로커에 전달하는게 아니라 Kubernetes API를 사용하여 Airflow worker를 pod 형태로 실행하는 방식
- Worker에 대한 유지 보수가 필요없고, Kubernetes를 활용하여 지속적으로 자원을 점유하지 않기 때문에 효율적으로 자원을 사용할 수 있음
- 짧은 Task에도 pod을 생성하는 overhead가 있으며, celery executor에 비해 자료가 적고 구성이 복잡하다는 단점
Pod가 무엇인가?
Kubernetes 구조
- 클러스터 전체를 관리하는 master + 컨테이너가 배포되는 Node(Worker Node)
- Master에서 적절
댓글 영역