Skip to content

Commit

Permalink
Feat/#376 kafka setting (#377)
Browse files Browse the repository at this point in the history
* [CHORE] build.gradle에 Spring Kafka 의존성 추가

* [FEAT] Kafka Producer · Consumer Configuration 파일 작성

* [FEAT] 테스트용 간단한 Event 클래스 작성

* [FEAT] 메시지 발행 및 소비를 위한 간단한 MessageProducer · Consumer 컴포넌트 개발

* [TEST] MessageProducer · Consumer를 이용한 간단한 메시지 발행 및 소비에 대한 테스트 코드 작성

* [FEAT] 변수 위치 변동

* [FEAT] Kafka KRaft 모드 실행을 위한 docker-compose.yml 작성 (추후 빌드 이미지 변경 예정)

* [FIX] 줄바꿈 제거 및 ci 테스트

* [FIX] 주석 처리 및 ci 테스트

* [FIX] port conflict로 인한 포트 변경

* [REFACT] kafka kraft 모드를 위한 설정 변경 및 kafka-ui 추가

* [FIX] 줄바꿈 제거 및 CI 테스트

* [FIX] port conflict로 인한 포트 변경
  • Loading branch information
JIN-076 committed Jun 9, 2024
1 parent 1fa86d4 commit 8bea07d
Show file tree
Hide file tree
Showing 8 changed files with 309 additions and 0 deletions.
4 changes: 4 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ dependencies {
implementation 'org.springframework.boot:spring-boot-starter-data-mongodb:3.1.8'
// Spring Boot WebSocket
implementation 'org.springframework.boot:spring-boot-starter-websocket:3.1.8'
// Spring Kafka
implementation 'org.springframework.kafka:spring-kafka:3.0.16'
// Spring Retry
implementation 'org.springframework.retry:spring-retry'
// Flyway
Expand All @@ -70,6 +72,8 @@ dependencies {
testImplementation 'org.testcontainers:mysql:1.19.3'
// Spring Security Test
testImplementation 'org.springframework.security:spring-security-test:6.1.3'
// Spring Kafka Test
testImplementation 'org.springframework.kafka:spring-kafka-test:3.0.16'
}

test {
Expand Down
70 changes: 70 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
version: '3.8'
services:
kafka00:
image: bitnami/kafka:3.7.0
restart: unless-stopped
container_name: kafka00
ports:
- '49092:9094'
environment:
# KRaft settings
- KAFKA_CFG_BROKER_ID=0
- KAFKA_CFG_NODE_ID=0
- KAFKA_KRAFT_CLUSTER_ID=nqz1LDJrSoyO52xjjwg9Dg
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka00:9093
- KAFKA_CFG_PROCESS_ROLES=controller,broker
# Listeners
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka00:9092,EXTERNAL://127.0.0.1:49092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
# Clustering
- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
kafka-ui:
image: provectuslabs/kafka-ui:latest
restart: unless-stopped
container_name: kafka-ui
ports:
- '8081:8080'
environment:
- KAFKA_CLUSTERS_0_NAME=Local-Kraft-Cluster
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka00:9092
- DYNAMIC_CONFIG_ENABLES=true
- KAFKA_CLUSTERS_0_AUDIT_TOPICAUDITENABLED=true
- KAFKA_CLUSTERS_0_AUDIT_CONSOLEAUDITENABLED=true
depends_on:
- kafka00

# zookeeper:
# image: confluentinc/cp-zookeeper
# ports:
# - '2181:2181'
# environment:
# ZOOKEEPER_CLIENT_PORT: 2181
# ZOOKEEPER_TICK_TIME: 2000
# kafka-1:
# image: confluentinc/cp-kafka
# ports:
# - '9092:9092'
# depends_on:
# - zookeeper
# environment:
# KAFKA_BROKER_ID: 1
# KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
# KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT, EXTERNAL:PLAINTEXT
# KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
# KAFKA_ADVERTIESED_LISTENERS: INTERNAL://kafka-1:29092,EXTERNAL://localhost:9092
# kafka-ui:
# image: provectuslabs/kafka-ui
# container_name: kafka-ui
# ports:
# - '9000:8080'
# restart: always
# environment:
# - KAFKA_CLUSTERS_0_NAME=local
# - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka-1:29092
# - KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package io.oeid.mogakgo.core.configuration;

import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

@Value("${spring.kafka.bootstrap-servers}")
private String BOOTSTRAP_ADDRESS;

@Value("${spring.kafka.consumer.group-id}")
private String GROUP_ID;

@Value("${spring.kafka.consumer.auto-offset-reset}")
private String AUTO_OFFSET_RESET;

@Value("${spring.kafka.consumer.enable-auto-commit}")
private boolean AUTO_COMMIT;

@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_ADDRESS);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, AUTO_COMMIT);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}

/**
* @KafkaListener 어노테이션이 붙은 메서드에 주입되어 사용됨
* 메시지를 동시에 처리할 수 있는 messageListenerContainer를 생성함
*/

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package io.oeid.mogakgo.core.configuration;

import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

/*
* Deprecated annotation programming -> change to functional programming
* @EnableBinding
* @Input
* @Output
* @StreamListener
* @StreamMessageConverter
* ...
*/

@Configuration
public class KafkaProducerConfig {

@Value("${spring.kafka.bootstrap-servers}")
private String BOOTSTRAP_SERVERS;

/**
* BOOTSTRAP_SERVERS_CONFIG
* producer가 처음으로 연결할 kafka broker의 위치 설정
* ---
* KEY_SERIALIZER_CLASS_CONFIG, VALUE_SERIALIZER_CLASS_CONFIG
* kafka는 네트워크를 통해 데이터를 전송하기 떄문에 객체를 byte array로 변환하는 직렬화 과정이 필요함.
* producer가 key와 value 값의 데이터를 kafka broker로 전송하기 전에 데이터를 byte array로 변환하는데
* 사용하는 직렬화 메커니즘을 설정.
*/

@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

return new DefaultKafkaProducerFactory<>(props);
}

/**
* Spring Kafka에서 제공하는 Kafka producer를 Wrapping한 클래스
* Kafka에 메시지 발생을 위한 여러 메서드를 제공함.
*/

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.oeid.mogakgo.core.properties.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.oeid.mogakgo.domain.event.Event;
import java.util.ArrayList;
import java.util.List;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RequiredArgsConstructor
public class MessageConsumer {

private final ObjectMapper objectMapper;
private List<Event> eventRepo = new ArrayList<>();

@KafkaListener(topics = "my-topic", groupId = "my-group", containerFactory = "kafkaListenerContainerFactory")
protected void consume(@Payload String payload) throws Exception {
log.info("receive event: {}", payload);
Event event = objectMapper.readValue(payload, Event.class);
eventRepo.add(event);

// Process
// acknowledgment.acknowledge();
}

public List<Event> getEventRepo() { return eventRepo; }

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.oeid.mogakgo.core.properties.kafka;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RequiredArgsConstructor
public class MessageProducer {

private final KafkaTemplate<String, String> kafkaTemplate;

public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}

}
26 changes: 26 additions & 0 deletions src/main/java/io/oeid/mogakgo/domain/event/Event.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.oeid.mogakgo.domain.event;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.time.LocalDateTime;
import lombok.Getter;
import lombok.NoArgsConstructor;

@Getter
@NoArgsConstructor
public class Event {

private Long id;
private LocalDateTime eventCreatedAt;

@JsonCreator
public Event(@JsonProperty("id") Long id, @JsonProperty("eventCreatedAt") LocalDateTime eventCreatedAt) {
this.id = id;
this.eventCreatedAt = eventCreatedAt;
}

public static Event idOf(Long id) {
return new Event(id, LocalDateTime.now());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package io.oeid.mogakgo.core.kafka.producer;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.oeid.mogakgo.core.properties.kafka.MessageConsumer;
import io.oeid.mogakgo.core.properties.kafka.MessageProducer;
import io.oeid.mogakgo.domain.event.Event;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.test.context.EmbeddedKafka;

@SpringBootTest
@EmbeddedKafka(partitions = 3,
brokerProperties = {
"listeners=PLAINTEXT://localhost:59092"
},
ports = { 59092 })
public class SimpleKafkaTest {

@Autowired
private MessageProducer messageProducer;

@Autowired
private ObjectMapper objectMapper;

@Autowired
private MessageConsumer messageConsumer;

@Test
void name() throws Exception {
// given
Event event = Event.idOf(1L);
String payload = objectMapper.writeValueAsString(event);

// when
messageProducer.sendMessage("my-topic", payload);
Thread.sleep(2000);

// then
org.junit.jupiter.api.Assertions.assertNotEquals(0, messageConsumer.getEventRepo().size());
}
}

0 comments on commit 8bea07d

Please sign in to comment.