본문 바로가기
Study/Data Engineering

데이터 수집

by 왕방개 2024. 5. 21.

1.데이터 수집

1)수집 방법
=>데이터베이스에서 수집
=>플랫 파일(텍스트 파일) 등에서 수집
=>센서 등의 데이터를 Kafka 와 같은 실시간 스트림을 이용한 수집

2)데이터 수집
=>읽어온 데이터를 데이터웨어하우스에 저장하는 작업
=>예전에는 데이터 센터의 데이터베이스에 저장하는 것이 일반적이었지만 최근에는 클라우드 환경에 많이 저장

2.MySQL 의 데이터를 S3 버킷에 csv 파일로 저장

1)추출 방법

=>전체 추출: 스냅샷을 이용한 추출

마지막 데이터의 복사본 저장

 

=>증분 추출: 트랜잭션을 이용한 추출

첫번째 데이터와 변경 내용 저장

 

2)전체 추출 과 증분 추출의 트레이드 오프

=>마지막 데이터의 결과를 복원하고자 하는 경우는 전체 추출이 빠름

=>전체 추출은 중간 과정을 저장 할 수 없음

 

3)추출 준비

=>사용할 MySQL과 S3 버킷 1개 준비

 

4)샘플 데이터 생성

CREATE TABLE ORDERS(

         OrderId int,

         OrderStatus varchar(30),

         LastUpdated timestamp

);

INSERT INTO ORDERS

VALUES (1,'BackOrdered','2020-06-01 12:00:00');

 

5)전체 추출하는 python 코드 작성

=>패키지 설치
pymysql
configparser(설정 정보를 사용하기 위한 라이브러리)

=>pipeline.conf 파일을 생성하고 MySQL 접속 정보를 저장

[mysql_config]

import pymysql
import csv
import configparser

parser = configparser.ConfigParser()
parser.read('pipeline.conf')

hostname = parser.get('mysql_config','hostname')
port = parser.get('mysql_config','port')
username = parser.get('mysql_config','username')
password= parser.get('mysql_config','password')

conn = pymysql.connect(host=hostname, user=username, password = password, port=int(port))

if conn is None:
  print("mysql 데이터베이스 접속 에러")

else:
  print("데이터베이스 연결 성공")

  m_query = ' SELECT * FROM VOD;'
  m_cursor = conn.cursor()
  m_cursor.execute(m_query)
  results = m_cursor.fetchall()
  
  local_filename = 'order_extract.csv'
  with open(local_filename,'w') as fp:
    csv_w = csv.writer(fp,delimiter="|")
    csv_w.wrierows(results)
    m_cursor.close()
    
  conn.close()

 

 

=>s3 사용을 위한 패키지 설치:boto3

 

=>pipeline.conf 파일을 생성하고 S3 버킷 접속 정보를 저장

 

=>파이썬 코드 수정

import pymysql
import csv
import configparser

import boto3
from botocore.exceptions import ClientError

#환경 설정 파일의 내용 읽기
parser = configparser.ConfigParser()
parser.read("pipeline.conf")

hostname = parser.get("mysql_config", "hostname")
port = parser.get("mysql_config", "port")
username = parser.get("mysql_config", "username")
dbname = parser.get("mysql_config", "database")
password = parser.get("mysql_config", "password")

conn = pymysql.connect(host=hostname, 
user=username, 
password=password, 
db=dbname, 
port=int(port))

if conn is None:
    print("데이터베이스 접속 에러")
else:
    print("데이터베이스 연결 성공")

    m_query = "SELECT * FROM ORDERS;"
    m_cursor = conn.cursor()
    m_cursor.execute(m_query)
    results = m_cursor.fetchall()

    local_filename = "order_extract.csv"
    with open(local_filename, 'w') as fp:
        csv_w = csv.writer(fp, delimiter="|")
        csv_w.writerows(results)#파일 버퍼에 기록
        #파일 버퍼에 기록한 내용은 파일이 닫힐 때 실제 파일로 옮겨집니다.
        m_cursor.close()

        #파일의 내용을 기록하기 위해서 close
        fp.close()

        #버킷 설정
        access_key = parser.get("aws_boto_credentials", "access_key")
        secret_key = parser.get("aws_boto_credentials", "secret_key")
        bucket_name = parser.get("aws_boto_credentials", "bucket_name")

        s3 = boto3.client('s3', aws_access_key_id = access_key,
        aws_secret_access_key=secret_key)

        #업로드
        s3_file = local_filename
        try:
            response = s3.upload_file(local_filename, bucket_name, s3_file)
        except:
            print("업로드 실패")

    conn.close()

 

=>데이터베이스 수정

update ORDERS set OrderStatus='Shipped', LastUpdated='2020-07-11 3:05:00' where OrderId=1;

INSERT INTO ORDERS
VALUES(2, 'Shipped', '2020-07-01 12:00:00');

 

=>추출 프로그램을 다시 수행
전체 추출 또는 스냅샷을 백업을 하게 되면 마지막 결과 만을 보존합니다.

 

6)증분 추출
=>알고리즘
데이터를 추출할 때 가장 마지막 업데이트 된 데이터의 타임 스탬프를 찾아서 기록

데이터를 추출할 때 가장 마지막 업데이트 된 데이터의 타임 스탬프를 읽어서 이 데이터 이후에 추가된 데이터만 읽어서 기존의 파일에 추가

=>파이썬 코드 수정

import pymysql
import csv
import configparser

import boto3
from botocore.exceptions import ClientError

#환경 설정 파일의 내용 읽기
parser = configparser.ConfigParser()
parser.read("pipeline.conf")

hostname = parser.get("mysql_config", "hostname")
port = parser.get("mysql_config", "port")
username = parser.get("mysql_config", "username")
dbname = parser.get("mysql_config", "database")
password = parser.get("mysql_config", "password")

#버킷 설정
access_key = parser.get("aws_boto_credentials", "access_key")
secret_key = parser.get("aws_boto_credentials", "secret_key")
bucket_name = parser.get("aws_boto_credentials", "bucket_name")

s3 = boto3.client('s3', aws_access_key_id = access_key,
aws_secret_access_key=secret_key)


#증분 추출을 위한 쿼리 생성
m_query = None

#마지막 타임스탬프를 읽기(파일 다운로드 받기)
with open('lastread.csv', 'wb') as f:
    try:
        s3.download_fileobj(bucket_name, 'lastread.csv', f)
    except:
        pass

with open('order_extract.csv', 'wb') as f:
    try:
        s3.download_fileobj(bucket_name, 'order_extract.csv', f)
    except:
        pass

with open('lastread.csv', 'rb') as f:
    x = f.readline()
    if len(x) > 0:
        m_query = "SELECT * FROM ORDERS WHERE LastUpdated > '" + x.decode('utf-8') + "' ORDER BY LastUpdated;"
    else:
        m_query = "SELECT * FROM ORDERS ORDER BY LastUpdated;"


    conn = pymysql.connect(host=hostname, user=username, password=password, db=dbname, port=int(port))

    if conn is None:
        print("데이터베이스 접속 에러")
    else:
        print("데이터베이스 연결 성공")
        print(m_query)
        m_cursor = conn.cursor()
        m_cursor.execute(m_query)
        results = m_cursor.fetchall()

        local_filename = "order_extract.csv"
        #파일에 추가 모드로 기록
        with open(local_filename, 'a') as fp:
            csv_w = csv.writer(fp, delimiter="|")
            csv_w.writerows(results)#파일 버퍼에 기록
        
        with open('lastread.csv', 'w') as fp1:
            fp1.write(str(results[-1][-1]))
            fp1.close()

        m_cursor.close()
        fp.close()

        s3_file = local_filename
        try:
            s3.upload_file(local_filename, bucket_name, s3_file)
            s3.upload_file('lastread.csv', bucket_name, 'lastread.csv')
        except:
            print("업로드 에러")
        conn.close()

 

=>업데이트 와 삽입 작업을 수행한 후 프로그램을 다시 실행시키고 파일을 확인

update ORDERS set OrderStatus='Backordered', LastUpdated='2020-07-21 3:05:00' where OrderId=2;

INSERT INTO ORDERS
VALUES(3, 'Shipped', '2020-07-27 12:00:00');

'Study > Data Engineering' 카테고리의 다른 글

Airflow  (0) 2024.05.28
데이터 변환  (0) 2024.05.22
Kafka(2)  (0) 2024.05.12
Kafka(1)  (0) 2024.05.09