Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | ||||
4 | 5 | 6 | 7 | 8 | 9 | 10 |
11 | 12 | 13 | 14 | 15 | 16 | 17 |
18 | 19 | 20 | 21 | 22 | 23 | 24 |
25 | 26 | 27 | 28 | 29 | 30 | 31 |
Tags
- Python
- SQL
- pySpark
- docker
- Streamlit
- Redshift
- RAG
- metadabatase
- 설치
- jmx-exporter
- vectorDB
- 루프백주소
- dockercompose
- sparkstreaming
- kafka
- javascript
- Dag
- BigQuery
- airflow
- amazonlinux
- spark
- prometheus
- ubuntu
- 오블완
- aiagent
- grafana
- hadoop
- airflow설치
- MSA
- milvus
Archives
- Today
- Total
데이터 노트
[Airflow] 메타데이터베이스를 통한 dag 실행 이력 조회 본문
작업 배경
작업 보고를 위해 얼마나 dag의 수행이 성공적이었고, 속도 면에서 개선이 있었는지 등 판단하기 위해 이력 조회가 필요한 상황이 발생했고, 그래서 작업을 하기 위해 알아보게 되었다.
작업
Dag별 수행 시간
import psycopg2
from psycopg2.extras import RealDictCursor
import csv
# PostgreSQL(메타데이터베이스)에 연결
conn = psycopg2.connect(
dbname="airflow",
user="{계정명}",
password="{계정 비밀번호}",
host="{서비스명}", # Docker Compose에서 정의된 서비스 이름
port="5432"
)
# 커서를 생성
cur = conn.cursor(cursor_factory=RealDictCursor)
# 성공적인 DAG 실행의 평균 duration을 계산하는 SQL 쿼리
query = """
SELECT
dag_id,
COUNT(id) AS total_runs,
AVG(EXTRACT(EPOCH FROM (end_date - start_date))) AS avg_duration_seconds
FROM dag_run
WHERE state = 'success'
GROUP BY dag_id
ORDER BY dag_id;
"""
# 쿼리 실행
cur.execute(query)
results = cur.fetchall()
# 결과를 CSV 파일로 저장.
csv_file_path = '{경로}/dag_stats_avg_duration.csv'
with open(csv_file_path, 'w', newline='') as f:
writer = csv.writer(f)
# 헤더
writer.writerow(['DAG', 'Total Runs', 'Average Duration (Seconds)', 'Average Duration (Minutes)'])
# 데이터
for result in results:
avg_duration_seconds = result['avg_duration_seconds']
avg_duration_minutes = avg_duration_seconds / 60 if avg_duration_seconds is not None else 0
writer.writerow([
result['dag_id'],
result['total_runs'],
f"{avg_duration_seconds:.2f}",
f"{avg_duration_minutes:.2f}"
])
# 종료
cur.close()
conn.close()
- postgresql을 연결하기 위한 커넥션 연결 설정 부분(conn)에서, host는 호스트 정보를 적어주면 되는데 나는 Docker Compose를 통해 빌드했으므로, 서비스명으로 적어주었다.
- cursor 설정에서의 RealDictCursor란?
기본적으로 psycopg2를 통해 결과를 받아오게 되면 tuple 형태로 값을 반환하게 되는데 이를 키:값으로 매핑해서 딕셔너리 형태로 받아오게 하기 위해서는 RealdictCursor를 적용해야 한다.
-> RealDictRow로 반환되고 이를 python에서는 딕셔너리로 활용 가능하다.
Dag 성공 실패 횟수
import psycopg2
from psycopg2.extras import RealDictCursor
import csv
# PostgreSQL(메타데이터베이스)에 연결
conn = psycopg2.connect(
dbname="airflow",
user="{계정명}",
password="{계정 비밀번호}",
host="{서비스명}", # Docker Compose에서 정의된 서비스 이름
port="5432"
)
# 커서를 생성
cur = conn.cursor(cursor_factory=RealDictCursor)
# 월별 DAG 실행 횟수를 그룹화하여 조회하는 SQL 쿼리
query = """
SELECT
dag_id,
EXTRACT(YEAR FROM execution_date) AS year,
EXTRACT(MONTH FROM execution_date) AS month,
COUNT(id) AS total_runs,
SUM(CASE WHEN state = 'success' THEN 1 ELSE 0 END) AS success_runs,
SUM(CASE WHEN state = 'failed' THEN 1 ELSE 0 END) AS failed_runs
FROM dag_run
GROUP BY dag_id, year, month
ORDER BY dag_id, year, month;
"""
cur.execute(query)
results = cur.fetchall()
# 결과를 CSV 파일로 저장.
csv_file_path = '/opt/airflow/dag_stats_monthly.csv'
with open(csv_file_path, 'w', newline='') as f:
writer = csv.writer(f)
# 헤더
writer.writerow(['DAG', 'Year', 'Month', 'Total Runs', 'Success Runs', 'Failed Runs'])
# 데이터
for result in results:
writer.writerow([
result['dag_id'],
result['year'],
result['month'],
result['total_runs'],
result['success_runs'],
result['failed_runs']
])
# 종료
cur.close()
conn.close()
메타데이터베이스를 잘 활용하면 이런 내용을 문서화하기 위해 추출이 필요할 때 훨씬 편하게 정리할 수 있을 것 같다.
그리고 duration은 코드 수정 등 발생했을 경우 얼마나 속도 개선이 됐는지도 기간별 히스토리를 볼 수 있어서 좋을 것 같다.
'Data Engineering > Airflow' 카테고리의 다른 글
[Airflow/설치] Docker로 Airflow 설치 (0) | 2024.05.29 |
---|---|
[Airflow] Dag 인식이 안 될 때 (0) | 2023.08.08 |