Skip to content

Commit

Permalink
Kafka source Confluent schema registry connectivity and OAuth impleme…
Browse files Browse the repository at this point in the history
…ntation (#3037)

* Schema registry connectivity with the oauth configurations

Signed-off-by: Ajeesh Gopalakrishnakurup <[email protected]>

* Junit fixes

Signed-off-by: Ajeesh Gopalakrishnakurup <[email protected]>

* Defect fixes

Signed-off-by: Ajeesh Gopalakrishnakurup <[email protected]>

* Updated the review comments for the PR3037

Signed-off-by: Ajeesh Gopalakrishnakurup <[email protected]>

---------

Signed-off-by: Ajeesh Gopalakrishnakurup <[email protected]>
  • Loading branch information
ajeeshakd committed Jul 24, 2023
1 parent 03c4819 commit 808e239
Show file tree
Hide file tree
Showing 14 changed files with 562 additions and 158 deletions.
1 change: 1 addition & 0 deletions data-prepper-plugins/kafka-plugins/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ dependencies {
implementation 'software.amazon.awssdk:sts:2.20.103'
implementation 'software.amazon.awssdk:auth:2.20.103'
implementation 'software.amazon.awssdk:kafka:2.20.103'
implementation 'io.confluent:kafka-json-schema-serializer:7.4.0'
testImplementation 'org.mockito:mockito-inline:4.1.0'
testImplementation 'org.yaml:snakeyaml:2.0'
testImplementation testLibs.spring.test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Size;
import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat;

import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -51,6 +52,19 @@ public class KafkaSourceConfig {
@JsonProperty("acknowledgments_timeout")
private Duration acknowledgementsTimeout = DEFAULT_ACKNOWLEDGEMENTS_TIMEOUT;

@JsonProperty("serde_format")
private String serdeFormat= MessageFormat.PLAINTEXT.toString();

@JsonProperty("client_dns_lookup")
private String clientDnsLookup;

public String getClientDnsLookup() {
return clientDnsLookup;
}

public String getSerdeFormat() {
return serdeFormat;
}
public Boolean getAcknowledgementsEnabled() {
return acknowledgementsEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,28 @@ public class OAuthConfig {
@JsonProperty("oauth_jwks_endpoint_url")
private String oauthJwksEndpointURL = "";

@JsonProperty("extension_logicalCluster")
private String extensionLogicalCluster;

@JsonProperty("extension_identityPoolId")
private String extensionIdentityPoolId;

public String getOauthAuthorizationToken() {
return oauthAuthorizationToken;
}

public String getOauthIntrospectAuthorizationToken() {
return oauthIntrospectAuthorizationToken;
}

public String getExtensionLogicalCluster() {
return extensionLogicalCluster;
}

public String getExtensionIdentityPoolId() {
return extensionIdentityPoolId;
}

public String getOauthJwksEndpointURL() {
return oauthJwksEndpointURL;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ public class PlainTextAuthConfig {
@JsonProperty("password")
private String password;

@JsonProperty("security_protocol")
private String securityProtocol;

public String getSecurityProtocol() {
return securityProtocol;
}

public String getUsername() {
return username;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,47 @@

public class SchemaConfig {

private static final int SESSION_TIME_OUT = 45000;

@JsonProperty("registry_url")
private String registryURL;

@JsonProperty("version")
private int version;

public String getRegistryURL() {
return registryURL;
@JsonProperty("schema_registry_api_key")
private String schemaRegistryApiKey;

@JsonProperty("schema_registry_api_secret")
private String schemaRegistryApiSecret;

@JsonProperty("session_timeout_ms")
private int sessionTimeoutms = SESSION_TIME_OUT;

@JsonProperty("basic_auth_credentials_source")
private String basicAuthCredentialsSource;

public int getSessionTimeoutms() {
return sessionTimeoutms;
}

public void setRegistryURL(String registryURL) {
this.registryURL = registryURL;
public String getBasicAuthCredentialsSource() {
return basicAuthCredentialsSource;
}

public String getRegistryURL() {
return registryURL;
}

public int getVersion() {
return version;
}

public void setVersion(int version) {
this.version = version;
public String getSchemaRegistryApiKey() {
return schemaRegistryApiKey;
}

public String getSchemaRegistryApiSecret() {
return schemaRegistryApiSecret;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,20 @@
public class TopicConfig {
private static final String AUTO_COMMIT = "false";
private static final Duration AUTOCOMMIT_INTERVAL = Duration.ofSeconds(5);
private static final Duration SESSION_TIMEOUT = Duration.ofSeconds(45);
private static final Integer SESSION_TIMEOUT = 45000;
private static final int MAX_RETRY_ATTEMPT = Integer.MAX_VALUE;
private static final String AUTO_OFFSET_RESET = "earliest";
static final Duration THREAD_WAITING_TIME = Duration.ofSeconds(5);
private static final Duration MAX_RECORD_FETCH_TIME = Duration.ofSeconds(4);
private static final Duration BUFFER_DEFAULT_TIMEOUT = Duration.ofSeconds(5);
private static final Duration MAX_RETRY_DELAY = Duration.ofSeconds(1);
private static final Long FETCH_MAX_BYTES = 52428800L;
private static final Long FETCH_MAX_WAIT = 500L;
private static final Long FETCH_MIN_BYTES = 1L;
private static final Integer FETCH_MAX_BYTES = 52428800;
private static final Integer FETCH_MAX_WAIT = 500;
private static final Integer FETCH_MIN_BYTES = 1;
private static final Duration RETRY_BACKOFF = Duration.ofSeconds(100);
private static final Duration MAX_POLL_INTERVAL = Duration.ofSeconds(300000);
private static final Integer CONSUMER_MAX_POLL_RECORDS = 500;
private static final Integer NUM_OF_WORKERS = 10;
private static final Integer NUM_OF_WORKERS = 5;
private static final Duration HEART_BEAT_INTERVAL_DURATION = Duration.ofSeconds(3);

@JsonProperty("name")
Expand Down Expand Up @@ -70,7 +70,7 @@ public class TopicConfig {
@JsonProperty("session_timeout")
@Valid
@Size(min = 1)
private Duration sessionTimeOut = SESSION_TIMEOUT;
private Integer sessionTimeOut = SESSION_TIMEOUT;

@JsonProperty("auto_offset_reset")
private String autoOffsetReset = AUTO_OFFSET_RESET;
Expand All @@ -94,17 +94,17 @@ public class TopicConfig {
@JsonProperty("fetch_max_bytes")
@Valid
@Size(min = 1, max = 52428800)
private Long fetchMaxBytes = FETCH_MAX_BYTES;
private Integer fetchMaxBytes = FETCH_MAX_BYTES;

@JsonProperty("fetch_max_wait")
@Valid
@Size(min = 1)
private Long fetchMaxWait = FETCH_MAX_WAIT;
private Integer fetchMaxWait = FETCH_MAX_WAIT;

@JsonProperty("fetch_min_bytes")
@Size(min = 1)
@Valid
private Long fetchMinBytes = FETCH_MIN_BYTES;
private Integer fetchMinBytes = FETCH_MIN_BYTES;

@JsonProperty("retry_backoff")
private Duration retryBackoff = RETRY_BACKOFF;
Expand Down Expand Up @@ -144,14 +144,10 @@ public void setAutoCommitInterval(Duration autoCommitInterval) {
this.autoCommitInterval = autoCommitInterval;
}

public Duration getSessionTimeOut() {
public Integer getSessionTimeOut() {
return sessionTimeOut;
}

public void setSessionTimeOut(Duration sessionTimeOut) {
this.sessionTimeOut = sessionTimeOut;
}

public String getAutoOffsetReset() {
return autoOffsetReset;
}
Expand Down Expand Up @@ -192,34 +188,22 @@ public void setBufferDefaultTimeout(Duration bufferDefaultTimeout) {
this.bufferDefaultTimeout = bufferDefaultTimeout;
}

public Long getFetchMaxBytes() {
public Integer getFetchMaxBytes() {
return fetchMaxBytes;
}

public void setFetchMaxBytes(Long fetchMaxBytes) {
this.fetchMaxBytes = fetchMaxBytes;
}

public void setAutoCommit(Boolean autoCommit) {
this.autoCommit = autoCommit;
}

public Long getFetchMaxWait() {
public Integer getFetchMaxWait() {
return fetchMaxWait;
}

public void setFetchMaxWait(Long fetchMaxWait) {
this.fetchMaxWait = fetchMaxWait;
}

public Long getFetchMinBytes() {
public Integer getFetchMinBytes() {
return fetchMinBytes;
}

public void setFetchMinBytes(Long fetchMinBytes) {
this.fetchMinBytes = fetchMinBytes;
}

public Duration getRetryBackoff() {
return retryBackoff;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,11 +203,15 @@ private <T> Record<Event> getRecord(ConsumerRecord<String, T> consumerRecord) {
if (schema == MessageFormat.JSON || schema == MessageFormat.AVRO) {
value = new HashMap<>();
try {
final JsonParser jsonParser = jsonFactory.createParser((String)consumerRecord.value().toString());
value = objectMapper.readValue(jsonParser, Map.class);
if(schema == MessageFormat.JSON){
value = consumerRecord.value();
}else if(schema == MessageFormat.AVRO) {
final JsonParser jsonParser = jsonFactory.createParser((String)consumerRecord.value().toString());
value = objectMapper.readValue(jsonParser, Map.class);
}
} catch (Exception e){
LOG.error("Failed to parse JSON or AVRO record");
return null;
data.put(key, value);
}
} else {
value = (String)consumerRecord.value();
Expand Down
Loading

0 comments on commit 808e239

Please sign in to comment.