상세 컨텐츠

본문 제목

[15차시 / 15기 최경석] Apache Airflow-1

본문

1. Introduction

Apache Airflow란??

⇒ Workflow를 author, monitor, schedule하는 플랫폼

  • dependencies 고려
  • DAG(Directed Acyclic Graphs)의 형태의 코드로 작성
  • dynamic, manageable해짐
  • 파이썬 기반!
  • UI가 편하고 코드 치기 편

Airflow이전의 기존의 scheduling방법들??

ETL이란(data pipe line의 기본적인 구조) ⇒ Extract → Transform → Load

Cron : unix 계열 운영 체제에서 특정 시간에 명령을 실행하는 데 사용할 수 있는 유틸리티, crontab에 의해 구현

Cronjob : 특정 시간에 파일 백업, 보고서 생성 또는 데이터베이스 업데이트와 같은 작업을 실행하는 데 사용 ⇒ 정기적으로 job을 만드는 것

Crontab : 주어진 일정에 주기적으로 실행하도록 규정해놓은 crontab (cron table) 파일

  • 스크립트들이 많아지고 서로에 대한 의존성이 생기게 되면 컨트롤하기 어렵고, 기존 작업이 실패했을 때 다시 스크립트를 실행하려면 로그를 확인하고 실행해야 하는 등의 문제점
  • 어디서 잘못되었는지 확인하기도 어렵고, 의존성이 있는 스크립트가 잘못되었는데 그 다음 스크립트가 실행이 되버리는 등의 문제점

Airflow는 이걸 어떻게 해결하는가?

Airflow는 DAG 단위로 구성되어있음 (DAG : Directed Acyclic Graph : 방향이 있는 순환되지 않는 그래프) ⇒ workflow를 작은 task들로 구분할 수 있음

<장점>

  • Dynamic : Python기반이기 때문에 자유롭게 수정하고 변경하는 등 control을 가지고 있
  • Extensible : Airflow는 작업 관리를 위해 필요한 연산자들을 다양하게 보유하지만, 자신의 환경에 맞는 자기만의 연산자를 정의할 수 있음
  • Scalable : Module형태 & Queuting System으로 인해, 여러개의 TASK를 진행할 수 있음
  • Configurable : 자유롭게 구성 가능함 DAG뿐만 아니라, 핵심 setting들도 변경할 수 있음
  • Monitoring이 쉬운 UI & 중앙 관리(track쉬움) & task사이의 dependency handling이 쉬움 & 과거 데이터를 쉽게 재생산 가능 & 오픈 소스라 좋음~

의존성을 표현할 수 있고, 스크립트가 실패했을 때 알람을 보내 확인하고 쉽게 수정 & 재시도할 수 있고, 이전 날짜 작업이 실패했을 때 그 날짜만 다시 실행하는 등 위의 문제점을 많이 해결

기본 용어 정리

DAG

  • Loop가 존재하지 않은 단방향 graph
  • nodetask를 의미하고, edge관련성을 의미함!

어떤 작업들이 병렬적으로 이루어질 수 있는지, 어떤 작업들이 순차적으로 이루어져야하는지를 파악한 뒤에 이에 따라 task를 진행

Operator (Task 정의)

  • DAG의 결과로 실제 어떤 것을 해야하는지를 결정
  • DAG는 어떻게 task를 수행할 것인지에 대한 것이고, 실제 연산을 하지 않기 때문에 실제 script를 수행하는 것은 operator
  • python, aws, slack, bash 등에서의 동작 실행 가능

Task

  • Task는 Operator가 실제로 instance화 된 결과
  • 실제로 worker가 work를 하는 대상

Workflow

  • 의존성(dependencies)로 연결된 task들의 집합
  • 사실상 DAG

2. Architecture of Apach Airflow

Architecture of Airflow

METADATA

  • 모든 DAG 실행 결과, task instance, 성공, 실패, 수행중 여부,,,,
  • DAB, task 등에 대한 모든 현재, 과거 정보를 저장함
  • SQL 형태

Web Server/UI

  • Frontend / HTTP 유지
  • 사용자에게 필요한 다양한 데이터를 예쁘게 보여주는 곳~~ (graph, number,,,)

SCHEDULER

  • 핵심 부분, task를 배치하고 실행하는 부분
  • DAG에 있는 configuration 분석(task간의 순서 등) ⇒ 필요한 task들을 수행
  • Executor에게 task제공해주는 역할

Executor

  • Scheduler의 역할은 각 task들이 올바른 시점에 시작할 수 있도록 하는 장치
  • 실제로 task 를 시작하고 실행하는 곳은 Executor
  • Schedulere와 밀접하게 작동해서 Message Queiuing Process, 예정된 작업을 실행할 worker를 정
  • Sequential & Local & Celery & Mesos,.,,등의 executor을 사용 가능

Worker

  • 실세로 task를 실행하는 주체
  • Executor의 종류에 따라 다양하게 동작함

DAG

  • 데이터 엔지니어가 pythoh으로 작성한 workflow
  • task들 사이의 dependency를 정의해줌

어떤 Cluster형태를 사용하는지에 따라서 다른 Executor를 사용할 수 있음

Single Node Cluster

모든 components들이 Master Node라는 한개의 Node에 위치하고 있음

이 경우, 외부 resource를 사용할 수 없기 때문에 Master Node의 resource에 의존함

⇒ Local Executor 사용

Multi Node Cluster

DAG의 개수가 많아지고, big data를 다루게 되면, Multi Node Cluster를 사용

components 자체는 동일하지만, Scheduler & Webserver만 Master Node에 존재하고, 나머지는 worder들은 별개의 instance로 존재함

새로운 worker node를 추가하면 되기 때문에 scalability 측면에서 유리함

⇒ “Celery Executor” 사용

한편, 어떤 Executor를 사용하든 Scheduler로부터 받는 task에 따라서 수행하게 됨!!

Scheduler는 DAG로부터 task에 대한 모든 정보를 받고, 그에 따라서 task queue를 생성

⇒ 반드시 Queuing System을 사용!!!

  • Local Executor은 내부적으로 queuing system이 존재하지만, multi node인 경우에는 외부 system이 필요함 (RabbitMQ, Redis)

Life Cycle of a Task

DAG가 airflow에 주어졌을 때, task의 lifecycle은 어떻게 이루어지는가???!?!

  • Scheduler는 Metadata에서 실행되어야할 DAG들에 대해 계속 monitoring을 진행
  • 실행되어야할 DAG를 찾으면, DAG Run 진행 → DAG의 상태를 “running”으로 update
  • DAG의 내용에 따라, 어떤 task를 먼저 진행해야할지 선택하고 Queue에 입력 (Task에 대한 정보를 담은 Message) → Task의 상태를 “queued”으로 update
  • Queue에 따라서 worker들이 task를 pull! → Task의 상태를 “running”으로 update
  • Task가 완료되면 → Task의 상태를 “Success/failed”으로 update
  • …..DAG의 모든 Task에 대해 반복….
  • DAG의 상태를 “Success/failed”으로 update

3. Installation

<Docker를 사용한 설치 완료>

4. Understanding Directories in Airflow Projects

Compose file

딱히 어려운건 없음!~~ : 필요한 부분들에 대한 docker image,,,,

<DAG File 예시>

결국 apach airflow를 쓴다는건 이 DAG문서를 작성하는 과정임! (자세한건 뒤에서)

5. Airflow’s UI Tour

  • Toggle : on/off : DAG이 활성화 되어있는지 여부
  • DAG ID (DAG의 Task)
  • Schedule : DAG가 실행될 interval time
    • 들어가보면 이런거 있음 (running, success, failed,,,,)
  • Recent Task
    • 최근에 run된 DAG의 task들의 status
  • Last Run
  • Dag Runs
    • 최근에 run된 DAG 자체의 status

Tutorial DAG File 실행해보기

  1. 시작 날짜가 2015년으로 되어있어서 수정해야함
    • catchup=False 설정하기 : 나중에 설명해줌
    • Start date 바꾸기 ⇒ Refresh

DAG 가 실행되는 시점은, 가장 최근 run date + schedule interval time이 지난 직후!

Graph View는 한번의 DAG Run에 대한 log만 볼 수 있다면,

Tree View에서는 여러개의 DAG Run들에 대한 log를 동시에 볼 수 있

Graph

Tree

Gantt

Task Duration : 각 DAG시도에 따른 Task별로 소요 시간

Task Tries : Task가 fail해서 재시도한 경우를 확인할 수 있음 (Untried인 경우에도 포함됨)

DAG File의 구성

우선, DAG definition file은 실제로 데이터가 어떻게 이동되고, 처리되는지에 대한 pipeline이 아님!!

⇒ DAG definition file은 data pipeline들에 대한 setting을 정리한 코드임 (DAG Structure)

  • 언제 시작할건지, 언제 종료할 것인지, 재시도는 언제 몇번 할 것인지, task dependencies,,,

1. 필요한 Library import

2. 공통변수 & DAG 속성값 정의

  • depends_on_past : 현재 실행되고 있는 DAG가 과거에 성공했던 DAG에 depend할 것인지 여
  • retries : 실패했을 경우 몇번 재시도 할 것인지
  • retry_delay : retry할 때 얼마나 뒤에 retry할 것인지

3. DAG 정의(속성값 전달)

  • schedule_interval : DAG가 실행될 time interval (1 day)

4. Task 정의

  • 어떤 작업을 할 것인지에 따라서 Operator가 달라짐 (여기서는 BashOperator)
    • PythonOperator, EmailOperator 등
  • task_id(UI에 보여지는 이름) - bash_command(실제 수행되는 command) - dag(Dag정의)
    • 다른 변수들을 override 가
  • jinja template???
  • Jinja Template (Flask 개발에 많이 사용되는 템플릿 엔진)
    • 웹 템플릿 엔진 : 웹페이지에 연산 같은 작업을 수행시켜 웹페이지를 편하게 완성시키는 기능 ⇒ html 코드 내에서 파이썬 코드를 작성하고 연산이 가능해짐
      • 간단한 문법으로 html에서 {}, {{}} 등의 규칙을 이용하여 python 프로그래밍이 가능
      • 데이터를 담은 변수를 return값으로 html과 함께 넘겨준 뒤, html에서 jinja 템플릿 규칙에 따라 화면에 표현할 수 있음
      • Jinja 템플릿 html에서 주석은 {# 주석으로 처리할 내용 #}으로 표시

5. Dependencies 정의(순서)

  • Upstream & Downstream있음
  • t2.set_upstream(t1) : t1이 완료된 후에 t2 실행
  • downstream은 반대 의미~~
  • 혹은 >> << 사용해서 표현하기도

⇒ 다시 UI로 돌아가서 Log를 살펴보면 좀 더 잘 이해할 수 있다

6. Operators

Operator는 workflow상에서 각 task를 정의/설명해주는 부분이고, 실제로 task를 수행하지는 않음

  • Operator는 주로 atomic하다 (독립적으로 존재하고 다른 operator과 resource공유 안함)
  • Operator는 idempotent하다 ⇒ 매번 같은 결과를 가지고 와야함
  • Oprator가 instance화 되면 task로 간주된다
  • 다양한 Operator는 Airflow의 BaseOperator Class에서 파생됨
  • 만약 operator가 서로 정보를 공유해야하는 경우, XCom을 사용
  1. Sensor Operator
  • 특정 조건이 만족될 때까지 반복적으로 수행

Ex) 특정 파일이 입력될때까지 수행해서 입력되면 반응 (HdfsSensor, FileSensor,,,)

  1. Transfer Operator
  • Data를 한 곳에서 다른 곳으로 transfer 해줌

Ex) MySqlToHiveTransfer, S3ToReshiftTransfer,,,

  1. Action Operators (Operators)
  • 실제로 다양한 action을 trigger하는 operator

Ex) BashOperator, PythonOperator, BigQueryOperator,,,,

  1. TaskFlow Operator
  • Airflow 2.0.0 신규 기능
  • @task 데코레이터를 사용해서 task를 패키지화 할 수 있는 기능
  • 간단한 task들을 파이썬으로 직접 작성할 수 있음

8. Airflow CLI

  1. Airflow 실행
  2. docker exec -it xxxx bash
  3. Directory 내 DAG 확인
airflow list_dags
  1. Meatadata DB 초기화
airflow initdb
  1. UI를 구성하는 webserver instance 생성
  2. airflow webserver
  3. Scheduler Instance 실행
airflow scheduler
  1. Airflow의 모든 connection 종류
airflow connections
  1. 특정 DAG의 task를 확인
  2. airflow list_tasks XX
  3. 주어진 DAG id의 dag run 확인
  4. airflow list_dag_runs XXX
  5. 특정 DAG를 실행
airflow trigger_dag XXX
  1. 실행중인 DAG 중단/재생
  2. airflow pause/unpause XXX
  3. Dependency, data 저장과 관계 없이 특정 task를 실험해보고 싶을 때
airflow test
  1. 다음 execution 시점
airflow delete_dag XXX

9. Executor

Executor ⇒ Scalabe 함!!! (확장 가능하다~!!)

Airflow의 architecture를 다시 생각해보면, scheduler가 meta data의 DAG, Task를 보고, 이를 통해 queue를 생성

이를 바탕으로 Executor가 실제로 어떤 resource를 가지고, 어떤 worker를

Sequential Executor

  • Airflow에서 사용하는 default Executor
  • 여러개의 task를 병렬적으로 처리하지 못하고, 한번에 한개씩의 task만 수행
  • 실시간 사용보다는 test할 때에 주로 사용됨
  • Sequential Executor는 자동적으로 task를 진행해주지 않아서, CLI에서 직접 trigger 해줘야함!! ⇒ 직접하면 compose file의 영향을 받지 않게됨,,??
  • 우리는 docker를 통해서 airflow를 사용하기 때문에, 원래는 sequential이 기본이지만 compose file에는 local로 되어있어서 Local Executor로 진행

Local Executor

  • 동시에 여러개 task 수행 가능 (locally in parallel)
  • 한개의 local worker!!
  • MySQL & Postgres 사용
  • 저렴하고 resource 소모량이 적음 (제한된 상황에서 사용good), 그만큼 local이라는 한계가 있음

cfg file의 설정

  • parallelism : 모든 DAG들에 대해 동시에 진행할 수 있는 최대 task 개수
  • dag_concurrency : 각 DAG 내에서 동시에 진행할 수 있는 최대 task 개

Celery Executor

  • scale up을 위한 executor ⇒ 여러 개의 worker 사용 가능 (multi-node)
  • RabbitMQ, Redis등을 사용해 여러 개의 task사이의 communication을 하는 비동기적 task queue system임
  • scheduler과 worker가 별도로 작용됨
  • flower : Celery Worker, Task 진행 상황 등을 monitoring하기 위
  • scheduler & worker을 독립적으로 작성

추가(오늘의 집 Tech Blog)

버킷플레이스 Airflow 도입기 - 오늘의집 블로그

쏘카 데이터 그룹 - Airflow와 함께한 데이터 환경 구축기(feat. Airflow on Kubernetes)

전사 구성원들이 사용하는 배치 데이터 플랫폼 만들기 - Airflow Advanced

쏘카에서 Airflow를 정리한건 Airflow 자체보다는 자기네들이 git하고 google cloud platform에서 어떻게 아키텍쳐를 구성하고 활용하는지에 대한 내용 (디테일한 내용들)

오늘의 집에서는 원래 젠킨슨을 썼다고 하더라,, 근데

  • 모니터링이 잘 안되고
  • 확장성도 별로 안좋고
  • 복잡한 파이프라인을 짜기 어렵다 하드라

1. Executor 고민

오늘의 집 같은 기업에서는 당연히 Local이나 Sequential을 사용할리가 없고!!

Celery Executor하고 Kubernetes Executor 사이에서 고민을 했다고 함

1-1. Celery Executor

  • Celery Executor는 Task를 메시지 브로커에 전달하고, Celery Worker가 Task를 가져가서 실행하는 방식
  • Worker 수를 스케일아웃 할 수 있다는 장점이 있지만, 메시지 브로커를 따로 관리해야하고 워커 프로세스에 대한 모니터링도 필요하다는 단점
  • 메시지 브로커는 task 가 적재되는 task queue를 의미한다 (Redis,,,)
  • Kubernetes를 이용한 효율적인 데이터 엔지니어링(Airflow on Kubernetes VS Airflow Kubernetes Executor) - 1
  • Local Executor와 Celery Executor의 자세한 설명과 코드를 보고 싶다면 아래 링크를 참고하면 된다

1-2. Kubernetes Executor

  • Task를 스케줄러가 실행가능 상태로 변경하면 메시지 브로커에 전달하는게 아니라 Kubernetes API를 사용하여 Airflow worker를 pod 형태로 실행하는 방식
  • Worker에 대한 유지 보수가 필요없고, Kubernetes를 활용하여 지속적으로 자원을 점유하지 않기 때문에 효율적으로 자원을 사용할 수 있음
  • 짧은 Task에도 pod을 생성하는 overhead가 있으며, celery executor에 비해 자료가 적고 구성이 복잡하다는 단점

Pod가 무엇인가?

Kubernetes 구조

  • 클러스터 전체를 관리하는 master + 컨테이너가 배포되는 Node(Worker Node)
  • Master에서 적절한 Worker Node에 컨테이너를 할당하고 운영하는 구조

Kubernetes의 가장 기본적인 배포 단위로 kubernetes는 컨테이너(docker)를 하나씩 배포하는게 아니라, Pod안에 컨테이너를 탑재한 상태로 배포한다 (1개 이상의 컨테이너 탑재 가능)

  • 장점1. Pod를 사용하면 통신 용이성이 향상되고(컨테이너끼리 local host로 호출 가능해짐)
  • 장점 2. Pod 내의 컨테이너끼리는 디스크 볼륨을 공유할 수 있음

KubernetesPodOperator

  • 쿠버네티스 환경에서 Airflow Worker가 지정한 이미지로 새로운 pod을 생성하여 Task를 실행
  • Airflow Worker ⇒ Kubernetes Pod 생성 이 순서이므로, Kubernetes API를 모두 사용하여 Pod 실행을 조절할 수 있음
  • 해당 워커 POD에서 개발자가 직접 정의한 컨테이너 이미지를 POD 형태로 또다시 실행하므로, Airflow 환경에서도 다양한 서비스에 접근할 수 있는 자유로운 환경 생

한편, 이런식으로 필요할때만 Kubernetes를 사용하는, KubernetesExecutor & KubernetesPodOperator를 통해 사용하는 방식이 있는가 하면,

그냥 kubernets 환경 위에 airflow를 구축하는 방식도 있다.

  • 구성이 간단하고 템플릿화하기 쉽다는 장점이 있지만,
  • 구성이 Pod 형태로 바뀌었을 뿐, 자원을 계속 점유해야하거나, 규모가 커질 경우 유지보수가 어려워진다는 단점은 그대로 있다

그 내용은 아래 링크에 자세히 나와있는데, 모르는 내용이 많아서 제대로 정리하지 못했다..

Kubernetes를 이용한 효율적인 데이터 엔지니어링(Airflow on Kubernetes VS Airflow Kubernetes Executor) - 2

2. DAG 작성

2-1. 시간

start_date & schedule_interval & execution_date

  • Airflow에서 Start Date는 실제로 DAG가 실행되기 시작하는 날짜가 아니라, schedule이 시작되는 날짜
  • 따라서, 1/1부터 하루에 한번씩 스케줄이 등록되므로, 1/2부터 execution_date가 1/1인 DAG가 실행된다
  • execution_date가 1/1인 task는 1/1의 데이터를 가지고 Task를 수행해야하기 때문에 실제 실행되는 것은 1/2부터라고 이해할 수 있

2-2. Backfill & Catchup

  • Start_date를 과거로 설정해놓으면, 과거의 Task를 차례로 실행하는 Backfill이 실행된다
  • 근데 이를 원치 않을 경우 Catchup=False로 하면 Backfill을 실행하지 않음!

2-4. DRT(Dont Repeat Yourself) DAG 작성

별건 아니고, 작성한 Task들이 반복적인 경우가 있을 경우, 여러번 작성하지 말고 for 문을 사용해서 Task 생성할 수 있음

Apache Airflow란??

⇒ Workflow를 author, monitor, schedule하는 플랫폼

  • dependencies 고려
  • DAG(Directed Acyclic Graphs)의 형태의 코드로 작성
  • dynamic, manageable해짐
  • 파이썬 기반!
  • UI가 편하고 코드 치기 편

Airflow이전의 기존의 scheduling방법들??

ETL이란(data pipe line의 기본적인 구조) ⇒ Extract → Transform → Load

Cron : unix 계열 운영 체제에서 특정 시간에 명령을 실행하는 데 사용할 수 있는 유틸리티, crontab에 의해 구현

Cronjob : 특정 시간에 파일 백업, 보고서 생성 또는 데이터베이스 업데이트와 같은 작업을 실행하는 데 사용 ⇒ 정기적으로 job을 만드는 것

Crontab : 주어진 일정에 주기적으로 실행하도록 규정해놓은 crontab (cron table) 파일

  • 스크립트들이 많아지고 서로에 대한 의존성이 생기게 되면 컨트롤하기 어렵고, 기존 작업이 실패했을 때 다시 스크립트를 실행하려면 로그를 확인하고 실행해야 하는 등의 문제점
  • 어디서 잘못되었는지 확인하기도 어렵고, 의존성이 있는 스크립트가 잘못되었는데 그 다음 스크립트가 실행이 되버리는 등의 문제점

Airflow는 이걸 어떻게 해결하는가?

Airflow는 DAG 단위로 구성되어있음 (DAG : Directed Acyclic Graph : 방향이 있는 순환되지 않는 그래프) ⇒ workflow를 작은 task들로 구분할 수 있음

<장점>

  • Dynamic : Python기반이기 때문에 자유롭게 수정하고 변경하는 등 control을 가지고 있
  • Extensible : Airflow는 작업 관리를 위해 필요한 연산자들을 다양하게 보유하지만, 자신의 환경에 맞는 자기만의 연산자를 정의할 수 있음
  • Scalable : Module형태 & Queuting System으로 인해, 여러개의 TASK를 진행할 수 있음
  • Configurable : 자유롭게 구성 가능함 DAG뿐만 아니라, 핵심 setting들도 변경할 수 있음
  • Monitoring이 쉬운 UI & 중앙 관리(track쉬움) & task사이의 dependency handling이 쉬움 & 과거 데이터를 쉽게 재생산 가능 & 오픈 소스라 좋음~

의존성을 표현할 수 있고, 스크립트가 실패했을 때 알람을 보내 확인하고 쉽게 수정 & 재시도할 수 있고, 이전 날짜 작업이 실패했을 때 그 날짜만 다시 실행하는 등 위의 문제점을 많이 해결

기본 용어 정리

DAG

  • Loop가 존재하지 않은 단방향 graph
  • nodetask를 의미하고, edge관련성을 의미함!

어떤 작업들이 병렬적으로 이루어질 수 있는지, 어떤 작업들이 순차적으로 이루어져야하는지를 파악한 뒤에 이에 따라 task를 진행

Operator (Task 정의)

  • DAG의 결과로 실제 어떤 것을 해야하는지를 결정
  • DAG는 어떻게 task를 수행할 것인지에 대한 것이고, 실제 연산을 하지 않기 때문에 실제 script를 수행하는 것은 operator
  • python, aws, slack, bash 등에서의 동작 실행 가능

Task

  • Task는 Operator가 실제로 instance화 된 결과
  • 실제로 worker가 work를 하는 대상

Workflow

  • 의존성(dependencies)로 연결된 task들의 집합
  • 사실상 DAG

2. Architecture of Apach Airflow

Architecture of Airflow

METADATA

  • 모든 DAG 실행 결과, task instance, 성공, 실패, 수행중 여부,,,,
  • DAB, task 등에 대한 모든 현재, 과거 정보를 저장함
  • SQL 형태

Web Server/UI

  • Frontend / HTTP 유지
  • 사용자에게 필요한 다양한 데이터를 예쁘게 보여주는 곳~~ (graph, number,,,)

SCHEDULER

  • 핵심 부분, task를 배치하고 실행하는 부분
  • DAG에 있는 configuration 분석(task간의 순서 등) ⇒ 필요한 task들을 수행
  • Executor에게 task제공해주는 역할

Executor

  • Scheduler의 역할은 각 task들이 올바른 시점에 시작할 수 있도록 하는 장치
  • 실제로 task 를 시작하고 실행하는 곳은 Executor
  • Schedulere와 밀접하게 작동해서 Message Queiuing Process, 예정된 작업을 실행할 worker를 정
  • Sequential & Local & Celery & Mesos,.,,등의 executor을 사용 가능

Worker

  • 실세로 task를 실행하는 주체
  • Executor의 종류에 따라 다양하게 동작함

DAG

  • 데이터 엔지니어가 pythoh으로 작성한 workflow
  • task들 사이의 dependency를 정의해줌

어떤 Cluster형태를 사용하는지에 따라서 다른 Executor를 사용할 수 있음

Single Node Cluster

모든 components들이 Master Node라는 한개의 Node에 위치하고 있음

이 경우, 외부 resource를 사용할 수 없기 때문에 Master Node의 resource에 의존함

⇒ Local Executor 사용

Multi Node Cluster

DAG의 개수가 많아지고, big data를 다루게 되면, Multi Node Cluster를 사용

components 자체는 동일하지만, Scheduler & Webserver만 Master Node에 존재하고, 나머지는 worder들은 별개의 instance로 존재함

새로운 worker node를 추가하면 되기 때문에 scalability 측면에서 유리함

⇒ “Celery Executor” 사용

한편, 어떤 Executor를 사용하든 Scheduler로부터 받는 task에 따라서 수행하게 됨!!

Scheduler는 DAG로부터 task에 대한 모든 정보를 받고, 그에 따라서 task queue를 생성

⇒ 반드시 Queuing System을 사용!!!

  • Local Executor은 내부적으로 queuing system이 존재하지만, multi node인 경우에는 외부 system이 필요함 (RabbitMQ, Redis)

Life Cycle of a Task

DAG가 airflow에 주어졌을 때, task의 lifecycle은 어떻게 이루어지는가???!?!

  • Scheduler는 Metadata에서 실행되어야할 DAG들에 대해 계속 monitoring을 진행
  • 실행되어야할 DAG를 찾으면, DAG Run 진행 → DAG의 상태를 “running”으로 update
  • DAG의 내용에 따라, 어떤 task를 먼저 진행해야할지 선택하고 Queue에 입력 (Task에 대한 정보를 담은 Message) → Task의 상태를 “queued”으로 update
  • Queue에 따라서 worker들이 task를 pull! → Task의 상태를 “running”으로 update
  • Task가 완료되면 → Task의 상태를 “Success/failed”으로 update
  • …..DAG의 모든 Task에 대해 반복….
  • DAG의 상태를 “Success/failed”으로 update

3. Installation

<Docker를 사용한 설치 완료>

4. Understanding Directories in Airflow Projects

Compose file

딱히 어려운건 없음!~~ : 필요한 부분들에 대한 docker image,,,,

<DAG File 예시>

결국 apach airflow를 쓴다는건 이 DAG문서를 작성하는 과정임! (자세한건 뒤에서)

5. Airflow’s UI Tour

  • Toggle : on/off : DAG이 활성화 되어있는지 여부
  • DAG ID (DAG의 Task)
  • Schedule : DAG가 실행될 interval time
    • 들어가보면 이런거 있음 (running, success, failed,,,,)
  • Recent Task
    • 최근에 run된 DAG의 task들의 status
  • Last Run
  • Dag Runs
    • 최근에 run된 DAG 자체의 status

Tutorial DAG File 실행해보기

  1. 시작 날짜가 2015년으로 되어있어서 수정해야함
    • catchup=False 설정하기 : 나중에 설명해줌
    • Start date 바꾸기 ⇒ Refresh

DAG 가 실행되는 시점은, 가장 최근 run date + schedule interval time이 지난 직후!

Graph View는 한번의 DAG Run에 대한 log만 볼 수 있다면,

Tree View에서는 여러개의 DAG Run들에 대한 log를 동시에 볼 수 있

Graph

Tree

Gantt

Task Duration : 각 DAG시도에 따른 Task별로 소요 시간

Task Tries : Task가 fail해서 재시도한 경우를 확인할 수 있음 (Untried인 경우에도 포함됨)

DAG File의 구성

우선, DAG definition file은 실제로 데이터가 어떻게 이동되고, 처리되는지에 대한 pipeline이 아님!!

⇒ DAG definition file은 data pipeline들에 대한 setting을 정리한 코드임 (DAG Structure)

  • 언제 시작할건지, 언제 종료할 것인지, 재시도는 언제 몇번 할 것인지, task dependencies,,,

1. 필요한 Library import

2. 공통변수 & DAG 속성값 정의

  • depends_on_past : 현재 실행되고 있는 DAG가 과거에 성공했던 DAG에 depend할 것인지 여
  • retries : 실패했을 경우 몇번 재시도 할 것인지
  • retry_delay : retry할 때 얼마나 뒤에 retry할 것인지

3. DAG 정의(속성값 전달)

  • schedule_interval : DAG가 실행될 time interval (1 day)

4. Task 정의

  • 어떤 작업을 할 것인지에 따라서 Operator가 달라짐 (여기서는 BashOperator)
    • PythonOperator, EmailOperator 등
  • task_id(UI에 보여지는 이름) - bash_command(실제 수행되는 command) - dag(Dag정의)
    • 다른 변수들을 override 가
  • jinja template???
  • Jinja Template (Flask 개발에 많이 사용되는 템플릿 엔진)
    • 웹 템플릿 엔진 : 웹페이지에 연산 같은 작업을 수행시켜 웹페이지를 편하게 완성시키는 기능 ⇒ html 코드 내에서 파이썬 코드를 작성하고 연산이 가능해짐
      • 간단한 문법으로 html에서 {}, {{}} 등의 규칙을 이용하여 python 프로그래밍이 가능
      • 데이터를 담은 변수를 return값으로 html과 함께 넘겨준 뒤, html에서 jinja 템플릿 규칙에 따라 화면에 표현할 수 있음
      • Jinja 템플릿 html에서 주석은 {# 주석으로 처리할 내용 #}으로 표시

5. Dependencies 정의(순서)

  • Upstream & Downstream있음
  • t2.set_upstream(t1) : t1이 완료된 후에 t2 실행
  • downstream은 반대 의미~~
  • 혹은 >> << 사용해서 표현하기도

⇒ 다시 UI로 돌아가서 Log를 살펴보면 좀 더 잘 이해할 수 있다

6. Operators

Operator는 workflow상에서 각 task를 정의/설명해주는 부분이고, 실제로 task를 수행하지는 않음

  • Operator는 주로 atomic하다 (독립적으로 존재하고 다른 operator과 resource공유 안함)
  • Operator는 idempotent하다 ⇒ 매번 같은 결과를 가지고 와야함
  • Oprator가 instance화 되면 task로 간주된다
  • 다양한 Operator는 Airflow의 BaseOperator Class에서 파생됨
  • 만약 operator가 서로 정보를 공유해야하는 경우, XCom을 사용
  1. Sensor Operator
  • 특정 조건이 만족될 때까지 반복적으로 수행

Ex) 특정 파일이 입력될때까지 수행해서 입력되면 반응 (HdfsSensor, FileSensor,,,)

  1. Transfer Operator
  • Data를 한 곳에서 다른 곳으로 transfer 해줌

Ex) MySqlToHiveTransfer, S3ToReshiftTransfer,,,

  1. Action Operators (Operators)
  • 실제로 다양한 action을 trigger하는 operator

Ex) BashOperator, PythonOperator, BigQueryOperator,,,,

  1. TaskFlow Operator
  • Airflow 2.0.0 신규 기능
  • @task 데코레이터를 사용해서 task를 패키지화 할 수 있는 기능
  • 간단한 task들을 파이썬으로 직접 작성할 수 있음

8. Airflow CLI

  1. Airflow 실행
  2. docker exec -it xxxx bash
  3. Directory 내 DAG 확인
airflow list_dags
  1. Meatadata DB 초기화
airflow initdb
  1. UI를 구성하는 webserver instance 생성
  2. airflow webserver
  3. Scheduler Instance 실행
airflow scheduler
  1. Airflow의 모든 connection 종류
airflow connections
  1. 특정 DAG의 task를 확인
  2. airflow list_tasks XX
  3. 주어진 DAG id의 dag run 확인
  4. airflow list_dag_runs XXX
  5. 특정 DAG를 실행
airflow trigger_dag XXX
  1. 실행중인 DAG 중단/재생
  2. airflow pause/unpause XXX
  3. Dependency, data 저장과 관계 없이 특정 task를 실험해보고 싶을 때
airflow test
  1. 다음 execution 시점
airflow delete_dag XXX

9. Executor

Executor ⇒ Scalabe 함!!! (확장 가능하다~!!)

Airflow의 architecture를 다시 생각해보면, scheduler가 meta data의 DAG, Task를 보고, 이를 통해 queue를 생성

이를 바탕으로 Executor가 실제로 어떤 resource를 가지고, 어떤 worker를

Sequential Executor

  • Airflow에서 사용하는 default Executor
  • 여러개의 task를 병렬적으로 처리하지 못하고, 한번에 한개씩의 task만 수행
  • 실시간 사용보다는 test할 때에 주로 사용됨
  • Sequential Executor는 자동적으로 task를 진행해주지 않아서, CLI에서 직접 trigger 해줘야함!! ⇒ 직접하면 compose file의 영향을 받지 않게됨,,??
  • 우리는 docker를 통해서 airflow를 사용하기 때문에, 원래는 sequential이 기본이지만 compose file에는 local로 되어있어서 Local Executor로 진행

Local Executor

  • 동시에 여러개 task 수행 가능 (locally in parallel)
  • 한개의 local worker!!
  • MySQL & Postgres 사용
  • 저렴하고 resource 소모량이 적음 (제한된 상황에서 사용good), 그만큼 local이라는 한계가 있음

cfg file의 설정

  • parallelism : 모든 DAG들에 대해 동시에 진행할 수 있는 최대 task 개수
  • dag_concurrency : 각 DAG 내에서 동시에 진행할 수 있는 최대 task 개

Celery Executor

  • scale up을 위한 executor ⇒ 여러 개의 worker 사용 가능 (multi-node)
  • RabbitMQ, Redis등을 사용해 여러 개의 task사이의 communication을 하는 비동기적 task queue system임
  • scheduler과 worker가 별도로 작용됨
  • flower : Celery Worker, Task 진행 상황 등을 monitoring하기 위
  • scheduler & worker을 독립적으로 작성

추가(오늘의 집 Tech Blog)

버킷플레이스 Airflow 도입기 - 오늘의집 블로그

쏘카 데이터 그룹 - Airflow와 함께한 데이터 환경 구축기(feat. Airflow on Kubernetes)

전사 구성원들이 사용하는 배치 데이터 플랫폼 만들기 - Airflow Advanced

쏘카에서 Airflow를 정리한건 Airflow 자체보다는 자기네들이 git하고 google cloud platform에서 어떻게 아키텍쳐를 구성하고 활용하는지에 대한 내용 (디테일한 내용들)

오늘의 집에서는 원래 젠킨슨을 썼다고 하더라,, 근데

  • 모니터링이 잘 안되고
  • 확장성도 별로 안좋고
  • 복잡한 파이프라인을 짜기 어렵다 하드라

1. Executor 고민

오늘의 집 같은 기업에서는 당연히 Local이나 Sequential을 사용할리가 없고!!

Celery Executor하고 Kubernetes Executor 사이에서 고민을 했다고 함

1-1. Celery Executor

  • Celery Executor는 Task를 메시지 브로커에 전달하고, Celery Worker가 Task를 가져가서 실행하는 방식
  • Worker 수를 스케일아웃 할 수 있다는 장점이 있지만, 메시지 브로커를 따로 관리해야하고 워커 프로세스에 대한 모니터링도 필요하다는 단점
  • 메시지 브로커는 task 가 적재되는 task queue를 의미한다 (Redis,,,)
  • Kubernetes를 이용한 효율적인 데이터 엔지니어링(Airflow on Kubernetes VS Airflow Kubernetes Executor) - 1
  • Local Executor와 Celery Executor의 자세한 설명과 코드를 보고 싶다면 아래 링크를 참고하면 된다

1-2. Kubernetes Executor

  • Task를 스케줄러가 실행가능 상태로 변경하면 메시지 브로커에 전달하는게 아니라 Kubernetes API를 사용하여 Airflow worker를 pod 형태로 실행하는 방식
  • Worker에 대한 유지 보수가 필요없고, Kubernetes를 활용하여 지속적으로 자원을 점유하지 않기 때문에 효율적으로 자원을 사용할 수 있음
  • 짧은 Task에도 pod을 생성하는 overhead가 있으며, celery executor에 비해 자료가 적고 구성이 복잡하다는 단점

Pod가 무엇인가?

Kubernetes 구조

  • 클러스터 전체를 관리하는 master + 컨테이너가 배포되는 Node(Worker Node)
  • Master에서 적절

관련글 더보기

댓글 영역