From 8ae09fa4cf7f9a904f452ffd1dfb055c50418629 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Thu, 29 Jun 2023 20:06:09 +0000 Subject: [PATCH] Fixes for failing build/tests Signed-off-by: Krishna Kondaka --- .../dataprepper/plugins/kafka/source/JSONConsumerIT.java | 5 +++-- .../plugins/kafka/source/PlainTextConsumerIT.java | 5 +++-- .../plugins/kafka/configuration/TopicConfig.java | 2 +- .../plugins/kafka/configuration/TopicConfigTest.java | 6 +++--- .../kafka-plugins/src/test/resources/sample-pipelines.yaml | 6 +++--- 5 files changed, 13 insertions(+), 11 deletions(-) diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/JSONConsumerIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/JSONConsumerIT.java index 1fab0d7ac1..cc777b25df 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/JSONConsumerIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/JSONConsumerIT.java @@ -25,6 +25,7 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; @@ -49,7 +50,7 @@ public class JSONConsumerIT { private KafkaSourceConfig kafkaSourceConfig; private KafkaSource kafkaSource; - private Buffer> buffer; + private Buffer> buffer; @ClassRule public static final EmbeddedKafkaClusterSingleNode CLUSTER = new EmbeddedKafkaClusterSingleNode(); @@ -111,4 +112,4 @@ private void produceTestMessages() throws JsonProcessingException { throw new RuntimeException(e); } } -} \ No newline at end of file +} diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/PlainTextConsumerIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/PlainTextConsumerIT.java index a2f10eeba7..a5118e64c5 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/PlainTextConsumerIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/PlainTextConsumerIT.java @@ -20,6 +20,7 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; @@ -44,7 +45,7 @@ public class PlainTextConsumerIT { private KafkaSourceConfig kafkaSourceConfig; private KafkaSource kafkaSource; - private Buffer> buffer; + private Buffer> buffer; @ClassRule public static final EmbeddedKafkaClusterSingleNode CLUSTER = new EmbeddedKafkaClusterSingleNode(); @@ -101,4 +102,4 @@ private void produceTestMessages() { throw new RuntimeException(e); } } -} \ No newline at end of file +} diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfig.java index a6d6292f4d..e9f2168604 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfig.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfig.java @@ -21,7 +21,7 @@ public class TopicConfig { private static final Duration SESSION_TIMEOUT = Duration.ofSeconds(45); private static final int MAX_RETRY_ATTEMPT = Integer.MAX_VALUE; private static final String AUTO_OFFSET_RESET = "earliest"; - private static final Duration THREAD_WAITING_TIME = Duration.ofSeconds(5); + public static final Duration THREAD_WAITING_TIME = Duration.ofSeconds(5); private static final Duration MAX_RECORD_FETCH_TIME = Duration.ofSeconds(4); private static final Duration BUFFER_DEFAULT_TIMEOUT = Duration.ofSeconds(5); private static final Duration MAX_RETRY_DELAY = Duration.ofSeconds(1); diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfigTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfigTest.java index 3e60bb2771..6c26666adf 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfigTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfigTest.java @@ -69,11 +69,11 @@ void testConfigValues_default() { assertEquals("my-topic-2", topicConfig.getName()); assertEquals("DPKafkaProj-2", topicConfig.getGroupId()); assertEquals("kafka-consumer-group-2", topicConfig.getGroupName()); - assertEquals("false", topicConfig.getAutoCommit()); + assertEquals(false, topicConfig.getAutoCommit()); assertEquals(Duration.ofSeconds(5), topicConfig.getAutoCommitInterval()); assertEquals(Duration.ofSeconds(45), topicConfig.getSessionTimeOut()); assertEquals("earliest", topicConfig.getAutoOffsetReset()); - assertEquals(Duration.ofSeconds(1), topicConfig.getThreadWaitingTime()); + assertEquals(TopicConfig.THREAD_WAITING_TIME, topicConfig.getThreadWaitingTime()); assertEquals(Duration.ofSeconds(4), topicConfig.getMaxRecordFetchTime()); assertEquals(Duration.ofSeconds(5), topicConfig.getBufferDefaultTimeout()); assertEquals(52428800L, topicConfig.getFetchMaxBytes().longValue()); @@ -93,7 +93,7 @@ void testConfigValues_from_yaml() { assertEquals("my-topic-1", topicConfig.getName()); assertEquals("DPKafkaProj-2", topicConfig.getGroupId()); assertEquals("kafka-consumer-group-2", topicConfig.getGroupName()); - assertEquals("false", topicConfig.getAutoCommit()); + assertEquals(false, topicConfig.getAutoCommit()); assertEquals(Duration.ofSeconds(5), topicConfig.getAutoCommitInterval()); assertEquals(Duration.ofSeconds(45), topicConfig.getSessionTimeOut()); assertEquals("earliest", topicConfig.getAutoOffsetReset()); diff --git a/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines.yaml b/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines.yaml index 2c398a99dc..efd860a8a9 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines.yaml +++ b/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines.yaml @@ -8,8 +8,8 @@ log-pipeline: group_name: kafka-consumer-group-2 group_id: DPKafkaProj-2 workers: 10 #optional and default is 10 - autocommit: false #optional and dafault is false - autocommit_interval: 5 #optional and dafault is 5s + auto_commit: false #optional and dafault is false + auto_commit_interval: 5 #optional and dafault is 5s session_timeout: 45 #optional and dafault is 45s max_retry_attempts: 1000 #optional and dafault is 5 max_retry_delay: 1 #optional and dafault is 5 @@ -48,4 +48,4 @@ log-pipeline: oauth_sasl_login_callback_handler_class: org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler oauth_jwks_endpoint_url: https://dev-13650048.okta.com/oauth2/default/v1/keys sink: - - stdout: \ No newline at end of file + - stdout: