Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Sink Codecs - Follow up PR to 2881 #2898

Closed
wants to merge 13 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -5,51 +5,95 @@
package org.opensearch.dataprepper.plugins.codec.avro;

import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.codec.OutputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.OutputStream;

import java.util.Objects;
/**
* An implementation of {@link OutputCodec} which deserializes Data-Prepper events
* and writes them to Output Stream as AVRO Data
*/
@DataPrepperPlugin(name = "avro", pluginType = OutputCodec.class, pluginConfigurationType = AvroOutputCodecConfig.class)
public class AvroOutputCodec implements OutputCodec {

private final AvroOutputCodecConfig config;
private static final Logger LOG = LoggerFactory.getLogger(AvroOutputCodec.class);

@DataPrepperPluginConstructor
public AvroOutputCodec(final AvroOutputCodecConfig config) {
// TODO: initiate config
Objects.requireNonNull(config);
this.config = config;
}


private DataFileWriter<GenericRecord> dataFileWriter;

private Schema schema;

private static final String AVRO = "avro";


@Override
public void start(final OutputStream outputStream) throws IOException {
// TODO: do the initial wrapping
Objects.requireNonNull(outputStream);
if(config.getSchema()!=null){
schema=parseSchema(config.getSchema());
}
else if(config.getFileLocation()!=null){
schema = AvroSchemaParser.parseSchemaFromJsonFile(config.getFileLocation());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an implicit assumption here that the schema is always provided as a local file. What if the schema is provided as a file in S3?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fetching Schema from a file present in S3 or elsewhere wasn't a part of the initial requirements.

}else{
LOG.error("Schema not provided.");
}
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema);
dataFileWriter = new DataFileWriter<>(datumWriter);
dataFileWriter.create(schema, outputStream);
}

@Override
public void writeEvent(final Event event, final OutputStream outputStream) throws IOException {
// TODO: write event data to the outputstream
public void complete(final OutputStream outputStream) throws IOException {
dataFileWriter.close();
outputStream.close();
}

@Override
public void complete(final OutputStream outputStream) throws IOException {
// TODO: do the final wrapping like closing outputstream
public void writeEvent(final Event event,final OutputStream outputStream) throws IOException {
Objects.requireNonNull(event);
final GenericRecord avroRecord = new GenericData.Record(schema);
final boolean isExcludeKeyAvailable = !Objects.isNull(config.getExcludeKeys());
for (final String key : event.toMap().keySet()) {
if (isExcludeKeyAvailable && config.getExcludeKeys().contains(key)) {
continue;
}
avroRecord.put(key, event.toMap().get(key));
}
dataFileWriter.append(avroRecord);
}

@Override
public String getExtension() {
return null;
}

static Schema parseSchema(final String schema) {
// TODO: generate schema from schema string and return
return null;
return AVRO;
}

Schema parseSchema(final String schemaString) throws IOException {
try{
Objects.requireNonNull(schemaString);
return new Schema.Parser().parse(schemaString);
}catch(Exception e){
LOG.error("Unable to parse Schema from Schema String provided.");
throw new IOException("Can't proceed without schema.");
}
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,32 @@
package org.opensearch.dataprepper.plugins.codec.avro;

import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;


/**
* Configuration class for {@link AvroOutputCodec}.
*/
public class AvroOutputCodecConfig {

@JsonProperty("schema")
private final String schema;
private String schema;

public AvroOutputCodecConfig(String schema) {
this.schema = schema;
}
@JsonProperty("schema_file_location")
private String fileLocation;

@JsonProperty("exclude_keys")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix indentation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

private List<String> excludeKeys;

public List<String> getExcludeKeys() {
return excludeKeys;
}
public String getSchema() {
return schema;
}
public String getFileLocation() {
return fileLocation;
}
public void setSchema(String schema) {
this.schema = schema;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.codec.avro;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.avro.Schema;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AvroSchemaParser {
private static final ObjectMapper mapper = new ObjectMapper();
private static final Logger LOG = LoggerFactory.getLogger(AvroOutputCodec.class);

public static Schema parseSchemaFromJsonFile(final String location) throws IOException {
final Map<?, ?> jsonMap;
try {
jsonMap = mapper.readValue(Paths.get(location).toFile(), Map.class);
} catch (FileNotFoundException e) {
LOG.error("Schema file not found, Error: {}", e.getMessage());
throw new IOException("Can't proceed without schema.");
}
final Map<Object,Object> schemaMap = new HashMap<Object,Object>();
for (Map.Entry<?, ?> entry : jsonMap.entrySet()) {
schemaMap.put(entry.getKey(), entry.getValue());
}
try{
return new Schema.Parser().parse(mapper.writeValueAsString(schemaMap));
}catch(Exception e) {
LOG.error("Unable to parse schema from the provided schema file, Error: {}", e.getMessage());
throw new IOException("Can't proceed without schema.");
}
}
}
Loading