Data Engineering/Airflow

[Airflow] DAG 작성하기

sangwonYoon 2023. 7. 25. 14:44

DAG

Airflow에서는 DAG이라는 단위로 스케줄링을 관리한다.

DAG는 Directed Acyclic Graph의 약자로, 순환하지 않고 방향이 존재하는 그래프를 의미한다.

위와 그림과 같은 DAG는 a, b, c, d, e라고 하는 task의 조합으로 이루어져 있다. task는 파이프라인에서 실행되는 단위 작업이다.

DAG 내에서 task는 이전에 수행되어야 하는 task가 모두 완료되면 실행되고, 여러 task를 동시에 실행시킬 수 있다.

 

예를 들어 DAG 구조가 위 그림과 같다면, a task가 완료된 후 b, c task가 동시에 실행되고, b와 c task가 모두 완료되어야 d task가 실행된다.

 

DAG 작성하기

먼저 DAG를 담을 디렉토리를 Airflow 기본 디렉토리 경로에서 생성한다($AIRFLOW_HOME/dags).

mkdir dags

 

위에서 생성한 디렉토리 내에 DAG를 정의하는 파일인 hello_world.py를 생성한다.

# hello_world.py

from datetime import timedelta

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator


def print_world() -> None:
    print("world")


# with 구문으로 DAG 정의를 시작한다.
with DAG(
    dag_id="hello_world", #DAG의 식별자용 아이디
    description="My First DAG", # DAG에 대해 설명
    start_date=days_ago(2), # DAG 정의 기준 2일 전부터 시작
    schedule_interval="0 6 * * *", # 매일 06:00에 실행
    tags=["my_dags"],
) as dag:

    # task 정의
    # bash 커맨드로 echo hello를 실행
    t1 = BashOperator(
        task_id="print_hello",
        bash_command="echo Hello",
        owner="sangwonYoon", # 이 작업의 owner
        retries=3, # 이 task가 실패한 경우, 3번 재시도
        retry_delay=timedelta(minutes=5), # 재시도하는 시간 간격은 5분
    )

    # task를 정의
    # python 함수인 print_world를 실행
    t2 = PythonOperator(
        task_id="print_world",
        python_callable=print_world,
        depends_on_past=True, # 이전 날짜의 동일 task가 실패한 경우 실행되지 않고 대기
        owner="sangwonYoon",
        retries=3,
        retry_delay=timedelta(minutes=5),
    )

    # task 순서 지정
    # t1 실행 후 t2 실행
    t1 >> t2

코드를 조금 더 자세히 살펴보자.

 

# with 구문으로 DAG 정의를 시작한다.
with DAG(
    dag_id="hello_world", #DAG의 식별자용 아이디
    description="My First DAG", # DAG에 대해 설명
    start_date=days_ago(2), # DAG 정의 기준 2일 전부터 시작
    schedule_interval="0 6 * * *", # 매일 06:00에 실행
    tags=["my_dags"],
) as dag:

DAG를 정의하는 부분이다. 언제부터 스케줄링을 시작하고, DAG를 주기적으로 실행할 시간을 지정할 수 있다.

 

    # task 정의
    # bash 커맨드로 echo hello를 실행
    t1 = BashOperator(
        task_id="print_hello",
        bash_command="echo Hello",
        owner="sangwonYoon", # 이 작업의 owner
        retries=3, # 이 task가 실패한 경우, 3번 재시도
        retry_delay=timedelta(minutes=5), # 재시도하는 시간 간격은 5분
    )

    # task를 정의
    # python 함수인 print_world를 실행
    t2 = PythonOperator(
        task_id="print_world",
        python_callable=print_world,
        depends_on_past=True, # 이전 날짜의 동일 task가 실패한 경우 실행되지 않고 대기
        owner="sangwonYoon",
        retries=3,
        retry_delay=timedelta(minutes=5),
    )

DAG 내의 task를 정의하는 부분이다. task를 정의할 때는 Airflow의 Operator 클래스를 사용한다.

 

    # task 정의
    # bash 커맨드로 echo hello를 실행
    t1 = BashOperator(
        task_id="print_hello",
        bash_command="echo Hello",
        owner="sangwonYoon", # 이 작업의 owner
        retries=3, # 이 task가 실패한 경우, 3번 재시도
        retry_delay=timedelta(minutes=5), # 재시도하는 시간 간격은 5분
    )

첫번째 task는 bash 커맨드를 실행하는 task이다.
BashOperator 클래스 bash_command 파라미터에 bash로 실행할 커맨드를 전달하여 실행할 수 있다.

 

    # task를 정의
    # python 함수인 print_world를 실행
    t2 = PythonOperator(
        task_id="print_world",
        python_callable=print_world,
        depends_on_past=True, # 이전 날짜의 동일 task가 실패한 경우 실행되지 않고 대기
        owner="sangwonYoon",
        retries=3,
        retry_delay=timedelta(minutes=5),
    )

두번째 task는 python 함수를 실행하는 task이다.
PythonOperator 클래스 python_callable 파라미터에 실행할 python 함수를 전달하여 실행할 수 있다. (python 함수를 괄호 없이 전달해야 하는 것에 주의)

 

    # task 순서 지정
    # t1 실행 후 t2 실행
    t1 >> t2

DAG 내의 task간 순서를 지정하는 부분이다. 순서는 >>와 같은 형태로 표현한다.

t1을 실행한 뒤, t2와 t3를 실행하고 싶다면 다음과 같이 작성할 수 있다.

t1 >> [t2, t3]

 

airflow 스케줄러 실행

airflow scheduler

 

결과 확인

웹 UI를 확인해보면 새로 생성한 DAG를 확인할 수 있다.

 

DAG를 눌러 상세 페이지에 들어가면 DAG를 끄고 켤 수 있는 토글 버튼이 존재한다.

 

DAG를 켜고 잠시 기다리면 아래와 같이 초록색 도형으로 실행된 결과를 확인할 수 있다.

세로 한 줄이 하나의 실행을 의미한다.

맨 위의 원이 하나의 DAG 실행을 의미하며, 하나의 실행을 DAG Run이라고 부른다.

DAG의 스케줄링 시작 날짜를 2일 전으로 설정해서 두 개의 DAG Run이 생성되었다.

 

원 아래에 위치한 사각형은 하나의 task를 의미한다.

t1과 t2 task가 정상적으로 실행된 것을 확인할 수 있다.

 

특정 DAG Run 재실행

만약 특정 DAG Run의 기록을 지우고, 다시 실행시키고 싶으면 초록색 원을 누른 뒤 Clear를 실행한다.

 

'Data Engineering > Airflow' 카테고리의 다른 글

[Airflow] Airflow 아키텍처  (0) 2023.07.25
[Airflow] Airflow 설치 및 실행하기  (0) 2023.07.25