forked from opensearch-project/data-prepper
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: Ajeesh Gopalakrishnakurup <[email protected]>
- Loading branch information
Showing
10 changed files
with
564 additions
and
0 deletions.
There are no files selected for viewing
39 changes: 39 additions & 0 deletions
39
...fka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/avro/AvroConsumer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
package org.opensearch.dataprepper.plugins.kafka.avro; | ||
|
||
import io.confluent.kafka.serializers.KafkaAvroDeserializer; | ||
import org.apache.kafka.clients.consumer.ConsumerRecord; | ||
import org.apache.kafka.clients.consumer.ConsumerRecords; | ||
import org.apache.kafka.clients.consumer.KafkaConsumer; | ||
import org.apache.kafka.common.serialization.StringDeserializer; | ||
|
||
import java.util.Collections; | ||
import java.util.Properties; | ||
|
||
public class AvroConsumer { | ||
public static void main(String[] args) { | ||
// Load the properties file | ||
Properties props = new Properties(); | ||
|
||
props.put("bootstrap.servers", "localhost:9092"); | ||
props.put("schema.registry.url", "http://localhost:8085"); | ||
props.put("group.id", "avro"); | ||
props.put("key.deserializer", StringDeserializer.class); | ||
props.put("value.deserializer", KafkaAvroDeserializer.class); | ||
props.put("security.protocol", "PLAINTEXT"); | ||
|
||
KafkaConsumer<String, Example> consumer = new KafkaConsumer<>(props); | ||
consumer.subscribe(Collections.singletonList("kafka-avro")); | ||
|
||
try { | ||
while (true) { | ||
ConsumerRecords<String, Example> records = consumer.poll(1000); | ||
for (ConsumerRecord<String, Example> record : records) { | ||
System.out.println(record.value()); | ||
} | ||
} | ||
} finally { | ||
consumer.close(); | ||
System.out.println("Done processing"); | ||
} | ||
} | ||
} |
55 changes: 55 additions & 0 deletions
55
...fka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/avro/AvroProducer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
package org.opensearch.dataprepper.plugins.kafka.avro; | ||
|
||
import io.confluent.kafka.serializers.KafkaAvroSerializer; | ||
import org.apache.kafka.clients.producer.KafkaProducer; | ||
import org.apache.kafka.clients.producer.ProducerRecord; | ||
import org.apache.kafka.common.serialization.StringSerializer; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Properties; | ||
|
||
public class AvroProducer { | ||
|
||
public static void main(String[] args) { | ||
// Load the properties file | ||
Properties props = new Properties(); | ||
|
||
/*try { | ||
props.load(new FileReader("producer.properties")); | ||
} catch (IOException e) { | ||
e.printStackTrace(); | ||
}*/ | ||
|
||
props.put("bootstrap.servers", "localhost:9092"); | ||
props.put("schema.registry.url", "http://localhost:8085"); | ||
props.put("key.serializer", StringSerializer.class); | ||
props.put("value.serializer", KafkaAvroSerializer.class); | ||
props.put("security.protocol", "PLAINTEXT"); | ||
|
||
|
||
// Create the producer from the properties | ||
KafkaProducer<String, Example> producer = new KafkaProducer<>(props); | ||
|
||
// Create some OrderEvents to produce | ||
ArrayList<Example> examples = new ArrayList<>(); | ||
examples.add(new Example("Black Gloves- New-2")); | ||
examples.add(new Example("Black Hat - New")); | ||
examples.add(new Example("Gold Hat- New")); | ||
examples.add(new Example("shoes- New")); | ||
|
||
|
||
// Turn each OrderEvent into a ProducerRecord for the orders topic, and send them | ||
for (Example example : examples) { | ||
/*ProducerRecord<String, OrderEvent> record = new ProducerRecord<>("orders", orderEvent); | ||
producer.send(record);*/ | ||
|
||
final ProducerRecord<String, Example> record = new ProducerRecord("kafka-avro", example); | ||
producer.send(record); | ||
} | ||
|
||
// Ensure all messages get sent to Kafka | ||
producer.flush(); | ||
System.out.println("Done Processing from producer"); | ||
} | ||
|
||
} |
234 changes: 234 additions & 0 deletions
234
...ns/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/avro/Example.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,234 @@ | ||
package org.opensearch.dataprepper.plugins.kafka.avro; | ||
|
||
import org.apache.avro.specific.SpecificData; | ||
import org.apache.avro.message.BinaryMessageEncoder; | ||
import org.apache.avro.message.BinaryMessageDecoder; | ||
import org.apache.avro.message.SchemaStore; | ||
|
||
@SuppressWarnings("all") | ||
@org.apache.avro.specific.AvroGenerated | ||
public class Example extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { | ||
private static final long serialVersionUID = 963322053341084311L; | ||
public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"org.opensearch.dataprepper.plugins.kafka.avro\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}]}"); | ||
public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } | ||
|
||
private static SpecificData MODEL$ = new SpecificData(); | ||
|
||
private static final BinaryMessageEncoder<Example> ENCODER = | ||
new BinaryMessageEncoder<Example>(MODEL$, SCHEMA$); | ||
|
||
private static final BinaryMessageDecoder<Example> DECODER = | ||
new BinaryMessageDecoder<Example>(MODEL$, SCHEMA$); | ||
|
||
/** | ||
* Return the BinaryMessageDecoder instance used by this class. | ||
*/ | ||
public static BinaryMessageDecoder<Example> getDecoder() { | ||
return DECODER; | ||
} | ||
|
||
/** | ||
* Create a new BinaryMessageDecoder instance for this class that uses the specified {@link SchemaStore}. | ||
* @param resolver a {@link SchemaStore} used to find schemas by fingerprint | ||
*/ | ||
public static BinaryMessageDecoder<Example> createDecoder(SchemaStore resolver) { | ||
return new BinaryMessageDecoder<Example>(MODEL$, SCHEMA$, resolver); | ||
} | ||
|
||
/** Serializes this Example to a ByteBuffer. */ | ||
public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException { | ||
return ENCODER.encode(this); | ||
} | ||
|
||
/** Deserializes a Example from a ByteBuffer. */ | ||
public static Example fromByteBuffer( | ||
java.nio.ByteBuffer b) throws java.io.IOException { | ||
return DECODER.decode(b); | ||
} | ||
|
||
@Deprecated public CharSequence field1; | ||
|
||
/** | ||
* Default constructor. Note that this does not initialize fields | ||
* to their default values from the schema. If that is desired then | ||
* one should use <code>newBuilder()</code>. | ||
*/ | ||
public Example() {} | ||
|
||
/** | ||
* All-args constructor. | ||
* @param field1 The new value for field1 | ||
*/ | ||
public Example(CharSequence field1) { | ||
this.field1 = field1; | ||
} | ||
|
||
public org.apache.avro.Schema getSchema() { return SCHEMA$; } | ||
// Used by DatumWriter. Applications should not call. | ||
public Object get(int field$) { | ||
switch (field$) { | ||
case 0: return field1; | ||
default: throw new org.apache.avro.AvroRuntimeException("Bad index"); | ||
} | ||
} | ||
|
||
// Used by DatumReader. Applications should not call. | ||
@SuppressWarnings(value="unchecked") | ||
public void put(int field$, Object value$) { | ||
switch (field$) { | ||
case 0: field1 = (CharSequence)value$; | ||
break; | ||
default: throw new org.apache.avro.AvroRuntimeException("Bad index"); | ||
} | ||
} | ||
|
||
/** | ||
* Gets the value of the 'field1' field. | ||
* @return The value of the 'field1' field. | ||
*/ | ||
public CharSequence getField1() { | ||
return field1; | ||
} | ||
|
||
/** | ||
* Sets the value of the 'field1' field. | ||
* @param value the value to set. | ||
*/ | ||
public void setField1(CharSequence value) { | ||
this.field1 = value; | ||
} | ||
|
||
/** | ||
* Creates a new Example RecordBuilder. | ||
* @return A new Example RecordBuilder | ||
*/ | ||
public static org.opensearch.dataprepper.plugins.kafka.avro.Example.Builder newBuilder() { | ||
return new org.opensearch.dataprepper.plugins.kafka.avro.Example.Builder(); | ||
} | ||
|
||
/** | ||
* Creates a new Example RecordBuilder by copying an existing Builder. | ||
* @param other The existing builder to copy. | ||
* @return A new Example RecordBuilder | ||
*/ | ||
public static org.opensearch.dataprepper.plugins.kafka.avro.Example.Builder newBuilder(org.opensearch.dataprepper.plugins.kafka.avro.Example.Builder other) { | ||
return new org.opensearch.dataprepper.plugins.kafka.avro.Example.Builder(other); | ||
} | ||
|
||
/** | ||
* Creates a new Example RecordBuilder by copying an existing Example instance. | ||
* @param other The existing instance to copy. | ||
* @return A new Example RecordBuilder | ||
*/ | ||
public static org.opensearch.dataprepper.plugins.kafka.avro.Example.Builder newBuilder(org.opensearch.dataprepper.plugins.kafka.avro.Example other) { | ||
return new org.opensearch.dataprepper.plugins.kafka.avro.Example.Builder(other); | ||
} | ||
|
||
/** | ||
* RecordBuilder for Example instances. | ||
*/ | ||
public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<Example> | ||
implements org.apache.avro.data.RecordBuilder<Example> { | ||
|
||
private CharSequence field1; | ||
|
||
/** Creates a new Builder */ | ||
private Builder() { | ||
super(SCHEMA$); | ||
} | ||
|
||
/** | ||
* Creates a Builder by copying an existing Builder. | ||
* @param other The existing Builder to copy. | ||
*/ | ||
private Builder(org.opensearch.dataprepper.plugins.kafka.avro.Example.Builder other) { | ||
super(other); | ||
if (isValidValue(fields()[0], other.field1)) { | ||
this.field1 = data().deepCopy(fields()[0].schema(), other.field1); | ||
fieldSetFlags()[0] = true; | ||
} | ||
} | ||
|
||
/** | ||
* Creates a Builder by copying an existing Example instance | ||
* @param other The existing instance to copy. | ||
*/ | ||
private Builder(org.opensearch.dataprepper.plugins.kafka.avro.Example other) { | ||
super(SCHEMA$); | ||
if (isValidValue(fields()[0], other.field1)) { | ||
this.field1 = data().deepCopy(fields()[0].schema(), other.field1); | ||
fieldSetFlags()[0] = true; | ||
} | ||
} | ||
|
||
/** | ||
* Gets the value of the 'field1' field. | ||
* @return The value. | ||
*/ | ||
public CharSequence getField1() { | ||
return field1; | ||
} | ||
|
||
/** | ||
* Sets the value of the 'field1' field. | ||
* @param value The value of 'field1'. | ||
* @return This builder. | ||
*/ | ||
public org.opensearch.dataprepper.plugins.kafka.avro.Example.Builder setField1(CharSequence value) { | ||
validate(fields()[0], value); | ||
this.field1 = value; | ||
fieldSetFlags()[0] = true; | ||
return this; | ||
} | ||
|
||
/** | ||
* Checks whether the 'field1' field has been set. | ||
* @return True if the 'field1' field has been set, false otherwise. | ||
*/ | ||
public boolean hasField1() { | ||
return fieldSetFlags()[0]; | ||
} | ||
|
||
|
||
/** | ||
* Clears the value of the 'field1' field. | ||
* @return This builder. | ||
*/ | ||
public org.opensearch.dataprepper.plugins.kafka.avro.Example.Builder clearField1() { | ||
field1 = null; | ||
fieldSetFlags()[0] = false; | ||
return this; | ||
} | ||
|
||
@Override | ||
@SuppressWarnings("unchecked") | ||
public Example build() { | ||
try { | ||
Example record = new Example(); | ||
record.field1 = fieldSetFlags()[0] ? this.field1 : (CharSequence) defaultValue(fields()[0]); | ||
return record; | ||
} catch (Exception e) { | ||
throw new org.apache.avro.AvroRuntimeException(e); | ||
} | ||
} | ||
} | ||
|
||
@SuppressWarnings("unchecked") | ||
private static final org.apache.avro.io.DatumWriter<Example> | ||
WRITER$ = (org.apache.avro.io.DatumWriter<Example>)MODEL$.createDatumWriter(SCHEMA$); | ||
|
||
@Override public void writeExternal(java.io.ObjectOutput out) | ||
throws java.io.IOException { | ||
WRITER$.write(this, SpecificData.getEncoder(out)); | ||
} | ||
|
||
@SuppressWarnings("unchecked") | ||
private static final org.apache.avro.io.DatumReader<Example> | ||
READER$ = (org.apache.avro.io.DatumReader<Example>)MODEL$.createDatumReader(SCHEMA$); | ||
|
||
@Override public void readExternal(java.io.ObjectInput in) | ||
throws java.io.IOException { | ||
READER$.read(this, SpecificData.getDecoder(in)); | ||
} | ||
|
||
} |
35 changes: 35 additions & 0 deletions
35
...a-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/avro/SampleConsumer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
package org.opensearch.dataprepper.plugins.kafka.avro; | ||
|
||
import org.apache.kafka.clients.consumer.ConsumerConfig; | ||
import org.apache.kafka.clients.consumer.KafkaConsumer; | ||
import org.apache.kafka.clients.consumer.ConsumerRecord; | ||
import org.apache.kafka.clients.consumer.ConsumerRecords; | ||
import org.apache.kafka.common.serialization.StringDeserializer; | ||
|
||
import java.io.IOException; | ||
import java.time.Duration; | ||
import java.util.Arrays; | ||
import java.util.Properties; | ||
|
||
import static org.opensearch.dataprepper.plugins.kafka.avro.SampleProducer.loadConfig; | ||
|
||
public class SampleConsumer { | ||
public static void main(String args[]) throws IOException { | ||
|
||
final Properties props = loadConfig("D:\\Projects\\kafka-source-demo\\data-prepper\\data-prepper-plugins\\kafka-plugins\\src\\main\\java\\org\\opensearch\\dataprepper\\plugins\\kafka\\avro\\client.properties"); | ||
props.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka-java-getting-started"); | ||
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); | ||
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); | ||
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class); | ||
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); | ||
consumer.subscribe(Arrays.asList("kafka-topic")); | ||
while (true) { | ||
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); | ||
for (ConsumerRecord<String, String> record : records) { | ||
System.out.printf("key = %s, value = %s%n", record.key(), record.value()); | ||
} | ||
} | ||
|
||
} | ||
|
||
} |
Oops, something went wrong.