Skip to content

Commit

Permalink
Updated the review comments for the PR3037
Browse files Browse the repository at this point in the history
Signed-off-by: Ajeesh Gopalakrishnakurup <[email protected]>
  • Loading branch information
ajeeshakd committed Jul 24, 2023
1 parent 225f962 commit 3eff153
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Size;
import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat;

import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -52,18 +53,11 @@ public class KafkaSourceConfig {
private Duration acknowledgementsTimeout = DEFAULT_ACKNOWLEDGEMENTS_TIMEOUT;

@JsonProperty("serde_format")
private String serdeFormat;
private String serdeFormat= MessageFormat.PLAINTEXT.toString();

@JsonProperty("client_dns_lookup")
private String clientDnsLookup;

@JsonProperty("ssl_endpoint_identification_algorithm")
private String sslEndpointIdentificationAlgorithm;

public String getSslEndpointIdentificationAlgorithm() {
return sslEndpointIdentificationAlgorithm;
}

public String getClientDnsLookup() {
return clientDnsLookup;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,13 @@ public class PlainTextAuthConfig {
@JsonProperty("password")
private String password;

@JsonProperty("sasl_mechanism")
private String saslMechanism;

@JsonProperty("security_protocol")
private String securityProtocol;

public String getSecurityProtocol() {
return securityProtocol;
}

public String getSaslMechanism() {
return saslMechanism;
}
public String getUsername() {
return username;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.OAuthConfig;
import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaSourceCustomConsumer;
import org.opensearch.dataprepper.plugins.kafka.util.ClientDNSLookupType;
import org.opensearch.dataprepper.plugins.kafka.util.KafkaSourceJsonDeserializer;
import org.opensearch.dataprepper.plugins.kafka.util.KafkaSourceSecurityConfigurer;
import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat;
Expand Down Expand Up @@ -79,7 +80,6 @@
import java.util.Objects;
import java.util.Comparator;
import java.util.Properties;
import java.util.Optional;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -300,15 +300,22 @@ private Properties getConsumerProperties(final TopicConfig topicConfig) {
throw new RuntimeException("Bootstrap servers are not specified");
}
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
if (isKafkaClusterExists(sourceConfig.getBootStrapServers())) {
/* if (isKafkaClusterExists(sourceConfig.getBootStrapServers())) {
throw new RuntimeException("Can't be able to connect to the given Kafka brokers... ");
}

}*/
if (StringUtils.isNotEmpty(sourceConfig.getClientDnsLookup())) {
properties.put("client.dns.lookup", sourceConfig.getClientDnsLookup());
}
if (StringUtils.isNotEmpty(sourceConfig.getSslEndpointIdentificationAlgorithm())) {
properties.put("ssl.endpoint.identification.algorithm", sourceConfig.getSslEndpointIdentificationAlgorithm());
ClientDNSLookupType dnsLookupType = ClientDNSLookupType.getDnsLookupType(sourceConfig.getClientDnsLookup());
switch (dnsLookupType) {
case USE_ALL_DNS_IPS:
properties.put("client.dns.lookup", ClientDNSLookupType.USE_ALL_DNS_IPS.toString());
break;
case CANONICAL_BOOTSTRAP:
properties.put("client.dns.lookup", ClientDNSLookupType.CANONICAL_BOOTSTRAP.toString());
break;
case DEFAULT:
properties.put("client.dns.lookup", ClientDNSLookupType.DEFAULT.toString());
break;
}
}
setConsumerTopicProperties(properties, topicConfig);
setSchemaRegistryProperties(properties, topicConfig);
Expand Down Expand Up @@ -437,15 +444,19 @@ private void setSchemaRegistryProperties(Properties properties, TopicConfig topi
}

private void setPropertiesForPlaintextAndJsonWithoutSchemaRegistry(Properties properties) {
Optional<String> schema = Optional.of(Optional.ofNullable(sourceConfig.getSerdeFormat()).orElse(MessageFormat.PLAINTEXT.toString()));
schemaType = schema.get();
MessageFormat dataFormat = MessageFormat.getByMessageFormatByName(sourceConfig.getSerdeFormat());
schemaType = dataFormat.toString();
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
if (schemaType.equalsIgnoreCase(MessageFormat.JSON.toString())) {
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaSourceJsonDeserializer.class);
} else if (schemaType.equalsIgnoreCase(MessageFormat.PLAINTEXT.toString())) {
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
switch (dataFormat) {
case JSON:
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaSourceJsonDeserializer.class);
break;
default:
case PLAINTEXT:
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
break;
}
}

Expand Down Expand Up @@ -506,10 +517,8 @@ private void setPropertiesForSchemaRegistryConnectivity(Properties properties) {
PlainTextAuthConfig plainTextAuthConfig = authConfig.getSaslAuthConfig().getPlainTextAuthConfig();
OAuthConfig oAuthConfig = authConfig.getSaslAuthConfig().getOAuthConfig();
if (plainTextAuthConfig != null) {
String sasl_mechanism = plainTextAuthConfig.getSaslMechanism();
String protocol = plainTextAuthConfig.getSecurityProtocol();
properties.put("sasl.mechanism", sasl_mechanism);
properties.put("security.protocol", protocol);
properties.put("sasl.mechanism", "PLAIN");
properties.put("security.protocol", plainTextAuthConfig.getSecurityProtocol());
} else if (oAuthConfig != null) {
properties.put("sasl.mechanism", oAuthConfig.getOauthSaslMechanism());
properties.put("security.protocol", oAuthConfig.getOauthSecurityProtocol());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.kafka.util;

import java.util.Arrays;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

public enum ClientDNSLookupType {

DEFAULT("default"),
USE_ALL_DNS_IPS("use_all_dns_ips"),
CANONICAL_BOOTSTRAP("resolve_canonical_bootstrap_servers_only");
private static final Map<String, ClientDNSLookupType> DNS_LOOKUP_TYPE_MAP = Arrays.stream(ClientDNSLookupType.values())
.collect(Collectors.toMap(ClientDNSLookupType::toString, Function.identity()));

private final String dnsLookupType;

ClientDNSLookupType(String dnsLookupType) {
this.dnsLookupType = dnsLookupType;
}

@Override
public String toString() {
return this.dnsLookupType;
}

public static ClientDNSLookupType getDnsLookupType(final String name) {
return DNS_LOOKUP_TYPE_MAP.get(name.toLowerCase());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ log-pipeline:
bootstrap_servers:
- 127.0.0.1:9093
client_dns_lookup: use_all_dns_ips
ssl_endpoint_identification_algorithm: https
encryption: plaintext
topics:
- name: my-topic-1
Expand Down Expand Up @@ -40,7 +39,6 @@ log-pipeline:
sasl:
aws_iam: role
plaintext:
sasl_mechanism: PLAIN
security_protocol: SASL_SSL
username: 5UH4NID4OENKDIBI
password: jCmncn77F9asfox3yhgZLCEwQ5fx8pKiXnszMqdt0y1GLrdZO1V1iz95aIe1UubX
Expand Down

0 comments on commit 3eff153

Please sign in to comment.