데이터 노트

[Airflow] 메타데이터베이스를 통한 dag 실행 이력 조회 본문

Data Engineering/Airflow

[Airflow] 메타데이터베이스를 통한 dag 실행 이력 조회

돌돌찐 2024. 8. 15. 16:46

작업 배경

작업 보고를 위해 얼마나 dag의 수행이 성공적이었고, 속도 면에서 개선이 있었는지 등 판단하기 위해 이력 조회가 필요한 상황이 발생했고, 그래서 작업을 하기 위해 알아보게 되었다.

 

작업

Dag별 수행 시간

import psycopg2
from psycopg2.extras import RealDictCursor
import csv

# PostgreSQL(메타데이터베이스)에 연결
conn = psycopg2.connect(
    dbname="airflow",
    user="{계정명}",
    password="{계정 비밀번호}",
    host="{서비스명}",  # Docker Compose에서 정의된 서비스 이름
    port="5432"
)

# 커서를 생성
cur = conn.cursor(cursor_factory=RealDictCursor)

# 성공적인 DAG 실행의 평균 duration을 계산하는 SQL 쿼리
query = """
    SELECT 
        dag_id,
        COUNT(id) AS total_runs,
        AVG(EXTRACT(EPOCH FROM (end_date - start_date))) AS avg_duration_seconds
    FROM dag_run
    WHERE state = 'success'
    GROUP BY dag_id
    ORDER BY dag_id;
"""

# 쿼리 실행
cur.execute(query)

results = cur.fetchall()

# 결과를 CSV 파일로 저장.
csv_file_path = '{경로}/dag_stats_avg_duration.csv'
with open(csv_file_path, 'w', newline='') as f:
    writer = csv.writer(f)
    # 헤더
    writer.writerow(['DAG', 'Total Runs', 'Average Duration (Seconds)', 'Average Duration (Minutes)'])
    
    # 데이터
    for result in results:
        avg_duration_seconds = result['avg_duration_seconds']
        avg_duration_minutes = avg_duration_seconds / 60 if avg_duration_seconds is not None else 0
        
        writer.writerow([
            result['dag_id'],
            result['total_runs'],
            f"{avg_duration_seconds:.2f}",
            f"{avg_duration_minutes:.2f}"
        ])

# 종료
cur.close()
conn.close()
  • postgresql을 연결하기 위한 커넥션 연결 설정 부분(conn)에서, host는 호스트 정보를 적어주면 되는데 나는 Docker Compose를 통해 빌드했으므로, 서비스명으로 적어주었다.
  • cursor 설정에서의 RealDictCursor란?
    기본적으로 psycopg2를 통해 결과를 받아오게 되면 tuple 형태로 값을 반환하게 되는데 이를 키:값으로 매핑해서 딕셔너리 형태로 받아오게 하기 위해서는 RealdictCursor를 적용해야 한다.
    -> RealDictRow로 반환되고 이를 python에서는 딕셔너리로 활용 가능하다.

Dag 성공 실패 횟수

import psycopg2
from psycopg2.extras import RealDictCursor
import csv

# PostgreSQL(메타데이터베이스)에 연결
conn = psycopg2.connect(
    dbname="airflow",
    user="{계정명}",
    password="{계정 비밀번호}",
    host="{서비스명}",  # Docker Compose에서 정의된 서비스 이름
    port="5432"
)

# 커서를 생성
cur = conn.cursor(cursor_factory=RealDictCursor)

# 월별 DAG 실행 횟수를 그룹화하여 조회하는 SQL 쿼리
query = """
    SELECT 
        dag_id,
        EXTRACT(YEAR FROM execution_date) AS year,
        EXTRACT(MONTH FROM execution_date) AS month,
        COUNT(id) AS total_runs,
        SUM(CASE WHEN state = 'success' THEN 1 ELSE 0 END) AS success_runs,
        SUM(CASE WHEN state = 'failed' THEN 1 ELSE 0 END) AS failed_runs
    FROM dag_run
    GROUP BY dag_id, year, month
    ORDER BY dag_id, year, month;
"""

cur.execute(query)

results = cur.fetchall()

# 결과를 CSV 파일로 저장.
csv_file_path = '/opt/airflow/dag_stats_monthly.csv'
with open(csv_file_path, 'w', newline='') as f:
    writer = csv.writer(f)
    # 헤더
    writer.writerow(['DAG', 'Year', 'Month', 'Total Runs', 'Success Runs', 'Failed Runs'])
    
    # 데이터
    for result in results:
        writer.writerow([
            result['dag_id'],
            result['year'],
            result['month'],
            result['total_runs'],
            result['success_runs'],
            result['failed_runs']
        ])


# 종료
cur.close()
conn.close()

 

메타데이터베이스를 잘 활용하면 이런 내용을 문서화하기 위해 추출이 필요할 때 훨씬 편하게 정리할 수 있을 것 같다.

그리고 duration은 코드 수정 등 발생했을 경우 얼마나 속도 개선이 됐는지도 기간별 히스토리를 볼 수 있어서 좋을 것 같다.

'Data Engineering > Airflow' 카테고리의 다른 글

[Airflow/설치] Docker로 Airflow 설치  (0) 2024.05.29
[Airflow] Dag 인식이 안 될 때  (0) 2023.08.08