상세 컨텐츠

본문 제목

[16차시 / 15기 김제성] Apache Airflow-2

본문

https://www.udemy.com/course/apache-airflow/

 

Executors

scalable 특성 - 무한대로 확장가능

 

✅Sequential

디폴트 executor

병렬적인 task는 처리 불가능 → test 용도로만 사용

SQLite를 사용하기 때문에 별도의 DB 설치 필요 없음 (airflow 설치할때 자동으로 )

( 이전 예제들에선 병렬 task가 가능했는데 Localexecutor.yml에서 executor 정의를 local로 해주었기 때문)

 

✅Local

병렬적으로 task 실행 가능

multiprocessing 가능한 python library 사용, MySQL Postgres DB로 사용

task들에 dependent하다

✅Celery

multi node cluster

multiple worker node를 활용해 task load를 분배 가능

asynchronous task queue system ~ 여러 task 들간의 communication 진행

 

 


✔️XComs (Cross Communication)

exchange messages/data between tasks

key / value / timestamp 로 구성

xcom_push() (전송) xcoms_pull() (송신)

ex)

t1(파일 읽기)에서 몇개의 ID를 t2(table에서 값을 select)에게 전달

t1: 메시지 생성 - xcom으로 push

t2: 송신

t3: 새로운메시지 생성

def push_function(**kwargs):
    message='This is the pushed message.'
    ti = kwargs['ti']
    ti.xcom_push(key="message", value=message)

def pull_function(**kwargs):
    ti = kwargs['ti']
    pulled_message = ti.xcom_pull(key='message', task_ids='new_push_task')
    print("Pulled Message: '%s'" % pulled_message)

def new_push_function(**kwargs):
    message='This is the NEW pushed message.'
    ti = kwargs['ti']
    ti.xcom_push(key="message", value=message)

t1 = PythonOperator(
    task_id='push_task',
    python_callable=push_function,
    provide_context=True,
    dag=DAG)

t2 = PythonOperator(
    task_id='pull_task',
    python_callable=pull_function,
    provide_context=True,
    dag=DAG)

t3 = PythonOperator(
    task_id='new_push_task',
    python_callable=new_push_function,
    provide_context=True,
    dag=DAG)

t1 >> t3 >> t2

✔️Variables

파이썬 변수 → 하나의 dag에서만 적용

airflow 변수 → 여러 dag에서 공통으로 적용 (file path, storage path..)

admin > variable에 들어가서 key-vaule pair 형태의 source path 를 가진 변수를 만들어주고

dag 파이썬 파일에서 별도의 파이썬 변수가 아닌 airflow 변수를 활용할 수 있도록 설정해준다

 

✔️Dummy Operator

→ doing nothing

첫번째 리스트의 task들을 두번째 리스트의 task로 옮겨가게 control 해줌

upstream, downstream dependency 확인해주는 역할도 함

 

✔️Hooks

외부 데이터베이스나 플랫폼과 연결할 수 있게 해줌

hooks를 활용해 postgres table을 또다른 postgres table로 복사하는 예제

airflow=# create table source_city_table(city_name varchar (50), city_code varchar (20));
CREATE TABLE
airflow=# insert into source_city_table (city_name, city_code) values('New York', 'ny'), ('Los Angeles', 'la'), ('Chicago', 'cg'), ('Houston', 'ht');
INSERT 0 4
airflow=# select * from source_city_table;
  city_name  | city_code
-------------+-----------
 New York    | ny
 Los Angeles | la
 Chicago     | cg
 Houston     | ht
(4 rows)

source table을 만들어주고

def transfer_function(ds, **kwargs):

    query = "SELECT * FROM source_city_table"

    #source hook
    source_hook = PostgresHook(postgres_conn_id='postgres_conn', schema='airflow')
    source_conn = source_hook.get_conn()

    #destination hook
    destination_hook = PostgresHook(postgres_conn_id='postgres_conn', schema='airflow')
    destination_conn = destination_hook.get_conn()

    source_cursor = source_conn.cursor()
    destination_cursor = destination_conn.cursor()

    source_cursor.execute(query)
    records = source_cursor.fetchall()

    if records:
        execute_values(destination_cursor, "INSERT INTO target_city_table VALUES %s", records)
        destination_conn.commit()

    source_cursor.close()
    destination_cursor.close()
    source_conn.close()
    destination_conn.close()
    print("Data transferred successfully!")

t1 = PythonOperator(task_id='transfer', python_callable=transfer_function, provide_context=True, dag=dag)

PostgresHook 객체를 이용해 source table의 값을 target table로 복사하는 query를 실행한다

airflow=# select * from target_city_table;
  city_name  | city_code
-------------+-----------
 New York    | ny
 Los Angeles | la
 Chicago     | cg
 Houston     | ht
(4 rows)

✔️Pools

worker의 개수를 임의로 분배하여 여러가지 Dag를 더욱 병렬적으로 처리할 수 있게

ex. 10개의 task 지닌 dag가 5개 있고 worker가 10개 있다고 하면 worker 10개가 하나의 dag에 다 붙는게 아니라 분배되어서 작동한다

task가 할당된 worker 개수에 의해서만 수행될 수 있도록 함

✔️Backfilling & Catchup

이전 시간대로 DAG를 돌려보아서 놓친 dag가 있나 확인하기 위해서

⇒ scheduler는 DAG를 (start data + schedule interval) 지난 직후에 실행시키도록 설정됨

 

start date  2019-12-01
schedule interval @daily
DAG 시작 시점 2019-12-01 23:59:59

start date가 과거이고 catchup = true로 해놓으면(default) 그 시점부터 schedule interval 고려한뒤에 DAG 시작됨

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": datetime(2023, 5, 16),
    "email": ["airflow@airflow.com"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

✔️Branching

case에 따라 task를 여러가지 상황의 수로 분류하는 것

def branch_function(**kwargs):
    ti = kwargs['ti']
    pulled_value = ti.xcom_pull(key='pushed_value', task_ids='push_task')
    if pulled_value %2 == 0:
        return 'even_task'
    else:
        return 'odd_task'


with DAG(dag_id='branching', default_args=args, schedule_interval="@daily") as dag:

    push_task = PythonOperator(task_id='push_task', python_callable=push_function, provide_context=True)

    branch_task = BranchPythonOperator(task_id='branch_task', python_callable=branch_function, provide_context=True)

    even_task = BashOperator(task_id='even_task', bash_command='echo "Got an even value."')

    odd_task = BashOperator(task_id='odd_task', bash_command='echo "Got an odd value."')

    push_task >> branch_task >>[even_task, odd_task]

✔️LatestOnlyOperator

시간을 고려하지 않으면서 스케줄에 의해 실행되는 워크플로우 → backfill 유발, cpu 낭비

가장 최근에 스케줄된 dag만 실행시키고, 현재 시간이 execution time과 다음 스케줄 execution time 사이가 아닌 경우 모든 하위 태스크는 스킵함

https://getchan.github.io/data/airflow_2/

✔️subDAGs

반복되는 패턴을 간결하게 표현하기 위해서 하위 DAG 묶음을 이용

 

관련글 더보기

댓글 영역