Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/#376 kafka setting #377

Merged
merged 14 commits into from
Jun 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
14 commits
Select commit Hold shift + click to select a range
2d3e51f
[CHORE] build.gradle에 Spring Kafka μ˜μ‘΄μ„± μΆ”κ°€
JIN-076 Jun 5, 2024
cd021ae
[FEAT] Kafka Producer Β· Consumer Configuration 파일 μž‘μ„±
JIN-076 Jun 5, 2024
11e0609
[FEAT] ν…ŒμŠ€νŠΈμš© κ°„λ‹¨ν•œ Event 클래슀 μž‘μ„±
JIN-076 Jun 5, 2024
9d23a01
[FEAT] λ©”μ‹œμ§€ λ°œν–‰ 및 μ†ŒλΉ„λ₯Ό μœ„ν•œ κ°„λ‹¨ν•œ MessageProducer Β· Consumer μ»΄ν¬λ„ŒνŠΈ 개발
JIN-076 Jun 5, 2024
96e631f
[TEST] MessageProducer Β· Consumerλ₯Ό μ΄μš©ν•œ κ°„λ‹¨ν•œ λ©”μ‹œμ§€ λ°œν–‰ 및 μ†ŒλΉ„μ— λŒ€ν•œ ν…ŒμŠ€νŠΈ μ½”λ“œ μž‘μ„±
JIN-076 Jun 5, 2024
dc81356
[FEAT] λ³€μˆ˜ μœ„μΉ˜ 변동
JIN-076 Jun 6, 2024
659a11c
[FEAT] Kafka KRaft λͺ¨λ“œ 싀행을 μœ„ν•œ docker-compose.yml μž‘μ„± (μΆ”ν›„ λΉŒλ“œ 이미지 λ³€κ²½ μ˜ˆμ •)
JIN-076 Jun 6, 2024
784eada
Merge remote-tracking branch 'origin/develop' into feat/#376-kafka_se…
JIN-076 Jun 6, 2024
235658f
[FIX] μ€„λ°”κΏˆ 제거 및 ci ν…ŒμŠ€νŠΈ
JIN-076 Jun 6, 2024
a8a16db
[FIX] 주석 처리 및 ci ν…ŒμŠ€νŠΈ
JIN-076 Jun 6, 2024
2026983
[FIX] port conflict둜 μΈν•œ 포트 λ³€κ²½
JIN-076 Jun 7, 2024
224b39b
[REFACT] kafka kraft λͺ¨λ“œλ₯Ό μœ„ν•œ μ„€μ • λ³€κ²½ 및 kafka-ui μΆ”κ°€
JIN-076 Jun 7, 2024
18a84af
[FIX] μ€„λ°”κΏˆ 제거 및 CI ν…ŒμŠ€νŠΈ
JIN-076 Jun 7, 2024
ed13f8a
[FIX] port conflict둜 μΈν•œ 포트 λ³€κ²½
JIN-076 Jun 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ dependencies {
implementation 'org.springframework.boot:spring-boot-starter-data-mongodb:3.1.8'
// Spring Boot WebSocket
implementation 'org.springframework.boot:spring-boot-starter-websocket:3.1.8'
// Spring Kafka
implementation 'org.springframework.kafka:spring-kafka:3.0.16'
// Spring Retry
implementation 'org.springframework.retry:spring-retry'
// Flyway
Expand All @@ -70,6 +72,8 @@ dependencies {
testImplementation 'org.testcontainers:mysql:1.19.3'
// Spring Security Test
testImplementation 'org.springframework.security:spring-security-test:6.1.3'
// Spring Kafka Test
testImplementation 'org.springframework.kafka:spring-kafka-test:3.0.16'
}

test {
Expand Down
70 changes: 70 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
version: '3.8'
services:
kafka00:
image: bitnami/kafka:3.7.0
restart: unless-stopped
container_name: kafka00
ports:
- '49092:9094'
environment:
# KRaft settings
- KAFKA_CFG_BROKER_ID=0
- KAFKA_CFG_NODE_ID=0
- KAFKA_KRAFT_CLUSTER_ID=nqz1LDJrSoyO52xjjwg9Dg
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka00:9093
- KAFKA_CFG_PROCESS_ROLES=controller,broker
# Listeners
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka00:9092,EXTERNAL://127.0.0.1:49092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
# Clustering
- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
kafka-ui:
image: provectuslabs/kafka-ui:latest
restart: unless-stopped
container_name: kafka-ui
ports:
- '8081:8080'
environment:
- KAFKA_CLUSTERS_0_NAME=Local-Kraft-Cluster
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka00:9092
- DYNAMIC_CONFIG_ENABLES=true
- KAFKA_CLUSTERS_0_AUDIT_TOPICAUDITENABLED=true
- KAFKA_CLUSTERS_0_AUDIT_CONSOLEAUDITENABLED=true
depends_on:
- kafka00

# zookeeper:
# image: confluentinc/cp-zookeeper
# ports:
# - '2181:2181'
# environment:
# ZOOKEEPER_CLIENT_PORT: 2181
# ZOOKEEPER_TICK_TIME: 2000
# kafka-1:
# image: confluentinc/cp-kafka
# ports:
# - '9092:9092'
# depends_on:
# - zookeeper
# environment:
# KAFKA_BROKER_ID: 1
# KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
# KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT, EXTERNAL:PLAINTEXT
# KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
# KAFKA_ADVERTIESED_LISTENERS: INTERNAL://kafka-1:29092,EXTERNAL://localhost:9092
# kafka-ui:
# image: provectuslabs/kafka-ui
# container_name: kafka-ui
# ports:
# - '9000:8080'
# restart: always
# environment:
# - KAFKA_CLUSTERS_0_NAME=local
# - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka-1:29092
# - KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package io.oeid.mogakgo.core.configuration;

import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

@Value("${spring.kafka.bootstrap-servers}")
private String BOOTSTRAP_ADDRESS;

@Value("${spring.kafka.consumer.group-id}")
private String GROUP_ID;

@Value("${spring.kafka.consumer.auto-offset-reset}")
private String AUTO_OFFSET_RESET;

@Value("${spring.kafka.consumer.enable-auto-commit}")
private boolean AUTO_COMMIT;

@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_ADDRESS);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, AUTO_COMMIT);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}

/**
* @KafkaListener μ–΄λ…Έν…Œμ΄μ…˜μ΄ 뢙은 λ©”μ„œλ“œμ— μ£Όμž…λ˜μ–΄ μ‚¬μš©λ¨
* λ©”μ‹œμ§€λ₯Ό λ™μ‹œμ— μ²˜λ¦¬ν•  수 μžˆλŠ” messageListenerContainerλ₯Ό 생성함
*/

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package io.oeid.mogakgo.core.configuration;

import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

/*
* Deprecated annotation programming -> change to functional programming
* @EnableBinding
* @Input
* @Output
* @StreamListener
* @StreamMessageConverter
* ...
*/

@Configuration
public class KafkaProducerConfig {

@Value("${spring.kafka.bootstrap-servers}")
private String BOOTSTRAP_SERVERS;

/**
* BOOTSTRAP_SERVERS_CONFIG
* producerκ°€ 처음으둜 μ—°κ²°ν•  kafka broker의 μœ„μΉ˜ μ„€μ •
* ---
* KEY_SERIALIZER_CLASS_CONFIG, VALUE_SERIALIZER_CLASS_CONFIG
* kafkaλŠ” λ„€νŠΈμ›Œν¬λ₯Ό 톡해 데이터λ₯Ό μ „μ†‘ν•˜κΈ° 떄문에 객체λ₯Ό byte array둜 λ³€ν™˜ν•˜λŠ” 직렬화 과정이 ν•„μš”ν•¨.
* producerκ°€ key와 value κ°’μ˜ 데이터λ₯Ό kafka broker둜 μ „μ†‘ν•˜κΈ° 전에 데이터λ₯Ό byte array둜 λ³€ν™˜ν•˜λŠ”λ°
* μ‚¬μš©ν•˜λŠ” 직렬화 λ©”μ»€λ‹ˆμ¦˜μ„ μ„€μ •.
*/

@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

return new DefaultKafkaProducerFactory<>(props);
}

/**
* Spring Kafkaμ—μ„œ μ œκ³΅ν•˜λŠ” Kafka producerλ₯Ό Wrappingν•œ 클래슀
* Kafka에 λ©”μ‹œμ§€ λ°œμƒμ„ μœ„ν•œ μ—¬λŸ¬ λ©”μ„œλ“œλ₯Ό μ œκ³΅ν•¨.
*/

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.oeid.mogakgo.core.properties.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.oeid.mogakgo.domain.event.Event;
import java.util.ArrayList;
import java.util.List;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RequiredArgsConstructor
public class MessageConsumer {

private final ObjectMapper objectMapper;
private List<Event> eventRepo = new ArrayList<>();

@KafkaListener(topics = "my-topic", groupId = "my-group", containerFactory = "kafkaListenerContainerFactory")
protected void consume(@Payload String payload) throws Exception {
log.info("receive event: {}", payload);
Event event = objectMapper.readValue(payload, Event.class);
eventRepo.add(event);

// Process
// acknowledgment.acknowledge();
}

public List<Event> getEventRepo() { return eventRepo; }

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.oeid.mogakgo.core.properties.kafka;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RequiredArgsConstructor
public class MessageProducer {

private final KafkaTemplate<String, String> kafkaTemplate;

public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}

}
26 changes: 26 additions & 0 deletions src/main/java/io/oeid/mogakgo/domain/event/Event.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.oeid.mogakgo.domain.event;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.time.LocalDateTime;
import lombok.Getter;
import lombok.NoArgsConstructor;

@Getter
@NoArgsConstructor
public class Event {

private Long id;
private LocalDateTime eventCreatedAt;

@JsonCreator
public Event(@JsonProperty("id") Long id, @JsonProperty("eventCreatedAt") LocalDateTime eventCreatedAt) {
this.id = id;
this.eventCreatedAt = eventCreatedAt;
}

public static Event idOf(Long id) {
return new Event(id, LocalDateTime.now());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package io.oeid.mogakgo.core.kafka.producer;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.oeid.mogakgo.core.properties.kafka.MessageConsumer;
import io.oeid.mogakgo.core.properties.kafka.MessageProducer;
import io.oeid.mogakgo.domain.event.Event;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.test.context.EmbeddedKafka;

@SpringBootTest
@EmbeddedKafka(partitions = 3,
brokerProperties = {
"listeners=PLAINTEXT://localhost:59092"
},
ports = { 59092 })
public class SimpleKafkaTest {

@Autowired
private MessageProducer messageProducer;

@Autowired
private ObjectMapper objectMapper;

@Autowired
private MessageConsumer messageConsumer;

@Test
void name() throws Exception {
// given
Event event = Event.idOf(1L);
String payload = objectMapper.writeValueAsString(event);

// when
messageProducer.sendMessage("my-topic", payload);
Thread.sleep(2000);

// then
org.junit.jupiter.api.Assertions.assertNotEquals(0, messageConsumer.getEventRepo().size());
}
}
Loading