Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Support for Auth/NoAuth with/without Encryption in Kafka with integration tests #3042

Merged
merged 1 commit into from
Jul 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 37 additions & 4 deletions data-prepper-plugins/kafka-plugins/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ log-pipeline:

- `session_timeout` (Optional) : The timeout used to detect client failures when using Kafka's group management. It is used for the rebalance.

- `max_retry_delay` (Optional) : By default the Kafka source will retry for every 1 second when there is a buffer write error. Defaults to `1s`.
- `max_retry_delay` (Optional) : By default the Kafka source will retry for every 1 second when there is a buffer write error. Defaults to `1s`.

- `auto_offset_reset` (Optional) : automatically reset the offset to the earliest or latest offset. Defaults to `earliest`.

Expand All @@ -89,7 +89,7 @@ Defaults to `4s`.

- `buffer_default_timeout` (Optional) : The maximum time to write data to the buffer. Defaults to `1s`.

- `fetch_max_bytes` (Optional) : The maximum record batch size accepted by the broker.
- `fetch_max_bytes` (Optional) : The maximum record batch size accepted by the broker.
Defaults to `52428800`.

- `fetch_max_wait` (Optional) : The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement. Defaults to `500`.
Expand Down Expand Up @@ -127,7 +127,7 @@ Defaults to `52428800`.
- `oauth_login_grant_type` (Optional) : This grant type refers to the way an application gets an access token.

- `oauth_login_scope` (Optional) : This scope limit an application's access to a user's account.

- `oauth_introspect_server` (Optional) : The URL of the introspect server. Most of the cases it should be similar to the oauth_login_server URL (Eg:https://dev.okta.com)

- `oauth_introspect_endpoint` (Optional) : The end point of the introspect server URL.(Eg: /oauth2/default/v1/introspect)
Expand All @@ -140,9 +140,42 @@ Defaults to `52428800`.

- `oauth_jwks_endpoint_url` (Optional) : The absolute URL for the oauth token refresh.

## Integration Tests

Before running the integration tests, make sure Kafka server is started
1. Start Zookeeper
```
bin/zookeeper-server-start.sh config/zookeeper.properties
```
2. Start Kafka Server with the following configuration
Configuration in config/server.properties
```
isteners=SASL_SSL://localhost:9093,PLAINTEXT://localhost:9092,SSL://localhost:9094,SASL_PLAINTEXT://localhost:9095
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
ssl.truststore.location=<location of truststore>
ssl.truststore.password=<password of truststore>
ssl.keystore.location=<location of keystore>
ssl.keystore.password=<password of keystore>
```
The truststore must have "localhost" certificates in them.

Command to start kafka server
```
bin/kafka-server-start.sh config/server.properties
```

3. Command to run integration tests

```
./gradlew data-prepper-plugins:kafka-plugins:integrationTest -Dtests.kafka.bootstrap_servers="localhost:9092" -Dtests.kafka.trust_store_location="/home/krishkdk/kafka/kafka-3.4.1-src/sec/client.truststore.jks" -Dtests.kafka.trust_store_password="kafkaks" -Dtests.kafka.saslssl_bootstrap_servers="localhost:9093" -Dtests.kafka.ssl_bootstrap_servers="localhost:9094" -Dtests.kafka.saslplain_bootstrap_servers="localhost:9095" -Dtests.kafka.username="admin" -Dtests.kafka.password="admin1" --tests "*KafkaSourceMultipleAuthTypeIT*"
```


## Developer Guide

This plugin is compatible with Java 11. See

- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md)
- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md)
- [monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/monitoring.md)
11 changes: 9 additions & 2 deletions data-prepper-plugins/kafka-plugins/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ dependencies {
testImplementation 'org.apache.kafka:kafka_2.13:3.4.0:test'
testImplementation 'org.apache.curator:curator-test:5.5.0'
testImplementation 'io.confluent:kafka-schema-registry:7.4.0'
testImplementation 'junit:junit:4.13.1'
testImplementation testLibs.junit.vintage
testImplementation 'org.apache.kafka:kafka-clients:3.4.0:test'
testImplementation 'org.apache.kafka:connect-json:3.4.0'
}
Expand All @@ -52,7 +52,6 @@ sourceSets {
runtimeClasspath += main.output + test.output
srcDir file('src/integrationTest/java')
}
//resources.srcDir file('src/integrationTest/resources')
}
}

Expand All @@ -67,6 +66,14 @@ task integrationTest(type: Test) {

useJUnitPlatform()

classpath = sourceSets.integrationTest.runtimeClasspath
systemProperty 'tests.kafka.bootstrap_servers', System.getProperty('tests.kafka.bootstrap_servers')
systemProperty 'tests.kafka.saslssl_bootstrap_servers', System.getProperty('tests.kafka.saslssl_bootstrap_servers')
systemProperty 'tests.kafka.ssl_bootstrap_servers', System.getProperty('tests.kafka.ssl_bootstrap_servers')
systemProperty 'tests.kafka.saslplain_bootstrap_servers', System.getProperty('tests.kafka.saslplain_bootstrap_servers')
systemProperty 'tests.kafka.username', System.getProperty('tests.kafka.username')
systemProperty 'tests.kafka.password', System.getProperty('tests.kafka.password')

filter {
includeTestsMatching '*IT'
}
Expand Down
Loading
Loading