1.개요
1)개요
=>데이터 파이프라인은 원하는 결과를 얻기 위해 실행되는 여러 태스크 또는 동작으로 구성
=>실시간 날씨 대시보드 구현
-작업 과정
다른 시스템의 날씨 API를 통해서 일기예보를 데이터 가져오기
서비스 목적에 맞도록 데이터 정제 및 변환
변환된 데이터를 날씨 대시보드로 전송
-파이프라인은 간단한 3개의 다른 태스크로 구성
이 태스크들은 정해진 순서대로 진행되어야 합니다
데이터 변환 전에 새로운 데이터를 대시보드로 전송하거나 하면 안됩니다
=>대다수의 파이프라인 구성은 이렇게 암묵적인 태스크 순서가 있습니다.
2)데이터 파이프라인 그래프
=>태스크 사이의 의존성을 명확하게 확인하는 방법 중 하나가 데이터 파이프라인을 그래프로 그리는 것
=>그래프에서 태스크는 노드로 표시되고 태스크 간의 의존성은 노드간의 방향으로 표시가 되는데 화살표를 이용
화살표의 끝점은 앞의 태스크가 종료되어야만 시작 또는 실행 될 수 있는 태스크라는 의미
이런 그래프를 방향성 그래프로라고 함. (directed graph)
=>실시간 날씨 대시보드 구현 애플리케이션을 그래프로 표현
날씨 예보 가져오기 -> 예보 데이터 정제하기 -> 대시보드에 데이터 전송하기
=>방향성 순환 그래프
다음 태스크에서 이전 태스크로 역의 관계가 존재하는 경우
방향성 순환 그래프가 잘못 만들어지면 절대로 수행되지 않는 형태가 만들어 질 수 있습니다.
태스크1 -> 태스크 <-> 태스크3
3)DAG(Directed Acyclic Graph - 비 순환 그래프)
=>순환이 없는 그래프
=>첫번째 태스크는 의존성이 없는 실행 가능한 태스크로 생성하고 다음 태스크는 이전 태스크가 종료되야만 실행되는 태스크여야 하고 역방향은 존재하지 않는 형태
4)그래프 파이프라인과 절차적 스크립트 파이프라인(모놀리식 스크립트)의 비교
=>모놀리식 방법이 그래프 파이프라인에 비해서 좋지 않은 점은 자원의 낭비를 가져오거나 태스크 중간에 실패했을 때 처음부터 다시 수행해야 하는 비효율 문제가 발생
절차적 스크립트 같은 경우 중간에 오류가 나면 다시 처음부터 돌려야 하는 오류가 생기지만 그래프 파이프라인을 그리면 어떤 부분에서 뻑나더라도 쉽게 고칠 수 있음
5)airflow
=>Python으로 작업의 흐름을 제어하기 위한 소프트웨어
=>Python으로 DAG를 작성해서 수행
2.DAG 작성 및 실행
1)DAG(비순환 그래프) 작성
=>작업 내용
-JSON 파일 다운로드: 리눅스의 curl 명령 이용
curl -o /tmp/launches.json -L https://ll.thespacedevs.com/2.0.0/launch/upcoming
-Python 코드를 이용해서 다운로드 받은 내용을 파싱한 후 이미지 경로를 추출해서 이미지를 다운로드
-작업이 종료되면 echo 명령(Linux)으로 알림을 전송
=>작성 방법
-dag 옵션 (이름,시작 시점, 주기 등) 을 설정
-Operator을 이용해서 작업을 정의
-정의된 작업들의 의존성 설정
=>실행 방법
-외부에서 작업을 확인할 수 있는 웹 서버 실행
-스케쥴러를 실행
=>python 파일을 만들어서 실행
import json
import pathlib
import airflow.utils.dates
import requests
import requests.exceptions as requests_exceptions
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
#인스턴스 생성 - 모든 워크플로의 시작점
#dag 이름 과 설명 그리고 시작 날짜 및 실행 간격 설정
dag = DAG(
dag_id="download_rocket_launches",
description="Download rocket pictures of recently launched rockets.",
start_date=airflow.utils.dates.days_ago(14),
schedule_interval=None
)
#bash 스크립트를 이용해서 curl로 URL 결과값 다운로드
download_launches = BashOperator(
task_id="download_launches",
bash_command="curl -o /tmp/launches.json -L 'https://ll.thespacedevs.com/2.0.0/launch/upcoming'", # noqa: E501
dag=dag,
)
#가져온 결과를 파싱하고 모든 로켓 사진을 다운로드
def _get_pictures():
# 디렉토리 존재 여부를 확인해서 디렉토리를 생성
pathlib.Path("/tmp/images").mkdir(parents=True, exist_ok=True)
# 데이터를 파싱하고 이미지 경로를 추출
with open("/tmp/launches.json") as f:
launches = json.load(f)
image_urls = [launch["image"] for launch in launches["results"]]
for image_url in image_urls:
try:
response = requests.get(image_url)
image_filename = image_url.split("/")[-1]
target_file = f"/tmp/images/{image_filename}"
with open(target_file, "wb") as f:
f.write(response.content)
print(f"Downloaded {image_url} to {target_file}")
except requests_exceptions.MissingSchema:
print(f"{image_url} appears to be an invalid URL.")
except requests_exceptions.ConnectionError:
print(f"Could not connect to {image_url}.")
#DAG에서 Python 함수를 호출
get_pictures = PythonOperator(
task_id="get_pictures", python_callable=_get_pictures, dag=dag
)
#알림 설정
notify = BashOperator(
task_id="notify",
bash_command='echo "There are now $(ls /tmp/images/ | wc -l) images."',
dag=dag,
)
#태스크 실행 순서 설정
download_launches >> get_pictures >> notify
2)도커에서 DAG실행(웹 서버는 기본적으로 8080을 사용)
docker run -it -p 8080:8080 -v ./download_rocket_launches.py:/opt/airflow/dags/download_rocket_launches.py --entrypoint=/bin/bash --name airflow apache/airflow -c '(airflow db init && airflow users create --username admin --password admin --firstname FIRST_NAME --lastname LAST_NAME --role Admin --email ggangpae1@gmail.com); airflow webserver & airflow scheduler'
=>웹서버 접속할때 8080 port 로 접속하게 되고 아이디는 admin이고 비밀번호도 admin (수정 필요)
3)linux 에서 실행
=>가상환경을 만들어서 airflow 패키지 설치 후 dag 파일은 home 디렉토리의 airflow/dags 디렉토리에 저장되어야 합니다.
=>가상환경 생성
sudo apt-get update
sudo apt-get install python3-venv python3-pip
python3 -m venv myvenv
source myvenv/bin/activate
=>airflow 패키지 설치
pip install apache-airflow
=>airflow 데이터베이스 초기화
airflow db init
=>airflow 디렉토리 확인 : ls
=>웹 서버 관리자 생성
airflow users create --username 관리자이름 --password 비밀번호 --firstname 이름 --lastname 이름 --role Admin --email 이메일
=>DAG 파일을 생성:airflow/dags 디렉토리에 위치해야 합니다.
cd ~/airflow
mkdir dags (dags 폴더 생성)
nano download_rocket_launches.py
파일에 위에 있는 python 파일 작성 => ctrl + s , ctrl +x
=>웹서버 실행 : airflow webserver &
=>웹 서버 제대로 실행됬는지 확인: IP주소:8080 접속
=>스케쥴러 실행: 다른 cmd창 키고 airflow scheduler &
잘성공했으면 아까 만든 python파일 이름 눌러서 graph 확인
성공적으로 업로드!
3.사용자 이벤트 처리
1)웹 API 서버 구현
=>flask를 이용해서 웹 서버 구현
=>app.py 파일 생성
from datetime import date, datetime, timedelta
import time
from numpy import random
import pandas as pd
from faker import Faker
from flask import Flask, jsonify, request
def _generate_events(end_date):
events = pd.concat(
[
_generate_events_for_day(date=end_date - timedelta(days=(30 - i)))
for i in range(30)
],
axis=0,
)
return events
def _generate_events_for_day(date):
seed = int(time.mktime(date.timetuple()))
Faker.seed(seed)
random_state = random.RandomState(seed)
n_users = random_state.randint(low=50, high=100)
n_events = random_state.randint(low=200, high=2000)
fake = Faker()
users = [fake.ipv4() for _ in range(n_users)]
return pd.DataFrame(
{
"user": random_state.choice(users, size=n_events, replace=True),
"date": pd.to_datetime(date),
}
)
def _str_to_datetime(value):
if value is None:
return None
return datetime.strptime(value, "%Y-%m-%d")
app = Flask(__name__)
app.config["events"] = _generate_events(end_date=date(year=2023, month=1, day=31))
@app.route("/events")
def events():
start_date = _str_to_datetime(request.args.get("start_date", None))
end_date = _str_to_datetime(request.args.get("end_date", None))
events = app.config.get("events")
if start_date is not None:
events = events.loc[events["date"] >= start_date]
if end_date is not None:
events = events.loc[events["date"] < end_date]
return jsonify(events.to_dict(orient="records"))
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000)
2)사용자가 직접 실행하는 DAG 를 생성
=>작업은 2가지
-웹 서버에서 데이터를 가져오는 작업
-가져온 데이터를 파싱해서 pandas를 이용해서 간단한 통계작업을 수행한 후 결과를 파일로 저장
from datetime import datetime
from pathlib import Path
import pandas as pd
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
dag = DAG(
dag_id="unscheduled", start_date=datetime(2023, 1, 1), schedule_interval=None
)
fetch_events = BashOperator(
task_id="fetch_events",
bash_command=(
"curl -o /tmp/events.json http://위주소/events"
),
dag=dag,
)
def _calculate_stats(input_path, output_path):
Path(output_path).parent.mkdir(exist_ok=True)
events = pd.read_json(input_path)
stats = events.groupby(["date", "user"]).size().reset_index()
stats.to_csv(output_path, index=False)
calculate_stats = PythonOperator(
task_id="calculate_stats",
python_callable=_calculate_stats,
op_kwargs={"input_path": "/tmp/events.json", "output_path": "/tmp/stats.csv"},
dag=dag,
)
fetch_events >> calculate_stats
4.스케쥴링
1)개요
=>DAG를 초기화할 때 schedule_interval 인수를 이용해서 스케쥴 간격을 정의할 수 있음
기본값은 None이며 이 경우 DAG가 예약 실행되지 않고 UI 또는 API를 통해서 수동으로 트리거를 해야함
2)스케쥴 간격에 관련된 매크로
@once
@hourly
@daily
@weekly
@monthly
@yearly
dag = DAG(dag_id= '아이디', start_date = datetime(년,월,일) , end_date = datetime(년,월,일), schedule_interval="@daily") =>매일 자정에 start_date 부터 end_date 까지 실행함
start_date 에 과거로 설정해놔도 실행함 (신기 :D)
=>시작 날짜 및 시간은 매크로 대신에 함수 사용이 가능한데 airflow.utils.dates.days_age(이전 날짜 수)를 이용할 수 있고 days 대신에 시분초 설정도 가능
3)크론 기반의 스케쥴링 가능
* * * * * 형식으로 작성 가능
4)timedelta 도 사용 가능