https://www.udemy.com/course/apache-airflow/
scalable 특성 - 무한대로 확장가능
디폴트 executor
병렬적인 task는 처리 불가능 → test 용도로만 사용
SQLite를 사용하기 때문에 별도의 DB 설치 필요 없음 (airflow 설치할때 자동으로 )
( 이전 예제들에선 병렬 task가 가능했는데 Localexecutor.yml에서 executor 정의를 local로 해주었기 때문)
병렬적으로 task 실행 가능
multiprocessing 가능한 python library 사용, MySQL Postgres DB로 사용
task들에 dependent하다
multi node cluster
multiple worker node를 활용해 task load를 분배 가능
asynchronous task queue system ~ 여러 task 들간의 communication 진행
exchange messages/data between tasks
key / value / timestamp 로 구성
xcom_push() (전송) xcoms_pull() (송신)
ex)
t1(파일 읽기)에서 몇개의 ID를 t2(table에서 값을 select)에게 전달
t1: 메시지 생성 - xcom으로 push
t2: 송신
t3: 새로운메시지 생성
def push_function(**kwargs):
message='This is the pushed message.'
ti = kwargs['ti']
ti.xcom_push(key="message", value=message)
def pull_function(**kwargs):
ti = kwargs['ti']
pulled_message = ti.xcom_pull(key='message', task_ids='new_push_task')
print("Pulled Message: '%s'" % pulled_message)
def new_push_function(**kwargs):
message='This is the NEW pushed message.'
ti = kwargs['ti']
ti.xcom_push(key="message", value=message)
t1 = PythonOperator(
task_id='push_task',
python_callable=push_function,
provide_context=True,
dag=DAG)
t2 = PythonOperator(
task_id='pull_task',
python_callable=pull_function,
provide_context=True,
dag=DAG)
t3 = PythonOperator(
task_id='new_push_task',
python_callable=new_push_function,
provide_context=True,
dag=DAG)
t1 >> t3 >> t2
파이썬 변수 → 하나의 dag에서만 적용
airflow 변수 → 여러 dag에서 공통으로 적용 (file path, storage path..)
admin > variable에 들어가서 key-vaule pair 형태의 source path 를 가진 변수를 만들어주고
dag 파이썬 파일에서 별도의 파이썬 변수가 아닌 airflow 변수를 활용할 수 있도록 설정해준다
→ doing nothing
첫번째 리스트의 task들을 두번째 리스트의 task로 옮겨가게 control 해줌
upstream, downstream dependency 확인해주는 역할도 함
외부 데이터베이스나 플랫폼과 연결할 수 있게 해줌
hooks를 활용해 postgres table을 또다른 postgres table로 복사하는 예제
airflow=# create table source_city_table(city_name varchar (50), city_code varchar (20));
CREATE TABLE
airflow=# insert into source_city_table (city_name, city_code) values('New York', 'ny'), ('Los Angeles', 'la'), ('Chicago', 'cg'), ('Houston', 'ht');
INSERT 0 4
airflow=# select * from source_city_table;
city_name | city_code
-------------+-----------
New York | ny
Los Angeles | la
Chicago | cg
Houston | ht
(4 rows)
source table을 만들어주고
def transfer_function(ds, **kwargs):
query = "SELECT * FROM source_city_table"
#source hook
source_hook = PostgresHook(postgres_conn_id='postgres_conn', schema='airflow')
source_conn = source_hook.get_conn()
#destination hook
destination_hook = PostgresHook(postgres_conn_id='postgres_conn', schema='airflow')
destination_conn = destination_hook.get_conn()
source_cursor = source_conn.cursor()
destination_cursor = destination_conn.cursor()
source_cursor.execute(query)
records = source_cursor.fetchall()
if records:
execute_values(destination_cursor, "INSERT INTO target_city_table VALUES %s", records)
destination_conn.commit()
source_cursor.close()
destination_cursor.close()
source_conn.close()
destination_conn.close()
print("Data transferred successfully!")
t1 = PythonOperator(task_id='transfer', python_callable=transfer_function, provide_context=True, dag=dag)
PostgresHook 객체를 이용해 source table의 값을 target table로 복사하는 query를 실행한다
airflow=# select * from target_city_table;
city_name | city_code
-------------+-----------
New York | ny
Los Angeles | la
Chicago | cg
Houston | ht
(4 rows)
worker의 개수를 임의로 분배하여 여러가지 Dag를 더욱 병렬적으로 처리할 수 있게
ex. 10개의 task 지닌 dag가 5개 있고 worker가 10개 있다고 하면 worker 10개가 하나의 dag에 다 붙는게 아니라 분배되어서 작동한다
task가 할당된 worker 개수에 의해서만 수행될 수 있도록 함
이전 시간대로 DAG를 돌려보아서 놓친 dag가 있나 확인하기 위해서
⇒ scheduler는 DAG를 (start data + schedule interval) 지난 직후에 실행시키도록 설정됨
start date | 2019-12-01 |
schedule interval | @daily |
DAG 시작 시점 | 2019-12-01 23:59:59 |
start date가 과거이고 catchup = true로 해놓으면(default) 그 시점부터 schedule interval 고려한뒤에 DAG 시작됨
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2023, 5, 16),
"email": ["airflow@airflow.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
case에 따라 task를 여러가지 상황의 수로 분류하는 것
def branch_function(**kwargs):
ti = kwargs['ti']
pulled_value = ti.xcom_pull(key='pushed_value', task_ids='push_task')
if pulled_value %2 == 0:
return 'even_task'
else:
return 'odd_task'
with DAG(dag_id='branching', default_args=args, schedule_interval="@daily") as dag:
push_task = PythonOperator(task_id='push_task', python_callable=push_function, provide_context=True)
branch_task = BranchPythonOperator(task_id='branch_task', python_callable=branch_function, provide_context=True)
even_task = BashOperator(task_id='even_task', bash_command='echo "Got an even value."')
odd_task = BashOperator(task_id='odd_task', bash_command='echo "Got an odd value."')
push_task >> branch_task >>[even_task, odd_task]
시간을 고려하지 않으면서 스케줄에 의해 실행되는 워크플로우 → backfill 유발, cpu 낭비
가장 최근에 스케줄된 dag만 실행시키고, 현재 시간이 execution time과 다음 스케줄 execution time 사이가 아닌 경우 모든 하위 태스크는 스킵함
반복되는 패턴을 간결하게 표현하기 위해서 하위 DAG 묶음을 이용
[15차시 / 15기 최경석] Apache Airflow-1 (0) | 2023.05.28 |
---|---|
[12차시 / 15기 최경석] AWS 1 (0) | 2023.05.28 |
[12차시 / 15기 김제성] AWS 1 (0) | 2023.05.21 |
[10차시/16기 박민규] Git&Github (0) | 2023.05.21 |
[13차시/15기 이병주] AWS 2부 (0) | 2023.05.17 |
댓글 영역