Data Engineering/Airflow
[Airflow] 메타데이터베이스를 통한 dag 실행 이력 조회
돌돌찐
2024. 8. 15. 16:46
작업 배경
작업 보고를 위해 얼마나 dag의 수행이 성공적이었고, 속도 면에서 개선이 있었는지 등 판단하기 위해 이력 조회가 필요한 상황이 발생했고, 그래서 작업을 하기 위해 알아보게 되었다.
작업
Dag별 수행 시간
import psycopg2
from psycopg2.extras import RealDictCursor
import csv
# PostgreSQL(메타데이터베이스)에 연결
conn = psycopg2.connect(
dbname="airflow",
user="{계정명}",
password="{계정 비밀번호}",
host="{서비스명}", # Docker Compose에서 정의된 서비스 이름
port="5432"
)
# 커서를 생성
cur = conn.cursor(cursor_factory=RealDictCursor)
# 성공적인 DAG 실행의 평균 duration을 계산하는 SQL 쿼리
query = """
SELECT
dag_id,
COUNT(id) AS total_runs,
AVG(EXTRACT(EPOCH FROM (end_date - start_date))) AS avg_duration_seconds
FROM dag_run
WHERE state = 'success'
GROUP BY dag_id
ORDER BY dag_id;
"""
# 쿼리 실행
cur.execute(query)
results = cur.fetchall()
# 결과를 CSV 파일로 저장.
csv_file_path = '{경로}/dag_stats_avg_duration.csv'
with open(csv_file_path, 'w', newline='') as f:
writer = csv.writer(f)
# 헤더
writer.writerow(['DAG', 'Total Runs', 'Average Duration (Seconds)', 'Average Duration (Minutes)'])
# 데이터
for result in results:
avg_duration_seconds = result['avg_duration_seconds']
avg_duration_minutes = avg_duration_seconds / 60 if avg_duration_seconds is not None else 0
writer.writerow([
result['dag_id'],
result['total_runs'],
f"{avg_duration_seconds:.2f}",
f"{avg_duration_minutes:.2f}"
])
# 종료
cur.close()
conn.close()
- postgresql을 연결하기 위한 커넥션 연결 설정 부분(conn)에서, host는 호스트 정보를 적어주면 되는데 나는 Docker Compose를 통해 빌드했으므로, 서비스명으로 적어주었다.
- cursor 설정에서의 RealDictCursor란?
기본적으로 psycopg2를 통해 결과를 받아오게 되면 tuple 형태로 값을 반환하게 되는데 이를 키:값으로 매핑해서 딕셔너리 형태로 받아오게 하기 위해서는 RealdictCursor를 적용해야 한다.
-> RealDictRow로 반환되고 이를 python에서는 딕셔너리로 활용 가능하다.
Dag 성공 실패 횟수
import psycopg2
from psycopg2.extras import RealDictCursor
import csv
# PostgreSQL(메타데이터베이스)에 연결
conn = psycopg2.connect(
dbname="airflow",
user="{계정명}",
password="{계정 비밀번호}",
host="{서비스명}", # Docker Compose에서 정의된 서비스 이름
port="5432"
)
# 커서를 생성
cur = conn.cursor(cursor_factory=RealDictCursor)
# 월별 DAG 실행 횟수를 그룹화하여 조회하는 SQL 쿼리
query = """
SELECT
dag_id,
EXTRACT(YEAR FROM execution_date) AS year,
EXTRACT(MONTH FROM execution_date) AS month,
COUNT(id) AS total_runs,
SUM(CASE WHEN state = 'success' THEN 1 ELSE 0 END) AS success_runs,
SUM(CASE WHEN state = 'failed' THEN 1 ELSE 0 END) AS failed_runs
FROM dag_run
GROUP BY dag_id, year, month
ORDER BY dag_id, year, month;
"""
cur.execute(query)
results = cur.fetchall()
# 결과를 CSV 파일로 저장.
csv_file_path = '/opt/airflow/dag_stats_monthly.csv'
with open(csv_file_path, 'w', newline='') as f:
writer = csv.writer(f)
# 헤더
writer.writerow(['DAG', 'Year', 'Month', 'Total Runs', 'Success Runs', 'Failed Runs'])
# 데이터
for result in results:
writer.writerow([
result['dag_id'],
result['year'],
result['month'],
result['total_runs'],
result['success_runs'],
result['failed_runs']
])
# 종료
cur.close()
conn.close()
메타데이터베이스를 잘 활용하면 이런 내용을 문서화하기 위해 추출이 필요할 때 훨씬 편하게 정리할 수 있을 것 같다.
그리고 duration은 코드 수정 등 발생했을 경우 얼마나 속도 개선이 됐는지도 기간별 히스토리를 볼 수 있어서 좋을 것 같다.