Airflow 기본 구조
Airflow 기본 구조
AirFlow는 Workflow를 정의하고 실행 가능한 플랫폼으로 반복 된 작업을 자동화 하기위해 사용한다.
주로 주기적으로 실행하는 ETL 작업에 주로 사용한다.
AirFlow 기본 구조
- Scheduler : AirFlow의 DAG, Task 들 모니터링 및 실행 순서 관리
- Worker : 스케쥴러의 의해 전달 받은 DAG를 실행, 실제 코드가 실행되는 컨테이너 역활
- database : AirFlow에서 실행할 작업들에 관한 정보들 저장
- WebServer : AirFlow UI
DAG : AirFlow 에서 실행할 작업들 파이프라인 형태로 저장
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
from airflow import DAG default_args = { 'owner' : hj, 'start_date' : datetime(2023,04,29) 'end_date' : datetime(2023,05,29) # error 낫을 때 메일 'email' : [email] # 재시도 횟수 및 딜레이 'retries' : 3 'retry_delay' : timedelta(minutes=3) } test_dag = DAG( id = "test_dag_v1", schedule_interval = "0 * * * *", defailt_args = default_args t1 = PythonPerator(dag=test_dag, task_id="test_task_1", params={})
Scheduler가 Dag directory에서 작업을 가져와서 Worker에서 실행하는 형태
AirFlow 활용 예시
- 데이터가 별로 크지 않다면 Redshift에 바로 저장
- 데이터가 많을 경우 효율적인 방식
- Dag가 Production DB에서 원하는 데이터 읽어와서 csv, json 등과 같은 파일로 만들어서 S3 업로드
- S3에서 저장된 데이터를 Redshift로 한번에 Bulk INSERT
데이터 파이프라인 생성시 고려할점
- Python Code로 작성하기 때문에 항상 버그 주의
- 작은 양의 데이터일 경우 매번 전체 복사해서 테이블 만드는게 편함 → Full Refresh
- BackFill을 위해
catchup=True
설정 잊지말기 - 데이터 파이프라인 입출력 문서화
- Airflow는 dag_dir_list_interval 옵션에 따라서 dags 폴더를 주기적으로 스캔하는데, Dag 폴더안에 있는
.py
파일 중from airflow import DAG
와 같이 임포트 되어있으면 DAG로 인식한다.- 그래서 test 할 때 주의!
Airflow WEB UI에서 디버깅 하는법
- 에러가 있는 task는 빨간색으로 표시되고, 해당 task 클릭하여
Log
확인
This post is licensed under CC BY 4.0 by the author.