1.데이터 수집
1)수집 방법
=>데이터베이스에서 수집
=>플랫 파일(텍스트 파일) 등에서 수집
=>센서 등의 데이터를 Kafka 와 같은 실시간 스트림을 이용한 수집
2)데이터 수집
=>읽어온 데이터를 데이터웨어하우스에 저장하는 작업
=>예전에는 데이터 센터의 데이터베이스에 저장하는 것이 일반적이었지만 최근에는 클라우드 환경에 많이 저장
2.MySQL 의 데이터를 S3 버킷에 csv 파일로 저장
1)추출 방법
=>전체 추출: 스냅샷을 이용한 추출
마지막 데이터의 복사본 저장
=>증분 추출: 트랜잭션을 이용한 추출
첫번째 데이터와 변경 내용 저장
2)전체 추출 과 증분 추출의 트레이드 오프
=>마지막 데이터의 결과를 복원하고자 하는 경우는 전체 추출이 빠름
=>전체 추출은 중간 과정을 저장 할 수 없음
3)추출 준비
=>사용할 MySQL과 S3 버킷 1개 준비
4)샘플 데이터 생성
CREATE TABLE ORDERS(
OrderId int,
OrderStatus varchar(30),
LastUpdated timestamp
);
INSERT INTO ORDERS
VALUES (1,'BackOrdered','2020-06-01 12:00:00');
5)전체 추출하는 python 코드 작성
=>패키지 설치
pymysql
configparser(설정 정보를 사용하기 위한 라이브러리)
=>pipeline.conf 파일을 생성하고 MySQL 접속 정보를 저장
[mysql_config]
import pymysql
import csv
import configparser
parser = configparser.ConfigParser()
parser.read('pipeline.conf')
hostname = parser.get('mysql_config','hostname')
port = parser.get('mysql_config','port')
username = parser.get('mysql_config','username')
password= parser.get('mysql_config','password')
conn = pymysql.connect(host=hostname, user=username, password = password, port=int(port))
if conn is None:
print("mysql 데이터베이스 접속 에러")
else:
print("데이터베이스 연결 성공")
m_query = ' SELECT * FROM VOD;'
m_cursor = conn.cursor()
m_cursor.execute(m_query)
results = m_cursor.fetchall()
local_filename = 'order_extract.csv'
with open(local_filename,'w') as fp:
csv_w = csv.writer(fp,delimiter="|")
csv_w.wrierows(results)
m_cursor.close()
conn.close()
=>s3 사용을 위한 패키지 설치:boto3
=>pipeline.conf 파일을 생성하고 S3 버킷 접속 정보를 저장
=>파이썬 코드 수정
import pymysql
import csv
import configparser
import boto3
from botocore.exceptions import ClientError
#환경 설정 파일의 내용 읽기
parser = configparser.ConfigParser()
parser.read("pipeline.conf")
hostname = parser.get("mysql_config", "hostname")
port = parser.get("mysql_config", "port")
username = parser.get("mysql_config", "username")
dbname = parser.get("mysql_config", "database")
password = parser.get("mysql_config", "password")
conn = pymysql.connect(host=hostname,
user=username,
password=password,
db=dbname,
port=int(port))
if conn is None:
print("데이터베이스 접속 에러")
else:
print("데이터베이스 연결 성공")
m_query = "SELECT * FROM ORDERS;"
m_cursor = conn.cursor()
m_cursor.execute(m_query)
results = m_cursor.fetchall()
local_filename = "order_extract.csv"
with open(local_filename, 'w') as fp:
csv_w = csv.writer(fp, delimiter="|")
csv_w.writerows(results)#파일 버퍼에 기록
#파일 버퍼에 기록한 내용은 파일이 닫힐 때 실제 파일로 옮겨집니다.
m_cursor.close()
#파일의 내용을 기록하기 위해서 close
fp.close()
#버킷 설정
access_key = parser.get("aws_boto_credentials", "access_key")
secret_key = parser.get("aws_boto_credentials", "secret_key")
bucket_name = parser.get("aws_boto_credentials", "bucket_name")
s3 = boto3.client('s3', aws_access_key_id = access_key,
aws_secret_access_key=secret_key)
#업로드
s3_file = local_filename
try:
response = s3.upload_file(local_filename, bucket_name, s3_file)
except:
print("업로드 실패")
conn.close()
=>데이터베이스 수정
update ORDERS set OrderStatus='Shipped', LastUpdated='2020-07-11 3:05:00' where OrderId=1;
INSERT INTO ORDERS
VALUES(2, 'Shipped', '2020-07-01 12:00:00');
=>추출 프로그램을 다시 수행
전체 추출 또는 스냅샷을 백업을 하게 되면 마지막 결과 만을 보존합니다.
6)증분 추출
=>알고리즘
데이터를 추출할 때 가장 마지막 업데이트 된 데이터의 타임 스탬프를 찾아서 기록
데이터를 추출할 때 가장 마지막 업데이트 된 데이터의 타임 스탬프를 읽어서 이 데이터 이후에 추가된 데이터만 읽어서 기존의 파일에 추가
=>파이썬 코드 수정
import pymysql
import csv
import configparser
import boto3
from botocore.exceptions import ClientError
#환경 설정 파일의 내용 읽기
parser = configparser.ConfigParser()
parser.read("pipeline.conf")
hostname = parser.get("mysql_config", "hostname")
port = parser.get("mysql_config", "port")
username = parser.get("mysql_config", "username")
dbname = parser.get("mysql_config", "database")
password = parser.get("mysql_config", "password")
#버킷 설정
access_key = parser.get("aws_boto_credentials", "access_key")
secret_key = parser.get("aws_boto_credentials", "secret_key")
bucket_name = parser.get("aws_boto_credentials", "bucket_name")
s3 = boto3.client('s3', aws_access_key_id = access_key,
aws_secret_access_key=secret_key)
#증분 추출을 위한 쿼리 생성
m_query = None
#마지막 타임스탬프를 읽기(파일 다운로드 받기)
with open('lastread.csv', 'wb') as f:
try:
s3.download_fileobj(bucket_name, 'lastread.csv', f)
except:
pass
with open('order_extract.csv', 'wb') as f:
try:
s3.download_fileobj(bucket_name, 'order_extract.csv', f)
except:
pass
with open('lastread.csv', 'rb') as f:
x = f.readline()
if len(x) > 0:
m_query = "SELECT * FROM ORDERS WHERE LastUpdated > '" + x.decode('utf-8') + "' ORDER BY LastUpdated;"
else:
m_query = "SELECT * FROM ORDERS ORDER BY LastUpdated;"
conn = pymysql.connect(host=hostname, user=username, password=password, db=dbname, port=int(port))
if conn is None:
print("데이터베이스 접속 에러")
else:
print("데이터베이스 연결 성공")
print(m_query)
m_cursor = conn.cursor()
m_cursor.execute(m_query)
results = m_cursor.fetchall()
local_filename = "order_extract.csv"
#파일에 추가 모드로 기록
with open(local_filename, 'a') as fp:
csv_w = csv.writer(fp, delimiter="|")
csv_w.writerows(results)#파일 버퍼에 기록
with open('lastread.csv', 'w') as fp1:
fp1.write(str(results[-1][-1]))
fp1.close()
m_cursor.close()
fp.close()
s3_file = local_filename
try:
s3.upload_file(local_filename, bucket_name, s3_file)
s3.upload_file('lastread.csv', bucket_name, 'lastread.csv')
except:
print("업로드 에러")
conn.close()
=>업데이트 와 삽입 작업을 수행한 후 프로그램을 다시 실행시키고 파일을 확인
update ORDERS set OrderStatus='Backordered', LastUpdated='2020-07-21 3:05:00' where OrderId=2;
INSERT INTO ORDERS
VALUES(3, 'Shipped', '2020-07-27 12:00:00');