diff --git a/data-prepper-plugins/kafka-plugins/README.md b/data-prepper-plugins/kafka-plugins/README.md index 626df4d54b..30fc97f876 100644 --- a/data-prepper-plugins/kafka-plugins/README.md +++ b/data-prepper-plugins/kafka-plugins/README.md @@ -76,7 +76,7 @@ log-pipeline: - `session_timeout` (Optional) : The timeout used to detect client failures when using Kafka's group management. It is used for the rebalance. -- `max_retry_delay` (Optional) : By default the Kafka source will retry for every 1 second when there is a buffer write error. Defaults to `1s`. +- `max_retry_delay` (Optional) : By default the Kafka source will retry for every 1 second when there is a buffer write error. Defaults to `1s`. - `auto_offset_reset` (Optional) : automatically reset the offset to the earliest or latest offset. Defaults to `earliest`. @@ -89,7 +89,7 @@ Defaults to `4s`. - `buffer_default_timeout` (Optional) : The maximum time to write data to the buffer. Defaults to `1s`. -- `fetch_max_bytes` (Optional) : The maximum record batch size accepted by the broker. +- `fetch_max_bytes` (Optional) : The maximum record batch size accepted by the broker. Defaults to `52428800`. - `fetch_max_wait` (Optional) : The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement. Defaults to `500`. @@ -127,7 +127,7 @@ Defaults to `52428800`. - `oauth_login_grant_type` (Optional) : This grant type refers to the way an application gets an access token. - `oauth_login_scope` (Optional) : This scope limit an application's access to a user's account. - + - `oauth_introspect_server` (Optional) : The URL of the introspect server. Most of the cases it should be similar to the oauth_login_server URL (Eg:https://dev.okta.com) - `oauth_introspect_endpoint` (Optional) : The end point of the introspect server URL.(Eg: /oauth2/default/v1/introspect) @@ -140,9 +140,42 @@ Defaults to `52428800`. - `oauth_jwks_endpoint_url` (Optional) : The absolute URL for the oauth token refresh. +## Integration Tests + +Before running the integration tests, make sure Kafka server is started +1. Start Zookeeper +``` +bin/zookeeper-server-start.sh config/zookeeper.properties +``` +2. Start Kafka Server with the following configuration +Configuration in config/server.properties +``` +isteners=SASL_SSL://localhost:9093,PLAINTEXT://localhost:9092,SSL://localhost:9094,SASL_PLAINTEXT://localhost:9095 +security.inter.broker.protocol=SASL_SSL +sasl.mechanism.inter.broker.protocol=PLAIN +sasl.enabled.mechanisms=PLAIN +ssl.truststore.location= +ssl.truststore.password= +ssl.keystore.location= +ssl.keystore.password= +``` +The truststore must have "localhost" certificates in them. + +Command to start kafka server +``` +bin/kafka-server-start.sh config/server.properties +``` + +3. Command to run integration tests + +``` +./gradlew data-prepper-plugins:kafka-plugins:integrationTest -Dtests.kafka.bootstrap_servers="localhost:9092" -Dtests.kafka.trust_store_location="/home/krishkdk/kafka/kafka-3.4.1-src/sec/client.truststore.jks" -Dtests.kafka.trust_store_password="kafkaks" -Dtests.kafka.saslssl_bootstrap_servers="localhost:9093" -Dtests.kafka.ssl_bootstrap_servers="localhost:9094" -Dtests.kafka.saslplain_bootstrap_servers="localhost:9095" -Dtests.kafka.username="admin" -Dtests.kafka.password="admin1" --tests "*KafkaSourceMultipleAuthTypeIT*" +``` + + ## Developer Guide This plugin is compatible with Java 11. See -- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md) +- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md) - [monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/monitoring.md) diff --git a/data-prepper-plugins/kafka-plugins/build.gradle b/data-prepper-plugins/kafka-plugins/build.gradle index 5d7825b631..f636a9a40f 100644 --- a/data-prepper-plugins/kafka-plugins/build.gradle +++ b/data-prepper-plugins/kafka-plugins/build.gradle @@ -36,7 +36,7 @@ dependencies { testImplementation 'org.apache.kafka:kafka_2.13:3.4.0:test' testImplementation 'org.apache.curator:curator-test:5.5.0' testImplementation 'io.confluent:kafka-schema-registry:7.4.0' - testImplementation 'junit:junit:4.13.1' + testImplementation testLibs.junit.vintage testImplementation 'org.apache.kafka:kafka-clients:3.4.0:test' testImplementation 'org.apache.kafka:connect-json:3.4.0' } @@ -52,7 +52,6 @@ sourceSets { runtimeClasspath += main.output + test.output srcDir file('src/integrationTest/java') } - //resources.srcDir file('src/integrationTest/resources') } } @@ -67,6 +66,14 @@ task integrationTest(type: Test) { useJUnitPlatform() + classpath = sourceSets.integrationTest.runtimeClasspath + systemProperty 'tests.kafka.bootstrap_servers', System.getProperty('tests.kafka.bootstrap_servers') + systemProperty 'tests.kafka.saslssl_bootstrap_servers', System.getProperty('tests.kafka.saslssl_bootstrap_servers') + systemProperty 'tests.kafka.ssl_bootstrap_servers', System.getProperty('tests.kafka.ssl_bootstrap_servers') + systemProperty 'tests.kafka.saslplain_bootstrap_servers', System.getProperty('tests.kafka.saslplain_bootstrap_servers') + systemProperty 'tests.kafka.username', System.getProperty('tests.kafka.username') + systemProperty 'tests.kafka.password', System.getProperty('tests.kafka.password') + filter { includeTestsMatching '*IT' } diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceMultipleAuthTypeIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceMultipleAuthTypeIT.java new file mode 100644 index 0000000000..2088f43335 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceMultipleAuthTypeIT.java @@ -0,0 +1,383 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.kafka.source; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.BeforeEach; +import org.mockito.Mock; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.PlainTextAuthConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionType; +import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; +import org.opensearch.dataprepper.model.configuration.PipelineDescription; + +import static org.mockito.Mockito.when; +import org.mockito.Mock; +import static org.mockito.Mockito.mock; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.CoreMatchers.equalTo; +import org.apache.commons.lang3.RandomStringUtils; + +import io.micrometer.core.instrument.Counter; +import java.util.List; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.TimeUnit; + +import java.time.Duration; + +public class KafkaSourceMultipleAuthTypeIT { + @Mock + private KafkaSourceConfig sourceConfig; + + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private AcknowledgementSetManager acknowledgementSetManager; + + @Mock + private PipelineDescription pipelineDescription; + + @Mock + private Buffer> buffer; + + private List topicList; + + @Mock + private TopicConfig plainTextTopic; + + @Mock + private AuthConfig authConfig; + + @Mock + private AuthConfig.SaslAuthConfig saslAuthConfig; + + @Mock + private AuthConfig.SslAuthConfig sslAuthConfig; + + @Mock + private PlainTextAuthConfig plainTextAuthConfig; + + private TopicConfig jsonTopic; + private TopicConfig avroTopic; + + private KafkaSource kafkaSource; + + private Counter counter; + + private List receivedRecords; + + private String bootstrapServers; + private String saslsslBootstrapServers; + private String saslplainBootstrapServers; + private String sslBootstrapServers; + private String kafkaUsername; + private String kafkaPassword; + + public KafkaSource createObjectUnderTest() { + return new KafkaSource(sourceConfig, pluginMetrics, acknowledgementSetManager, pipelineDescription); + } + + @BeforeEach + public void setup() { + sourceConfig = mock(KafkaSourceConfig.class); + pluginMetrics = mock(PluginMetrics.class); + counter = mock(Counter.class); + buffer = mock(Buffer.class); + receivedRecords = new ArrayList<>(); + acknowledgementSetManager = mock(AcknowledgementSetManager.class); + pipelineDescription = mock(PipelineDescription.class); + when(sourceConfig.getAcknowledgementsEnabled()).thenReturn(false); + when(sourceConfig.getAcknowledgementsTimeout()).thenReturn(KafkaSourceConfig.DEFAULT_ACKNOWLEDGEMENTS_TIMEOUT); + when(sourceConfig.getSchemaConfig()).thenReturn(null); + when(pluginMetrics.counter(anyString())).thenReturn(counter); + when(pipelineDescription.getPipelineName()).thenReturn("testPipeline"); + try { + doAnswer(args -> { + Collection> bufferedRecords = (Collection>)args.getArgument(0); + receivedRecords.addAll(bufferedRecords); + return null; + }).when(buffer).writeAll(any(Collection.class), any(Integer.class)); + } catch (Exception e){} + + final String testGroup = "TestGroup_"+RandomStringUtils.randomAlphabetic(6); + final String testTopic = "TestTopic_"+RandomStringUtils.randomAlphabetic(5); + plainTextTopic = mock(TopicConfig.class); + when(plainTextTopic.getName()).thenReturn(testTopic); + when(plainTextTopic.getGroupId()).thenReturn(testGroup); + when(plainTextTopic.getWorkers()).thenReturn(1); + when(plainTextTopic.getAutoCommit()).thenReturn(false); + when(plainTextTopic.getAutoOffsetReset()).thenReturn("earliest"); + when(plainTextTopic.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(1)); + bootstrapServers = System.getProperty("tests.kafka.bootstrap_servers"); + saslsslBootstrapServers = System.getProperty("tests.kafka.saslssl_bootstrap_servers"); + saslplainBootstrapServers = System.getProperty("tests.kafka.saslplain_bootstrap_servers"); + sslBootstrapServers = System.getProperty("tests.kafka.ssl_bootstrap_servers"); + kafkaUsername = System.getProperty("tests.kafka.username"); + kafkaPassword = System.getProperty("tests.kafka.password"); + when(sourceConfig.getBootStrapServers()).thenReturn(bootstrapServers); + } + + @Test + public void TestPlainTextWithNoAuthKafkaNoEncryptionWithNoAuthSchemaRegistry() throws Exception { + final int numRecords = 1; + when(sourceConfig.getEncryptionType()).thenReturn(EncryptionType.PLAINTEXT); + when(plainTextTopic.getConsumerMaxPollRecords()).thenReturn(numRecords); + when(sourceConfig.getTopics()).thenReturn(List.of(plainTextTopic)); + when(sourceConfig.getAuthConfig()).thenReturn(null); + kafkaSource = createObjectUnderTest(); + + Properties props = new Properties(); + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + AtomicBoolean created = new AtomicBoolean(false); + final String topicName = plainTextTopic.getName(); + try (AdminClient adminClient = AdminClient.create(props)) { + try { + adminClient.createTopics( + Collections.singleton(new NewTopic(topicName, 1, (short)1))) + .all().get(30, TimeUnit.SECONDS); + } catch (Exception e) { + throw new RuntimeException(e); + } + created.set(true); + } + while (created.get() != true) { + Thread.sleep(1000); + } + kafkaSource.start(buffer); + produceKafkaRecords(bootstrapServers, topicName, numRecords); + int numRetries = 0; + while (numRetries++ < 10 && (receivedRecords.size() != numRecords)) { + Thread.sleep(1000); + } + assertThat(receivedRecords.size(), equalTo(numRecords)); + try (AdminClient adminClient = AdminClient.create(props)) { + try { + adminClient.deleteTopics(Collections.singleton(topicName)) + .all().get(30, TimeUnit.SECONDS); + } catch (Exception e) { + throw new RuntimeException(e); + } + created.set(false); + } + while (created.get() != false) { + Thread.sleep(1000); + } + } + + @Test + public void TestPlainTextWithAuthKafkaNoEncryptionWithNoAuthSchemaRegistry() throws Exception { + final int numRecords = 1; + authConfig = mock(AuthConfig.class); + saslAuthConfig = mock(AuthConfig.SaslAuthConfig.class); + plainTextAuthConfig = mock(PlainTextAuthConfig.class); + when(sourceConfig.getEncryptionType()).thenReturn(EncryptionType.PLAINTEXT); + when(plainTextTopic.getConsumerMaxPollRecords()).thenReturn(numRecords); + when(sourceConfig.getTopics()).thenReturn(List.of(plainTextTopic)); + plainTextAuthConfig = mock(PlainTextAuthConfig.class); + when(plainTextAuthConfig.getUsername()).thenReturn(kafkaUsername); + when(plainTextAuthConfig.getPassword()).thenReturn(kafkaPassword); + when(sourceConfig.getAuthConfig()).thenReturn(authConfig); + when(authConfig.getSaslAuthConfig()).thenReturn(saslAuthConfig); + when(authConfig.getInsecure()).thenReturn(true); + when(saslAuthConfig.getPlainTextAuthConfig()).thenReturn(plainTextAuthConfig); + when(sourceConfig.getBootStrapServers()).thenReturn(saslplainBootstrapServers); + when(authConfig.getSslAuthConfig()).thenReturn(null); + kafkaSource = createObjectUnderTest(); + + Properties props = new Properties(); + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + AtomicBoolean created = new AtomicBoolean(false); + final String topicName = plainTextTopic.getName(); + try (AdminClient adminClient = AdminClient.create(props)) { + try { + adminClient.createTopics( + Collections.singleton(new NewTopic(topicName, 1, (short)1))) + .all().get(30, TimeUnit.SECONDS); + } catch (Exception e) { + throw new RuntimeException(e); + } + created.set(true); + } + while (created.get() != true) { + Thread.sleep(1000); + } + kafkaSource.start(buffer); + produceKafkaRecords(bootstrapServers, topicName, numRecords); + int numRetries = 0; + while (numRetries++ < 10 && (receivedRecords.size() != numRecords)) { + Thread.sleep(1000); + } + assertThat(receivedRecords.size(), equalTo(numRecords)); + try (AdminClient adminClient = AdminClient.create(props)) { + try { + adminClient.deleteTopics(Collections.singleton(topicName)) + .all().get(30, TimeUnit.SECONDS); + } catch (Exception e) { + throw new RuntimeException(e); + } + created.set(false); + } + while (created.get() != false) { + Thread.sleep(1000); + } + } + + @Test + public void TestPlainTextWithNoAuthKafkaEncryptionWithNoAuthSchemaRegistry() throws Exception { + final int numRecords = 1; + authConfig = mock(AuthConfig.class); + saslAuthConfig = mock(AuthConfig.SaslAuthConfig.class); + sslAuthConfig = mock(AuthConfig.SslAuthConfig.class); + plainTextAuthConfig = mock(PlainTextAuthConfig.class); + when(plainTextAuthConfig.getUsername()).thenReturn(kafkaUsername); + when(plainTextAuthConfig.getPassword()).thenReturn(kafkaPassword); + when(sourceConfig.getAuthConfig()).thenReturn(authConfig); + when(authConfig.getSaslAuthConfig()).thenReturn(null); + when(authConfig.getInsecure()).thenReturn(true); + when(authConfig.getSslAuthConfig()).thenReturn(sslAuthConfig); + when(sourceConfig.getEncryptionType()).thenReturn(EncryptionType.SSL); + when(plainTextTopic.getConsumerMaxPollRecords()).thenReturn(numRecords); + when(sourceConfig.getBootStrapServers()).thenReturn(sslBootstrapServers); + when(sourceConfig.getTopics()).thenReturn(List.of(plainTextTopic)); + kafkaSource = createObjectUnderTest(); + + Properties props = new Properties(); + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + AtomicBoolean created = new AtomicBoolean(false); + final String topicName = plainTextTopic.getName(); + try (AdminClient adminClient = AdminClient.create(props)) { + try { + adminClient.createTopics( + Collections.singleton(new NewTopic(topicName, 1, (short)1))) + .all().get(30, TimeUnit.SECONDS); + } catch (Exception e) { + throw new RuntimeException(e); + } + created.set(true); + } + while (created.get() != true) { + Thread.sleep(1000); + } + kafkaSource.start(buffer); + produceKafkaRecords(bootstrapServers, topicName, numRecords); + int numRetries = 0; + while (numRetries++ < 10 && (receivedRecords.size() != numRecords)) { + Thread.sleep(1000); + } + assertThat(receivedRecords.size(), equalTo(numRecords)); + try (AdminClient adminClient = AdminClient.create(props)) { + try { + adminClient.deleteTopics(Collections.singleton(topicName)) + .all().get(30, TimeUnit.SECONDS); + } catch (Exception e) { + throw new RuntimeException(e); + } + created.set(false); + } + while (created.get() != false) { + Thread.sleep(1000); + } + } + + @Test + public void TestPlainTextWithAuthKafkaEncryptionWithNoAuthSchemaRegistry() throws Exception { + final int numRecords = 1; + authConfig = mock(AuthConfig.class); + saslAuthConfig = mock(AuthConfig.SaslAuthConfig.class); + sslAuthConfig = mock(AuthConfig.SslAuthConfig.class); + plainTextAuthConfig = mock(PlainTextAuthConfig.class); + when(plainTextAuthConfig.getUsername()).thenReturn(kafkaUsername); + when(plainTextAuthConfig.getPassword()).thenReturn(kafkaPassword); + when(sourceConfig.getAuthConfig()).thenReturn(authConfig); + when(authConfig.getInsecure()).thenReturn(true); + when(authConfig.getSaslAuthConfig()).thenReturn(saslAuthConfig); + when(authConfig.getSslAuthConfig()).thenReturn(sslAuthConfig); + when(saslAuthConfig.getPlainTextAuthConfig()).thenReturn(plainTextAuthConfig); + when(sourceConfig.getEncryptionType()).thenReturn(EncryptionType.SSL); + when(plainTextTopic.getConsumerMaxPollRecords()).thenReturn(numRecords); + when(sourceConfig.getBootStrapServers()).thenReturn(saslsslBootstrapServers); + when(sourceConfig.getTopics()).thenReturn(List.of(plainTextTopic)); + kafkaSource = createObjectUnderTest(); + + Properties props = new Properties(); + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + AtomicBoolean created = new AtomicBoolean(false); + final String topicName = plainTextTopic.getName(); + try (AdminClient adminClient = AdminClient.create(props)) { + try { + adminClient.createTopics( + Collections.singleton(new NewTopic(topicName, 1, (short)1))) + .all().get(30, TimeUnit.SECONDS); + } catch (Exception e) { + throw new RuntimeException(e); + } + created.set(true); + } + while (created.get() != true) { + Thread.sleep(1000); + } + kafkaSource.start(buffer); + produceKafkaRecords(bootstrapServers, topicName, numRecords); + int numRetries = 0; + while (numRetries++ < 10 && (receivedRecords.size() != numRecords)) { + Thread.sleep(1000); + } + assertThat(receivedRecords.size(), equalTo(numRecords)); + try (AdminClient adminClient = AdminClient.create(props)) { + try { + adminClient.deleteTopics(Collections.singleton(topicName)) + .all().get(30, TimeUnit.SECONDS); + } catch (Exception e) { + throw new RuntimeException(e); + } + created.set(false); + } + while (created.get() != false) { + Thread.sleep(1000); + } + } + + public void produceKafkaRecords(final String servers, final String topicName, final int numRecords) { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + org.apache.kafka.common.serialization.StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + org.apache.kafka.common.serialization.StringSerializer.class); + KafkaProducer producer = new KafkaProducer(props); + for (int i = 0; i < numRecords; i++) { + String key = RandomStringUtils.randomAlphabetic(5); + String value = RandomStringUtils.randomAlphabetic(10); + ProducerRecord record = + new ProducerRecord<>(topicName, key, value); + producer.send(record); + try { + Thread.sleep(100); + } catch (Exception e){} + } + producer.close(); + } +} diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/AuthConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/AuthConfig.java index 5cc768cc99..ae037ba13e 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/AuthConfig.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/AuthConfig.java @@ -61,6 +61,9 @@ public SslAuthConfig() { @JsonProperty("sasl") private SaslAuthConfig saslAuthConfig; + @JsonProperty("insecure") + private Boolean insecure = false; + public SslAuthConfig getSslAuthConfig() { return sslAuthConfig; } @@ -69,6 +72,10 @@ public SaslAuthConfig getSaslAuthConfig() { return saslAuthConfig; } + public Boolean getInsecure() { + return insecure; + } + @AssertTrue(message = "Only one of SSL or SASL auth config must be specified") public boolean hasSaslOrSslConfig() { return Stream.of(sslAuthConfig, saslAuthConfig).filter(n -> n!=null).count() == 1; diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/InsecureSslEngineFactory.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/InsecureSslEngineFactory.java new file mode 100644 index 0000000000..39415dc370 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/InsecureSslEngineFactory.java @@ -0,0 +1,86 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.kafka.source; + +import org.apache.kafka.common.security.auth.SslEngineFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.TrustManager; +import javax.net.ssl.X509TrustManager; +import java.security.KeyManagementException; +import java.security.KeyStore; +import java.security.NoSuchAlgorithmException; +import java.security.SecureRandom; +import java.security.cert.X509Certificate; +import java.util.Map; +import java.util.Set; + +public class InsecureSslEngineFactory implements SslEngineFactory { + + private final TrustManager INSECURE_TRUST_MANAGER = new X509TrustManager() { + + public X509Certificate[] getAcceptedIssuers() { + return null; + } + + public void checkClientTrusted(X509Certificate[] certs, String authType) { + // empty + } + + public void checkServerTrusted(X509Certificate[] certs, String authType) { + // empty + } + }; + + @Override + public SSLEngine createClientSslEngine(String peerHost, int peerPort, String endpointIdentification) { + TrustManager[] trustManagers = new TrustManager[]{ INSECURE_TRUST_MANAGER }; + try { + SSLContext sslContext = SSLContext.getInstance("SSL"); + sslContext.init(null, trustManagers, new SecureRandom()); + SSLEngine sslEngine = sslContext.createSSLEngine(peerHost, peerPort); + sslEngine.setUseClientMode(true); + return sslEngine; + } catch (NoSuchAlgorithmException | KeyManagementException e) { + throw new RuntimeException(e); + } + } + + @Override + public SSLEngine createServerSslEngine(String peerHost, int peerPort) { + return null; + } + + @Override + public boolean shouldBeRebuilt(Map nextConfigs) { + return false; + } + + @Override + public Set reconfigurableConfigs() { + return null; + } + + @Override + public KeyStore keystore() { + return null; + } + + @Override + public KeyStore truststore() { + return null; + } + + @Override + public void close() { + + } + + @Override + public void configure(Map configs) { + + } +} diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java index 797bcb4e37..e11f00d81b 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java @@ -23,11 +23,13 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.source.Source; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; -import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; -import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionType; import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.AwsConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.AwsIamAuthConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionType; +import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.PlainTextAuthConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaSourceCustomConsumer; import org.opensearch.dataprepper.plugins.kafka.util.KafkaSourceJsonDeserializer; @@ -238,23 +240,26 @@ private Properties getConsumerProperties(final TopicConfig topicConfig) { AuthConfig.SaslAuthConfig saslAuthConfig = sourceConfig.getAuthConfig().getSaslAuthConfig(); if (saslAuthConfig != null) { awsIamAuthConfig = saslAuthConfig.getAwsIamAuthConfig(); + PlainTextAuthConfig plainTextAuthConfig = saslAuthConfig.getPlainTextAuthConfig(); + if (awsIamAuthConfig != null) { if (encryptionType == EncryptionType.PLAINTEXT) { throw new RuntimeException("Encryption Config must be SSL to use IAM authentication mechanism"); } setAwsIamAuthProperties(properties, awsIamAuthConfig, awsConfig); } else if (saslAuthConfig.getOAuthConfig() != null) { - } else if (saslAuthConfig.getPlainTextAuthConfig() != null) { - setPlainTextAuthProperties(properties); + } else if (plainTextAuthConfig != null) { + setPlainTextAuthProperties(properties, plainTextAuthConfig); } else { throw new RuntimeException("No SASL auth config specified"); } + } else if (encryptionType == EncryptionType.SSL) { + properties.put("security.protocol", "SSL"); + if (sourceConfig.getAuthConfig().getInsecure()) { + properties.put("ssl.engine.factory.class", InsecureSslEngineFactory.class); + } } } - properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, - topicConfig.getAutoCommitInterval().toSecondsPart()); - properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, - topicConfig.getAutoOffsetReset()); String bootstrapServers = sourceConfig.getBootStrapServers(); if (Objects.nonNull(awsIamAuthConfig)) { bootstrapServers = getBootStrapServersForMsk(awsIamAuthConfig, awsConfig); @@ -263,13 +268,19 @@ private Properties getConsumerProperties(final TopicConfig topicConfig) { throw new RuntimeException("Bootstrap servers are not specified"); } properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, topicConfig.getAutoCommit()); + properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, + topicConfig.getAutoCommitInterval().toSecondsPart()); + properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + topicConfig.getAutoOffsetReset()); properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, topicConfig.getConsumerMaxPollRecords()); properties.put(ConsumerConfig.GROUP_ID_CONFIG, topicConfig.getGroupId()); - if (sourceConfig.getSchemaConfig() != null) { - schemaType = getSchemaType(sourceConfig.getSchemaConfig().getRegistryURL(), topicConfig.getName(), sourceConfig.getSchemaConfig().getVersion()); + SchemaConfig schemaConfig = sourceConfig.getSchemaConfig(); + if (Objects.nonNull(schemaConfig)) { + schemaType = getSchemaType(schemaConfig.getRegistryURL(), topicConfig.getName(), schemaConfig.getVersion()); } if (schemaType.isEmpty()) { schemaType = MessageFormat.PLAINTEXT.toString(); @@ -333,13 +344,19 @@ private void setAwsIamAuthProperties(Properties properties, final AwsIamAuthConf } } - private void setPlainTextAuthProperties(Properties properties) { - - String username = sourceConfig.getAuthConfig().getSaslAuthConfig().getPlainTextAuthConfig().getUsername(); - String password = sourceConfig.getAuthConfig().getSaslAuthConfig().getPlainTextAuthConfig().getPassword(); + private void setPlainTextAuthProperties(Properties properties, final PlainTextAuthConfig plainTextAuthConfig) { + String username = plainTextAuthConfig.getUsername(); + String password = plainTextAuthConfig.getPassword(); properties.put("sasl.mechanism", "PLAIN"); properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + username + "\" password=\"" + password + "\";"); - properties.put("security.protocol", "SASL_PLAINTEXT"); + if (encryptionType == EncryptionType.PLAINTEXT) { + properties.put("security.protocol", "SASL_PLAINTEXT"); + } else { // EncryptionType.SSL + properties.put("security.protocol", "SASL_SSL"); + } + if (sourceConfig.getAuthConfig().getInsecure()) { + properties.put("ssl.engine.factory.class", InsecureSslEngineFactory.class); + } } private static String getSchemaType(final String registryUrl, final String topicName, final int schemaVersion) {