diff --git a/build.gradle b/build.gradle index 32587d3a..12ef234a 100644 --- a/build.gradle +++ b/build.gradle @@ -56,6 +56,8 @@ dependencies { implementation 'org.springframework.boot:spring-boot-starter-data-mongodb:3.1.8' // Spring Boot WebSocket implementation 'org.springframework.boot:spring-boot-starter-websocket:3.1.8' + // Spring Kafka + implementation 'org.springframework.kafka:spring-kafka:3.0.16' // Spring Retry implementation 'org.springframework.retry:spring-retry' // Flyway @@ -70,6 +72,8 @@ dependencies { testImplementation 'org.testcontainers:mysql:1.19.3' // Spring Security Test testImplementation 'org.springframework.security:spring-security-test:6.1.3' + // Spring Kafka Test + testImplementation 'org.springframework.kafka:spring-kafka-test:3.0.16' } test { diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 00000000..1a8309ac --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,70 @@ +version: '3.8' +services: + kafka00: + image: bitnami/kafka:3.7.0 + restart: unless-stopped + container_name: kafka00 + ports: + - '49092:9094' + environment: + # KRaft settings + - KAFKA_CFG_BROKER_ID=0 + - KAFKA_CFG_NODE_ID=0 + - KAFKA_KRAFT_CLUSTER_ID=nqz1LDJrSoyO52xjjwg9Dg + - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka00:9093 + - KAFKA_CFG_PROCESS_ROLES=controller,broker + # Listeners + - ALLOW_PLAINTEXT_LISTENER=yes + - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094 + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka00:9092,EXTERNAL://127.0.0.1:49092 + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT + - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER + - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT + # Clustering + - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3 + - KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3 + - KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2 + kafka-ui: + image: provectuslabs/kafka-ui:latest + restart: unless-stopped + container_name: kafka-ui + ports: + - '8081:8080' + environment: + - KAFKA_CLUSTERS_0_NAME=Local-Kraft-Cluster + - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka00:9092 + - DYNAMIC_CONFIG_ENABLES=true + - KAFKA_CLUSTERS_0_AUDIT_TOPICAUDITENABLED=true + - KAFKA_CLUSTERS_0_AUDIT_CONSOLEAUDITENABLED=true + depends_on: + - kafka00 + +# zookeeper: +# image: confluentinc/cp-zookeeper +# ports: +# - '2181:2181' +# environment: +# ZOOKEEPER_CLIENT_PORT: 2181 +# ZOOKEEPER_TICK_TIME: 2000 +# kafka-1: +# image: confluentinc/cp-kafka +# ports: +# - '9092:9092' +# depends_on: +# - zookeeper +# environment: +# KAFKA_BROKER_ID: 1 +# KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 +# KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT, EXTERNAL:PLAINTEXT +# KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL +# KAFKA_ADVERTIESED_LISTENERS: INTERNAL://kafka-1:29092,EXTERNAL://localhost:9092 +# kafka-ui: +# image: provectuslabs/kafka-ui +# container_name: kafka-ui +# ports: +# - '9000:8080' +# restart: always +# environment: +# - KAFKA_CLUSTERS_0_NAME=local +# - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka-1:29092 +# - KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181 diff --git a/src/main/java/io/oeid/mogakgo/core/configuration/KafkaConsumerConfig.java b/src/main/java/io/oeid/mogakgo/core/configuration/KafkaConsumerConfig.java new file mode 100644 index 00000000..61f0d9ac --- /dev/null +++ b/src/main/java/io/oeid/mogakgo/core/configuration/KafkaConsumerConfig.java @@ -0,0 +1,55 @@ +package io.oeid.mogakgo.core.configuration; + +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; + +@EnableKafka +@Configuration +public class KafkaConsumerConfig { + + @Value("${spring.kafka.bootstrap-servers}") + private String BOOTSTRAP_ADDRESS; + + @Value("${spring.kafka.consumer.group-id}") + private String GROUP_ID; + + @Value("${spring.kafka.consumer.auto-offset-reset}") + private String AUTO_OFFSET_RESET; + + @Value("${spring.kafka.consumer.enable-auto-commit}") + private boolean AUTO_COMMIT; + + @Bean + public ConsumerFactory consumerFactory() { + Map props = new HashMap<>(); + props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_ADDRESS); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, AUTO_COMMIT); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + return new DefaultKafkaConsumerFactory<>(props); + } + + /** + * @KafkaListener 어노테이션이 붙은 메서드에 주입되어 사용됨 + * 메시지를 동시에 처리할 수 있는 messageListenerContainer를 생성함 + */ + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory()); + return factory; + } + +} diff --git a/src/main/java/io/oeid/mogakgo/core/configuration/KafkaProducerConfig.java b/src/main/java/io/oeid/mogakgo/core/configuration/KafkaProducerConfig.java new file mode 100644 index 00000000..2d2091c6 --- /dev/null +++ b/src/main/java/io/oeid/mogakgo/core/configuration/KafkaProducerConfig.java @@ -0,0 +1,60 @@ +package io.oeid.mogakgo.core.configuration; + +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; + +/* + * Deprecated annotation programming -> change to functional programming + * @EnableBinding + * @Input + * @Output + * @StreamListener + * @StreamMessageConverter + * ... + */ + +@Configuration +public class KafkaProducerConfig { + + @Value("${spring.kafka.bootstrap-servers}") + private String BOOTSTRAP_SERVERS; + + /** + * BOOTSTRAP_SERVERS_CONFIG + * producer가 처음으로 연결할 kafka broker의 위치 설정 + * --- + * KEY_SERIALIZER_CLASS_CONFIG, VALUE_SERIALIZER_CLASS_CONFIG + * kafka는 네트워크를 통해 데이터를 전송하기 떄문에 객체를 byte array로 변환하는 직렬화 과정이 필요함. + * producer가 key와 value 값의 데이터를 kafka broker로 전송하기 전에 데이터를 byte array로 변환하는데 + * 사용하는 직렬화 메커니즘을 설정. + */ + + @Bean + public ProducerFactory producerFactory() { + Map props = new HashMap<>(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + return new DefaultKafkaProducerFactory<>(props); + } + + /** + * Spring Kafka에서 제공하는 Kafka producer를 Wrapping한 클래스 + * Kafka에 메시지 발생을 위한 여러 메서드를 제공함. + */ + + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } + +} diff --git a/src/main/java/io/oeid/mogakgo/core/properties/kafka/MessageConsumer.java b/src/main/java/io/oeid/mogakgo/core/properties/kafka/MessageConsumer.java new file mode 100644 index 00000000..a5d9064f --- /dev/null +++ b/src/main/java/io/oeid/mogakgo/core/properties/kafka/MessageConsumer.java @@ -0,0 +1,33 @@ +package io.oeid.mogakgo.core.properties.kafka; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.oeid.mogakgo.domain.event.Event; +import java.util.ArrayList; +import java.util.List; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +@RequiredArgsConstructor +public class MessageConsumer { + + private final ObjectMapper objectMapper; + private List eventRepo = new ArrayList<>(); + + @KafkaListener(topics = "my-topic", groupId = "my-group", containerFactory = "kafkaListenerContainerFactory") + protected void consume(@Payload String payload) throws Exception { + log.info("receive event: {}", payload); + Event event = objectMapper.readValue(payload, Event.class); + eventRepo.add(event); + + // Process + // acknowledgment.acknowledge(); + } + + public List getEventRepo() { return eventRepo; } + +} diff --git a/src/main/java/io/oeid/mogakgo/core/properties/kafka/MessageProducer.java b/src/main/java/io/oeid/mogakgo/core/properties/kafka/MessageProducer.java new file mode 100644 index 00000000..aeba57b8 --- /dev/null +++ b/src/main/java/io/oeid/mogakgo/core/properties/kafka/MessageProducer.java @@ -0,0 +1,19 @@ +package io.oeid.mogakgo.core.properties.kafka; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +@RequiredArgsConstructor +public class MessageProducer { + + private final KafkaTemplate kafkaTemplate; + + public void sendMessage(String topic, String message) { + kafkaTemplate.send(topic, message); + } + +} diff --git a/src/main/java/io/oeid/mogakgo/domain/event/Event.java b/src/main/java/io/oeid/mogakgo/domain/event/Event.java new file mode 100644 index 00000000..ac5ec598 --- /dev/null +++ b/src/main/java/io/oeid/mogakgo/domain/event/Event.java @@ -0,0 +1,26 @@ +package io.oeid.mogakgo.domain.event; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.time.LocalDateTime; +import lombok.Getter; +import lombok.NoArgsConstructor; + +@Getter +@NoArgsConstructor +public class Event { + + private Long id; + private LocalDateTime eventCreatedAt; + + @JsonCreator + public Event(@JsonProperty("id") Long id, @JsonProperty("eventCreatedAt") LocalDateTime eventCreatedAt) { + this.id = id; + this.eventCreatedAt = eventCreatedAt; + } + + public static Event idOf(Long id) { + return new Event(id, LocalDateTime.now()); + } + +} diff --git a/src/test/java/io/oeid/mogakgo/core/kafka/producer/SimpleKafkaTest.java b/src/test/java/io/oeid/mogakgo/core/kafka/producer/SimpleKafkaTest.java new file mode 100644 index 00000000..c9c6bc0d --- /dev/null +++ b/src/test/java/io/oeid/mogakgo/core/kafka/producer/SimpleKafkaTest.java @@ -0,0 +1,42 @@ +package io.oeid.mogakgo.core.kafka.producer; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.oeid.mogakgo.core.properties.kafka.MessageConsumer; +import io.oeid.mogakgo.core.properties.kafka.MessageProducer; +import io.oeid.mogakgo.domain.event.Event; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.kafka.test.context.EmbeddedKafka; + +@SpringBootTest +@EmbeddedKafka(partitions = 3, + brokerProperties = { + "listeners=PLAINTEXT://localhost:59092" + }, + ports = { 59092 }) +public class SimpleKafkaTest { + + @Autowired + private MessageProducer messageProducer; + + @Autowired + private ObjectMapper objectMapper; + + @Autowired + private MessageConsumer messageConsumer; + + @Test + void name() throws Exception { + // given + Event event = Event.idOf(1L); + String payload = objectMapper.writeValueAsString(event); + + // when + messageProducer.sendMessage("my-topic", payload); + Thread.sleep(2000); + + // then + org.junit.jupiter.api.Assertions.assertNotEquals(0, messageConsumer.getEventRepo().size()); + } +}