Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Sink Codecs - Follow up PR to 2881 #2898

Closed
wants to merge 13 commits into from
70 changes: 70 additions & 0 deletions data-prepper-plugins/avro-codecs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Avro Sink/Output Codec

This is an implementation of Avro Sink Codec that parses the Dataprepper Events into avro records and writes them into the underlying OutputStream.

## Usages

Avro Output Codec can be configured with sink plugins (e.g. S3 Sink) in the Pipeline file.

## Configuration Options

```
pipeline:
...
sink:
- s3:
aws:
region: us-east-1
sts_role_arn: arn:aws:iam::123456789012:role/Data-Prepper
sts_header_overrides:
max_retries: 5
bucket: bucket_name
object_key:
path_prefix: my-elb/%{yyyy}/%{MM}/%{dd}/
threshold:
event_count: 2000
maximum_size: 50mb
event_collect_timeout: 15s
codec:
avro:
schema: "{\"namespace\": \"org.example.test\"," +
" \"type\": \"record\"," +
" \"name\": \"TestMessage\"," +
" \"fields\": [" +
" {\"name\": \"name\", \"type\": \"string\"}," +
" {\"name\": \"age\", \"type\": \"int\"}]" +
"}";
schema_file_location: "C:\\Users\\OM20254233\\Downloads\\schema.json"
schema_registry_url: https://your.schema.registry.url.com
exclude_keys:
- s3
buffer_type: in_memory
```

## AWS Configuration

### Codec Configuration:

1) `schema`: A json string that user can provide in the yaml file itself. The codec parses schema object from this schema string.
2) `schema_file_location`: Path to the schema json file through which the user can provide schema.
3) `exclude_keys`: Those keys of the events that the user wants to exclude while converting them to avro records.
4) `schema_registry_url`: Another way of providing the schema through schema registry.

### Note:

User can provide only one schema at a time i.e. through either of the ways provided in codec config.

## Developer Guide

This plugin is compatible with Java 11. See below

- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md)
- [monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/monitoring.md)

The integration tests for this plugin do not run as part of the Data Prepper build.

The following command runs the integration tests:

```
./gradlew :data-prepper-plugins:s3-sink:integrationTest -Dtests.s3sink.region=<your-aws-region> -Dtests.s3sink.bucket=<your-bucket>
```
Original file line number Diff line number Diff line change
Expand Up @@ -5,51 +5,142 @@
package org.opensearch.dataprepper.plugins.codec.avro;

import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.codec.OutputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.OutputStream;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
/**
* An implementation of {@link OutputCodec} which deserializes Data-Prepper events
* and writes them to Output Stream as AVRO Data
*/
@DataPrepperPlugin(name = "avro", pluginType = OutputCodec.class, pluginConfigurationType = AvroOutputCodecConfig.class)
public class AvroOutputCodec implements OutputCodec {

private final AvroOutputCodecConfig config;
private static final List<String> nonComplexTypes = Arrays.asList("int", "long", "string", "float", "double", "bytes");
private static final Logger LOG = LoggerFactory.getLogger(AvroOutputCodec.class);

@DataPrepperPluginConstructor
public AvroOutputCodec(final AvroOutputCodecConfig config) {
// TODO: initiate config
Objects.requireNonNull(config);
this.config = config;
}


private DataFileWriter<GenericRecord> dataFileWriter;

private Schema schema;

private static final String AVRO = "avro";


@Override
public void start(final OutputStream outputStream) throws IOException {
// TODO: do the initial wrapping
Objects.requireNonNull(outputStream);
if (config.getSchema() != null) {
schema = parseSchema(config.getSchema());
} else if(config.getFileLocation() != null){
schema = AvroSchemaParser.parseSchemaFromJsonFile(config.getFileLocation());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an implicit assumption here that the schema is always provided as a local file. What if the schema is provided as a file in S3?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fetching Schema from a file present in S3 or elsewhere wasn't a part of the initial requirements.

}else if(config.getSchemaRegistryUrl() != null){
schema = parseSchema(AvroSchemaParserFromSchemaRegistry.getSchemaType(config.getSchemaRegistryUrl()));
}else{
LOG.error("Schema not provided.");
throw new IOException("Can't proceed without Schema.");
}
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema);
dataFileWriter = new DataFileWriter<>(datumWriter);
dataFileWriter.create(schema, outputStream);
}

@Override
public void writeEvent(final Event event, final OutputStream outputStream) throws IOException {
// TODO: write event data to the outputstream
public void complete(final OutputStream outputStream) throws IOException {
dataFileWriter.close();
outputStream.close();
}

@Override
public void complete(final OutputStream outputStream) throws IOException {
// TODO: do the final wrapping like closing outputstream
public void writeEvent(final Event event,final OutputStream outputStream) throws IOException {
Objects.requireNonNull(event);
final GenericRecord avroRecord = buildAvroRecord(schema, event.toMap());
dataFileWriter.append(avroRecord);
}

@Override
public String getExtension() {
return null;
return AVRO;
}

static Schema parseSchema(final String schema) {
// TODO: generate schema from schema string and return
return null;
Schema parseSchema(final String schemaString) throws IOException {
try{
Objects.requireNonNull(schemaString);
return new Schema.Parser().parse(schemaString);
}catch(Exception e){
LOG.error("Unable to parse Schema from Schema String provided.");
throw new IOException("Can't proceed without schema.");
}
}
private GenericRecord buildAvroRecord(final Schema schema, final Map<String, Object> eventData) {
final GenericRecord avroRecord = new GenericData.Record(schema);
final boolean isExcludeKeyAvailable = !Objects.isNull(config.getExcludeKeys());
for (final String key : eventData.keySet()) {
if (isExcludeKeyAvailable && config.getExcludeKeys().contains(key)) {
continue;
}
final Schema.Field field = schema.getField(key);
final Object value = schemaMapper(field, eventData.get(key));
avroRecord.put(key, value);
}
return avroRecord;
}
private Object schemaMapper(final Schema.Field field , final Object rawValue){
Object finalValue=null;
final String fieldType = field.schema().getType().name().toString().toLowerCase();
if(nonComplexTypes.contains(fieldType)){
switch (fieldType){
case "string":
finalValue = rawValue.toString();
break;
case "int":
finalValue = Integer.parseInt(rawValue.toString());
break;
case "float":
finalValue = Float.parseFloat(rawValue.toString());
break;
case "double":
finalValue = Double.parseDouble(rawValue.toString());
break;
case "long":
finalValue = Long.parseLong(rawValue.toString());
break;
case "bytes":
finalValue = rawValue.toString().getBytes(StandardCharsets.UTF_8);
break;
default:
LOG.error("Unrecognised Field name : '{}' & type : '{}'", field.name(), fieldType);
break;
}
}else{
if(fieldType.equals("record") && rawValue instanceof Map){
finalValue = buildAvroRecord(field.schema(), (Map<String, Object>) rawValue);
}
}
return finalValue;
}

}


Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,42 @@

import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.List;

/**
* Configuration class for {@link AvroOutputCodec}.
*/
public class AvroOutputCodecConfig {

@JsonProperty("schema")
private final String schema;
private String schema;

public AvroOutputCodecConfig(String schema) {
this.schema = schema;
@JsonProperty("schema_file_location")
private String fileLocation;

@JsonProperty("exclude_keys")
private List<String> excludeKeys;
@JsonProperty("schema_registry_url")
private String schemaRegistryUrl;

public List<String> getExcludeKeys() {
return excludeKeys;
}

public String getSchema() {
return schema;
}

public void setSchema(String schema) {
this.schema = schema;
}

public String getFileLocation() {
return fileLocation;
}

public String getSchemaRegistryUrl() {
return schemaRegistryUrl;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.codec.avro;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.avro.Schema;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AvroSchemaParser {
private static final ObjectMapper mapper = new ObjectMapper();
private static final Logger LOG = LoggerFactory.getLogger(AvroOutputCodec.class);

public static Schema parseSchemaFromJsonFile(final String location) throws IOException {
final Map<?, ?> jsonMap;
try {
jsonMap = mapper.readValue(Paths.get(location).toFile(), Map.class);
} catch (FileNotFoundException e) {
LOG.error("Schema file not found, Error: {}", e.getMessage());
throw new IOException("Can't proceed without schema.");
}
final Map<Object,Object> schemaMap = new HashMap<Object,Object>();
for (Map.Entry<?, ?> entry : jsonMap.entrySet()) {
schemaMap.put(entry.getKey(), entry.getValue());
}
try{
return new Schema.Parser().parse(mapper.writeValueAsString(schemaMap));
}catch(Exception e) {
LOG.error("Unable to parse schema from the provided schema file, Error: {}", e.getMessage());
throw new IOException("Can't proceed without schema.");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package org.opensearch.dataprepper.plugins.codec.avro;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;

public class AvroSchemaParserFromSchemaRegistry {
private static final ObjectMapper mapper = new ObjectMapper();
private static final Logger LOG = LoggerFactory.getLogger(AvroSchemaParserFromSchemaRegistry.class);
static String getSchemaType(final String schemaRegistryUrl) {
final StringBuilder response = new StringBuilder();
String schemaType = "";
try {
final String urlPath = schemaRegistryUrl;
final URL url = new URL(urlPath);
final HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("GET");
final int responseCode = connection.getResponseCode();
if (responseCode == HttpURLConnection.HTTP_OK) {
final BufferedReader reader = new BufferedReader(new InputStreamReader(connection.getInputStream()));
String inputLine;
while ((inputLine = reader.readLine()) != null) {
response.append(inputLine);
}
reader.close();
final Object json = mapper.readValue(response.toString(), Object.class);
final String indented = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(json);
final JsonNode rootNode = mapper.readTree(indented);
if(rootNode.get("schema") != null ){
return rootNode.get("schema").toString();
}
} else {
final InputStream errorStream = connection.getErrorStream();
final String errorMessage = readErrorMessage(errorStream);
LOG.error("GET request failed while fetching the schema registry details : {}", errorMessage);
}
} catch (IOException e) {
LOG.error("An error while fetching the schema registry details : ", e);
throw new RuntimeException();
}
return null;
}

private static String readErrorMessage(final InputStream errorStream) throws IOException {
if (errorStream == null) {
return null;
}
final BufferedReader reader = new BufferedReader(new InputStreamReader(errorStream));
final StringBuilder errorMessage = new StringBuilder();
String line;
while ((line = reader.readLine()) != null) {
errorMessage.append(line);
}
reader.close();
errorStream.close();
return errorMessage.toString();
}
}
Loading
Loading