본문 바로가기
Study/Data Engineering

Kafka(2)

by 왕방개 2024. 5. 12.

**Kafka - Message Broker
=>구독 과 게시의 개념을 가지고 한쪽에서 구독 신청을 하면 다른 한쪽에 특정 이벤트가 발생할 때 메시지를 발행해서 게시를 신청한 쪽에 메시지를 전달하는 시스템
=>메시지를 데이터로 활용해서 여러 데이터 발생지로부터 데이터를 받아서 타겟에게 전달하는 용도로 사용할 수 있고 특정 이벤트가 발생했을 때 작업을 순차적으로 수행하도록 사용할 수 도 있습니다.
=>CQRS를 구현한 프로젝트에서 데이터 업데이트를 수행하는 애플리케이션 과 데이터 조회를 수행하는 애플리케이션 사이의 데이터 불일치 문제를 해결하기 위해서 데이터에 업데이트가 발생하면 업데이트 된 데이터를 조회 애플리케이션에게 전송해서 조회 애플리케이션의 데이터 불일치를 문제를 해결하기 위해서 사용

 

1.카프카 서버 구동

=>주키퍼 실행
zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties

jps -vm

=>카프카 실행
kafka-server-start.sh -daemon /opt/kafka/config/server.properties

jps -m

=>로그 확인
tail -f /opt/kafka/logs/server.log


=>토픽 생성
/opt/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic sampletopic

=>토픽 전송
/opt/kafka/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic sampletopic

=>토픽 전송 받기
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sampletopic --from-beginning

 

2.Python 에서 Kafka 와 메시지 주고 받기

1)패키지 설치
kafka-python

 

2)메시지를 전송하는 파일을 만들고 실행

=>pythonproducer.py 파일 생성

from kafka import KafkaProducer
import json

class MessageProducer:
    def __init__(self, broker, topic):
        self.broker = broker
        self.topic = topic

        self.producer = KafkaProducer(
            bootstrap_servers = self.broker,
            value_serializer=lambda x:json.dumps(x).encode("utf-8"),
            acks=0,
            api_version=(2, 5, 0),
            key_serializer=str.encode,
            retries=3,
        )
        print(self.producer)

    def send_message(self, msg, auto_close=True):
        try:
            #메시지 전송
            future = self.producer.send(self.topic, value=msg, key="key")
            #프로듀서 초기화
            self.producer.flush()
            if auto_close:
                self.producer.close()
            
            future.get(timeout = 2)
            return {"status_code":200, "error": None}
        except Exception as exc:
            raise exc

#Producer 생성
broker = ["43.203.210.222:9092"]
topic = "sampletopic"
pd = MessageProducer(broker, topic)

#메시지 전송
msg = {"name":"군계", "age":34}
res = pd.send_message(msg)
print(res)

 

3)메시지를 전송받는 파일을 작성하고 실행

from kafka import KafkaConsumer
import json

class MessageConsumer:
    def __init__(self, broker, topic):
        self.broker = broker

        self.consumer = KafkaConsumer(
            topic,
            bootstrap_servers = self.broker,
            value_deserializer=lambda x : x.decode("utf-8"),
            group_id="my-group",
            auto_offset_reset = "earliest",
            enable_auto_commit=True
        )

    def receive_message(self):
        try:
            for message in self.consumer:
                print(message.value)
                '''
                result = json.loads(message.value)
                for k, v in result.items():
                    print(k, ":", result[k])
                print(result["name"])
                print(result["age"])
                '''
        except Exception as exc:
            raise exc
    
broker = ["43.203.210.222:9092"]
topic = "sampletopic"
pd = MessageConsumer(broker, topic)
pd.receive_message()

 

3.Spring Boot 에서의 Kafka

1)프로젝트 생성
=>Lombok, Web, Kafka 의 의존성을 추가해서 생성

 

2)application.propertise 파일의 이름을 application.yml 파일로 변경하고 작성

spring:
  kafka:
    bootstrap-servers: 43.203.210.222:9092
    consumer:
      group-id: adamsoft
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

 

3)KafKa 설정 클래스를 추가 - KafkaConfiguration

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;


//설정 클래스로 만들어지고 프로젝트를 실행할 때 먼저 내용을 읽어서
//인스턴스를 만들고 작업을 수행
@Configuration
public class KafkaConfiguration {
	//하드 코딩을 하지 않고 설정 파일을 이용하고
	//외부에서 생성한 값을 대입받아서 사용: 의존성 주입
	@Value("${spring.kafka.bootstrap-servers}")
	private String bootstrapServers;
	
	//이 메서드가 리턴한 인스턴스를 Bean으로 생성
    @Bean
    public ProducerFactory<String, String> producerFactory() {

        Map<String,Object> configs = new HashMap<>();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory(configs);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
	
}

 

 

4)메시지를 전송하는 Producer 클래스 생성 - KafkaProducer

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaProducer {
    private static final String TOPIC = "sampletopic";

    @Autowired
    private  KafkaTemplate<String, String> kafkaTemplate;
	private final Logger log = LoggerFactory.getLogger(getClass());
    
	public void sendMessage(String name, int age) {
        log.info("Produce message : {}{}", name, age);
        String message = "{\"name\":" + "\"" + name + "\"" + ", \"age\":" + age + "}";
        this.kafkaTemplate.send(TOPIC, message);
    }
}

 

5)데이터가 JSON 문자열이라서 JSON 파싱을 위한 라이브러리를 추가 - build.gradle 파일의 dependencies 에 추가

implementation 'org.json:json:20190722'

 

6)메시지가 전송되었을 때 수행될 Consumer 클래스를 생성 - KafkaConsumer

import lombok.extern.slf4j.Slf4j;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import java.io.IOException;

@Service
@Slf4j
public class KafkaConsumer {
	private final Logger log = LoggerFactory.getLogger(getClass());
	
    @KafkaListener(topics = "sampletopic", groupId = "adamsoft")
    public void consume(String message) throws IOException {
        log.info("Consumed message : {}", message);
        JSONObject messageObj = new JSONObject(message);
        log.info(messageObj.getString("name"));
        log.info(messageObj.getInt("age") + "");
    }
}

 

7)클라이언트의 요청에 응답할 Controller 클래스를 생성 - KafkaController

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping(value = "/kafka")
@Slf4j
@RequiredArgsConstructor
public class KafkaController {

    @Autowired
    private KafkaProducer producer;

    @PostMapping
    @ResponseBody
    public String sendMessage(@RequestParam("name") String name, 
    		@RequestParam("age") int age) {
        this.producer.sendMessage(name, age);
        return "success";
    }
}

 

8)Spring Boot 프로젝트를 실행하고 테스트
=>테스트 할 때는 포트 설정을 하지 않았으므로 localhost:8080/kafka 에 POST 방식으로 name 과 age를 설정해서 전송하면 됩니다.

어떤 프로그램이던 Kafka에게 메시지를 전송하면 다른 프로그램은 토픽 이름 과 데이터 포맷만 알면 메시지를 전송하는 프로그램을 수정하지 않고 메시지를 받을 수 있습니다.
애플리케이션끼리 직접 메시지를 주고받는 것 보다는 카프카를 이용하면 유지보수에 유리

'Study > Data Engineering' 카테고리의 다른 글

Airflow  (0) 2024.05.28
데이터 변환  (0) 2024.05.22
데이터 수집  (0) 2024.05.21
Kafka(1)  (0) 2024.05.09