Skip to content

Commit

Permalink
Merge pull request DSpace#9477 from 4Science/lazy_s3input
Browse files Browse the repository at this point in the history
Improve S3 Bitstream Storage to Lazy download object from S3
  • Loading branch information
tdonohue authored May 16, 2024
2 parents 50aaec1 + 76f04f1 commit a703bea
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static java.lang.String.valueOf;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -102,6 +103,11 @@ public class S3BitStoreService extends BaseBitStoreService {
private String awsRegionName;
private boolean useRelativePath;

/**
* The maximum size of individual chunk to download from S3 when a file is accessed. Default 5Mb
*/
private long bufferSize = 5 * 1024 * 1024;

/**
* container for all the assets
*/
Expand Down Expand Up @@ -258,20 +264,7 @@ public InputStream get(Bitstream bitstream) throws IOException {
if (isRegisteredBitstream(key)) {
key = key.substring(REGISTERED_FLAG.length());
}
try {
File tempFile = File.createTempFile("s3-disk-copy-" + UUID.randomUUID(), "temp");
tempFile.deleteOnExit();

GetObjectRequest getObjectRequest = new GetObjectRequest(bucketName, key);

Download download = tm.download(getObjectRequest, tempFile);
download.waitForCompletion();

return new DeleteOnCloseFileInputStream(tempFile);
} catch (AmazonClientException | InterruptedException e) {
log.error("get(" + key + ")", e);
throw new IOException(e);
}
return new S3LazyInputStream(key, bufferSize, bitstream.getSizeBytes());
}

/**
Expand Down Expand Up @@ -622,4 +615,84 @@ public boolean isRegisteredBitstream(String internalId) {
return internalId.startsWith(REGISTERED_FLAG);
}

public void setBufferSize(long bufferSize) {
this.bufferSize = bufferSize;
}

/**
* This inner class represent an InputStream that uses temporary files to
* represent chunk of the object downloaded from S3. When the input stream is
* read the class look first to the current chunk and download a new one once if
* the current one as been fully read. The class is responsible to close a chunk
* as soon as a new one is retrieved, the last chunk is closed when the input
* stream itself is closed or the last byte is read (the first of the two)
*/
public class S3LazyInputStream extends InputStream {
private InputStream currentChunkStream;
private String objectKey;
private long endOfChunk = -1;
private long chunkMaxSize;
private long currPos = 0;
private long fileSize;

public S3LazyInputStream(String objectKey, long chunkMaxSize, long fileSize) throws IOException {
this.objectKey = objectKey;
this.chunkMaxSize = chunkMaxSize;
this.endOfChunk = 0;
this.fileSize = fileSize;
downloadChunk();
}

@Override
public int read() throws IOException {
// is the current chunk completely read and other are available?
if (currPos == endOfChunk && currPos < fileSize) {
currentChunkStream.close();
downloadChunk();
}

int byteRead = currPos < endOfChunk ? currentChunkStream.read() : -1;
// do we get any data or are we at the end of the file?
if (byteRead != -1) {
currPos++;
} else {
currentChunkStream.close();
}
return byteRead;
}

/**
* This method download the next chunk from S3
*
* @throws IOException
* @throws FileNotFoundException
*/
private void downloadChunk() throws IOException, FileNotFoundException {
// Create a DownloadFileRequest with the desired byte range
long startByte = currPos; // Start byte (inclusive)
long endByte = Long.min(startByte + chunkMaxSize - 1, fileSize - 1); // End byte (inclusive)
GetObjectRequest getRequest = new GetObjectRequest(bucketName, objectKey)
.withRange(startByte, endByte);

File currentChunkFile = File.createTempFile("s3-disk-copy-" + UUID.randomUUID(), "temp");
currentChunkFile.deleteOnExit();
try {
Download download = tm.download(getRequest, currentChunkFile);
download.waitForCompletion();
currentChunkStream = new DeleteOnCloseFileInputStream(currentChunkFile);
endOfChunk = endOfChunk + download.getProgress().getBytesTransferred();
} catch (AmazonClientException | InterruptedException e) {
currentChunkFile.delete();
throw new IOException(e);
}
}

@Override
public void close() throws IOException {
if (currentChunkStream != null) {
currentChunkStream.close();
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public void setup() throws Exception {
s3BitStoreService = new S3BitStoreService(amazonS3Client);
s3BitStoreService.setEnabled(BooleanUtils.toBoolean(
configurationService.getProperty("assetstore.s3.enabled")));
s3BitStoreService.setBufferSize(22);
context.turnOffAuthorisationSystem();

parentCommunity = CommunityBuilder.createCommunity(context)
Expand Down Expand Up @@ -129,12 +130,25 @@ public void testBitstreamPutAndGetWithAlreadyPresentBucket() throws IOException
assertThat(amazonS3Client.listBuckets(), contains(bucketNamed(bucketName)));

context.turnOffAuthorisationSystem();
String content = "Test bitstream content";
String content = "Test bitstream content";
String contentOverOneSpan = "This content span two chunks";
String contentExactlyTwoSpans = "Test bitstream contentTest bitstream content";
String contentOverOneTwoSpans = "Test bitstream contentThis content span three chunks";
Bitstream bitstream = createBitstream(content);
Bitstream bitstreamOverOneSpan = createBitstream(contentOverOneSpan);
Bitstream bitstreamExactlyTwoSpans = createBitstream(contentExactlyTwoSpans);
Bitstream bitstreamOverOneTwoSpans = createBitstream(contentOverOneTwoSpans);
context.restoreAuthSystemState();

s3BitStoreService.put(bitstream, toInputStream(content));
checkGetPut(bucketName, content, bitstream);
checkGetPut(bucketName, contentOverOneSpan, bitstreamOverOneSpan);
checkGetPut(bucketName, contentExactlyTwoSpans, bitstreamExactlyTwoSpans);
checkGetPut(bucketName, contentOverOneTwoSpans, bitstreamOverOneTwoSpans);

}

private void checkGetPut(String bucketName, String content, Bitstream bitstream) throws IOException {
s3BitStoreService.put(bitstream, toInputStream(content));
String expectedChecksum = Utils.toHex(generateChecksum(content));

assertThat(bitstream.getSizeBytes(), is((long) content.length()));
Expand All @@ -147,7 +161,6 @@ public void testBitstreamPutAndGetWithAlreadyPresentBucket() throws IOException
String key = s3BitStoreService.getFullKey(bitstream.getInternalId());
ObjectMetadata objectMetadata = amazonS3Client.getObjectMetadata(bucketName, key);
assertThat(objectMetadata.getContentMD5(), is(expectedChecksum));

}

@Test
Expand Down

0 comments on commit a703bea

Please sign in to comment.