본문 바로가기
Study/Data Engineering

Airflow

by 왕방개 2024. 5. 28.

1.개요

1)개요

=>데이터 파이프라인은 원하는 결과를 얻기 위해 실행되는 여러 태스크 또는 동작으로 구성

=>실시간 날씨 대시보드 구현

-작업 과정

  다른 시스템의 날씨 API를 통해서 일기예보를 데이터 가져오기

  서비스 목적에 맞도록 데이터 정제 및 변환

   변환된 데이터를 날씨 대시보드로 전송

-파이프라인은 간단한 3개의 다른 태스크로 구성

이 태스크들은 정해진 순서대로 진행되어야 합니다

데이터 변환 전에 새로운 데이터를 대시보드로 전송하거나 하면 안됩니다

=>대다수의 파이프라인 구성은 이렇게 암묵적인 태스크 순서가 있습니다.

2)데이터 파이프라인 그래프

=>태스크 사이의 의존성을 명확하게 확인하는 방법 중 하나가 데이터 파이프라인을 그래프로 그리는 것

=>그래프에서 태스크는 노드로 표시되고 태스크 간의 의존성은 노드간의 방향으로 표시가 되는데 화살표를 이용

화살표의 끝점은 앞의 태스크가 종료되어야만 시작 또는 실행 될 수 있는 태스크라는 의미

이런 그래프를 방향성 그래프로라고 함. (directed graph)



=>실시간 날씨 대시보드 구현 애플리케이션을 그래프로 표현



날씨 예보 가져오기 -> 예보 데이터 정제하기 -> 대시보드에 데이터 전송하기



=>방향성 순환 그래프

다음 태스크에서 이전 태스크로 역의 관계가 존재하는 경우

방향성 순환 그래프가 잘못 만들어지면 절대로 수행되지 않는 형태가 만들어 질 수 있습니다.

태스크1 -> 태스크 <-> 태스크3

 

3)DAG(Directed Acyclic Graph - 비 순환 그래프)

=>순환이 없는 그래프

=>첫번째 태스크는 의존성이 없는 실행 가능한 태스크로 생성하고 다음 태스크는 이전 태스크가 종료되야만 실행되는 태스크여야 하고 역방향은 존재하지 않는 형태



4)그래프 파이프라인과 절차적 스크립트 파이프라인(모놀리식 스크립트)의 비교

=>모놀리식 방법이 그래프 파이프라인에 비해서 좋지 않은 점은 자원의 낭비를 가져오거나 태스크 중간에 실패했을 때 처음부터 다시 수행해야 하는 비효율 문제가 발생

절차적 스크립트 같은 경우 중간에 오류가 나면 다시 처음부터 돌려야 하는 오류가 생기지만 그래프 파이프라인을 그리면 어떤 부분에서 뻑나더라도 쉽게 고칠 수 있음

 

5)airflow

=>Python으로 작업의 흐름을 제어하기 위한 소프트웨어

=>Python으로 DAG를 작성해서 수행

 

 

2.DAG 작성 및 실행

1)DAG(비순환 그래프) 작성

=>작업 내용

-JSON 파일 다운로드: 리눅스의 curl 명령 이용

 curl -o /tmp/launches.json -L https://ll.thespacedevs.com/2.0.0/launch/upcoming

 

-Python 코드를 이용해서 다운로드 받은 내용을 파싱한 후 이미지 경로를 추출해서 이미지를 다운로드

 

-작업이 종료되면 echo 명령(Linux)으로 알림을 전송

 

=>작성 방법

-dag 옵션 (이름,시작 시점, 주기 등) 을 설정

 

-Operator을 이용해서 작업을 정의

 

-정의된 작업들의 의존성 설정

 

=>실행 방법

-외부에서 작업을 확인할 수 있는 웹 서버 실행

 

-스케쥴러를 실행

 

=>python 파일을 만들어서 실행

import json
import pathlib

import airflow.utils.dates
import requests
import requests.exceptions as requests_exceptions
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

#인스턴스 생성 - 모든 워크플로의 시작점
#dag 이름 과 설명 그리고 시작 날짜 및 실행 간격 설정
dag = DAG(
    dag_id="download_rocket_launches",
    description="Download rocket pictures of recently launched rockets.",
    start_date=airflow.utils.dates.days_ago(14),
    schedule_interval=None
)

#bash 스크립트를 이용해서 curl로 URL 결과값 다운로드
download_launches = BashOperator(
    task_id="download_launches",
    bash_command="curl -o /tmp/launches.json -L 'https://ll.thespacedevs.com/2.0.0/launch/upcoming'",  # noqa: E501
    dag=dag,
)

#가져온 결과를 파싱하고 모든 로켓 사진을 다운로드
def _get_pictures():
    # 디렉토리 존재 여부를 확인해서 디렉토리를 생성
    pathlib.Path("/tmp/images").mkdir(parents=True, exist_ok=True)

    # 데이터를 파싱하고 이미지 경로를 추출
    with open("/tmp/launches.json") as f:
        launches = json.load(f)
        image_urls = [launch["image"] for launch in launches["results"]]
        for image_url in image_urls:
            try:
                response = requests.get(image_url)
                image_filename = image_url.split("/")[-1]
                target_file = f"/tmp/images/{image_filename}"
                with open(target_file, "wb") as f:
                    f.write(response.content)
                print(f"Downloaded {image_url} to {target_file}")
            except requests_exceptions.MissingSchema:
                print(f"{image_url} appears to be an invalid URL.")
            except requests_exceptions.ConnectionError:
                print(f"Could not connect to {image_url}.")
                
#DAG에서 Python 함수를 호출
get_pictures = PythonOperator(
    task_id="get_pictures", python_callable=_get_pictures, dag=dag
)

#알림 설정
notify = BashOperator(
    task_id="notify",
    bash_command='echo "There are now $(ls /tmp/images/ | wc -l) images."',
    dag=dag,
)

#태스크 실행 순서 설정
download_launches >> get_pictures >> notify

 

2)도커에서 DAG실행(웹 서버는 기본적으로 8080을 사용)

docker run -it -p 8080:8080 -v ./download_rocket_launches.py:/opt/airflow/dags/download_rocket_launches.py --entrypoint=/bin/bash --name airflow apache/airflow -c '(airflow db init && airflow users create --username admin --password admin --firstname FIRST_NAME --lastname LAST_NAME --role Admin --email ggangpae1@gmail.com); airflow webserver & airflow scheduler'

 

=>웹서버 접속할때 8080 port 로 접속하게 되고 아이디는 admin이고 비밀번호도 admin (수정 필요)

 

 

3)linux 에서 실행

=>가상환경을 만들어서 airflow 패키지 설치 후 dag 파일은 home 디렉토리의 airflow/dags 디렉토리에 저장되어야 합니다.

 

=>가상환경 생성

sudo apt-get update

sudo apt-get install python3-venv python3-pip

python3 -m venv myvenv

source myvenv/bin/activate

 

=>airflow 패키지 설치

pip install apache-airflow

 

=>airflow 데이터베이스 초기화

airflow db init

 

=>airflow 디렉토리 확인 : ls

=>웹 서버 관리자 생성

airflow users create --username 관리자이름 --password 비밀번호 --firstname 이름 --lastname 이름 --role Admin --email 이메일

 

=>DAG 파일을 생성:airflow/dags 디렉토리에 위치해야 합니다.

cd ~/airflow

mkdir dags (dags 폴더 생성)

nano download_rocket_launches.py 

파일에 위에 있는 python 파일 작성 => ctrl + s , ctrl +x

 

=>웹서버 실행 : airflow webserver & 

=>웹 서버 제대로 실행됬는지 확인: IP주소:8080 접속

스케쥴러가 안켜짐

=>스케쥴러 실행: 다른 cmd창 키고 airflow scheduler &

사라진것을 확인할 수 있음

 

잘성공했으면 아까 만든 python파일 이름 눌러서 graph 확인

 

성공적으로 업로드!

 

3.사용자 이벤트 처리

1)웹 API 서버 구현

=>flask를 이용해서 웹 서버 구현

AMI 22.04에서 잘 진행됩니다 :D

 

=>app.py 파일 생성

from datetime import date, datetime, timedelta
import time

from numpy import random
import pandas as pd
from faker import Faker

from flask import Flask, jsonify, request

def _generate_events(end_date):

    events = pd.concat(
        [
            _generate_events_for_day(date=end_date - timedelta(days=(30 - i)))
            for i in range(30)
        ],
        axis=0,
    )
    return events
    
 def _generate_events_for_day(date):
    seed = int(time.mktime(date.timetuple()))
    Faker.seed(seed)
    random_state = random.RandomState(seed)
    n_users = random_state.randint(low=50, high=100)
    n_events = random_state.randint(low=200, high=2000)
    fake = Faker()
    users = [fake.ipv4() for _ in range(n_users)]

    return pd.DataFrame(
        {
            "user": random_state.choice(users, size=n_events, replace=True),
            "date": pd.to_datetime(date),
        }
    )
    
  def _str_to_datetime(value):
    if value is None:
        return None
    return datetime.strptime(value, "%Y-%m-%d")

app = Flask(__name__)
app.config["events"] = _generate_events(end_date=date(year=2023, month=1, day=31))

@app.route("/events")
def events():
    start_date = _str_to_datetime(request.args.get("start_date", None))
    end_date = _str_to_datetime(request.args.get("end_date", None))
    events = app.config.get("events")
    if start_date is not None:
        events = events.loc[events["date"] >= start_date]
    if end_date is not None:
        events = events.loc[events["date"] < end_date]
    return jsonify(events.to_dict(orient="records"))

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=5000)

 

2)사용자가 직접 실행하는 DAG 를 생성

=>작업은 2가지

-웹 서버에서 데이터를 가져오는 작업

-가져온 데이터를 파싱해서 pandas를 이용해서 간단한 통계작업을 수행한 후 결과를 파일로 저장

from datetime import datetime
from pathlib import Path

import pandas as pd
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

dag = DAG(
    dag_id="unscheduled", start_date=datetime(2023, 1, 1), schedule_interval=None
)


fetch_events = BashOperator(
    task_id="fetch_events",
    bash_command=(
        "curl -o /tmp/events.json http://위주소/events"
    ),
    dag=dag,
)

def _calculate_stats(input_path, output_path):

    Path(output_path).parent.mkdir(exist_ok=True)

    events = pd.read_json(input_path)
    stats = events.groupby(["date", "user"]).size().reset_index()

    stats.to_csv(output_path, index=False)

calculate_stats = PythonOperator(
    task_id="calculate_stats",
    python_callable=_calculate_stats,
    op_kwargs={"input_path": "/tmp/events.json", "output_path": "/tmp/stats.csv"},
    dag=dag,
)

fetch_events >> calculate_stats

 

4.스케쥴링

1)개요

=>DAG를 초기화할 때 schedule_interval 인수를 이용해서 스케쥴 간격을 정의할 수 있음

기본값은 None이며 이 경우 DAG가 예약 실행되지 않고 UI 또는 API를 통해서 수동으로 트리거를 해야함

 

2)스케쥴 간격에 관련된 매크로

@once

@hourly

@daily

@weekly

@monthly

@yearly

 

dag = DAG(dag_id= '아이디', start_date = datetime(년,월,일) , end_date = datetime(년,월,일), schedule_interval="@daily") =>매일 자정에 start_date 부터 end_date 까지 실행함

start_date 에 과거로 설정해놔도 실행함 (신기 :D)

 

=>시작 날짜 및 시간은 매크로 대신에 함수 사용이 가능한데 airflow.utils.dates.days_age(이전 날짜 수)를 이용할 수 있고 days 대신에 시분초 설정도 가능

 

3)크론 기반의 스케쥴링 가능

* * * * * 형식으로 작성 가능

 

4)timedelta 도 사용 가능

 

'Study > Data Engineering' 카테고리의 다른 글

데이터 변환  (0) 2024.05.22
데이터 수집  (0) 2024.05.21
Kafka(2)  (0) 2024.05.12
Kafka(1)  (0) 2024.05.09