**Kafka - Message Broker
=>구독 과 게시의 개념을 가지고 한쪽에서 구독 신청을 하면 다른 한쪽에 특정 이벤트가 발생할 때 메시지를 발행해서 게시를 신청한 쪽에 메시지를 전달하는 시스템
=>메시지를 데이터로 활용해서 여러 데이터 발생지로부터 데이터를 받아서 타겟에게 전달하는 용도로 사용할 수 있고 특정 이벤트가 발생했을 때 작업을 순차적으로 수행하도록 사용할 수 도 있습니다.
=>CQRS를 구현한 프로젝트에서 데이터 업데이트를 수행하는 애플리케이션 과 데이터 조회를 수행하는 애플리케이션 사이의 데이터 불일치 문제를 해결하기 위해서 데이터에 업데이트가 발생하면 업데이트 된 데이터를 조회 애플리케이션에게 전송해서 조회 애플리케이션의 데이터 불일치를 문제를 해결하기 위해서 사용
1.카프카 서버 구동
=>주키퍼 실행
zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties
jps -vm
=>카프카 실행
kafka-server-start.sh -daemon /opt/kafka/config/server.properties
jps -m
=>로그 확인
tail -f /opt/kafka/logs/server.log
=>토픽 생성
/opt/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic sampletopic
=>토픽 전송
/opt/kafka/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic sampletopic
=>토픽 전송 받기
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sampletopic --from-beginning
2.Python 에서 Kafka 와 메시지 주고 받기
1)패키지 설치
kafka-python
2)메시지를 전송하는 파일을 만들고 실행
=>pythonproducer.py 파일 생성
from kafka import KafkaProducer
import json
class MessageProducer:
def __init__(self, broker, topic):
self.broker = broker
self.topic = topic
self.producer = KafkaProducer(
bootstrap_servers = self.broker,
value_serializer=lambda x:json.dumps(x).encode("utf-8"),
acks=0,
api_version=(2, 5, 0),
key_serializer=str.encode,
retries=3,
)
print(self.producer)
def send_message(self, msg, auto_close=True):
try:
#메시지 전송
future = self.producer.send(self.topic, value=msg, key="key")
#프로듀서 초기화
self.producer.flush()
if auto_close:
self.producer.close()
future.get(timeout = 2)
return {"status_code":200, "error": None}
except Exception as exc:
raise exc
#Producer 생성
broker = ["43.203.210.222:9092"]
topic = "sampletopic"
pd = MessageProducer(broker, topic)
#메시지 전송
msg = {"name":"군계", "age":34}
res = pd.send_message(msg)
print(res)
3)메시지를 전송받는 파일을 작성하고 실행
from kafka import KafkaConsumer
import json
class MessageConsumer:
def __init__(self, broker, topic):
self.broker = broker
self.consumer = KafkaConsumer(
topic,
bootstrap_servers = self.broker,
value_deserializer=lambda x : x.decode("utf-8"),
group_id="my-group",
auto_offset_reset = "earliest",
enable_auto_commit=True
)
def receive_message(self):
try:
for message in self.consumer:
print(message.value)
'''
result = json.loads(message.value)
for k, v in result.items():
print(k, ":", result[k])
print(result["name"])
print(result["age"])
'''
except Exception as exc:
raise exc
broker = ["43.203.210.222:9092"]
topic = "sampletopic"
pd = MessageConsumer(broker, topic)
pd.receive_message()
3.Spring Boot 에서의 Kafka
1)프로젝트 생성
=>Lombok, Web, Kafka 의 의존성을 추가해서 생성
2)application.propertise 파일의 이름을 application.yml 파일로 변경하고 작성
spring:
kafka:
bootstrap-servers: 43.203.210.222:9092
consumer:
group-id: adamsoft
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
3)KafKa 설정 클래스를 추가 - KafkaConfiguration
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
//설정 클래스로 만들어지고 프로젝트를 실행할 때 먼저 내용을 읽어서
//인스턴스를 만들고 작업을 수행
@Configuration
public class KafkaConfiguration {
//하드 코딩을 하지 않고 설정 파일을 이용하고
//외부에서 생성한 값을 대입받아서 사용: 의존성 주입
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
//이 메서드가 리턴한 인스턴스를 Bean으로 생성
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String,Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory(configs);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
4)메시지를 전송하는 Producer 클래스 생성 - KafkaProducer
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaProducer {
private static final String TOPIC = "sampletopic";
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
private final Logger log = LoggerFactory.getLogger(getClass());
public void sendMessage(String name, int age) {
log.info("Produce message : {}{}", name, age);
String message = "{\"name\":" + "\"" + name + "\"" + ", \"age\":" + age + "}";
this.kafkaTemplate.send(TOPIC, message);
}
}
5)데이터가 JSON 문자열이라서 JSON 파싱을 위한 라이브러리를 추가 - build.gradle 파일의 dependencies 에 추가
implementation 'org.json:json:20190722'
6)메시지가 전송되었을 때 수행될 Consumer 클래스를 생성 - KafkaConsumer
import lombok.extern.slf4j.Slf4j;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.io.IOException;
@Service
@Slf4j
public class KafkaConsumer {
private final Logger log = LoggerFactory.getLogger(getClass());
@KafkaListener(topics = "sampletopic", groupId = "adamsoft")
public void consume(String message) throws IOException {
log.info("Consumed message : {}", message);
JSONObject messageObj = new JSONObject(message);
log.info(messageObj.getString("name"));
log.info(messageObj.getInt("age") + "");
}
}
7)클라이언트의 요청에 응답할 Controller 클래스를 생성 - KafkaController
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping(value = "/kafka")
@Slf4j
@RequiredArgsConstructor
public class KafkaController {
@Autowired
private KafkaProducer producer;
@PostMapping
@ResponseBody
public String sendMessage(@RequestParam("name") String name,
@RequestParam("age") int age) {
this.producer.sendMessage(name, age);
return "success";
}
}
8)Spring Boot 프로젝트를 실행하고 테스트
=>테스트 할 때는 포트 설정을 하지 않았으므로 localhost:8080/kafka 에 POST 방식으로 name 과 age를 설정해서 전송하면 됩니다.
어떤 프로그램이던 Kafka에게 메시지를 전송하면 다른 프로그램은 토픽 이름 과 데이터 포맷만 알면 메시지를 전송하는 프로그램을 수정하지 않고 메시지를 받을 수 있습니다.
애플리케이션끼리 직접 메시지를 주고받는 것 보다는 카프카를 이용하면 유지보수에 유리