Skip to content

Commit

Permalink
Moved Accumulator package to common for #874.
Browse files Browse the repository at this point in the history
Signed-off-by: mallikagogoi7 <[email protected]>
  • Loading branch information
mallikagogoi7 committed Jun 23, 2023
1 parent fe1c9d3 commit 71bfb78
Show file tree
Hide file tree
Showing 12 changed files with 197 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.accumulator;
package org.opensearch.dataprepper.plugins.accumulator;

import org.apache.hc.client5.http.classic.HttpClient;
import software.amazon.awssdk.services.s3.S3Client;

import java.io.IOException;

/**
* A buffer can hold data before flushing it to Http Endpoint.
* A buffer can hold data before flushing it to S3.
*/
public interface Buffer {

Expand All @@ -23,7 +23,6 @@ public interface Buffer {

long getDuration();

void sendDataToHttpEndpoint(HttpClient client) ;

void flushToS3(S3Client s3Client, String bucket, String key) ;
void writeEvent(byte[] bytes) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.accumulator;
package org.opensearch.dataprepper.plugins.accumulator;

public interface BufferFactory {
Buffer getBuffer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.accumulator;
package org.opensearch.dataprepper.plugins.accumulator;

import com.fasterxml.jackson.annotation.JsonCreator;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,19 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.accumulator;
package org.opensearch.dataprepper.plugins.accumulator;

import org.apache.commons.lang3.time.StopWatch;
import org.apache.hc.client5.http.classic.HttpClient;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.concurrent.TimeUnit;

/**
* A buffer can hold in memory data and flushing it to Http Endpoint.
* A buffer can hold in memory data and flushing it to S3.
*/
public class InMemoryBuffer implements Buffer {

Expand Down Expand Up @@ -42,14 +44,19 @@ public long getDuration() {
return watch.getTime(TimeUnit.SECONDS);
}


/**
* Upload accumulated data to http endpoint.
* @param client HttpClient object.
* Upload accumulated data to s3 bucket.
*
* @param s3Client s3 client object.
* @param bucket bucket name.
* @param key s3 object key path.
*/
@Override
public void sendDataToHttpEndpoint(final HttpClient client) {
//TODO: implement
public void flushToS3(S3Client s3Client, String bucket, String key) {
final byte[] byteArray = byteArrayOutputStream.toByteArray();
s3Client.putObject(
PutObjectRequest.builder().bucket(bucket).key(key).build(),
RequestBody.fromBytes(byteArray));
}

/**
Expand All @@ -59,7 +66,9 @@ public void sendDataToHttpEndpoint(final HttpClient client) {
* @throws IOException while writing to output stream fails.
*/
@Override
public void writeEvent(final byte[] bytes) throws IOException {
//TODO: implement
public void writeEvent(byte[] bytes) throws IOException {
byteArrayOutputStream.write(bytes);
byteArrayOutputStream.write(System.lineSeparator().getBytes());
eventCount++;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.accumulator;
package org.opensearch.dataprepper.plugins.accumulator;

public class InMemoryBufferFactory implements BufferFactory {
@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.accumulator;

import org.apache.commons.lang3.time.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.OutputStream;
import java.io.FileOutputStream;
import java.io.IOException;

import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.concurrent.TimeUnit;

/**
* A buffer can hold local file data and flushing it to S3.
*/
public class LocalFileBuffer implements Buffer {

private static final Logger LOG = LoggerFactory.getLogger(LocalFileBuffer.class);
private final OutputStream outputStream;
private int eventCount;
private final StopWatch watch;
private final File localFile;

LocalFileBuffer(File tempFile) throws FileNotFoundException {
localFile = tempFile;
outputStream = new BufferedOutputStream(new FileOutputStream(tempFile));
eventCount = 0;
watch = new StopWatch();
watch.start();
}

@Override
public long getSize() {
try {
outputStream.flush();
} catch (IOException e) {
LOG.error("An exception occurred while flushing data to buffered output stream :", e);
}
return localFile.length();
}

@Override
public int getEventCount() {
return eventCount;
}

@Override
public long getDuration(){
return watch.getTime(TimeUnit.SECONDS);
}

/**
* Upload accumulated data to amazon s3.
* @param s3Client s3 client object.
* @param bucket bucket name.
* @param key s3 object key path.
*/
@Override
public void flushToS3(S3Client s3Client, String bucket, String key) {
flushAndCloseStream();
s3Client.putObject(
PutObjectRequest.builder().bucket(bucket).key(key).build(),
RequestBody.fromFile(localFile));
removeTemporaryFile();
}

/**
* write byte array to output stream.
* @param bytes byte array.
* @throws IOException while writing to output stream fails.
*/
@Override
public void writeEvent(byte[] bytes) throws IOException {
outputStream.write(bytes);
outputStream.write(System.lineSeparator().getBytes());
eventCount++;
}

/**
* Flushing the buffered data into the output stream.
*/
protected void flushAndCloseStream(){
try {
outputStream.flush();
outputStream.close();
} catch (IOException e) {
LOG.error("An exception occurred while flushing data to buffered output stream :", e);
}
}

/**
* Remove the local temp file after flushing data to s3.
*/
protected void removeTemporaryFile() {
if (localFile != null) {
try {
Files.deleteIfExists(Paths.get(localFile.toString()));
} catch (IOException e) {
LOG.error("Unable to delete Local file {}", localFile, e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,29 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.accumulator;
package org.opensearch.dataprepper.plugins.accumulator;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;

public class LocalFileBufferFactory implements BufferFactory {

private static final Logger LOG = LoggerFactory.getLogger(LocalFileBufferFactory.class);
public static final String PREFIX = "local";
public static final String SUFFIX = ".log";
@Override
public Buffer getBuffer() {
//TODO: implement
return null;
File tempFile = null;
Buffer localfileBuffer = null;
try {
tempFile = File.createTempFile(PREFIX, SUFFIX);
localfileBuffer = new LocalFileBuffer(tempFile);
} catch (IOException e) {
LOG.error("Unable to create temp file ", e);
}
return localfileBuffer;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink;

import org.opensearch.dataprepper.model.types.ByteCount;
import org.opensearch.dataprepper.plugins.accumulator.Buffer;

/**
* Check threshold limits.
*/
public class ThresholdCheck {

private ThresholdCheck() {
}

/**
* Check threshold exceeds.
* @param currentBuffer current buffer.
* @param maxEvents maximum event provided by user as threshold.
* @param maxBytes maximum bytes provided by user as threshold.
* @param maxCollectionDuration maximum event collection duration provided by user as threshold.
* @return boolean value whether the threshold are met.
*/
public static boolean checkThresholdExceed(final Buffer currentBuffer, final int maxEvents, final ByteCount maxBytes, final long maxCollectionDuration) {
if (maxEvents > 0) {
return currentBuffer.getEventCount() + 1 > maxEvents ||
currentBuffer.getDuration() > maxCollectionDuration ||
currentBuffer.getSize() > maxBytes.getBytes();
} else {
return currentBuffer.getDuration() > maxCollectionDuration ||
currentBuffer.getSize() > maxBytes.getBytes();
}
}
}
1 change: 1 addition & 0 deletions data-prepper-plugins/http-sink/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:aws-plugin-api')
implementation project(':data-prepper-plugins:common')
implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'org.hibernate.validator:hibernate-validator:7.0.5.Final'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
/**
* AWS factory class required to create AWS Http Endpoint client for this plugin.
*/
public final class AwsServiceClientFactory {
private AwsServiceClientFactory() {}
public final class ClientFactory {
private ClientFactory() {}

}
Loading

0 comments on commit 71bfb78

Please sign in to comment.