본문 바로가기
Study/Data Engineering

데이터 변환

by 왕방개 2024. 5. 22.

1.개요

=>ELT(Extract Load Transfom) 패턴에서 데이터가 데이터 레이크 (필터링 되지 않은 데이터) 또는 데이터 웨어하우스( 나에게 필요한 - 필터링이 된 데이터, 데이터 웨어하우스가 규모가 작으면 데이터 마트)로 수집되면 파이프라인의 다음 단계는 데이터 변환

 

=>데이터 변환에는 비문맥적 데이터 조작과 비지니스 컨텍스트 및 논리를 염두에 둔 데이터 모델이 모두 포함됨

 

=>파이프라인의 목적이 비지니스 통찰력 또는 분석을 하는 것이라면 비문맥적 변환 외에 데이터 모델링 추가 되어야 합니다

 

=>데이터 모델은 데이터 분석을 위해 이해되고 최적화된 방식으로 데이터를 정형화하고 정의하는 것

=>데이터 모델은 데이터 웨어하우스에서 하나 이상의 테이블로 표시

DataFrame 을 공부하는 이유
- DataFrame이나 Spark는 분석은 할 수 있으나 분석이 목적이 아님
- 다른데 있는 데이터를 메모리로 가져옴 => 빠르니까

 

=>데이터 엔지니어가 파이프라인에서 비문맥적 변환을 구축하기도 하지만 데이터 분석가나 분석 엔지니어가 데이터 변환을 처리하는 것이 일반적

ELT 패턴과 SQL을 기본 언어로 해서 설계된 지원도구나 프레임워크 덕분에 비문맥적 변환이 쉬어짐

 

=>변환 코드는 SQL로 작성하고 파이썬 라이브러리(Pandas)를 사용하여 데이터 수집에 비문맥적 변환을 결합하는 것이 적절

 

2.비문맥적 변환

1)SQL을 이용해서 중복 데이터 제거

=>샘플 데이터 생성

CREATE TABLE Orders(
OrderId int,
OrderStatus varchar(20),
LastUpdated timestamp
);

INSERT INTO Orders VALUES(1, 'Backordered', '2020-06-01');
INSERT INTO Orders VALUES(1, 'Shipped', '2020-06-09');
INSERT INTO Orders VALUES(2, 'Shipped', '2020-07-01');
INSERT INTO Orders VALUES(1, 'Shipped', '2020-06-09');
INSERT INTO Orders VALUES(3, 'Backordered', '2020-07-11');

 

=>Query Sequence를 이용해서 모든 컬럼의 값이 동일한 데이터 찾아서 제거

- 모든 컬럼의 값이 동일한 데이터 찾기

SELECT OrderId, OrderStatus, LastUpdated, COUNT(*) AS dup_count
FROM Orders
GROUP BY OrderId, OrderStatus, LastUpdated
HAVING COUNT(*) > 1;

 

- 중복 데이터를 제거할 때 원본 데이터에 직접 작업하는 것은 위험합니다.
원본 데이터에서 필요한 데이터만 추출한 후 다시 원본 데이터에 삽입하는 것이 좋습니다.

 

-- 데이터의 중복을 제거하고 새로운 테이블을 생성

CREATE TABLE distinct_orders AS 
SELECT DISTINCT OrderId, OrderStatus, LastUpdated
FROM Orders;

 

-- 원본 테이블의 데이터 삭제

TRUNCATE TABLE Orders;

 

-- 중복을 제거한 데이터를 Orders 에 삽입

INSERT INTO Orders
SELECT * FROM distinct_orders;

 

-- 복사한 테이블 삭제

DROP TABLE distinct_orders;

 

-- 확인

SELECT * FROM Orders;

=>Window Function을 이용해서 중복을 제거
- 중복 행을 그룹화하고 행 번호를 할당하여 삭제할 행 과 유지할 행을 식별하는 방식
- ROW_NUMBER 함수를 이용해서 레코드의 순위를 지정하고 PARTITION BY 문을 사용해서 열 별로 레코드를 그룹화하면 둘 이상의 일치 항목이 있는 모든 레코드 그룹에 1보다 큰 ROW_NUMBER 가 할당

 

- 중복 데이터 생성

TRUNCATE TABLE Orders;

INSERT INTO Orders VALUES(1, 'Backordered', '2020-06-01');
INSERT INTO Orders VALUES(1, 'Shipped', '2020-06-09');
INSERT INTO Orders VALUES(2, 'Shipped', '2020-07-01');
INSERT INTO Orders VALUES(1, 'Shipped', '2020-06-09');
INSERT INTO Orders VALUES(3, 'Backordered', '2020-07-11');

 

-- 중복된 데이터에 dup_count를 일련번호 형태로 할당

CREATE TABLE all_orders AS
SELECT
	OrderId,
	OrderStatus,
	LastUpdated,
	ROW_NUMBER() OVER(PARTITION BY OrderId, OrderStatus, LastUpdated)
	AS dup_count
FROM Orders;

 

-- Orders 데이터 삭제

TRUNCATE TABLE Orders;

 

-- 첫번째 데이터만 Orders 에 복사

INSERT INTO Orders(OrderId, OrderStatus, LastUpdated)
SELECT
	OrderId, OrderStatus, LastUpdated
FROM all_orders
WHERE dup_count = 1;

 

-- 복사본 테이블 삭제

DROP TABLE all_orders;

 

-- 확인

SELECT *
FROM Orders;

 

 

2)URL Parsing

=>개요

- URL의 Segment를 분석
- 클라이언트가 서버에 접근했을 때 접근한 URL을 분석하기 위해서 분해하는 작업
- 도메인 과 URL 경로 그리고 파라미터를 추출
파라미터 중에 UTM(Urchin Tracking Module)은 마케팅 및 광고 캠페인을 추적하는데 사용되는 매개변수로 대부분의 플랫폼 과 조직에서 공통적으로 사용
- python으로 수행

 

=>URL 파싱: URL을 분해해서 csv 파일로 저장

- 분석할 URL: https://www.mydomain.com/page-name?utm_content=textlink&utm_medium=social

 

- 패키지 설치: urllib3

- python 파일을 생성하고 작성

from urllib.parse import urlsplit, parse_qs
import csv

#분석할 URL
url = "https://www.mydomain.com/page-name?utm_content=textlink&utm_medium=social"

#URL 분리
split_url = urlsplit(url)
#print(split_url)

#파라미터 분리
params = parse_qs(split_url.query)
#print(params)

#url parsing 결과를 저장할 list 생성
parsed_url = []
all_urls = []

parsed_url.append(split_url.netloc)
parsed_url.append(split_url.path)
parsed_url.append(params['utm_content'][0])
parsed_url.append(params['utm_medium'][0])

all_urls.append(parsed_url)
export_file = "urlparsing.csv"

with open(export_file, 'w') as f:
    csvw = csv.writer(f, delimiter='|')
    csvw.writerows(all_urls)
f.close()

 

=>SQL로 이 작업을 수행하는 것이 가능한데 어려운 작업
데이터 웨어하우스 중에서 Snowflake 같은 경우 URL Parsing을 위한 함수를 제공

 

3.변환 시점

=>비지니스 컨텍스트가 없는 데이터 변환은 기술적인 관점에서 데이터 수집 중 또는 후에 실행할 수 있음
=>EtLT 패턴에서는 수집 프로세스의 일부로 실행하는 것을 고려
 - URL Parsing의 경우는 데이터 웨어하우스에서 작업하는 것이 어렵기 때문에 데이터를 수집하는 파이썬 코드에서 수행하는 것이 유리
 - ELT 패턴에서 수집 후 수행하는 변환은 일반적으로 SQL에 능숙한 데이터 분석가가 수행하고 수집 단계에서 수행하는 변환은 데이터 엔지니어가 수행
 - 가능한 파이프라인 초기에 데이터 품질을 해결하는 것이 가장 좋음
 - 데이터를 수집하는 시점에서 수정할 수 있다면 분석가는 전처리에 시간 낭비를 할 필요가 없음
=>비지니스 로직 과 관련된 변환의 경우는 수집과 별도로 수행하는 것을 권장 - 데이터 모델링

 

4.데이터 모델링

=>파이프라인 작업 중 ELT 패턴의 변환 단계에서 비지니스 컨텍스트가 고려되는 곳

=>데이터 모델
 - 데이터 웨어하우스의 하나의 테이블
 - 데이터 모델의 속성
   측정: 측정하고 싶은 것으로 고객의 수 나 수익의 합
   속성: 보고서 또는 대시보드에서 필터링 하거나 그룹화하려는 항목
 - 세분화

=>완전히 새로 고침된 데이터 모델
 - 개요
 소스 데이터 저장소의 최신 상태가 포함된 테이블을 다루는 모델
 전체 이력이 아닌 최신의 레코드만 사용
 이전에 사용한 데이터가 없음
 분석가의 니즈가 있는 경우 니즈에 맞는 테이블을 생성

 

 

 - 실습을 위한 샘플 데이터

DROP TABLE Orders;

 CREATE TABLE Orders(
	OrderId int,
	OrderStatus varchar(30),
	OrderDate timestamp,
	CustomerId int,
	OrderTotal numeric
);

INSERT INTO Orders VALUES(1, 'Shipped', '2020-06-09', 100, 50.05);
INSERT INTO Orders VALUES(2, 'Shipped', '2020-07-11', 101, 57.45);
INSERT INTO Orders VALUES(3, 'Shipped', '2020-07-12', 102, 135.99);
INSERT INTO Orders VALUES(4, 'Shipped', '2020-07-12', 100, 43.00);

CREATE TABLE Customers(
	CustomerId int,
	CustomerName varchar(20),
	CustomerCountry varchar(10)
);

INSERT INTO Customers VALUES(100, '아담', 'USA');
INSERT INTO Customers VALUES(101, '제시카', 'UK');
INSERT INTO Customers VALUES(102, '헌트', 'UK');

 

- 데이터의 개수가 적은 경우에는 니즈 별로 테이블을 만들 필요가 없지만 데이터 볼륨이 큰 경우에는 니즈 별로 별도의 테이블을 만들어 두는 것이 좋습니다.

 - 문제
   요구 사항
      특정 월에 특정 국가에서 발생한 주문으로 발생한 수익은 얼마인가?
      특정 날짜에 주문이 몇 개가 들어오는가?

   필요한 측정 값
      총 수익
      주문 개수
 
   데이터 필터링 기준 또는 그룹화 할 항목
      주문 국가
      주문 날짜
  
   시간 단위
      일

 

- 테이블 생성

CREATE TABLE order_summary_daily(
	order_date date,
	order_country varchar(10),
	total_revenue numeric,
	order_count int
  );

 

- 데이터 모델 생성

INSERT INTO order_summary_daily(
	order_date, order_country, total_revenue, order_count)
   SELECT 
	o.OrderDate as order_date,
	c.CustomerCountry as order_country,
	SUM(o.OrderTotal) as total_revenue,
	COUNT(o.OrderId) as order_count
   FROM Orders o, Customers c
   WHERE c.CustomerId = o.CustomerId 
   GROUP BY o.OrderDate, c.CustomerCountry;

 

 - 확인

  SELECT *
  FROM order_summary_daily;

 

- 특정 월에 특정 국가에서 발생한 주문으로 발생한 수익은 얼마인가?

SELECT 
	MONTH(order_date) as order_month,
	order_country,
	SUM(total_revenue) as order_revenue
FROM
	order_summary_daily
GROUP BY
	MONTH(order_date),
	order_country
ORDER BY
	MONTH(order_date),
	order_country;

 

- 특정 날짜에 주문이 몇 개가 들어오는가?

SELECT
	order_date,
	SUM(order_count) as total_orders
 FROM
	order_summary_daily
 GROUP BY
	order_date
 ORDER BY
	order_date;

 

- 완전히 새로 고침 된 데이터의 차원을 천천히 변경
  데이터 전체를 새로 고침해서 가지고 있으면 기존 데이터에 대한 변경 사항을 덮어쓰기 때문에 종종 과거 변경 사항을 추적하기 위헤서 고급 데이터 모델링 개념이 사용됨
   
  CustomerId 가 100인 데이터에 대해서 국가가 USA 인데 UK 값으로 변경된 경우 OrderId를 4로 해서 데이터를 추가하면 이 데이터의 국가는 USA 가 UK 가 됩니다.

  주문 내역을 분석할 때 잘못하면 최신의 데이터를 적용해서 USA에서 주문한 것이 UK에서 주문한 것으로 될 수 있음
  이 문제를 해결할 때 증분된 데이터를 이용하면 쉽게 해결

  완전히 새로 고침 된 데이터를 사용하면 각 수집 사이에 Customers 테이블의 전체 기록을 유지하고 이러한 변경 사항을 스스로 추적해야 하는데 이를 수행하는 방법은 Kimball 모델링에 정의되어 있고 천천히 변화하는 차원 또는 SCD(Slowly Changing Dimension) 라고 합니다.

 - 변경 가능성이 있는 데이터는 증분 백업을 이용해서 작업을 하는 것을 고려

 

- 회원 정보의 변경 내역을 가지고 있는 테이블을 생성해서 데이터를 수정

CREATE TABLE Customers_scd(
	CustomerId int,
	CustomerName varchar(20),
	CustomerCountry varchar(10),
	ValidFrom datetime,
	Expired datetime
);

 

 

 -- 회원 정보를 수정

INSERT INTO Customers_scd
  VALUES(100, '아담', 'USA', '2019-05-01', '2020-06-20 8:15:34');
  
  INSERT INTO Customers_scd
  VALUES(100, '아담', 'UK', '2020-06-20', '2199-06-20 8:15:34');

 

-- 주문을 했을 때의 회원 정보를 가져오기

SELECT 
	o.OrderId,
	o.OrderDate,
	c.CustomerName,
	c.CustomerCountry
	
 FROM 
	Orders o,
	Customers_scd c
 WHERE
	o.CustomerId = c.CustomerId
	AND o.OrderDate BETWEEN c.ValidFrom AND c.Expired
 ORDER BY
	o.OrderDate;

 

- SCD 구현에 관심이 있으면 Kimball의 모델링을 학습

=>변경이 가능한 데이터의 경우 전체 데이터를 복제하는 것 보다는 증분 수집된 데이터 모델링을 사용하는 것이 효율적


=>추가 전용 데이터 모델링
 - 추가 전용 데이터는 데이터 웨어하우스로 수집되는 변경할 수 없는 데이터
 - 대표적인 데이터는 페이지 조회 기록 같은 경우
 - 수행하는 방법은 새로 고침된 데이터 모델링 과 동일한 방식으로 수행을 하지만 수정할 수 없다는 점이 다름
이런 경우 관계형 데이터베이스의 경우 뷰를 이용하는 것도 하나의 방법

'Study > Data Engineering' 카테고리의 다른 글

Airflow  (0) 2024.05.28
데이터 수집  (0) 2024.05.21
Kafka(2)  (0) 2024.05.12
Kafka(1)  (0) 2024.05.09