Skip to content

Commit

Permalink
Version 1.7.0
Browse files Browse the repository at this point in the history
refactor: Code cleanup and check AWS region if no endpoint is provided.
  • Loading branch information
AstrorEnales committed Jun 25, 2018
1 parent fc38a0d commit d10ea8d
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 105 deletions.
15 changes: 9 additions & 6 deletions src/main/java/de/unibi/cebitec/aws/s3/transfer/BiBiS3.java
Original file line number Diff line number Diff line change
Expand Up @@ -225,22 +225,25 @@ public static void main(String[] args) {
dest = positionalArgs[1];
}

String endpoint = null;
// Override endpoint with CLI parameter if present.
if (cl.hasOption("endpoint")) {
endpoint = cl.getOptionValue("endpoint");
}

String region = null;
// Override region with CLI parameter if present.
if (cl.hasOption("region")) {
region = cl.getOptionValue("region");
if (endpoint == null && region != null && !regions.containsKey(region)) {
throw new IllegalArgumentException("The region could not be found for AWS!");
}
}
if (region == null) {
region = DEFAULT_REGION;
}
log.info("== Access key: {} Bucket region: {}", credentials == null ? "none" : credentials.getAWSAccessKeyId(), region);

String endpoint = null;
// Override endpoint with CLI parameter if present.
if (cl.hasOption("endpoint")) {
endpoint = cl.getOptionValue("endpoint");
}

AmazonS3ClientBuilder builder = AmazonS3Client.builder();
builder = endpoint == null ?
builder.withRegion(region) :
Expand Down
40 changes: 21 additions & 19 deletions src/main/java/de/unibi/cebitec/aws/s3/transfer/ctrl/Uploader.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class Uploader {
private final boolean reducedRedundancy;

public Uploader(AmazonS3 s3, InputFileList<Path> inputFiles, String bucketName,
OutputFileList uploadTargetKeys, int numberOfThreads, long chunkSize,
OutputFileList<Path, String> uploadTargetKeys, int numberOfThreads, long chunkSize,
ObjectMetadata metadata, boolean reducedRedundancy) {
this.inputFiles = inputFiles;
this.outputFiles = uploadTargetKeys;
Expand All @@ -68,21 +68,21 @@ public void upload() throws Exception {
}
}

log.debug("file list size: {}", this.files.size());
log.debug("file list size: {}", files.size());

//fill chunk list
for (UploadFile f : this.files) {
for (UploadFile f : files) {
if (f instanceof SingleUploadFile) {
this.chunks.add((IUploadChunk) f);
chunks.add((IUploadChunk) f);
} else if (f instanceof MultipartUploadFile) {
while (((MultipartUploadFile) f).hasMoreParts()) {
this.chunks.add(((MultipartUploadFile) f).next());
chunks.add(((MultipartUploadFile) f).next());
}
}
}

Measurements.setOverallChunks(this.chunks.size());
log.info("== Uploading {} of data split into {} chunks...", Measurements.getOverallBytesFormatted(), this.chunks.size());
Measurements.setOverallChunks(chunks.size());
log.info("== Uploading {} of data split into {} chunks...", Measurements.getOverallBytesFormatted(), chunks.size());

Measurements.start();

Expand All @@ -96,10 +96,10 @@ public void run() {
timer.schedule(measurementsUpdates, 3000, 15000);

//upload all chunks/single files
ExecutorService threading = Executors.newFixedThreadPool(this.numberOfThreads);
ExecutorService threading = Executors.newFixedThreadPool(numberOfThreads);
List<Future<?>> futures = new ArrayList<>();
for (IUploadChunk chunk : this.chunks) {
futures.add(threading.submit(new TransferUploadThread(this.s3, this.bucketName, chunk, 6)));
for (IUploadChunk chunk : chunks) {
futures.add(threading.submit(new TransferUploadThread(s3, bucketName, chunk, 6)));
}

//wait for threads to finish
Expand All @@ -113,9 +113,9 @@ public void run() {
threading.shutdown();

//complete multipart uploads
for (UploadFile f : this.files) {
for (UploadFile f : files) {
if (f instanceof MultipartUploadFile) {
((MultipartUploadFile) f).complete(this.s3, this.bucketName);
((MultipartUploadFile) f).complete(s3, bucketName);
}
}

Expand All @@ -124,17 +124,19 @@ public void run() {
}

private void addMultipartFile(Path file, String key, ObjectMetadata metadata, boolean reducedRedundancy) {
MultipartUploadFile mFile = new MultipartUploadFile(file, key, this.chunkSize);
this.files.add(mFile);
InitiateMultipartUploadRequest mReq = new InitiateMultipartUploadRequest(this.bucketName, mFile.getKey(), metadata);
MultipartUploadFile mFile = new MultipartUploadFile(file, key, chunkSize);
files.add(mFile);
InitiateMultipartUploadRequest request = new InitiateMultipartUploadRequest(bucketName, mFile.getKey(), metadata);
if (reducedRedundancy) {
mReq.setStorageClass(StorageClass.ReducedRedundancy);
request.setStorageClass(StorageClass.ReducedRedundancy);
}
InitiateMultipartUploadResult mRes = this.s3.initiateMultipartUpload(mReq);
mFile.setUploadId(mRes.getUploadId());
InitiateMultipartUploadResult result = s3.initiateMultipartUpload(request);
mFile.setUploadId(result.getUploadId());
log.debug("Add multipart file {} with upload id {}", key, result.getUploadId());
}

private void addSingleFile(Path file, String key, ObjectMetadata metadata, boolean reducedRedundancy) {
this.files.add(new SingleUploadFile(file, key, metadata, reducedRedundancy));
files.add(new SingleUploadFile(file, key, metadata, reducedRedundancy));
log.debug("Add single file {}", key);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,13 @@

import java.text.DecimalFormat;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Measurements {
public final class Measurements {
private static long overallBytes;
private static long overallChunks;
private static long finishedChunks;
private static long start;
private static long end;
private static boolean started = false;
private static final Logger log = LoggerFactory.getLogger(Measurements.class);

private Measurements() {
}
Expand Down Expand Up @@ -45,19 +41,19 @@ public static void stop() {
private static String formatResult(long bytes, String suffix) {
DecimalFormat f = new DecimalFormat("#0.00");
if (bytes > 1e9) {
return new StringBuilder().append(f.format(bytes / 1e9)).append("GB").append(suffix).toString();
return f.format(bytes / 1e9) + "GB" + suffix;
}
if (bytes > 1e6) {
return new StringBuilder().append(f.format(bytes / 1e6)).append("MB").append(suffix).toString();
return f.format(bytes / 1e6) + "MB" + suffix;
}
if (bytes > 1e3) {
return new StringBuilder().append(f.format(bytes / 1e3)).append("KB").append(suffix).toString();
return f.format(bytes / 1e3) + "KB" + suffix;
}
return new StringBuilder().append(bytes).append("B").append(suffix).toString();
return String.valueOf(bytes) + "B" + suffix;
}

public static String getChunksFinishedCount() {
return new StringBuilder().append(finishedChunks).append(" / ").append(overallChunks).toString();
return finishedChunks + " / " + overallChunks;
}

public static String getEndResult() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.*;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -24,25 +20,15 @@ public class MultipartUploadFile extends UploadFile {

public MultipartUploadFile(Path file, String key, final long initialPartSize) {
super(file, key);
this.remainingParts = new PriorityQueue<>(11, (p1, p2) -> {
if (p1.getPartNumber() == p2.getPartNumber()) {
return 0;
}
return p1.getPartNumber() < p2.getPartNumber() ? -1 : 1;
});
this.registeredParts = new ArrayList<>();
remainingParts = new PriorityQueue<>(11, Comparator.comparingInt(UploadPart::getPartNumber));
registeredParts = new ArrayList<>();

try {
long fileSize = Files.size(this.file);
long partSize;
long pos = 0;
for (int i = 1; pos < fileSize; i++) {
partSize = Math.min(initialPartSize, (fileSize - pos));
UploadPart p = new UploadPart(this);
p.setPartNumber(i);
p.setPartSize(partSize);
p.setFileOffset(pos);
this.remainingParts.add(p);
long partSize = Math.min(initialPartSize, (fileSize - pos));
remainingParts.add(new UploadPart(this, i, partSize, pos));
pos += partSize;
}
} catch (IOException e) {
Expand All @@ -52,26 +38,25 @@ public MultipartUploadFile(Path file, String key, final long initialPartSize) {

public void complete(AmazonS3 s3, String bucketName) {
List<PartETag> tags = new ArrayList<>();
for (UploadPart p : this.registeredParts) {
for (UploadPart p : registeredParts) {
tags.add(p.getTag());
}
CompleteMultipartUploadRequest compRequest = new CompleteMultipartUploadRequest(
bucketName, this.key, this.uploadId, tags);
CompleteMultipartUploadRequest compRequest = new CompleteMultipartUploadRequest(bucketName, key, uploadId, tags);
s3.completeMultipartUpload(compRequest);
log.debug("Completed multipart upload of file: {}", this.key);
log.debug("Completed multipart upload of file: {}", key);
}

public boolean hasMoreParts() {
return !this.remainingParts.isEmpty();
return !remainingParts.isEmpty();
}

private void addPart(UploadPart part) {
this.remainingParts.offer(part);
remainingParts.offer(part);
}

public UploadPart next() {
UploadPart currentPart = this.remainingParts.remove();
this.registeredParts.add(currentPart);
UploadPart currentPart = remainingParts.remove();
registeredParts.add(currentPart);
return currentPart;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,16 @@ public SingleUploadFile(Path file, String key, ObjectMetadata metadata, boolean
@Override
public void upload(AmazonS3 s3, String bucketName) throws IOException {
try {
PutObjectRequest req = new PutObjectRequest(bucketName, this.key, Files.newInputStream(this.file), this.metadata);
if (this.reducedRedundancy) {
req.setStorageClass(StorageClass.ReducedRedundancy);
PutObjectRequest request = new PutObjectRequest(bucketName, key, Files.newInputStream(file), metadata);
if (reducedRedundancy) {
request.setStorageClass(StorageClass.ReducedRedundancy);
}
log.debug("Starting upload of single file: {}", this.key);
s3.putObject(req);
log.debug("Starting upload of single file: {}", key);
s3.putObject(request);
Measurements.countChunkAsFinished();
log.debug("Upload done: Single file: {}", this.key);
log.debug("Upload done: Single file: {}", key);
} catch (IOException | AmazonClientException e) {
log.debug("Failed to upload single file: {} - Reason: {}", this.key, e.toString());
log.debug("Failed to upload single file: {} - Reason: {}", key, e.toString());
throw e;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package de.unibi.cebitec.aws.s3.transfer.model.up;

import com.amazonaws.services.s3.AmazonS3;
import de.unibi.cebitec.aws.s3.transfer.model.up.IUploadChunk;

import java.util.concurrent.Callable;

Expand All @@ -10,10 +9,10 @@

public class TransferUploadThread implements Callable<Void> {
public static final Logger log = LoggerFactory.getLogger(TransferUploadThread.class);
private AmazonS3 s3;
private String bucketName;
private IUploadChunk chunk;
private int retryCount;
private final AmazonS3 s3;
private final String bucketName;
private final IUploadChunk chunk;
private final int retryCount;

public TransferUploadThread(AmazonS3 s3, String bucketName, IUploadChunk chunk, int retryCount) {
this.s3 = s3;
Expand All @@ -24,14 +23,14 @@ public TransferUploadThread(AmazonS3 s3, String bucketName, IUploadChunk chunk,

@Override
public Void call() throws Exception {
for (int i = 0; i < this.retryCount; i++) {
for (int i = 0; i < retryCount; i++) {
try {
this.chunk.upload(this.s3, this.bucketName);
chunk.upload(s3, bucketName);
break;
} catch (Exception e) {
log.warn("Chunk upload failed! Retrying.... ({})", e.toString());
log.warn("Chunk upload failed! Retrying... ({})", e.toString());
}
if (i == this.retryCount - 1) {
if (i == retryCount - 1) {
log.error("Chunk upload failed after {} retries. Exiting...", i);
System.exit(1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,32 +10,36 @@

public class UploadPart implements IUploadChunk {
public static final Logger log = LoggerFactory.getLogger(UploadPart.class);
private MultipartUploadFile multipartUploadFile;
private int partNumber;
private long fileOffset;
private long partSize;
private final MultipartUploadFile multipartUploadFile;
private final int partNumber;
private final long fileOffset;
private final long partSize;
private PartETag tag;

public UploadPart(MultipartUploadFile multipartUploadFile) {
public UploadPart(MultipartUploadFile multipartUploadFile, int partNumber, long partSize, long fileOffset) {
this.multipartUploadFile = multipartUploadFile;
this.partNumber = partNumber;
this.partSize = partSize;
this.fileOffset = fileOffset;
}

@Override
public void upload(AmazonS3 s3, String bucketName) {
UploadPartRequest uploadRequest = new UploadPartRequest()
.withBucketName(bucketName).withKey(this.multipartUploadFile.getKey())
.withUploadId(this.multipartUploadFile.getUploadId())
.withPartNumber(this.partNumber)
.withFileOffset(this.fileOffset)
.withFile(this.multipartUploadFile.file.toFile())
.withPartSize(this.partSize);
.withBucketName(bucketName)
.withKey(multipartUploadFile.getKey())
.withUploadId(multipartUploadFile.getUploadId())
.withPartNumber(partNumber)
.withFileOffset(fileOffset)
.withFile(multipartUploadFile.file.toFile())
.withPartSize(partSize);
try {
log.debug("Starting upload of part {} of file: {}", this.partNumber, this.multipartUploadFile.getKey());
this.tag = s3.uploadPart(uploadRequest).getPartETag();
log.debug("Starting upload of part {} of file: {}", partNumber, multipartUploadFile.getKey());
tag = s3.uploadPart(uploadRequest).getPartETag();
Measurements.countChunkAsFinished();
log.debug("Upload done: Part {} of file: {}", this.partNumber, this.multipartUploadFile.getKey());
log.debug("Upload done: Part {} of file: {}", partNumber, multipartUploadFile.getKey());
} catch (AmazonClientException e) {
log.debug("Failed to upload part {} of file: {} - Reason: {}", this.partNumber, this.multipartUploadFile.getKey(), e.toString());
log.debug("Failed to upload part {} of file: {} - Reason: {}", partNumber, multipartUploadFile.getKey(), e.toString());
throw e;
}
}
Expand All @@ -44,26 +48,14 @@ public int getPartNumber() {
return partNumber;
}

public void setPartNumber(int partNumber) {
this.partNumber = partNumber;
}

public long getFileOffset() {
return fileOffset;
}

public void setFileOffset(long fileOffset) {
this.fileOffset = fileOffset;
}

public long getPartSize() {
return partSize;
}

public void setPartSize(long partSize) {
this.partSize = partSize;
}

public PartETag getTag() {
return tag;
}
Expand Down

0 comments on commit d10ea8d

Please sign in to comment.