diff --git a/data-prepper-plugins/kafka-plugins/build.gradle b/data-prepper-plugins/kafka-plugins/build.gradle index 831c1c0963..f20e4928a8 100644 --- a/data-prepper-plugins/kafka-plugins/build.gradle +++ b/data-prepper-plugins/kafka-plugins/build.gradle @@ -16,10 +16,53 @@ dependencies { implementation 'org.apache.commons:commons-lang3:3.12.0' implementation 'io.confluent:kafka-avro-serializer:7.3.3' implementation 'io.confluent:kafka-schema-registry-client:7.3.3' + implementation 'io.confluent:kafka-avro-serializer:7.3.3' + implementation 'io.confluent:kafka-schema-registry-client:7.3.3' + implementation 'io.confluent:kafka-schema-registry:7.3.3:tests' testImplementation 'org.mockito:mockito-inline:4.1.0' testImplementation 'org.yaml:snakeyaml:2.0' testImplementation testLibs.spring.test testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.14.2' testImplementation project(':data-prepper-test-common') testImplementation project(':data-prepper-plugins:blocking-buffer') + testImplementation 'org.mockito:mockito-inline:4.1.0' + testImplementation 'org.apache.kafka:kafka_2.13:3.4.0' + testImplementation 'org.apache.kafka:kafka_2.13:3.4.0:test' + testImplementation 'org.apache.curator:curator-test:5.5.0' + testImplementation 'io.confluent:kafka-schema-registry:7.4.0' + testImplementation 'junit:junit:4.13.1' + testImplementation 'org.apache.kafka:kafka-clients:3.4.0:test' + testImplementation 'org.apache.kafka:connect-json:3.4.0' +} + +test { + useJUnitPlatform() +} + +sourceSets { + integrationTest { + java { + compileClasspath += main.output + test.output + runtimeClasspath += main.output + test.output + srcDir file('src/integrationTest/java') + } + //resources.srcDir file('src/integrationTest/resources') + } } + +configurations { + integrationTestImplementation.extendsFrom testImplementation + integrationTestRuntime.extendsFrom testRuntime +} + +task integrationTest(type: Test) { + group = 'verification' + testClassesDirs = sourceSets.integrationTest.output.classesDirs + + useJUnitPlatform() + + filter { + includeTestsMatching '*IT' + } +} + diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/EmbeddedKafkaClusterSingleNode.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/EmbeddedKafkaClusterSingleNode.java new file mode 100644 index 0000000000..ec791c221f --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/EmbeddedKafkaClusterSingleNode.java @@ -0,0 +1,161 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.kafka.source; + +import io.confluent.kafka.schemaregistry.RestApp; +import io.confluent.kafka.schemaregistry.avro.AvroCompatibilityLevel; +import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig; +import kafka.server.KafkaConfig$; +import org.junit.rules.ExternalResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Properties; + +/** + * Runs an in-memory, "embedded" Kafka cluster with 1 ZooKeeper instance, 1 Kafka broker, and 1 + * Confluent Schema Registry instance. + */ +public class EmbeddedKafkaClusterSingleNode extends ExternalResource { + + private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaClusterSingleNode.class); + private static final int DEFAULT_BROKER_PORT = 0; + private static final String KAFKA_SCHEMAS_TOPIC = "_schemas"; + private static final String AVRO_COMPATIBILITY_TYPE = AvroCompatibilityLevel.NONE.name; + private static final String KAFKASTORE_OPERATION_TIMEOUT_MS = "60000"; + private static final String KAFKASTORE_DEBUG = "true"; + private static final String KAFKASTORE_INIT_TIMEOUT = "90000"; + + private EmbeddedZooKeeperServer zookeeper; + private EmbeddedKafkaServer broker; + private RestApp schemaRegistry; + private final Properties brokerConfig; + private boolean running; + + public EmbeddedKafkaClusterSingleNode() { + this(new Properties()); + } + + public EmbeddedKafkaClusterSingleNode(final Properties brokerConfig) { + this.brokerConfig = new Properties(); + this.brokerConfig.put(SchemaRegistryConfig.KAFKASTORE_TIMEOUT_CONFIG, KAFKASTORE_OPERATION_TIMEOUT_MS); + this.brokerConfig.putAll(brokerConfig); + } + + /** + * Creates and starts the cluster. + */ + public void start() throws Exception { + log.debug("Initiating embedded Kafka cluster startup"); + log.debug("Starting a ZooKeeper instance..."); + zookeeper = new EmbeddedZooKeeperServer(); + log.debug("ZooKeeper instance is running at {}", zookeeper.connectString()); + + final Properties effectiveBrokerConfig = effectiveBrokerConfigFrom(brokerConfig, zookeeper); + log.debug("Starting a Kafka instance on ...", + effectiveBrokerConfig.getProperty(KafkaConfig$.MODULE$.ZkConnectDoc())); + broker = new EmbeddedKafkaServer(effectiveBrokerConfig); + log.debug("Kafka instance is running at {}, connected to ZooKeeper at {}", + broker.brokerList(), broker.zookeeperConnect()); + + final Properties schemaRegistryProps = new Properties(); + + schemaRegistryProps.put(SchemaRegistryConfig.KAFKASTORE_TIMEOUT_CONFIG, KAFKASTORE_OPERATION_TIMEOUT_MS); + schemaRegistryProps.put(SchemaRegistryConfig.DEBUG_CONFIG, KAFKASTORE_DEBUG); + schemaRegistryProps.put(SchemaRegistryConfig.KAFKASTORE_INIT_TIMEOUT_CONFIG, KAFKASTORE_INIT_TIMEOUT); + schemaRegistryProps.put(SchemaRegistryConfig.KAFKASTORE_BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); + + + schemaRegistry = new RestApp(0, zookeeperConnect(), KAFKA_SCHEMAS_TOPIC, "none", schemaRegistryProps); + schemaRegistry.start(); + running = true; + } + + private Properties effectiveBrokerConfigFrom(final Properties brokerConfig, final EmbeddedZooKeeperServer zookeeper) { + final Properties effectiveConfig = new Properties(); + effectiveConfig.putAll(brokerConfig); + effectiveConfig.put(KafkaConfig$.MODULE$.ZkConnectProp(), zookeeper.connectString()); + effectiveConfig.put(KafkaConfig$.MODULE$.ZkSessionTimeoutMsProp(), 30 * 1000); + effectiveConfig.put(KafkaConfig$.MODULE$.ZkConnectionTimeoutMsProp(), 60 * 1000); + effectiveConfig.put(KafkaConfig$.MODULE$.DeleteTopicEnableProp(), true); + effectiveConfig.put(KafkaConfig$.MODULE$.LogCleanerDedupeBufferSizeProp(), 2 * 1024 * 1024L); + effectiveConfig.put(KafkaConfig$.MODULE$.GroupMinSessionTimeoutMsProp(), 0); + effectiveConfig.put(KafkaConfig$.MODULE$.OffsetsTopicReplicationFactorProp(), (short) 1); + effectiveConfig.put(KafkaConfig$.MODULE$.OffsetsTopicPartitionsProp(), 1); + effectiveConfig.put(KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), true); + return effectiveConfig; + } + + @Override + protected void before() throws Exception { + start(); + } + + @Override + protected void after() { + stop(); + } + + /** + * Stops the cluster. + */ + public void stop() { + log.info("Stopping Confluent"); + try { + try { + if (schemaRegistry != null) { + schemaRegistry.stop(); + } + } catch (final Exception fatal) { + throw new RuntimeException(fatal); + } + if (broker != null) { + broker.stop(); + } + try { + if (zookeeper != null) { + zookeeper.stop(); + } + } catch (final IOException fatal) { + throw new RuntimeException(fatal); + } + } finally { + running = false; + } + log.info("Confluent Stopped"); + } + + public String bootstrapServers() { + return broker.brokerList(); + } + + public String zookeeperConnect() { + return zookeeper.connectString(); + } + + public String schemaRegistryUrl() { + return schemaRegistry.restConnect; + } + + public void createTopic(final String topic) { + createTopic(topic, 1, (short) 1, Collections.emptyMap()); + } + + public void createTopic(final String topic, final int partitions, final short replication) { + createTopic(topic, partitions, replication, Collections.emptyMap()); + } + + public void createTopic(final String topic, + final int partitions, + final short replication, + final Map topicConfig) { + broker.createTopic(topic, partitions, replication, topicConfig); + } + +} diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/EmbeddedKafkaServer.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/EmbeddedKafkaServer.java new file mode 100644 index 0000000000..1bb6953ed9 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/EmbeddedKafkaServer.java @@ -0,0 +1,142 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.kafka.source; + + +import kafka.server.KafkaConfig; +import kafka.server.KafkaConfig$; +import kafka.server.KafkaServer; +import kafka.utils.TestUtils; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; +import org.apache.kafka.common.utils.Time; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +/** + * Runs an in-memory, "embedded" instance of a Kafka broker, which listens at `127.0.0.1:9092` by + * default. + * + * Requires a running ZooKeeper instance to connect to. By default, it expects a ZooKeeper instance + * running at `127.0.0.1:2181`. + */ +public class EmbeddedKafkaServer { + + private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaServer.class); + + private static final String DEFAULT_ZK_CONNECT = "127.0.0.1:2181"; + + private final Properties effectiveConfig; + private final File logDir; + private final TemporaryFolder tmpFolder; + private final KafkaServer kafka; + + public EmbeddedKafkaServer(final Properties config) throws IOException { + tmpFolder = new TemporaryFolder(); + tmpFolder.create(); + logDir = tmpFolder.newFolder(); + effectiveConfig = effectiveConfigFrom(config); + final boolean loggingEnabled = true; + + final KafkaConfig kafkaConfig = new KafkaConfig(effectiveConfig, loggingEnabled); + log.info("Starting embedded Kafka broker (with log.dirs={} and ZK ensemble at {}) ...", + logDir, zookeeperConnect()); + kafka = TestUtils.createServer(kafkaConfig, Time.SYSTEM); + log.debug("Startup of embedded Kafka broker at {} completed (with ZK ensemble at {}) ...", + brokerList(), zookeeperConnect()); + } + + private Properties effectiveConfigFrom(final Properties initialConfig) throws IOException { + final Properties effectiveConfig = new Properties(); + effectiveConfig.put(KafkaConfig$.MODULE$.BrokerIdProp(), 1); + effectiveConfig.put(KafkaConfig$.MODULE$.NumPartitionsProp(), 1); + effectiveConfig.put(KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), true); + effectiveConfig.put(KafkaConfig$.MODULE$.MessageMaxBytesProp(), 1000000); + effectiveConfig.put(KafkaConfig$.MODULE$.ControlledShutdownEnableProp(), true); + + effectiveConfig.putAll(initialConfig); + effectiveConfig.setProperty(KafkaConfig$.MODULE$.LogDirProp(), logDir.getAbsolutePath()); + return effectiveConfig; + } + + public String brokerList() { + return kafka.config().zkConnect(); + } + + + public String zookeeperConnect() { + return effectiveConfig.getProperty("zookeeper.connect", DEFAULT_ZK_CONNECT); + } + + public void stop() { + log.debug("Shutting down embedded Kafka broker at {} (with ZK ensemble at {}) ...", + brokerList(), zookeeperConnect()); + kafka.shutdown(); + kafka.awaitShutdown(); + log.debug("Removing temp folder {} with logs.dir at {} ...", tmpFolder, logDir); + tmpFolder.delete(); + log.debug("Shutdown of embedded Kafka broker at {} completed (with ZK ensemble at {}) ...", + brokerList(), zookeeperConnect()); + } + + public void createTopic(final String topic) { + createTopic(topic, 1, (short) 1, Collections.emptyMap()); + } + + public void createTopic(final String topic, final int partitions, final short replication) { + createTopic(topic, partitions, replication, Collections.emptyMap()); + } + + public void createTopic(final String topic, + final int partitions, + final short replication, + final Map topicConfig) { + log.debug("Creating topic { name: {}, partitions: {}, replication: {}, config: {} }", + topic, partitions, replication, topicConfig); + + final Properties properties = new Properties(); + properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); + + try (final AdminClient adminClient = AdminClient.create(properties)) { + final NewTopic newTopic = new NewTopic(topic, partitions, replication); + newTopic.configs(topicConfig); + adminClient.createTopics(Collections.singleton(newTopic)).all().get(); + } catch (final InterruptedException | ExecutionException fatal) { + throw new RuntimeException(fatal); + } + + } + + public void deleteTopic(final String topic) { + log.debug("Deleting topic {}", topic); + final Properties properties = new Properties(); + properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList()); + + try (final AdminClient adminClient = AdminClient.create(properties)) { + adminClient.deleteTopics(Collections.singleton(topic)).all().get(); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } catch (final ExecutionException e) { + if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) { + throw new RuntimeException(e); + } + } + } + + KafkaServer kafkaServer() { + return kafka; + } +} diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/EmbeddedZooKeeperServer.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/EmbeddedZooKeeperServer.java new file mode 100644 index 0000000000..e4e3d9fdd9 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/EmbeddedZooKeeperServer.java @@ -0,0 +1,46 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.kafka.source; + +import org.apache.curator.test.TestingServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Runs an in-memory, "embedded" instance of a ZooKeeper server. + * + * The ZooKeeper server instance is automatically started when you create a new instance of this class. + */ +public class EmbeddedZooKeeperServer { + + private static final Logger log = LoggerFactory.getLogger(EmbeddedZooKeeperServer.class); + + private final TestingServer server; + + public EmbeddedZooKeeperServer() throws Exception { + log.debug("Starting embedded ZooKeeper server..."); + this.server = new TestingServer(); + log.debug("Embedded ZooKeeper server at {} uses the temp directory at {}", + server.getConnectString(), server.getTempDirectory()); + } + + public void stop() throws IOException { + log.debug("Shutting down embedded ZooKeeper server at {} ...", server.getConnectString()); + server.close(); + log.debug("Shutdown of embedded ZooKeeper server at {} completed", server.getConnectString()); + } + + public String connectString() { + return server.getConnectString(); + } + + public String hostname() { + return connectString().substring(0, connectString().lastIndexOf(':')); + } + +} diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/JSONConsumerIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/JSONConsumerIT.java new file mode 100644 index 0000000000..1fab0d7ac1 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/JSONConsumerIT.java @@ -0,0 +1,114 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.kafka.source; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.connect.json.JsonSerializer; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; +import org.yaml.snakeyaml.Yaml; + +import java.io.FileReader; +import java.io.IOException; +import java.io.Reader; +import java.io.StringReader; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +@ExtendWith(MockitoExtension.class) +public class JSONConsumerIT { + + private PluginMetrics pluginMetrics; + @Mock + TopicConfig topicConfig; + @Mock + private SchemaConfig schemaConfig; + private KafkaSourceConfig kafkaSourceConfig; + + private KafkaSource kafkaSource; + private Buffer> buffer; + + @ClassRule + public static final EmbeddedKafkaClusterSingleNode CLUSTER = new EmbeddedKafkaClusterSingleNode(); + + @BeforeClass + public static void createTopics() { + CLUSTER.createTopic("test-IT-topic-1"); + } + + @Before + public void configure() throws IOException { + Yaml yaml = new Yaml(); + FileReader fileReader = new FileReader(getClass().getClassLoader().getResource("sample-pipelines-int.yaml").getFile()); + Object data = yaml.load(fileReader); + if(data instanceof Map){ + Map propertyMap = (Map) data; + Map logPipelineMap = (Map) propertyMap.get("log-pipeline"); + Map sourceMap = (Map) logPipelineMap.get("source"); + Map kafkaConfigMap = (Map) sourceMap.get("kafka"); + ObjectMapper mapper = new ObjectMapper(); + mapper.registerModule(new JavaTimeModule()); + String json = mapper.writeValueAsString(kafkaConfigMap); + Reader reader = new StringReader(json); + kafkaSourceConfig = mapper.readValue(reader, KafkaSourceConfig.class); + List topicConfigList = kafkaSourceConfig.getTopics(); + topicConfig = topicConfigList.get(0); + schemaConfig = kafkaSourceConfig.getSchemaConfig(); + } + } + + + @Test + public void testKafkaMessagesForJsonConsumer() throws JsonProcessingException { + produceTestMessages(); + kafkaSource.start(buffer); + } + + private void produceTestMessages() throws JsonProcessingException { + + String value = "{\"writebuffer\":\"true\",\"buffertype\":\"json\"}"; + JsonNode mapper = new ObjectMapper().readTree(value); + + final Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + JsonSerializer.class); + props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, CLUSTER.schemaRegistryUrl()); + try (KafkaProducer producer = new KafkaProducer(props)) { + + for (long i = 0; i < 10; i++) { + producer.send(new ProducerRecord<>("test-IT-topic-1", + mapper )); + Thread.sleep(1000L); + } + producer.flush(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } +} \ No newline at end of file diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/PlainTextConsumerIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/PlainTextConsumerIT.java new file mode 100644 index 0000000000..a2f10eeba7 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/PlainTextConsumerIT.java @@ -0,0 +1,104 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.kafka.source; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.mockito.Mock; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; +import org.yaml.snakeyaml.Yaml; + +import java.io.FileReader; +import java.io.IOException; +import java.io.Reader; +import java.io.StringReader; +import java.util.List; +import java.util.Map; +import java.util.Properties; + + +public class PlainTextConsumerIT { + + private PluginMetrics pluginMetrics; + @Mock + TopicConfig topicConfig; + @Mock + private SchemaConfig schemaConfig; + private KafkaSourceConfig kafkaSourceConfig; + + private KafkaSource kafkaSource; + private Buffer> buffer; + + @ClassRule + public static final EmbeddedKafkaClusterSingleNode CLUSTER = new EmbeddedKafkaClusterSingleNode(); + + @BeforeClass + public static void createTopics() { + CLUSTER.createTopic("test-IT-topic"); + } + + @Before + public void configure() throws IOException { + Yaml yaml = new Yaml(); + FileReader fileReader = new FileReader(getClass().getClassLoader().getResource("sample-pipelines-int.yaml").getFile()); + Object data = yaml.load(fileReader); + if(data instanceof Map){ + Map propertyMap = (Map) data; + Map logPipelineMap = (Map) propertyMap.get("log-pipeline"); + Map sourceMap = (Map) logPipelineMap.get("source"); + Map kafkaConfigMap = (Map) sourceMap.get("kafka"); + ObjectMapper mapper = new ObjectMapper(); + mapper.registerModule(new JavaTimeModule()); + String json = mapper.writeValueAsString(kafkaConfigMap); + Reader reader = new StringReader(json); + kafkaSourceConfig = mapper.readValue(reader, KafkaSourceConfig.class); + List topicConfigList = kafkaSourceConfig.getTopics(); + topicConfig = topicConfigList.get(0); + schemaConfig = kafkaSourceConfig.getSchemaConfig(); + } + } + + @Test + public void consumeKafkaMessages_should_return_at_least_one_message() { + produceTestMessages(); + kafkaSource.start(buffer); + } + + private void produceTestMessages() { + + final Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + StringSerializer.class); + props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, CLUSTER.schemaRegistryUrl()); + try (KafkaProducer producer = new KafkaProducer(props)) { + for (long i = 0; i < 10; i++) { + producer.send(new ProducerRecord<>("test-IT-topic", + "hello" + i)); + Thread.sleep(1000L); + } + producer.flush(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } +} \ No newline at end of file diff --git a/data-prepper-plugins/kafka-plugins/src/main/resources/sample-pipelines-int.yaml b/data-prepper-plugins/kafka-plugins/src/main/resources/sample-pipelines-int.yaml new file mode 100644 index 0000000000..88efc7b2e5 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/main/resources/sample-pipelines-int.yaml @@ -0,0 +1,34 @@ +log-pipeline: + source: + kafka: + bootstrap_servers: + - 127.0.0.1:9093 + auth_type: plaintext + topics: + - name: my-topic-2 + group_name: kafka-consumer-group-2 + group_id: DPKafkaProj-2 + workers: 10 #optional and default is 10 + autocommit: false #optional and dafault is false + autocommit_interval: 5 #optional and dafault is 5s + session_timeout: 45 #optional and dafault is 45s + max_retry_attempts: 1000 #optional and dafault is 5 + max_retry_delay: 1 #optional and dafault is 5 + auto_offset_reset: earliest #optional and dafault is earliest + thread_waiting_time: 1 #optional and dafault is 1s + max_record_fetch_time: 4 #optional and dafault is 4s + heart_beat_interval: 3 #optional and dafault is 3s + buffer_default_timeout: 5 #optional and dafault is 5s + fetch_max_bytes: 52428800 #optional and dafault is 52428800 + fetch_max_wait: 500 #optional and dafault is 500 + fetch_min_bytes: 1 #optional and dafault is 1 + retry_backoff: 100 #optional and dafault is 10s + max_poll_interval: 300000 #optional and dafault is 300000s + consumer_max_poll_records: 500 #optional and dafault is 500 + - name: my-topic-1 + group_id: DPKafkaProj-1 + schema: + registry_url: http://localhost:8081/ + version: 1 + sink: + - stdout: \ No newline at end of file