diff --git a/data-prepper-plugins/parquet-codecs/README.md b/data-prepper-plugins/parquet-codecs/README.md new file mode 100644 index 0000000000..e67af5e3af --- /dev/null +++ b/data-prepper-plugins/parquet-codecs/README.md @@ -0,0 +1,90 @@ +# Parquet Sink/Output Codec + +This is an implementation of Parquet Sink Codec that parses the Dataprepper Events into Parquet Records and writes them into the underlying OutputStream. + +## Usages + +Parquet Output Codec can be configured with sink plugins (e.g. S3 Sink) in the Pipeline file. + +## Configuration Options + +``` +pipeline: + ... + sink: + - s3: + aws: + region: us-east-1 + sts_role_arn: arn:aws:iam::123456789012:role/Data-Prepper + sts_header_overrides: + max_retries: 5 + bucket: bucket_name + object_key: + path_prefix: my-elb/%{yyyy}/%{MM}/%{dd}/ + threshold: + event_count: 2000 + maximum_size: 50mb + event_collect_timeout: 15s + codec: + parquet: + schema: "{\"namespace\": \"org.example.test\"," + + " \"type\": \"record\"," + + " \"name\": \"TestMessage\"," + + " \"fields\": [" + + " {\"name\": \"name\", \"type\": \"string\"}," + + " {\"name\": \"age\", \"type\": \"int\"}]" + + "}"; + schema_file_location: "C:\\Path\\to\\your\\schema.json" + schema_registry_url: https://your.schema.registry.url.com + exclude_keys: + - s3 + region: + bucket: + path_prefix: + buffer_type: in_memory +``` + +## AWS Configuration + +### Codec Configuration: + +1) `schema`: A json string that user can provide in the yaml file itself. The codec parses schema object from this schema string. +2) `schema_file_location`: Path to the schema json file through which the user can provide schema. +3) `exclude_keys`: Those keys of the events that the user wants to exclude while converting them to avro records. +4) `schema_registry_url`: Another way of providing the schema through schema registry. +5) `region`: AWS Region of the S3 bucket which the user wants to use as buffer for records parsed by Parquet Output Codec. +6) `bucket`: Name of the S3 bucket which the user wants to use as buffer for records parsed by Parquet Output Codec. +7) `path_prefix`: Path to the folder within the S3 bucket where the user wants the intermittent files to be made. +8) `schema_bucket`: Name of the S3 bucket in which `schema.json` file is kept. +9) `file_key`: File key of `schema.json` file kept in S3 bucket. +10) `schema_region`: AWS Region of the S3 bucket in which `schema.json` file is kept. + +### Note: + +1) User can provide only one schema at a time i.e. through either of the ways provided in codec config. +2) If the user wants the tags to be a part of the resultant Avro Data and has given `tagsTargetKey` in the config file, the user also has to modify the schema to accommodate the tags. Another field has to be provided in the `schema.json` file: + + `{ + "name": "yourTagsTargetKey", + "type": { "type": "array", + "items": "string" + }` + +3) The user must provide valid `region`, `bucket` and `path_prefix` for the codec to work. +4) If the user wants to input schema through a `schema.json` file kept in S3, the user must provide corresponding credentials i.e. region, bucket name and file key of the same. + + +## Developer Guide + +This plugin is compatible with Java 11. See below + +- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md) +- [monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/monitoring.md) + +The integration tests for this plugin do not run as part of the Data Prepper build. + +The following command runs the integration tests: + +``` +./gradlew :data-prepper-plugins:s3-sink:integrationTest -Dtests.s3sink.region= -Dtests.s3sink.bucket= +``` diff --git a/data-prepper-plugins/parquet-codecs/build.gradle b/data-prepper-plugins/parquet-codecs/build.gradle index 4eecb88940..c1315eb11f 100644 --- a/data-prepper-plugins/parquet-codecs/build.gradle +++ b/data-prepper-plugins/parquet-codecs/build.gradle @@ -7,6 +7,8 @@ dependencies { implementation project(':data-prepper-api') implementation project(':data-prepper-plugins:common') implementation 'org.apache.avro:avro:1.11.0' + implementation 'software.amazon.awssdk:s3' + implementation 'software.amazon.awssdk:apache-client' implementation 'org.apache.hadoop:hadoop-common:3.3.5' implementation 'org.apache.hadoop:hadoop-hdfs-client:3.3.5' implementation 'org.apache.hadoop:hadoop-yarn-client:3.3.5' diff --git a/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodec.java b/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodec.java index c4a7d0f420..71e6fae4b5 100644 --- a/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodec.java +++ b/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodec.java @@ -4,48 +4,317 @@ */ package org.opensearch.dataprepper.plugins.codec.parquet; - import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.parquet.avro.AvroParquetWriter; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.io.OutputFile; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.codec.OutputCodec; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.plugins.fs.LocalInputFile; +import org.opensearch.dataprepper.plugins.fs.LocalOutputFile; +import org.opensearch.dataprepper.plugins.s3keyindex.S3ObjectIndexUtility; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.http.apache.ApacheHttpClient; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; +import java.io.File; import java.io.IOException; import java.io.OutputStream; - +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.regex.Pattern; @DataPrepperPlugin(name = "parquet", pluginType = OutputCodec.class, pluginConfigurationType = ParquetOutputCodecConfig.class) public class ParquetOutputCodec implements OutputCodec { + private static final Logger LOG = LoggerFactory.getLogger(ParquetOutputCodec.class); + + private final ParquetOutputCodecConfig config; + private static final String BASE_SCHEMA_STRING = "{\"type\":\"record\",\"name\":\"ParquetRecords\",\"fields\":["; + private static final String END_SCHEMA_STRING = "]}"; + private static Schema schema; + private ParquetWriter writer; + private final ApacheHttpClient.Builder apacheHttpClientBuilder = ApacheHttpClient.builder(); + private S3Client s3Client; + private static final String PARQUET = "parquet"; + + private static final String TIME_PATTERN_REGULAR_EXPRESSION = "\\%\\{.*?\\}"; + private static final Pattern SIMPLE_DURATION_PATTERN = Pattern.compile(TIME_PATTERN_REGULAR_EXPRESSION); + private String key; + private final String bucket; + private final String region; + private static final List primitiveTypes = Arrays.asList("int", "long", "string", "float", "double", "bytes"); + @DataPrepperPluginConstructor public ParquetOutputCodec(final ParquetOutputCodecConfig config) { - // TODO: implement initiate config + Objects.requireNonNull(config); + this.config = config; + this.region = config.getRegion(); + this.bucket = config.getBucket(); } - @Override - public void start(final OutputStream outputStream, Event event, String tagsTargetKey) throws IOException { - // TODO: do the initial wrapping + public synchronized void start(final OutputStream outputStream, final Event event, final String tagsTargetKey) throws IOException { + this.s3Client = buildS3Client(); + buildSchemaAndKey(event, tagsTargetKey); + final S3OutputFile s3OutputFile = new S3OutputFile(s3Client, bucket, key); + buildWriter(s3OutputFile); + } + + public synchronized void start(File file) throws IOException { + LocalOutputFile localOutputFile =new LocalOutputFile(file); + buildSchemaAndKey(null, null); + buildWriter(localOutputFile); + } + + void buildSchemaAndKey(final Event event, final String tagsTargetKey) throws IOException { + if (config.getSchema() != null) { + schema = parseSchema(config.getSchema()); + } else if(config.getFileLocation()!=null){ + schema = ParquetSchemaParser.parseSchemaFromJsonFile(config.getFileLocation()); + }else if(config.getSchemaRegistryUrl()!=null){ + schema = parseSchema(ParquetSchemaParserFromSchemaRegistry.getSchemaType(config.getSchemaRegistryUrl())); + }else if(checkS3SchemaValidity()){ + schema = ParquetSchemaParserFromS3.parseSchema(config); + } + else{ + schema = buildInlineSchemaFromEvent(event, tagsTargetKey); + } + key = generateKey(); + } + public Schema buildInlineSchemaFromEvent(final Event event, final String tagsTargetKey) throws IOException { + if(tagsTargetKey!=null){ + return parseSchema(buildSchemaStringFromEventMap(addTagsToEvent(event, tagsTargetKey).toMap(), false)); + }else{ + return parseSchema(buildSchemaStringFromEventMap(event.toMap(), false)); + } + } + + private String buildSchemaStringFromEventMap(final Map eventData, boolean nestedRecordFlag) { + final StringBuilder builder = new StringBuilder(); + int nestedRecordIndex=1; + if(nestedRecordFlag==false){ + builder.append(BASE_SCHEMA_STRING); + }else{ + builder.append("{\"type\":\"record\",\"name\":\""+"NestedRecord"+nestedRecordIndex+"\",\"fields\":["); + nestedRecordIndex++; + } + String fields; + int index = 0; + for(final String key: eventData.keySet()){ + if(config.getExcludeKeys()==null){ + config.setExcludeKeys(new ArrayList<>()); + } + if(config.getExcludeKeys().contains(key)){ + continue; + } + if(index == 0){ + if(!(eventData.get(key) instanceof Map)){ + fields = "{\"name\":\""+key+"\",\"type\":\""+typeMapper(eventData.get(key))+"\"}"; + } + else{ + fields = "{\"name\":\""+key+"\",\"type\":"+typeMapper(eventData.get(key))+"}"; + } + } + else{ + if(!(eventData.get(key) instanceof Map)){ + fields = ","+"{\"name\":\""+key+"\",\"type\":\""+typeMapper(eventData.get(key))+"\"}"; + }else{ + fields = ","+"{\"name\":\""+key+"\",\"type\":"+typeMapper(eventData.get(key))+"}"; + } + } + builder.append(fields); + index++; + } + builder.append(END_SCHEMA_STRING); + return builder.toString(); + } + + private String typeMapper(final Object value) { + if(value instanceof Integer || value.getClass().equals(int.class)){ + return "int"; + }else if(value instanceof Float || value.getClass().equals(float.class)){ + return "float"; + }else if(value instanceof Double || value.getClass().equals(double.class)){ + return "double"; + }else if(value instanceof Long || value.getClass().equals(long.class)){ + return "long"; + }else if(value instanceof Byte[]){ + return "bytes"; + }else if(value instanceof Map){ + return buildSchemaStringFromEventMap((Map) value, true); + } + else{ + return "string"; + } + } + + private void buildWriter(OutputFile outputFile) throws IOException { + writer = AvroParquetWriter.builder(outputFile) + .withSchema(schema) + .build(); } @Override - public void complete(final OutputStream outputStream) throws IOException { - // TODO: Close the output stream + public void writeEvent(final Event event, final OutputStream outputStream,final String tagsTargetKey) throws IOException { + final GenericData.Record parquetRecord = new GenericData.Record(schema); + final Event modifiedEvent; + if (tagsTargetKey != null) { + modifiedEvent = addTagsToEvent(event, tagsTargetKey); + } else { + modifiedEvent = event; + } + for (final String key : modifiedEvent.toMap().keySet()) { + if (config.getExcludeKeys().contains(key)) { + continue; + } + final Schema.Field field = schema.getField(key); + final Object value = schemaMapper(field, modifiedEvent.toMap().get(key)); + parquetRecord.put(key, value); + } + writer.write(parquetRecord); } @Override - public void writeEvent(final Event event, final OutputStream outputStream, String tagsTargetKey) throws IOException { - // TODO: get the event data and write in output stream + public synchronized void complete(final OutputStream outputStream) throws IOException { + writer.close(); + final S3ObjectReference s3ObjectReference = S3ObjectReference.bucketAndKey(bucket, key).build(); + final S3InputFile inputFile = new S3InputFile(s3Client, s3ObjectReference); + byte[] byteBuffer = inputFile.newStream().readAllBytes(); + outputStream.write(byteBuffer); + final DeleteObjectRequest deleteRequest = DeleteObjectRequest.builder() + .bucket(bucket) + .key(key) + .build(); + s3Client.deleteObject(deleteRequest); + } + public void closeWriter(final OutputStream outputStream, File file) throws IOException { + final LocalInputFile inputFile = new LocalInputFile(file); + byte[] byteBuffer = inputFile.newStream().readAllBytes(); + outputStream.write(byteBuffer); + writer.close(); } @Override public String getExtension() { - return null; + return PARQUET; } static Schema parseSchema(final String schemaString) { - return null; + return new Schema.Parser().parse(schemaString); } -} \ No newline at end of file + private S3Client buildS3Client() { + final AwsCredentialsProvider credentialsProvider = AwsCredentialsProviderChain.builder() + .addCredentialsProvider(DefaultCredentialsProvider.create()).build(); + return S3Client.builder() + .region(Region.of(region)) + .credentialsProvider(credentialsProvider) + .httpClientBuilder(apacheHttpClientBuilder) + .build(); + } + + /** + * Generate the s3 object path prefix and object file name. + * + * @return object key path. + */ + protected String generateKey() { + final String pathPrefix = buildObjectPath(config.getPathPrefix()); + final String namePattern = buildObjectFileName(config.getNamePattern()); + return (!pathPrefix.isEmpty()) ? pathPrefix + namePattern : namePattern; + } + + private static String buildObjectPath(final String pathPrefix) { + final StringBuilder s3ObjectPath = new StringBuilder(); + if (pathPrefix != null && !pathPrefix.isEmpty()) { + String[] pathPrefixList = pathPrefix.split("\\/"); + for (final String prefixPath : pathPrefixList) { + if (SIMPLE_DURATION_PATTERN.matcher(prefixPath).find()) { + s3ObjectPath.append(S3ObjectIndexUtility.getObjectPathPrefix(prefixPath)).append("/"); + } else { + s3ObjectPath.append(prefixPath).append("/"); + } + } + } + return s3ObjectPath.toString(); + } + + private String buildObjectFileName(final String configNamePattern) { + return S3ObjectIndexUtility.getObjectNameWithDateTimeId(configNamePattern) + "." + getExtension(); + } + private static Object schemaMapper(final Schema.Field field , final Object rawValue){ + Object finalValue = null; + final String fieldType = field.schema().getType().name().toString().toLowerCase(); + if (field.schema().getLogicalType() == null && primitiveTypes.contains(fieldType)){ + switch (fieldType){ + case "string": + finalValue = rawValue.toString(); + break; + case "int": + finalValue = Integer.parseInt(rawValue.toString()); + break; + case "float": + finalValue = Float.parseFloat(rawValue.toString()); + break; + case "double": + finalValue = Double.parseDouble(rawValue.toString()); + break; + case "long": + finalValue = Long.parseLong(rawValue.toString()); + break; + case "bytes": + finalValue = rawValue.toString().getBytes(StandardCharsets.UTF_8); + break; + default: + LOG.error("Unrecognised Field name : '{}' & type : '{}'", field.name(), fieldType); + break; + } + }else{ + final String logicalTypeName = field.schema().getLogicalType().getName(); + switch (logicalTypeName){ + case "date": + finalValue = Integer.parseInt(rawValue.toString()); + break; + case "time-millis": + case "timestamp-millis": + case "time-micros": + case "timestamp-micros": + finalValue = Long.parseLong(rawValue.toString()); + break; + case "decimal": + Double.parseDouble(rawValue.toString()); + break; + case "uuid": + finalValue = rawValue.toString().getBytes(StandardCharsets.UTF_8); + break; + default: + LOG.error("Unrecognised Logical Datatype for field : '{}' & type : '{}'", field.name(), logicalTypeName); + break; + } + } + return finalValue; + } + boolean checkS3SchemaValidity() throws IOException { + if (config.getSchemaBucket() != null && config.getFileKey() != null && config.getSchemaRegion() != null) { + return true; + } else { + LOG.error("Invalid S3 credentials, can't reach the schema file."); + throw new IOException("Can't proceed without schema."); + } + } +} diff --git a/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodecConfig.java b/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodecConfig.java index d816a364ae..e1016496d4 100644 --- a/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodecConfig.java +++ b/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodecConfig.java @@ -5,13 +5,145 @@ package org.opensearch.dataprepper.plugins.codec.parquet; import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.Valid; +import jakarta.validation.constraints.NotNull; +import jakarta.validation.constraints.Size; + +import java.util.ArrayList; +import java.util.List; public class ParquetOutputCodecConfig { + + private static final String DEFAULT_OBJECT_NAME_PATTERN = "events-%{yyyy-MM-dd'T'hh-mm-ss}"; + private static final List DEFAULT_EXCLUDE_KEYS = new ArrayList<>(); + @JsonProperty("schema") private String schema; + @Valid + @Size(max = 0, message = "Schema from file is not supported.") + @JsonProperty("schema_file_location") + private String fileLocation; + + @Valid + @Size(max = 0, message = "Schema from file is not supported.") + @JsonProperty("schema_bucket") + private String schemaBucket; + + @Valid + @Size(max = 0, message = "Schema from file is not supported.") + @JsonProperty("file_key") + private String fileKey; + + @Valid + @Size(max = 0, message = "Schema from file is not supported.") + @JsonProperty("schema_region") + private String schemaRegion; + + @JsonProperty("region") + @NotNull + @Valid + private String region; + + @JsonProperty("bucket") + @NotNull + @Valid + private String bucket; + + @JsonProperty("path_prefix") + @NotNull + @Valid + private String pathPrefix; + @JsonProperty("exclude_keys") + private List excludeKeys = DEFAULT_EXCLUDE_KEYS; + + @Valid + @Size(max = 0, message = "Schema from Schema Registry is not supported.") + @JsonProperty("schema_registry_url") + private String schemaRegistryUrl; + + public List getExcludeKeys() { + return excludeKeys; + } + + public String getFileLocation() { + return fileLocation; + } + + public void setSchema(String schema) { + this.schema = schema; + } + public String getSchema() { return schema; } + + public String getSchemaRegistryUrl() { + return schemaRegistryUrl; + } + + public String getRegion() { + return region; + } + + public String getBucket() { + return bucket; + } + + public String getPathPrefix() { + return pathPrefix; + } + + /** + * Read s3 object index file pattern configuration. + * + * @return default object name pattern. + */ + public String getNamePattern() { + return DEFAULT_OBJECT_NAME_PATTERN; + } + public void setRegion(String region) { + this.region = region; + } + + public void setBucket(String bucket) { + this.bucket = bucket; + } + + public void setPathPrefix(String pathPrefix) { + this.pathPrefix = pathPrefix; + } + public String getSchemaBucket() { + return schemaBucket; + } + + public String getFileKey() { + return fileKey; + } + + public String getSchemaRegion() { + return schemaRegion; + } + + public void setFileKey(String fileKey) { + this.fileKey = fileKey; + } + public void setSchemaBucket(String schemaBucket) { + this.schemaBucket = schemaBucket; + } + + public void setSchemaRegion(String schemaRegion) { + this.schemaRegion = schemaRegion; + } + public void setFileLocation(String fileLocation) { + this.fileLocation = fileLocation; + } + + public void setSchemaRegistryUrl(String schemaRegistryUrl) { + this.schemaRegistryUrl = schemaRegistryUrl; + } + public void setExcludeKeys(List excludeKeys) { + this.excludeKeys = excludeKeys; + } } diff --git a/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetSchemaParser.java b/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetSchemaParser.java new file mode 100644 index 0000000000..e523ef0679 --- /dev/null +++ b/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetSchemaParser.java @@ -0,0 +1,32 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.codec.parquet; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.avro.Schema; + +import java.io.IOException; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; + +public class ParquetSchemaParser { + private static final ObjectMapper mapper = new ObjectMapper(); + + public static Schema parseSchemaFromJsonFile(final String location) throws IOException { + final Map map = mapper.readValue(Paths.get(location).toFile(), Map.class); + return getSchema(map); + } + + public static Schema getSchema(Map map) throws JsonProcessingException { + final Map schemaMap = new HashMap(); + for (Map.Entry entry : map.entrySet()) { + schemaMap.put(entry.getKey(), entry.getValue()); + } + final String schemaJson = mapper.writeValueAsString(schemaMap); + return new Schema.Parser().parse(schemaJson); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetSchemaParserFromS3.java b/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetSchemaParserFromS3.java new file mode 100644 index 0000000000..7eaad8dbfd --- /dev/null +++ b/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetSchemaParserFromS3.java @@ -0,0 +1,55 @@ +package org.opensearch.dataprepper.plugins.codec.parquet; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.avro.Schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.core.ResponseInputStream; +import software.amazon.awssdk.http.apache.ApacheHttpClient; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; + +import java.io.IOException; +import java.util.Map; + +public class ParquetSchemaParserFromS3 { + private static final ObjectMapper objectMapper = new ObjectMapper(); + private static final ApacheHttpClient.Builder apacheHttpClientBuilder = ApacheHttpClient.builder(); + private static final Logger LOG = LoggerFactory.getLogger(ParquetSchemaParserFromS3.class); + + public static Schema parseSchema(final ParquetOutputCodecConfig config) throws IOException { + try{ + return new Schema.Parser().parse(getS3SchemaObject(config)); + }catch (Exception e){ + LOG.error("Unable to retrieve schema from S3. Error: "+e.getMessage()); + throw new IOException("Can't proceed without schema."); + } + } + + private static String getS3SchemaObject(ParquetOutputCodecConfig config) throws IOException { + S3Client s3Client = buildS3Client(config); + GetObjectRequest getObjectRequest = GetObjectRequest.builder() + .bucket(config.getSchemaBucket()) + .key(config.getFileKey()) + .build(); + ResponseInputStream s3Object = s3Client.getObject(getObjectRequest); + final Map stringObjectMap = objectMapper.readValue(s3Object, new TypeReference<>() {}); + return objectMapper.writeValueAsString(stringObjectMap); + } + + private static S3Client buildS3Client(ParquetOutputCodecConfig config) { + final AwsCredentialsProvider credentialsProvider = AwsCredentialsProviderChain.builder() + .addCredentialsProvider(DefaultCredentialsProvider.create()).build(); + return S3Client.builder() + .region(Region.of(config.getSchemaRegion())) + .credentialsProvider(credentialsProvider) + .httpClientBuilder(apacheHttpClientBuilder) + .build(); + } +} diff --git a/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetSchemaParserFromSchemaRegistry.java b/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetSchemaParserFromSchemaRegistry.java new file mode 100644 index 0000000000..7158c363c4 --- /dev/null +++ b/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetSchemaParserFromSchemaRegistry.java @@ -0,0 +1,69 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.codec.parquet; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.HttpURLConnection; +import java.net.URL; + +public class ParquetSchemaParserFromSchemaRegistry { + private static final ObjectMapper mapper = new ObjectMapper(); + private static final Logger LOG = LoggerFactory.getLogger(ParquetSchemaParserFromSchemaRegistry.class); + static String getSchemaType(final String schemaRegistryUrl) { + final StringBuilder response = new StringBuilder(); + try { + final String urlPath = schemaRegistryUrl; + final URL url = new URL(urlPath); + final HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("GET"); + final int responseCode = connection.getResponseCode(); + if (responseCode == HttpURLConnection.HTTP_OK) { + final BufferedReader reader = new BufferedReader(new InputStreamReader(connection.getInputStream())); + String inputLine; + while ((inputLine = reader.readLine()) != null) { + response.append(inputLine); + } + reader.close(); + final Object json = mapper.readValue(response.toString(), Object.class); + final String indented = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(json); + final JsonNode rootNode = mapper.readTree(indented); + if(rootNode.get("schema") != null ){ + return rootNode.get("schema").toString(); + } + } else { + final InputStream errorStream = connection.getErrorStream(); + final String errorMessage = readErrorMessage(errorStream); + LOG.error("GET request failed while fetching the schema registry details : {}", errorMessage); + } + } catch (IOException e) { + LOG.error("An error while fetching the schema registry details : ", e); + throw new RuntimeException(); + } + return null; + } + + private static String readErrorMessage(final InputStream errorStream) throws IOException { + if (errorStream == null) { + return null; + } + final BufferedReader reader = new BufferedReader(new InputStreamReader(errorStream)); + final StringBuilder errorMessage = new StringBuilder(); + String line; + while ((line = reader.readLine()) != null) { + errorMessage.append(line); + } + reader.close(); + errorStream.close(); + return errorMessage.toString(); + } +} diff --git a/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/S3InputFile.java b/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/S3InputFile.java new file mode 100644 index 0000000000..6b5630c342 --- /dev/null +++ b/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/S3InputFile.java @@ -0,0 +1,78 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.codec.parquet; + +import org.apache.parquet.io.SeekableInputStream; +import org.opensearch.dataprepper.model.io.InputFile; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.HeadObjectRequest; +import software.amazon.awssdk.services.s3.model.HeadObjectResponse; + +import java.util.concurrent.atomic.LongAdder; + +public class S3InputFile implements InputFile { + + private S3Client s3Client; + + private S3ObjectReference s3ObjectReference; + + private LongAdder bytesCounter; + + private HeadObjectResponse metadata; + + public S3InputFile(final S3Client s3Client, final S3ObjectReference s3ObjectReference) { + this.s3Client = s3Client; + this.s3ObjectReference = s3ObjectReference; + } + + /** + * Note: this may be stale if file was deleted since metadata is cached for size/existence checks. + * + * @return content length + */ + @Override + public long getLength() { + return getMetadata().contentLength(); + } + + /** + * Create an input stream from the input file + * @return an implementation of a SeekableInputStream into the S3 object. + */ + @Override + public SeekableInputStream newStream() { + bytesCounter = new LongAdder(); + + return new S3InputStream(s3Client, s3ObjectReference, getMetadata(), bytesCounter); + } + + /** + * Get the count of bytes read from the S3 object + * @return + */ + public long getBytesCount() { + if (bytesCounter == null) { + return 0; + } + + return bytesCounter.longValue(); + } + + /** + * Get the metadata of the S3 object. Cache the metadata to avoid subsequent headObject calls to S3 + * @return the metadata of the S3 object + */ + private synchronized HeadObjectResponse getMetadata() { + if (metadata == null) { + final HeadObjectRequest request = HeadObjectRequest.builder() + .bucket(s3ObjectReference.getBucketName()) + .key(s3ObjectReference.getKey()) + .build(); + metadata = s3Client.headObject(request); + } + + return metadata; + } +} diff --git a/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/S3InputStream.java b/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/S3InputStream.java new file mode 100644 index 0000000000..fceebe1eac --- /dev/null +++ b/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/S3InputStream.java @@ -0,0 +1,605 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.codec.parquet; + +import com.google.common.base.Preconditions; +import com.google.common.io.ByteStreams; +import org.apache.parquet.io.SeekableInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.core.sync.ResponseTransformer; +import software.amazon.awssdk.http.Abortable; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.HeadObjectResponse; +import software.amazon.awssdk.services.s3.model.NoSuchKeyException; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.LongAdder; + + +class S3InputStream extends SeekableInputStream { + + private static final int COPY_BUFFER_SIZE = 8192; + + private static final Logger LOG = LoggerFactory.getLogger(S3InputStream.class); + + private static final int SKIP_SIZE = 1024 * 1024; + + private final S3Client s3Client; + + private final S3ObjectReference s3ObjectReference; + + private final HeadObjectResponse metadata; + + private final LongAdder bytesCounter; + + private final GetObjectRequest.Builder getObjectRequestBuilder; + + private InputStream stream; + + private final byte[] temp = new byte[COPY_BUFFER_SIZE]; + + private long pos = 0; + private long next = 0; + + private long mark = 0; + + private long markLimit = 0; + + private boolean closed = false; + + public S3InputStream( + final S3Client s3Client, + final S3ObjectReference s3ObjectReference, + final HeadObjectResponse metadata, + final LongAdder bytesCounter) { + this.s3Client = s3Client; + this.s3ObjectReference = s3ObjectReference; + this.metadata = metadata; + this.bytesCounter = bytesCounter; + + this.getObjectRequestBuilder = GetObjectRequest.builder() + .bucket(this.s3ObjectReference.getBucketName()) + .key(this.s3ObjectReference.getKey()); + } + + // Implement all InputStream methods first: + + /** + * Returns bytes available to read. + * + * @throws IOException If the underlying stream throws IOException + */ + @Override + public int available() throws IOException { + Preconditions.checkState(!closed, "Cannot read: already closed"); + positionStream(); + + return stream.available(); + } + + /** + * Close a stream. + * + * @throws IOException If the underlying stream throws IOException + */ + @Override + public void close() throws IOException { + super.close(); + closed = true; + closeStream(); + } + + /** + * Mark the current position of the input stream + * + * @param readlimit the maximum limit of bytes that can be read before + * the mark position becomes invalid. + */ + @Override + public synchronized void mark(int readlimit) { + mark = next; + markLimit = mark + readlimit; + } + + /** + * Whether this stream supports mark or not. + * @return Whether mark is supported or not. + */ + @Override + public synchronized boolean markSupported() { + return true; + } + + + /** + * Read a single byte from the stream + * @return the number of bytes read + * @throws IOException if data cannoy be read. + */ + @Override + public int read() throws IOException { + Preconditions.checkState(!closed, "Cannot read: already closed"); + positionStream(); + + pos += 1; + next += 1; + bytesCounter.increment(); + + return stream.read(); + } + + /** + * Read data into the provided byte array + * @param b the buffer into which the data is read. + * @return number of bytes read + * @throws IOException if data cannot be read + */ + @Override + public int read(byte[] b) throws IOException { + return read(b, 0, b.length); + } + + /** + * Read len bytes into the provided byte array starting at off + * @param b the buffer into which the data is read. + * @param off the start offset in array b + * at which the data is written. + * @param len the maximum number of bytes to read. + * @return number of bytes read + * @throws IOException if data cannot be read + */ + @Override + public int read(byte[] b, int off, int len) throws IOException { + Preconditions.checkState(!closed, "Cannot read: already closed"); + positionStream(); + + int bytesRead = stream.read(b, off, len); + pos += bytesRead; + next += bytesRead; + bytesCounter.add(bytesRead); + + return bytesRead; + } + + /** + * Read all bytes from this input stream. + * @return Array of bytes read + * @throws IOException + */ + @Override + public byte[] readAllBytes() throws IOException { + Preconditions.checkState(!closed, "Cannot read: already closed"); + positionStream(); + + final byte[] bytesRead = stream.readAllBytes(); + + pos += bytesRead.length; + next += bytesRead.length; + bytesCounter.add(bytesRead.length); + + return bytesRead; + } + + /** + * + * @param b the byte array into which the data is read + * @param off the start offset in {@code b} at which the data is written + * @param len the maximum number of bytes to read + * @return number of bytes read + * @throws IOException if underlying stream cannot be read from + */ + @Override + public int readNBytes(byte[] b, int off, int len) throws IOException { + Preconditions.checkState(!closed, "Cannot read: already closed"); + positionStream(); + + final int bytesRead = stream.readNBytes(b, off, len); + + pos += bytesRead; + next += bytesRead; + bytesCounter.add(bytesRead); + + return bytesRead; + } + + /** + * @param len the number of bytes to read + * @return array of bytes read + * @throws IOException if stream cannot be read from + */ + @Override + public byte[] readNBytes(int len) throws IOException { + Preconditions.checkState(!closed, "Cannot read: already closed"); + positionStream(); + + final byte[] bytesRead = stream.readNBytes(len); + + pos += bytesRead.length; + next += bytesRead.length; + bytesCounter.add(bytesRead.length); + + return bytesRead; + } + + /** + * Reset the stream to the marked position + * @throws IOException if the stream that was marked is no longer valid + */ + @Override + public synchronized void reset() throws IOException { + if (next > markLimit) { + throw new IOException("Cannot reset stream because mark limit exceeded"); + } + + next = mark; + } + + /** + * Skip n number of bytes in the stream. + * @param n the number of bytes to be skipped. + * @return the number of bytes skipped. + */ + @Override + public long skip(long n) { + if (next >= metadata.contentLength()) { + return 0; + } + + long toSkip = Math.min(n, metadata.contentLength() - next); + + next += toSkip; + + return toSkip; + } + + // Override all SeekableInputStream methods + + /** + * Get the offset into the stream + * @return the offset into the stream + */ + @Override + public long getPos() { + return next; + } + + /** + * Seek the specified offset into the input stream. + * @param newPos the new position to seek to + */ + @Override + public void seek(long newPos) { + Preconditions.checkState(!closed, "Cannot read: already closed"); + Preconditions.checkArgument(newPos >= 0, "position is negative: %s", newPos); + + // this allows a seek beyond the end of the stream but the next read will fail + next = newPos; + } + + // Implement all SeekableInputStream methods + + /** + * Read a byte array of data, from position 0 to the end of the array. + *

+ * This method is equivalent to {@code read(bytes, 0, bytes.length)}. + *

+ * This method will block until len bytes are available to copy into the + * array, or will throw {@link EOFException} if the stream ends before the + * array is full. + * + * @param bytes a byte array to fill with data from the stream + * @throws IOException If the underlying stream throws IOException + * @throws EOFException If the stream has fewer bytes left than are needed to + * fill the array, {@code bytes.length} + */ + @Override + public void readFully(byte[] bytes) throws IOException { + readFully(bytes, 0, bytes.length); + } + + /** + * Read {@code len} bytes of data into an array, at position {@code start}. + *

+ * This method will block until len bytes are available to copy into the + * array, or will throw {@link EOFException} if the stream ends before the + * array is full. + * + * @param bytes a byte array to fill with data from the stream + * @param start the starting position in the byte array for data + * @param len the length of bytes to read into the byte array + * @throws IOException If the underlying stream throws IOException + * @throws EOFException If the stream has fewer than {@code len} bytes left + */ + @Override + public void readFully(byte[] bytes, int start, int len) throws IOException { + Preconditions.checkState(!closed, "Cannot read: already closed"); + positionStream(); + + int bytesRead = readFully(stream, bytes, start, len); + + this.pos += bytesRead; + this.next += bytesRead; + this.bytesCounter.add(bytesRead); + } + + /** + * Read {@code buf.remaining()} bytes of data into a {@link ByteBuffer}. + *

+ * This method will copy available bytes into the buffer, reading at most + * {@code buf.remaining()} bytes. The number of bytes actually copied is + * returned by the method, or -1 is returned to signal that the end of the + * underlying stream has been reached. + * + * @param buf a byte buffer to fill with data from the stream + * @return the number of bytes read or -1 if the stream ended + * @throws IOException If the underlying stream throws IOException + */ + @Override + public int read(ByteBuffer buf) throws IOException { + Preconditions.checkState(!closed, "Cannot read: already closed"); + positionStream(); + + int bytesRead = 0; + if (buf.hasArray()) { + bytesRead = readHeapBuffer(stream, buf); + } else { + bytesRead = readDirectBuffer(stream, buf, temp); + } + + this.pos += bytesRead; + this.next += bytesRead; + this.bytesCounter.add(bytesRead); + + return bytesRead; + } + + /** + * Read {@code buf.remaining()} bytes of data into a {@link ByteBuffer}. + *

+ * This method will block until {@code buf.remaining()} bytes are available + * to copy into the buffer, or will throw {@link EOFException} if the stream + * ends before the buffer is full. + * + * @param buf a byte buffer to fill with data from the stream + * @throws IOException If the underlying stream throws IOException + * @throws EOFException If the stream has fewer bytes left than are needed to + * fill the buffer, {@code buf.remaining()} + */ + @Override + public void readFully(ByteBuffer buf) throws IOException { + Preconditions.checkState(!closed, "Cannot read: already closed"); + positionStream(); + + int bytesRead = 0; + if (buf.hasArray()) { + bytesRead = readFullyHeapBuffer(stream, buf); + } else { + bytesRead = readFullyDirectBuffer(stream, buf, temp); + } + + this.pos += bytesRead; + this.next += bytesRead; + this.bytesCounter.add(bytesRead); + } + + /** + * Position the stream for reading bytes starting at next offset + * @throws IOException if stream cannot be set correctly + */ + private void positionStream() throws IOException { + + if ((stream != null) && (next == pos)) { + // already at specified position + return; + } + + if ((stream != null) && (next > pos)) { + // seeking forwards + long skip = next - pos; + if (skip <= Math.max(stream.available(), SKIP_SIZE)) { + // already buffered or seek is small enough + LOG.debug("Read-through seek for {} to offset {}", s3ObjectReference, next); + try { + ByteStreams.skipFully(stream, skip); + pos = next; + return; + } catch (IOException ignored) { + // will retry by re-opening the stream + } + } + } + + // close the stream and open at desired position + LOG.debug("Seek with new stream for {} to offset {}", s3ObjectReference, next); + pos = next; + openStream(); + } + + /** + * Open the stream to the S3 object + * @throws IOException if the stream cannot be opened. + */ + private void openStream() throws IOException { + closeStream(); + + if (pos >= metadata.contentLength()) { + stream = InputStream.nullInputStream(); + return; + } + + final GetObjectRequest request = this.getObjectRequestBuilder + .range(String.format("bytes=%s-", pos)) + .build(); + + try { + stream = s3Client.getObject(request, ResponseTransformer.toInputStream()); + } catch (NoSuchKeyException e) { + throw new IOException("Location does not exist: " + s3ObjectReference.toString(), e); + } + } + + /** + * Close the input stream from the S3 object + * @throws IOException if the stream cannot be closed. + */ + private void closeStream() throws IOException { + if (stream != null) { + // if we aren't at the end of the stream, and the stream is abortable, then + // call abort() so we don't read the remaining data with the Apache HTTP client + abortStream(); + try { + stream.close(); + } catch (IOException e) { + // the Apache HTTP client will throw a ConnectionClosedException + // when closing an aborted stream, which is expected + if (!e.getClass().getSimpleName().equals("ConnectionClosedException")) { + throw e; + } + } + stream = null; + } + } + + /** + * Abort the stream to the S3 object. + */ + private void abortStream() { + try { + if (stream instanceof Abortable && stream.read() != -1) { + ((Abortable) stream).abort(); + } + } catch (Exception e) { + LOG.warn("An error occurred while aborting the stream", e); + } + } + + /** + * Read the input stream into the byte buffer with the assumption that the byte buffer is backed by some bytes. + * @param f input stream + * @param buf byte buffer wrapper + * @return bytes read into the buffer + * @throws IOException if bytes cannot be read from input stream into the byte buffer + */ + // Visible for testing + static int readHeapBuffer(InputStream f, ByteBuffer buf) throws IOException { + int bytesRead = f.read(buf.array(), buf.arrayOffset() + buf.position(), buf.remaining()); + if (bytesRead < 0) { + // if this resulted in EOF, don't update position + return bytesRead; + } else { + buf.position(buf.position() + bytesRead); + return bytesRead; + } + } + + /** + * Helper method to read bytes from an input stream into a byte array + * @param f input stream + * @param bytes byte array + * @param start offset into byte array to start reading to + * @param len number of bytes to read into the byte array + * @return number of bytes read into buffer + * @throws IOException if input stream cannot be read + */ + static int readFully(InputStream f, byte[] bytes, int start, int len) throws IOException { + int totalBytesRead = 0; + int offset = start; + int remaining = len; + while (remaining > 0) { + int bytesRead = f.read(bytes, offset, remaining); + if (bytesRead < 0) { + throw new EOFException( + "Reached the end of stream with " + remaining + " bytes left to read"); + } + + remaining -= bytesRead; + offset += bytesRead; + totalBytesRead += bytesRead; + } + return totalBytesRead; + } + + /** + * Read fully into the bytes buffer assuming that the byte buffer is backed by a byte array + * @param f input stream + * @param buf byte buffer + * @return number of bytes read into buffer + * @throws IOException if bytes cannot be read into the byte buffer + */ + // Visible for testing + static int readFullyHeapBuffer(InputStream f, ByteBuffer buf) throws IOException { + int bytesRead = readFully(f, buf.array(), buf.arrayOffset() + buf.position(), buf.remaining()); + buf.position(buf.limit()); + return bytesRead; + } + + /** + * Read into a direct buffer with the assumption that the byte buffer has no backing byte array + * @param f input stream + * @param buf byte buffer + * @param temp byte array to use as a buffer + * @return the number of bytes read + * @throws IOException if the bytes cannot be read from the input stream + */ + // Visible for testing + static int readDirectBuffer(InputStream f, ByteBuffer buf, byte[] temp) throws IOException { + // copy all the bytes that return immediately, stopping at the first + // read that doesn't return a full buffer. + int nextReadLength = Math.min(buf.remaining(), temp.length); + int totalBytesRead = 0; + int bytesRead; + + while ((bytesRead = f.read(temp, 0, nextReadLength)) == temp.length) { + buf.put(temp); + totalBytesRead += bytesRead; + nextReadLength = Math.min(buf.remaining(), temp.length); + } + + if (bytesRead < 0) { + // return -1 if nothing was read + return totalBytesRead == 0 ? -1 : totalBytesRead; + } else { + // copy the last partial buffer + buf.put(temp, 0, bytesRead); + totalBytesRead += bytesRead; + return totalBytesRead; + } + } + + /** + * Read into from the input stream into the byte buffer using the provided byte array as a buffer + * @param f input sream to read from + * @param buf byte buffer to read data into + * @param temp The byte array to use as a buffer for reading. + * @return number of bytes read into buffer + * @throws IOException if the bytes cannot be read + */ + // Visible for testing + static int readFullyDirectBuffer(InputStream f, ByteBuffer buf, byte[] temp) throws IOException { + int totalBytesRead = 0; + int nextReadLength = Math.min(buf.remaining(), temp.length); + int bytesRead = 0; + + while (nextReadLength > 0 && (bytesRead = f.read(temp, 0, nextReadLength)) >= 0) { + buf.put(temp, 0, bytesRead); + nextReadLength = Math.min(buf.remaining(), temp.length); + if (bytesRead >= 0) { + totalBytesRead += bytesRead; + } + } + + if (bytesRead < 0 && buf.remaining() > 0) { + throw new EOFException( + "Reached the end of stream with " + buf.remaining() + " bytes left to read"); + } + + return totalBytesRead; + } +} diff --git a/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/S3ObjectReference.java b/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/S3ObjectReference.java new file mode 100644 index 0000000000..09cb196626 --- /dev/null +++ b/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/S3ObjectReference.java @@ -0,0 +1,72 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.codec.parquet; + +import java.util.Objects; +import java.util.Optional; + +/** + * Reference to an S3 object. + */ +class S3ObjectReference { + private final String bucketName; + private final String key; + private final String owner; + + private S3ObjectReference(final String bucketName, final String key, final String owner) { + this.bucketName = bucketName; + this.key = key; + this.owner = owner; + } + + static Builder bucketAndKey(final String bucketName, final String key) { + Objects.requireNonNull(bucketName, "bucketName must be non null"); + Objects.requireNonNull(key, "key must be non null"); + return new Builder(bucketName, key); + } + + String getBucketName() { + return bucketName; + } + + String getKey() { + return key; + } + + Optional getBucketOwner() { + return Optional.ofNullable(owner); + } + + @Override + public String toString() { + return "[bucketName=" + bucketName + ", key=" + key + "]"; + } + + public String uri() { + return String.format("s3://%s/%s", bucketName, key); + } + + public static final class Builder { + + private final String bucketName; + private final String key; + private String owner; + + private Builder(final String bucketName, final String key) { + this.bucketName = bucketName; + this.key = key; + } + + public Builder owner(final String owner) { + this.owner = owner; + return this; + } + + public S3ObjectReference build() { + return new S3ObjectReference(bucketName, key, owner); + } + } +} diff --git a/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/S3OutputFile.java b/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/S3OutputFile.java new file mode 100644 index 0000000000..cc5020a3f7 --- /dev/null +++ b/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/S3OutputFile.java @@ -0,0 +1,50 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.codec.parquet; + +import org.apache.parquet.io.OutputFile; +import org.apache.parquet.io.PositionOutputStream; +import software.amazon.awssdk.services.s3.S3Client; + +import java.io.IOException; + + +public class S3OutputFile implements OutputFile { + + private S3Client s3Client; + + private String bucketName; + + private String key; + + + public S3OutputFile(final S3Client s3Client, final String bucketName, + final String key) { + this.s3Client = s3Client; + this.bucketName = bucketName; + this.key = key; + } + + @Override + public PositionOutputStream create(long blockSizeHint) throws IOException { + return new S3OutputStream(s3Client, bucketName, key); + } + + @Override + public PositionOutputStream createOrOverwrite(long blockSizeHint) throws IOException { + return null; + } + + @Override + public boolean supportsBlockSize() { + return false; + } + + @Override + public long defaultBlockSize() { + return 0; + } + +} diff --git a/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/S3OutputStream.java b/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/S3OutputStream.java new file mode 100644 index 0000000000..388a1e31cc --- /dev/null +++ b/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/S3OutputStream.java @@ -0,0 +1,214 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.codec.parquet; + + +import org.apache.parquet.io.PositionOutputStream; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload; +import software.amazon.awssdk.services.s3.model.CompletedPart; +import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.UploadPartRequest; +import software.amazon.awssdk.services.s3.model.UploadPartResponse; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class S3OutputStream extends PositionOutputStream { + + /** + * Default chunk size is 10MB + */ + protected static final int BUFFER_SIZE = 10000000; + + /** + * The bucket-name on Amazon S3 + */ + private final String bucket; + + /** + * The path (key) name within the bucket + */ + private final String path; + + /** + * The temporary buffer used for storing the chunks + */ + private final byte[] buf; + + private final S3Client s3Client; + /** + * Collection of the etags for the parts that have been uploaded + */ + private final List etags; + /** + * The position in the buffer + */ + private int position; + /** + * The unique id for this upload + */ + private String uploadId; + /** + * indicates whether the stream is still open / valid + */ + private boolean open; + + /** + * Creates a new S3 OutputStream + * + * @param s3Client the AmazonS3 client + * @param bucket name of the bucket + * @param path path within the bucket + */ + public S3OutputStream(S3Client s3Client, String bucket, String path) { + this.s3Client = s3Client; + this.bucket = bucket; + this.path = path; + buf = new byte[BUFFER_SIZE]; + position = 0; + etags = new ArrayList<>(); + open = true; + } + + @Override + public void write(int b) { + assertOpen(); + if (position >= buf.length) { + flushBufferAndRewind(); + } + buf[position++] = (byte) b; + } + + + /** + * Write an array to the S3 output stream. + * + * @param b the byte-array to append + */ + @Override + public void write(byte[] b) { + write(b, 0, b.length); + } + + + /** + * Writes an array to the S3 Output Stream + * + * @param byteArray the array to write + * @param o the offset into the array + * @param l the number of bytes to write + */ + @Override + public void write(byte[] byteArray, int o, int l) { + assertOpen(); + int ofs = o; + int len = l; + int size; + while (len > (size = buf.length - position)) { + System.arraycopy(byteArray, ofs, buf, position, size); + position += size; + flushBufferAndRewind(); + ofs += size; + len -= size; + } + System.arraycopy(byteArray, ofs, buf, position, len); + position += len; + } + + /** + * Flushes the buffer by uploading a part to S3. + */ + @Override + public synchronized void flush() { + assertOpen(); + } + + @Override + public void close() { + if (open) { + open = false; + if (uploadId != null) { + if (position > 0) { + uploadPart(); + } + + CompletedPart[] completedParts = new CompletedPart[etags.size()]; + for (int i = 0; i < etags.size(); i++) { + completedParts[i] = CompletedPart.builder() + .eTag(etags.get(i)) + .partNumber(i + 1) + .build(); + } + + CompletedMultipartUpload completedMultipartUpload = CompletedMultipartUpload.builder() + .parts(completedParts) + .build(); + CompleteMultipartUploadRequest completeMultipartUploadRequest = CompleteMultipartUploadRequest.builder() + .bucket(bucket) + .key(path) + .uploadId(uploadId) + .multipartUpload(completedMultipartUpload) + .build(); + s3Client.completeMultipartUpload(completeMultipartUploadRequest); + } else { + PutObjectRequest putRequest = PutObjectRequest.builder() + .bucket(bucket) + .key(path) + .contentLength((long) position) + .build(); + + RequestBody requestBody = RequestBody.fromInputStream(new ByteArrayInputStream(buf, 0, position), + position); + s3Client.putObject(putRequest, requestBody); + } + } + } + + private void assertOpen() { + if (!open) { + throw new IllegalStateException("Closed"); + } + } + + protected void flushBufferAndRewind() { + if (uploadId == null) { + CreateMultipartUploadRequest uploadRequest = CreateMultipartUploadRequest.builder() + .bucket(bucket) + .key(path) + .build(); + CreateMultipartUploadResponse multipartUpload = s3Client.createMultipartUpload(uploadRequest); + uploadId = multipartUpload.uploadId(); + } + uploadPart(); + position = 0; + } + + protected void uploadPart() { + UploadPartRequest uploadRequest = UploadPartRequest.builder() + .bucket(bucket) + .key(path) + .uploadId(uploadId) + .partNumber(etags.size() + 1) + .contentLength((long) position) + .build(); + RequestBody requestBody = RequestBody.fromInputStream(new ByteArrayInputStream(buf, 0, position), + position); + UploadPartResponse uploadPartResponse = s3Client.uploadPart(uploadRequest, requestBody); + etags.add(uploadPartResponse.eTag()); + } + + @Override + public long getPos() throws IOException { + return position; + } +} + diff --git a/data-prepper-plugins/parquet-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetInputCodecTest.java b/data-prepper-plugins/parquet-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetInputCodecTest.java index 8d0ab87cd7..bbaeb506d3 100644 --- a/data-prepper-plugins/parquet-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetInputCodecTest.java +++ b/data-prepper-plugins/parquet-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetInputCodecTest.java @@ -1,3 +1,7 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ package org.opensearch.dataprepper.plugins.codec.parquet; import org.apache.avro.Schema; diff --git a/data-prepper-plugins/parquet-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodecTest.java b/data-prepper-plugins/parquet-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodecTest.java new file mode 100644 index 0000000000..abba1c5429 --- /dev/null +++ b/data-prepper-plugins/parquet-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodecTest.java @@ -0,0 +1,197 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.codec.parquet; + +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroup; +import org.apache.parquet.example.data.simple.convert.GroupRecordConverter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.ColumnIOFactory; +import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.io.RecordReader; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; +import org.hamcrest.Matchers; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.log.JacksonLog; +import org.opensearch.dataprepper.model.record.Record; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class ParquetOutputCodecTest { + private static final String FILE_NAME = "parquet-data"; + private static final String FILE_SUFFIX = ".parquet"; + private static int numberOfRecords; + private ParquetOutputCodecConfig config; + + private static Record getRecord(int index) { + List recordList = generateRecords(numberOfRecords); + final Event event = JacksonLog.builder().withData(recordList.get(index)).build(); + return new Record<>(event); + } + + private static List generateRecords(int numberOfRecords) { + + List recordList = new ArrayList<>(); + + for (int rows = 0; rows < numberOfRecords; rows++) { + + HashMap eventData = new HashMap<>(); + + eventData.put("name", "Person" + rows); + eventData.put("age", rows); + eventData.put("doubleType", Double.valueOf(rows)); + eventData.put("floatType", Float.valueOf(rows)); + eventData.put("longType", Long.valueOf(rows)); + eventData.put("bytesType", ("Person"+rows).getBytes()); + recordList.add((eventData)); + + } + return recordList; + } + + private static Schema parseSchema() { + return SchemaBuilder.record("Person") + .fields() + .name("name").type().stringType().noDefault() + .name("age").type().intType().noDefault() + .name("doubleType").type().doubleType().noDefault() + .name("floatType").type().floatType().noDefault() + .name("longType").type().longType().noDefault() + .name("bytesType").type().bytesType().noDefault() + .endRecord(); + + } + + private ParquetOutputCodec createObjectUnderTest() { + config = new ParquetOutputCodecConfig(); + config.setSchema(parseSchema().toString()); + config.setBucket("test"); + config.setRegion("test"); + return new ParquetOutputCodec(config); + } + + @ParameterizedTest + @ValueSource(ints = {1, 2, 10, 100}) + void test_happy_case(final int numberOfRecords) throws Exception { + ParquetOutputCodecTest.numberOfRecords = numberOfRecords; + ParquetOutputCodec parquetOutputCodec = createObjectUnderTest(); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + final File tempFile = File.createTempFile(FILE_NAME, FILE_SUFFIX); + parquetOutputCodec.start(tempFile); + for (int index = 0; index < numberOfRecords; index++) { + final Event event = (Event) getRecord(index).getData(); + parquetOutputCodec.writeEvent(event, outputStream, null); + } + parquetOutputCodec.closeWriter(outputStream, tempFile); + List> actualRecords = createParquetRecordsList(new ByteArrayInputStream(tempFile.toString().getBytes())); + int index = 0; + for (final HashMap actualMap : actualRecords) { + assertThat(actualMap, notNullValue()); + Map expectedMap = generateRecords(numberOfRecords).get(index); + assertThat(expectedMap, Matchers.equalTo(actualMap)); + index++; + } + tempFile.delete(); + } + @Test + public void test_getExtension() { + ParquetOutputCodec parquetOutputCodec = createObjectUnderTest(); + String extension = parquetOutputCodec.getExtension(); + + assertThat(extension, equalTo("parquet")); + } + @Test + public void whenNoSchemaProvided_thenThrowsException() { + config = new ParquetOutputCodecConfig(); + config.setSchema(null); + config.setFileLocation(null); + config.setSchemaRegistryUrl(null); + ParquetOutputCodec parquetOutputCodec = new ParquetOutputCodec(config); + assertThrows(IOException.class,()-> + parquetOutputCodec.buildSchemaAndKey(null, null)); + } + + @Test + public void test_s3SchemaValidity() throws IOException { + config = new ParquetOutputCodecConfig(); + config.setSchema(parseSchema().toString()); + config.setSchemaBucket("test"); + config.setSchemaRegion("test"); + config.setFileKey("test"); + ParquetOutputCodec parquetOutputCodec = new ParquetOutputCodec(config); + assertThat(parquetOutputCodec.checkS3SchemaValidity(), equalTo(Boolean.TRUE)); + ParquetOutputCodec parquetOutputCodecFalse = createObjectUnderTest(); + assertThrows(IOException.class,()-> + parquetOutputCodecFalse.checkS3SchemaValidity()); + } + + private List> createParquetRecordsList(final InputStream inputStream) throws IOException { + + final File tempFile = File.createTempFile(FILE_NAME, FILE_SUFFIX); + Files.copy(inputStream, tempFile.toPath(), StandardCopyOption.REPLACE_EXISTING); + List> actualRecordList = new ArrayList<>(); + try (ParquetFileReader parquetFileReader = new ParquetFileReader(HadoopInputFile.fromPath(new Path(tempFile.toURI()), new Configuration()), ParquetReadOptions.builder().build())) { + final ParquetMetadata footer = parquetFileReader.getFooter(); + final MessageType schema = createdParquetSchema(footer); + PageReadStore pages; + + while ((pages = parquetFileReader.readNextRowGroup()) != null) { + final long rows = pages.getRowCount(); + final MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema); + final RecordReader recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema)); + for (int row = 0; row < rows; row++) { + final Map eventData = new HashMap<>(); + int fieldIndex = 0; + final SimpleGroup simpleGroup = (SimpleGroup) recordReader.read(); + for (Type field : schema.getFields()) { + try { + eventData.put(field.getName(), simpleGroup.getValueToString(fieldIndex, 0)); + } catch (Exception parquetException) { + parquetException.printStackTrace(); + } + fieldIndex++; + } + actualRecordList.add((HashMap) eventData); + } + } + } catch (Exception parquetException) { + parquetException.printStackTrace(); + } finally { + Files.delete(tempFile.toPath()); + } + return actualRecordList; + } + + private MessageType createdParquetSchema(ParquetMetadata parquetMetadata) { + return parquetMetadata.getFileMetaData().getSchema(); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/build.gradle b/data-prepper-plugins/s3-sink/build.gradle index 65a5def5e1..fb58ece594 100644 --- a/data-prepper-plugins/s3-sink/build.gradle +++ b/data-prepper-plugins/s3-sink/build.gradle @@ -17,10 +17,13 @@ dependencies { implementation 'software.amazon.awssdk:s3' implementation 'software.amazon.awssdk:sts' implementation 'org.jetbrains.kotlin:kotlin-stdlib:1.8.21' + implementation 'org.apache.avro:avro:1.11.1' + implementation 'org.apache.hadoop:hadoop-common:3.3.6' + implementation 'org.apache.parquet:parquet-avro:1.13.1' implementation 'org.jetbrains.kotlin:kotlin-stdlib-common:1.8.21' implementation 'org.apache.commons:commons-lang3:3.12.0' testImplementation project(':data-prepper-test-common') - + implementation project(':data-prepper-plugins:parquet-codecs') implementation project(':data-prepper-plugins:parse-json-processor') } diff --git a/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/S3SinkServiceIT.java b/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/S3SinkServiceIT.java index 15d41309f8..a0e1c1f359 100644 --- a/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/S3SinkServiceIT.java +++ b/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/S3SinkServiceIT.java @@ -7,7 +7,25 @@ import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.DistributionSummary; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroup; +import org.apache.parquet.example.data.simple.convert.GroupRecordConverter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.ColumnIOFactory; +import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.io.RecordReader; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; import org.hamcrest.CoreMatchers; +import org.hamcrest.Matchers; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -24,6 +42,8 @@ import org.opensearch.dataprepper.model.log.JacksonLog; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.plugins.codec.parquet.ParquetOutputCodec; +import org.opensearch.dataprepper.plugins.codec.parquet.ParquetOutputCodecConfig; import org.opensearch.dataprepper.plugins.codec.json.NdjsonOutputCodec; import org.opensearch.dataprepper.plugins.codec.json.NdjsonOutputConfig; import org.opensearch.dataprepper.plugins.sink.accumulator.BufferFactory; @@ -41,10 +61,18 @@ import software.amazon.awssdk.services.s3.model.ListObjectsResponse; import software.amazon.awssdk.services.s3.model.S3Object; +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; + import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; @@ -52,6 +80,7 @@ import java.util.Set; import java.util.UUID; +import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.mockito.Mockito.lenient; @@ -62,10 +91,14 @@ class S3SinkServiceIT { private static final String PATH_PREFIX = UUID.randomUUID().toString() + "/%{yyyy}/%{MM}/%{dd}/"; + private static final int numberOfRecords = 2; private S3Client s3Client; private String bucketName; - + private String s3region; + private ParquetOutputCodecConfig parquetOutputCodecConfig; private BufferFactory bufferFactory; + private static final String FILE_NAME = "parquet-data"; + private static final String FILE_SUFFIX = ".parquet"; @Mock private S3SinkConfig s3SinkConfig; @Mock @@ -95,11 +128,10 @@ class S3SinkServiceIT { @BeforeEach public void setUp() { - String s3region = System.getProperty("tests.s3ink.region"); + s3region = System.getProperty("tests.s3ink.region"); s3Client = S3Client.builder().region(Region.of(s3region)).build(); bucketName = System.getProperty("tests.s3sink.bucket"); - bufferFactory = new InMemoryBufferFactory(); when(objectKeyOptions.getNamePattern()).thenReturn("elb-log-%{yyyy-MM-dd'T'hh-mm-ss}"); @@ -223,4 +255,106 @@ private static Map generateJson(Set testTags) { jsonObject.put("Tag", testTags.toArray()); return jsonObject; } -} + private static Record getRecord(int index) { + List recordList = generateRecords(numberOfRecords); + final Event event = JacksonLog.builder().withData(recordList.get(index)).build(); + return new Record<>(event); + } + + private static List generateRecords(int numberOfRecords) { + + List recordList = new ArrayList<>(); + + for (int rows = 0; rows < numberOfRecords; rows++) { + + HashMap eventData = new HashMap<>(); + + eventData.put("name", "Person" + rows); + eventData.put("age", Integer.toString(rows)); + recordList.add((eventData)); + + } + return recordList; + } + @Test + void verify_flushed_records_into_s3_bucket_Parquet() throws IOException { + configureParquetCodec(); + S3SinkService s3SinkService = createObjectUnderTest(); + Collection> recordsData = getRecordList(); + + s3SinkService.output(recordsData); + + List> actualRecords = createParquetRecordsList(new ByteArrayInputStream(getS3Object().getBytes())); + int index = 0; + for (final HashMap actualMap : actualRecords) { + assertThat(actualMap, notNullValue()); + Map expectedMap = generateRecords(numberOfRecords).get(index); + assertThat(expectedMap, Matchers.equalTo(actualMap)); + index++; + } + } + + private void configureParquetCodec() { + parquetOutputCodecConfig = new ParquetOutputCodecConfig(); + parquetOutputCodecConfig.setSchema(parseSchema().toString()); + parquetOutputCodecConfig.setBucket(bucketName); + parquetOutputCodecConfig.setRegion(s3region); + parquetOutputCodecConfig.setPathPrefix(PATH_PREFIX); + codec = new ParquetOutputCodec(parquetOutputCodecConfig); + when(parquetOutputCodecConfig.getExcludeKeys()).thenReturn(new ArrayList<>()); + + } + private Collection> getRecordList() { + final Collection> recordList = new ArrayList<>(); + for (int i = 0; i < numberOfRecords; i++) + recordList.add(getRecord(i)); + return recordList; + } + private static Schema parseSchema() { + return SchemaBuilder.record("Person") + .fields() + .name("name").type().stringType().noDefault() + .name("age").type().intType().noDefault() + .endRecord(); + } + private List> createParquetRecordsList(final InputStream inputStream) throws IOException { + + final File tempFile = File.createTempFile(FILE_NAME, FILE_SUFFIX); + Files.copy(inputStream, tempFile.toPath(), StandardCopyOption.REPLACE_EXISTING); + List> actualRecordList = new ArrayList<>(); + try (ParquetFileReader parquetFileReader = new ParquetFileReader(HadoopInputFile.fromPath(new Path(tempFile.toURI()), new Configuration()), ParquetReadOptions.builder().build())) { + final ParquetMetadata footer = parquetFileReader.getFooter(); + final MessageType schema = createdParquetSchema(footer); + PageReadStore pages; + + while ((pages = parquetFileReader.readNextRowGroup()) != null) { + final long rows = pages.getRowCount(); + final MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema); + final RecordReader recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema)); + for (int row = 0; row < rows; row++) { + final Map eventData = new HashMap<>(); + int fieldIndex = 0; + final SimpleGroup simpleGroup = (SimpleGroup) recordReader.read(); + for (Type field : schema.getFields()) { + try { + eventData.put(field.getName(), simpleGroup.getValueToString(fieldIndex, 0)); + } catch (Exception parquetException) { + parquetException.printStackTrace(); + } + fieldIndex++; + } + actualRecordList.add((HashMap) eventData); + } + } + } catch (Exception parquetException) { + parquetException.printStackTrace(); + } finally { + Files.delete(tempFile.toPath()); + } + return actualRecordList; + } + + private MessageType createdParquetSchema(ParquetMetadata parquetMetadata) { + return parquetMetadata.getFileMetaData().getSchema(); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/Buffer.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/Buffer.java index df09ea42e1..34dee39776 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/Buffer.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/Buffer.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.plugins.sink.accumulator; import software.amazon.awssdk.services.s3.S3Client; +import java.io.IOException; import java.io.OutputStream; /** @@ -21,9 +22,11 @@ public interface Buffer { int getEventCount(); long getDuration(); + boolean isCodecStarted(); + void setCodecStarted(boolean codecStarted); void flushToS3(S3Client s3Client, String bucket, String key) ; - + void writeEvent(byte[] bytes) throws IOException; OutputStream getOutputStream(); void setEventCount(int eventCount); diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/InMemoryBuffer.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/InMemoryBuffer.java index 4a892233de..f3450f0d19 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/InMemoryBuffer.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/InMemoryBuffer.java @@ -9,8 +9,8 @@ import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.PutObjectRequest; - import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.io.OutputStream; import java.util.concurrent.TimeUnit; @@ -22,12 +22,14 @@ public class InMemoryBuffer implements Buffer { private static final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); private int eventCount; private final StopWatch watch; + private boolean isCodecStarted; InMemoryBuffer() { byteArrayOutputStream.reset(); eventCount = 0; watch = new StopWatch(); watch.start(); + isCodecStarted = false; } @Override @@ -40,15 +42,6 @@ public int getEventCount() { return eventCount; } - @Override - public void setEventCount(int eventCount) { - this.eventCount = eventCount; - } - @Override - public OutputStream getOutputStream() { - return byteArrayOutputStream; - } - public long getDuration() { return watch.getTime(TimeUnit.SECONDS); } @@ -68,5 +61,33 @@ public void flushToS3(S3Client s3Client, String bucket, String key) { RequestBody.fromBytes(byteArray)); } + /** + * write byte array to output stream. + * + * @param bytes byte array. + * @throws IOException while writing to output stream fails. + */ + @Override + public void writeEvent(byte[] bytes) throws IOException { + byteArrayOutputStream.write(bytes); + byteArrayOutputStream.write(System.lineSeparator().getBytes()); + eventCount++; + } + @Override + public boolean isCodecStarted() { + return isCodecStarted; + } + @Override + public void setCodecStarted(boolean codecStarted) { + isCodecStarted = codecStarted; + } + @Override + public void setEventCount(int eventCount) { + this.eventCount = eventCount; + } + @Override + public OutputStream getOutputStream() { + return byteArrayOutputStream; + } } \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/LocalFileBuffer.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/LocalFileBuffer.java index 843be56090..4eca985f83 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/LocalFileBuffer.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/LocalFileBuffer.java @@ -31,6 +31,7 @@ public class LocalFileBuffer implements Buffer { private int eventCount; private final StopWatch watch; private final File localFile; + private boolean isCodecStarted; LocalFileBuffer(File tempFile) throws FileNotFoundException { localFile = tempFile; @@ -38,6 +39,7 @@ public class LocalFileBuffer implements Buffer { eventCount = 0; watch = new StopWatch(); watch.start(); + isCodecStarted = false; } @Override @@ -75,6 +77,18 @@ public void flushToS3(S3Client s3Client, String bucket, String key) { removeTemporaryFile(); } + /** + * write byte array to output stream. + * @param bytes byte array. + * @throws IOException while writing to output stream fails. + */ + @Override + public void writeEvent(byte[] bytes) throws IOException { + outputStream.write(bytes); + outputStream.write(System.lineSeparator().getBytes()); + eventCount++; + } + /** * Flushing the buffered data into the output stream. */ @@ -99,8 +113,15 @@ protected void removeTemporaryFile() { } } } - + @Override + public boolean isCodecStarted() { + return isCodecStarted; + } + @Override + public void setCodecStarted(boolean codecStarted) { + isCodecStarted = codecStarted; + } @Override public void setEventCount(int eventCount) { this.eventCount = eventCount; @@ -110,4 +131,5 @@ public void setEventCount(int eventCount) { public OutputStream getOutputStream() { return outputStream; } + } \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/accumulator/ObjectKeyTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/accumulator/ObjectKeyTest.java index f70cba4d3a..e9c0f77c9c 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/accumulator/ObjectKeyTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/accumulator/ObjectKeyTest.java @@ -53,7 +53,7 @@ void test_buildingPathPrefix() { @Test void test_objectFileName() { - + when(objectKeyOptions.getNamePattern()).thenReturn("my-elb-%{yyyy-MM-dd'T'hh-mm-ss}"); String objectFileName = ObjectKey.objectFileName(s3SinkConfig, null); Assertions.assertNotNull(objectFileName);