1.Data Pipeline
1)개요
=>다양한 소스에서 새로운 가치를 얻을 수 있는 대상으로 데이터를 옮기고 변환하는 일련의 과정 - 분석이나 리포팅 그리고 머신러닝 능력의 기초가 됨
=>원본 데이터의 크기와 상태 그리고 구조 및 분석 프로젝트의 요구사항에 따라 파이프라인은 달라짐
=>가장 단순한 형태의 파이프라인은 REST API처럼 단일소스에서 데이터를 추출하거나 데이터 웨어하우스 SQL테이블와 같은 대상으로 데이터를 로드하는 것이지만 실제 파이프라인은 데이터 추출 과 데이터 가공 그리고 데이터 유효성 검사를 포함한 여러 단계로 구성이 되고 때로는 데이터를 최종 목적지로 전달하기 전에 머신러닝 모델을 학습하거나 실행하는 단계를 수행하기도 합니다.(example :KNN으로 결측치 제거)
=>파이프라인에는 여러 시스템과 프로그래밍 언어의 작업이 포함되는 경우가 많고 데이터 팀은 일반적으로 종속성을 공유하고 조정해야 하는 수많은 데이터 파이프라인을 소유하고 유지
2)구축하는 이유
=>분석 외부에 있는 사람들은 자신에게 보이는 것을 생성하기 위해서 보이지 않은 복잡한 기계가 있다는 것을 인식하지 못하는데 데이터 분석가가 생성하는 대시보드와 통찰력 그리고 데이터 과학자가 개발한 각 예측 모델에는 뒤에서 작동하는 데이터 파이프라인이 있음
=>원본 데이터는 정리,정형화,정규화,결합,집계 그리고 떄로는 마스킹 또는 보안을 위해서 정제가 되어야 하므로 미리 파이프라인을 구축해서 이러한 작업을 자동화 시켜야 합니다.
=>데이터 분석가 및 데이터 과학자가 들어오는 프로젝트마다 별도로 데이터를 찾고 조달할 필요가 없도록 파이프라인을 구성할 필요가 있음
파이프라인을 제대로 구축하지 않으면 데이터 처리와 다양한 정보 출처 그리고 데이터 준비에 분석 재능을 낭비할 위험이 큼
3)Data Engineer
=> 클라우드 컴퓨팅하고 SaaS(Software as a Service)가 대중화 되면서 조직해서 파악해야할 데이터 소스가 폭발적으로 증가하고 있고 머신러닝 모델과 데이터 과학 연구 그리고 시간에 민감한 통찰력을 제공하는 데이터에 대한 수요도 높아지면서 이에 대응하기 위한 전문가가 필요
=>데이터 엔지니어링은 단순히 데이터를 데이터 웨어하우스에 로드하는 것이 아니며 데이터 과학자 및 분석가와 긴밀히 협력해서 데이터를 어떻게 처리해야 하는지 파악하고 요구 사항을 확장 가능한 프로덕션 상태로 전환하도록 도움을 주어야 함
=>갖추어야 할 기술
SQL과 데이터 웨어하우징 기술: 필요한 데이터를 추출하는 기술
Python,Java,Go 등의 프로그래밍 언어
분산컴퓨팅
기본시스템 관리 - 리눅스
목표 지향적 사고 방식
4)최신 데이터 인프라의 핵심 구성 요소
=>클라우드 데이터 웨어하우스 와 데이터 레이크
=>데이터 수집 도구
=>워크플로우 오케스트레이션 플랫폼: 스케쥴링
=>모델링 도구 및 프레임워크
5)Data Source 의 다양성
=>소스 시스템 소유권
-분석 팀은 조직이 구축하고 소유한 소스 시스템과 타사 도구 및 공급업체에서 데이터를 수집한 데이터를 사용하는 것이 일반적
-전자상거래 업체의 경우 고객 장바구니 데이터를 애플리케이션과 연결해서 내부 데이터베이스에 저장할 수 있고 Google Analytics와 같은 타사 웹 분석 도구를 이용하기도 함
-내부 데이터의 경우는 엑세스 제한이 없는 경우가 많지만 외부 데이터의 경우는 엑세스 제한을 거는 경우가 많은데 예를 들면 REST API를 통한 접근은 가능하지만 SQL 데이터베이스 형태의 데이터에 까지 직접 엑세스 할 수 있게 해주는 공급업체는 거의 없음
-데이터를 제공할 때 점진적으로 로드시켜주어도 되는지에 대해서 확인
=>수집 인터페이스
-RDBMS
-NoSQL
-REST API 와 같은 시스템 상단의 추상화 계층
-Apache Kafka 와 같은 스트림 처리 플랫폼
-CSV 와 같은 플랫 파일을 포함하는 공유 네트워크나 클라우드 스토리지
-데이터 웨어하우스 또는 데이터 레이크
=>데이터 구조
-RDBMS
-REST API 나 NoSQL 들이 제공하는 JSON
-반정형화된 로그 데이터:csv 와 같은 플랫 파일
-Kafka 와 같은 도구의 스트림 출력
=>데이터 사이즈
-데이터 엔지니어와 분석가들은 대규모 데이터 세트를 좋아하지만 실제로 분석을 수행할 때는 작은 데이터가 중요
-데이터 파이프라인에서는 데이터를 대용량과 소용량으로만 나누지 않고 스펙트럼 측면에서 생각하는 것이 중요
=>데이터 클렌징 작업과 유효성 검사
-지저분한 데이터의 특성
중복되거나 모호한 레코드
고립된 데이터
불완전하거나 누락된 데이터
텍스트 인코딩 오류
형식이 일치하지 않은 경우
레이블이 없거나 레이블이 일치하지 않은 경우
-접근 방식
최악을 가정하고 최상을 기대
가장 적합한 시스템에서 데이터를 정리하고 검증
예전에는 ETL (추출 - 변환 - 로드) 를 권장했지만 지금은 ELT(추출 - 로드 - 변환)을 권장
지연 시간이나 대역폭을 고려해야 함
6)클라우드 데이터 웨어하우스 및 데이터 레이크
=>퍼블릭 클라우드의 등장이 변화시킨 데이터 웨어하우스 환경을 변화시킨 요소
-클라우드 공급업체에서 관리해주는 관리 서비스가 주류로 등장
-지속적인 클라우드 내 스토리지 비용 감소
-Amazon Redshift, Snowflake 및 Google Big Query 와 같은 확장성이 뛰어나 열 기반의 데이터베이스의 등장
=>데이터 웨어하우스
-사용자가 원하는 질문에 대답할 수 있는 데이터 분석 활동을 지원하기 위해 서로 다른 시스템의 데이터가 모델링되서 저장되는 데이터베이스
=>데이터 레이크
-데이터 웨어하우스처럼 데이터 구조나 쿼리 최적화가 필요없는 저장소
-서로 다른 종류의 여러 데이터가 같이 존재할 수 있음
7)데이터 수집 도구
=>Singer
=>Stitch
=>Fivetan
8)데이터 변환 및 모델링 도구
=>변환은 Python과 같은 프로그래밍 언어나 SQL 을 사용
9)workflow 오케스트레이션 플랫폼
=>파이프라인에서 스케쥴링 및 흐름을 관리
직접 프로그래밍 언어만으로 수행하는 것이 어려워서 등장
=>Apache Airflow, Luigi, AWS Glue 와 같은 플랫폼은 일반적인 용도로 설계되어 다양한 데이터 파이프라인에 사용
=>Kubeflow Pipeline 은 Docker 컨테이너에 구축된 머신러닝 워크플로우
2.Kafka 개요
1)등장 배경
=>Linked IN에서 파편화된 데이터 수집 및 분배 아키텍쳐를 운영하기 위해서 생성
데이터를 전송하는 라인이 기하 급수적으로 증가하면서 복잡해지면 소스코드 및 버전 관리에서 이슈가 발생
=>카프카는 각각의 애플리케이션 까지 연결해서 데이터를 처리하는 것이 아니고 한곳에 모아 처리할 수 있는 중앙 집중화 방식을 사용하는 것이 가능
-웹사이트, 애플리케이션, 센서 등에서 취합한 데이터 스트림을 한곳에서 실시간으로 관리할 수 있음
=>카프카를 중앙에 재치해서 소스 애플리케이션과 타깃 애플리케이션 사이의 의존도를 최소화하여 커플링을 완화
=>소스 애플리케이션은 카프카에게 데이터를 보내기만 하고 카프카 내부에서 타깃으로 보내는 부분을 수행
2)기본 구조
프로듀서 <-> 토픽(파티션 - 메세지가 다수 존재) <-> 컨슈머
3)특징
=>데이터 포맷에 제한이 없음
=>직렬화, 역직렬화를 통해서 ByteArray 로 통신하기 때문에 자바에서 지원하는 모든 객체를 지원
=>카프카는 서버가 3대 이상으로 구성된 클러스터를 이용
=>카프카는 오픈 소스라서 https://github.com/apache/kafka 에 소스코드를 공개
=>기능을 추가하거나 이슈를 알려주고자 하는 경우 KIP를 통해서 가능
4) 빅데이터 파이프라인에서 카프카의 역할
=>데이터 파이프라인을 안정적이고 확장성 높게 운영하기 위한 방법
5) 사용 이유
=>높은 처리량
=>확장성이 뛰어남
=>데이터를 파일로 저장 - 영속성
=>고가용성: 일반적으로 3대 이상의 서버로 운영
3.설치
1)Docker를 이용하는 방법
=>현재 이미지 다운로드가 잘 안됨(업데이트 중이라고 알려짐)
2)Linux Server에 직접 설치: ubuntu EC2에 설치
=>EC2의 인바운드 규칙에 2181(zookeeper: 코디네이터 - 여러 개의 카프카 서버를 관리) 번 과 9092 번 포트(카프카가 사용하는 포트)를 외부에서 접속할 수 있도록 허용
=>JDK 설치(Kafka가 Java로 개발되서 Java Runtime Environment 가 설치되어 있어야 합니다.)
sudo apt-get update
sudo apt-get install openjdk-17-jdk
java -version(실행 환경 확인이며 javac 는 개발 환경 확인)
=>카프카 설치
- 바이너리 다운로드: wget https://archive.apache.org/dist/kafka/3.6.0/kafka_2.13-3.6.0.tgz
- 압축 해제: tar xvf kafka_2.13-3.6.0.tgz
- 압축 해제 확인(파일 이름으로 된 디렉토리 확인): ll
- kafka를 관리자 계정으로 이동: sudo mv kafka_2.13-3.6.0 /opt/kafka
- 환경 변수를 추가: kafka 의 bin 디렉토리를 PATH에 추가해서 명령어를 단순하게 사용하기 위해서
nano ~/.bashrc 로 파일을 열어서
export KAFKA_HOME=/opt/kafka
export PATH=$PATH:$KAFKA_HOME/bin
를 작성하고 저장한 후 빠져 나와서 source ~/.bashrc
- 카프카 브로커 힙 메모리 설정
카프카 브로커는 메시지의 내용을 페이지 캐시로 시스템 메모리를 상요해서 저장합니다.
기본적으로 카프카 브로커는 1GB 그리고 주키퍼는 512MB를 사용하는데 EC2 기본 인스턴스의 메모리는 1GB 인데 이렇게 되면 힙 메모리 할당 오류(Cannot allocate memory)가 발생
기본 메모리가 2GB가 안되는 경우에는 힙 메모리 크기를 수정해야 합니다.
nano ~/.bashrc 명령으로 텍스트 에디터로 접속해서
export KAFKA_HEAP_OPTS="-Xmx400m -Xms400m"
라고 입력한 후 source ./bashrc로 적용한 후 echo $KAFKA_HEAP_OPTS 명령으로 확인
- 외부에서 접속할 수 있도록 설정을 변경: kafka 디렉토리에 있는 config/server.properties에서 작업
nano /opt/kafka/config/server.properties 명령으로 에디터로 문서를 열고
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://IP주소:9092
delete.topic.enable=true
auto.create.topics.enable=true
=>실행
- 주키퍼 실행
zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties
jps -vm
- 카프카 실행
kafka-server-start.sh -daemon /opt/kafka/config/server.properties
jps -m
3)카프카 브로커 실행
=>로그 확인
tail -f /opt/kafka/logs/server.log
=>토픽 생성
kafka-topics.sh --create --bootstrap-server localhost:9092 --topic ec2topic
kafka-topics.sh --bootstrap-server localhost:9092 --list
=>토픽에 메시지 전송
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic ec2topic
=>토픽에서 메시지 받기
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic ec2topic --from-beginning
4.기본 개념
1)브로커, 클러스터, 주키퍼
=>브로커: 카프카 서버 1대
=>클러스터: 동일한 기능을 수행하는 브로커의 모임
=>주키퍼: 클러스터를 관리하는 애플리케이션
=>하나의 브로커에 메시지를 전송하면 클러스터로 묶인 모든 브로커에 메시지는 복제가 되고 컨슈머에게 전달할 때 2개의 메시지가 전달되지 않도록 주키퍼가 관리를 합니다.
=>하나의 리더 와 팔로워로 브로커들을 구성해서 기본적으로 리더가 작업하고 리더에 문제가 발생하면 다른 팔로워가 리더로 변경되서 장애에 대비합니다.
2)토픽 과 파티션
=>토픽은 카프카에서 데이터를 구분하기 위해 사용하는 단위
=>파티션은 프로듀서가 보낸 데이터들이 저장된 것으로 레코드라고 부르고 1개의 토픽은 0개 이상의 파티션으로 구성됩니다.
=>카프카의 브로커는 컨슈머가 어떤 파티션까지 데이터를 가져갔는지 오프셋을 저장하고 있습니다.
=>파티션은 Queue 자료구조 와 유사합니다.
순서대로 처리됩니다.