Skip to content

Commit

Permalink
Fixes for failing build/tests
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
Krishna Kondaka committed Jun 29, 2023
1 parent 7d28868 commit 8ae09fa
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig;
Expand All @@ -49,7 +50,7 @@ public class JSONConsumerIT {
private KafkaSourceConfig kafkaSourceConfig;

private KafkaSource kafkaSource;
private Buffer<Record<Object>> buffer;
private Buffer<Record<Event>> buffer;

@ClassRule
public static final EmbeddedKafkaClusterSingleNode CLUSTER = new EmbeddedKafkaClusterSingleNode();
Expand Down Expand Up @@ -111,4 +112,4 @@ private void produceTestMessages() throws JsonProcessingException {
throw new RuntimeException(e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig;
Expand All @@ -44,7 +45,7 @@ public class PlainTextConsumerIT {
private KafkaSourceConfig kafkaSourceConfig;

private KafkaSource kafkaSource;
private Buffer<Record<Object>> buffer;
private Buffer<Record<Event>> buffer;

@ClassRule
public static final EmbeddedKafkaClusterSingleNode CLUSTER = new EmbeddedKafkaClusterSingleNode();
Expand Down Expand Up @@ -101,4 +102,4 @@ private void produceTestMessages() {
throw new RuntimeException(e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class TopicConfig {
private static final Duration SESSION_TIMEOUT = Duration.ofSeconds(45);
private static final int MAX_RETRY_ATTEMPT = Integer.MAX_VALUE;
private static final String AUTO_OFFSET_RESET = "earliest";
private static final Duration THREAD_WAITING_TIME = Duration.ofSeconds(5);
public static final Duration THREAD_WAITING_TIME = Duration.ofSeconds(5);
private static final Duration MAX_RECORD_FETCH_TIME = Duration.ofSeconds(4);
private static final Duration BUFFER_DEFAULT_TIMEOUT = Duration.ofSeconds(5);
private static final Duration MAX_RETRY_DELAY = Duration.ofSeconds(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,11 @@ void testConfigValues_default() {
assertEquals("my-topic-2", topicConfig.getName());
assertEquals("DPKafkaProj-2", topicConfig.getGroupId());
assertEquals("kafka-consumer-group-2", topicConfig.getGroupName());
assertEquals("false", topicConfig.getAutoCommit());
assertEquals(false, topicConfig.getAutoCommit());
assertEquals(Duration.ofSeconds(5), topicConfig.getAutoCommitInterval());
assertEquals(Duration.ofSeconds(45), topicConfig.getSessionTimeOut());
assertEquals("earliest", topicConfig.getAutoOffsetReset());
assertEquals(Duration.ofSeconds(1), topicConfig.getThreadWaitingTime());
assertEquals(TopicConfig.THREAD_WAITING_TIME, topicConfig.getThreadWaitingTime());
assertEquals(Duration.ofSeconds(4), topicConfig.getMaxRecordFetchTime());
assertEquals(Duration.ofSeconds(5), topicConfig.getBufferDefaultTimeout());
assertEquals(52428800L, topicConfig.getFetchMaxBytes().longValue());
Expand All @@ -93,7 +93,7 @@ void testConfigValues_from_yaml() {
assertEquals("my-topic-1", topicConfig.getName());
assertEquals("DPKafkaProj-2", topicConfig.getGroupId());
assertEquals("kafka-consumer-group-2", topicConfig.getGroupName());
assertEquals("false", topicConfig.getAutoCommit());
assertEquals(false, topicConfig.getAutoCommit());
assertEquals(Duration.ofSeconds(5), topicConfig.getAutoCommitInterval());
assertEquals(Duration.ofSeconds(45), topicConfig.getSessionTimeOut());
assertEquals("earliest", topicConfig.getAutoOffsetReset());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ log-pipeline:
group_name: kafka-consumer-group-2
group_id: DPKafkaProj-2
workers: 10 #optional and default is 10
autocommit: false #optional and dafault is false
autocommit_interval: 5 #optional and dafault is 5s
auto_commit: false #optional and dafault is false
auto_commit_interval: 5 #optional and dafault is 5s
session_timeout: 45 #optional and dafault is 45s
max_retry_attempts: 1000 #optional and dafault is 5
max_retry_delay: 1 #optional and dafault is 5
Expand Down Expand Up @@ -48,4 +48,4 @@ log-pipeline:
oauth_sasl_login_callback_handler_class: org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
oauth_jwks_endpoint_url: https://dev-13650048.okta.com/oauth2/default/v1/keys
sink:
- stdout:
- stdout:

0 comments on commit 8ae09fa

Please sign in to comment.