From d10ea8d6f95bd4631cb2c3f59a5e6e0dfc9e312a Mon Sep 17 00:00:00 2001 From: Marcel Friedrichs Date: Mon, 25 Jun 2018 12:46:15 +0200 Subject: [PATCH] Version 1.7.0 refactor: Code cleanup and check AWS region if no endpoint is provided. --- .../unibi/cebitec/aws/s3/transfer/BiBiS3.java | 15 +++--- .../aws/s3/transfer/ctrl/Uploader.java | 40 ++++++++-------- .../aws/s3/transfer/model/Measurements.java | 16 +++---- .../model/up/MultipartUploadFile.java | 39 +++++----------- .../transfer/model/up/SingleUploadFile.java | 14 +++--- .../model/up/TransferUploadThread.java | 17 ++++--- .../aws/s3/transfer/model/up/UploadPart.java | 46 ++++++++----------- 7 files changed, 82 insertions(+), 105 deletions(-) diff --git a/src/main/java/de/unibi/cebitec/aws/s3/transfer/BiBiS3.java b/src/main/java/de/unibi/cebitec/aws/s3/transfer/BiBiS3.java index dc047e0..3eb0295 100644 --- a/src/main/java/de/unibi/cebitec/aws/s3/transfer/BiBiS3.java +++ b/src/main/java/de/unibi/cebitec/aws/s3/transfer/BiBiS3.java @@ -225,22 +225,25 @@ public static void main(String[] args) { dest = positionalArgs[1]; } + String endpoint = null; + // Override endpoint with CLI parameter if present. + if (cl.hasOption("endpoint")) { + endpoint = cl.getOptionValue("endpoint"); + } + String region = null; // Override region with CLI parameter if present. if (cl.hasOption("region")) { region = cl.getOptionValue("region"); + if (endpoint == null && region != null && !regions.containsKey(region)) { + throw new IllegalArgumentException("The region could not be found for AWS!"); + } } if (region == null) { region = DEFAULT_REGION; } log.info("== Access key: {} Bucket region: {}", credentials == null ? "none" : credentials.getAWSAccessKeyId(), region); - String endpoint = null; - // Override endpoint with CLI parameter if present. - if (cl.hasOption("endpoint")) { - endpoint = cl.getOptionValue("endpoint"); - } - AmazonS3ClientBuilder builder = AmazonS3Client.builder(); builder = endpoint == null ? builder.withRegion(region) : diff --git a/src/main/java/de/unibi/cebitec/aws/s3/transfer/ctrl/Uploader.java b/src/main/java/de/unibi/cebitec/aws/s3/transfer/ctrl/Uploader.java index a3f2f45..3ba26c6 100644 --- a/src/main/java/de/unibi/cebitec/aws/s3/transfer/ctrl/Uploader.java +++ b/src/main/java/de/unibi/cebitec/aws/s3/transfer/ctrl/Uploader.java @@ -43,7 +43,7 @@ public class Uploader { private final boolean reducedRedundancy; public Uploader(AmazonS3 s3, InputFileList inputFiles, String bucketName, - OutputFileList uploadTargetKeys, int numberOfThreads, long chunkSize, + OutputFileList uploadTargetKeys, int numberOfThreads, long chunkSize, ObjectMetadata metadata, boolean reducedRedundancy) { this.inputFiles = inputFiles; this.outputFiles = uploadTargetKeys; @@ -68,21 +68,21 @@ public void upload() throws Exception { } } - log.debug("file list size: {}", this.files.size()); + log.debug("file list size: {}", files.size()); //fill chunk list - for (UploadFile f : this.files) { + for (UploadFile f : files) { if (f instanceof SingleUploadFile) { - this.chunks.add((IUploadChunk) f); + chunks.add((IUploadChunk) f); } else if (f instanceof MultipartUploadFile) { while (((MultipartUploadFile) f).hasMoreParts()) { - this.chunks.add(((MultipartUploadFile) f).next()); + chunks.add(((MultipartUploadFile) f).next()); } } } - Measurements.setOverallChunks(this.chunks.size()); - log.info("== Uploading {} of data split into {} chunks...", Measurements.getOverallBytesFormatted(), this.chunks.size()); + Measurements.setOverallChunks(chunks.size()); + log.info("== Uploading {} of data split into {} chunks...", Measurements.getOverallBytesFormatted(), chunks.size()); Measurements.start(); @@ -96,10 +96,10 @@ public void run() { timer.schedule(measurementsUpdates, 3000, 15000); //upload all chunks/single files - ExecutorService threading = Executors.newFixedThreadPool(this.numberOfThreads); + ExecutorService threading = Executors.newFixedThreadPool(numberOfThreads); List> futures = new ArrayList<>(); - for (IUploadChunk chunk : this.chunks) { - futures.add(threading.submit(new TransferUploadThread(this.s3, this.bucketName, chunk, 6))); + for (IUploadChunk chunk : chunks) { + futures.add(threading.submit(new TransferUploadThread(s3, bucketName, chunk, 6))); } //wait for threads to finish @@ -113,9 +113,9 @@ public void run() { threading.shutdown(); //complete multipart uploads - for (UploadFile f : this.files) { + for (UploadFile f : files) { if (f instanceof MultipartUploadFile) { - ((MultipartUploadFile) f).complete(this.s3, this.bucketName); + ((MultipartUploadFile) f).complete(s3, bucketName); } } @@ -124,17 +124,19 @@ public void run() { } private void addMultipartFile(Path file, String key, ObjectMetadata metadata, boolean reducedRedundancy) { - MultipartUploadFile mFile = new MultipartUploadFile(file, key, this.chunkSize); - this.files.add(mFile); - InitiateMultipartUploadRequest mReq = new InitiateMultipartUploadRequest(this.bucketName, mFile.getKey(), metadata); + MultipartUploadFile mFile = new MultipartUploadFile(file, key, chunkSize); + files.add(mFile); + InitiateMultipartUploadRequest request = new InitiateMultipartUploadRequest(bucketName, mFile.getKey(), metadata); if (reducedRedundancy) { - mReq.setStorageClass(StorageClass.ReducedRedundancy); + request.setStorageClass(StorageClass.ReducedRedundancy); } - InitiateMultipartUploadResult mRes = this.s3.initiateMultipartUpload(mReq); - mFile.setUploadId(mRes.getUploadId()); + InitiateMultipartUploadResult result = s3.initiateMultipartUpload(request); + mFile.setUploadId(result.getUploadId()); + log.debug("Add multipart file {} with upload id {}", key, result.getUploadId()); } private void addSingleFile(Path file, String key, ObjectMetadata metadata, boolean reducedRedundancy) { - this.files.add(new SingleUploadFile(file, key, metadata, reducedRedundancy)); + files.add(new SingleUploadFile(file, key, metadata, reducedRedundancy)); + log.debug("Add single file {}", key); } } diff --git a/src/main/java/de/unibi/cebitec/aws/s3/transfer/model/Measurements.java b/src/main/java/de/unibi/cebitec/aws/s3/transfer/model/Measurements.java index f367faf..d511ca9 100644 --- a/src/main/java/de/unibi/cebitec/aws/s3/transfer/model/Measurements.java +++ b/src/main/java/de/unibi/cebitec/aws/s3/transfer/model/Measurements.java @@ -2,17 +2,13 @@ import java.text.DecimalFormat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class Measurements { +public final class Measurements { private static long overallBytes; private static long overallChunks; private static long finishedChunks; private static long start; private static long end; private static boolean started = false; - private static final Logger log = LoggerFactory.getLogger(Measurements.class); private Measurements() { } @@ -45,19 +41,19 @@ public static void stop() { private static String formatResult(long bytes, String suffix) { DecimalFormat f = new DecimalFormat("#0.00"); if (bytes > 1e9) { - return new StringBuilder().append(f.format(bytes / 1e9)).append("GB").append(suffix).toString(); + return f.format(bytes / 1e9) + "GB" + suffix; } if (bytes > 1e6) { - return new StringBuilder().append(f.format(bytes / 1e6)).append("MB").append(suffix).toString(); + return f.format(bytes / 1e6) + "MB" + suffix; } if (bytes > 1e3) { - return new StringBuilder().append(f.format(bytes / 1e3)).append("KB").append(suffix).toString(); + return f.format(bytes / 1e3) + "KB" + suffix; } - return new StringBuilder().append(bytes).append("B").append(suffix).toString(); + return String.valueOf(bytes) + "B" + suffix; } public static String getChunksFinishedCount() { - return new StringBuilder().append(finishedChunks).append(" / ").append(overallChunks).toString(); + return finishedChunks + " / " + overallChunks; } public static String getEndResult() { diff --git a/src/main/java/de/unibi/cebitec/aws/s3/transfer/model/up/MultipartUploadFile.java b/src/main/java/de/unibi/cebitec/aws/s3/transfer/model/up/MultipartUploadFile.java index a36aa52..35c93bf 100644 --- a/src/main/java/de/unibi/cebitec/aws/s3/transfer/model/up/MultipartUploadFile.java +++ b/src/main/java/de/unibi/cebitec/aws/s3/transfer/model/up/MultipartUploadFile.java @@ -7,11 +7,7 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.List; -import java.util.PriorityQueue; -import java.util.Queue; +import java.util.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,25 +20,15 @@ public class MultipartUploadFile extends UploadFile { public MultipartUploadFile(Path file, String key, final long initialPartSize) { super(file, key); - this.remainingParts = new PriorityQueue<>(11, (p1, p2) -> { - if (p1.getPartNumber() == p2.getPartNumber()) { - return 0; - } - return p1.getPartNumber() < p2.getPartNumber() ? -1 : 1; - }); - this.registeredParts = new ArrayList<>(); + remainingParts = new PriorityQueue<>(11, Comparator.comparingInt(UploadPart::getPartNumber)); + registeredParts = new ArrayList<>(); try { long fileSize = Files.size(this.file); - long partSize; long pos = 0; for (int i = 1; pos < fileSize; i++) { - partSize = Math.min(initialPartSize, (fileSize - pos)); - UploadPart p = new UploadPart(this); - p.setPartNumber(i); - p.setPartSize(partSize); - p.setFileOffset(pos); - this.remainingParts.add(p); + long partSize = Math.min(initialPartSize, (fileSize - pos)); + remainingParts.add(new UploadPart(this, i, partSize, pos)); pos += partSize; } } catch (IOException e) { @@ -52,26 +38,25 @@ public MultipartUploadFile(Path file, String key, final long initialPartSize) { public void complete(AmazonS3 s3, String bucketName) { List tags = new ArrayList<>(); - for (UploadPart p : this.registeredParts) { + for (UploadPart p : registeredParts) { tags.add(p.getTag()); } - CompleteMultipartUploadRequest compRequest = new CompleteMultipartUploadRequest( - bucketName, this.key, this.uploadId, tags); + CompleteMultipartUploadRequest compRequest = new CompleteMultipartUploadRequest(bucketName, key, uploadId, tags); s3.completeMultipartUpload(compRequest); - log.debug("Completed multipart upload of file: {}", this.key); + log.debug("Completed multipart upload of file: {}", key); } public boolean hasMoreParts() { - return !this.remainingParts.isEmpty(); + return !remainingParts.isEmpty(); } private void addPart(UploadPart part) { - this.remainingParts.offer(part); + remainingParts.offer(part); } public UploadPart next() { - UploadPart currentPart = this.remainingParts.remove(); - this.registeredParts.add(currentPart); + UploadPart currentPart = remainingParts.remove(); + registeredParts.add(currentPart); return currentPart; } diff --git a/src/main/java/de/unibi/cebitec/aws/s3/transfer/model/up/SingleUploadFile.java b/src/main/java/de/unibi/cebitec/aws/s3/transfer/model/up/SingleUploadFile.java index 710e3ab..464fca0 100644 --- a/src/main/java/de/unibi/cebitec/aws/s3/transfer/model/up/SingleUploadFile.java +++ b/src/main/java/de/unibi/cebitec/aws/s3/transfer/model/up/SingleUploadFile.java @@ -28,16 +28,16 @@ public SingleUploadFile(Path file, String key, ObjectMetadata metadata, boolean @Override public void upload(AmazonS3 s3, String bucketName) throws IOException { try { - PutObjectRequest req = new PutObjectRequest(bucketName, this.key, Files.newInputStream(this.file), this.metadata); - if (this.reducedRedundancy) { - req.setStorageClass(StorageClass.ReducedRedundancy); + PutObjectRequest request = new PutObjectRequest(bucketName, key, Files.newInputStream(file), metadata); + if (reducedRedundancy) { + request.setStorageClass(StorageClass.ReducedRedundancy); } - log.debug("Starting upload of single file: {}", this.key); - s3.putObject(req); + log.debug("Starting upload of single file: {}", key); + s3.putObject(request); Measurements.countChunkAsFinished(); - log.debug("Upload done: Single file: {}", this.key); + log.debug("Upload done: Single file: {}", key); } catch (IOException | AmazonClientException e) { - log.debug("Failed to upload single file: {} - Reason: {}", this.key, e.toString()); + log.debug("Failed to upload single file: {} - Reason: {}", key, e.toString()); throw e; } } diff --git a/src/main/java/de/unibi/cebitec/aws/s3/transfer/model/up/TransferUploadThread.java b/src/main/java/de/unibi/cebitec/aws/s3/transfer/model/up/TransferUploadThread.java index 05bc3c9..b9d7007 100644 --- a/src/main/java/de/unibi/cebitec/aws/s3/transfer/model/up/TransferUploadThread.java +++ b/src/main/java/de/unibi/cebitec/aws/s3/transfer/model/up/TransferUploadThread.java @@ -1,7 +1,6 @@ package de.unibi.cebitec.aws.s3.transfer.model.up; import com.amazonaws.services.s3.AmazonS3; -import de.unibi.cebitec.aws.s3.transfer.model.up.IUploadChunk; import java.util.concurrent.Callable; @@ -10,10 +9,10 @@ public class TransferUploadThread implements Callable { public static final Logger log = LoggerFactory.getLogger(TransferUploadThread.class); - private AmazonS3 s3; - private String bucketName; - private IUploadChunk chunk; - private int retryCount; + private final AmazonS3 s3; + private final String bucketName; + private final IUploadChunk chunk; + private final int retryCount; public TransferUploadThread(AmazonS3 s3, String bucketName, IUploadChunk chunk, int retryCount) { this.s3 = s3; @@ -24,14 +23,14 @@ public TransferUploadThread(AmazonS3 s3, String bucketName, IUploadChunk chunk, @Override public Void call() throws Exception { - for (int i = 0; i < this.retryCount; i++) { + for (int i = 0; i < retryCount; i++) { try { - this.chunk.upload(this.s3, this.bucketName); + chunk.upload(s3, bucketName); break; } catch (Exception e) { - log.warn("Chunk upload failed! Retrying.... ({})", e.toString()); + log.warn("Chunk upload failed! Retrying... ({})", e.toString()); } - if (i == this.retryCount - 1) { + if (i == retryCount - 1) { log.error("Chunk upload failed after {} retries. Exiting...", i); System.exit(1); } diff --git a/src/main/java/de/unibi/cebitec/aws/s3/transfer/model/up/UploadPart.java b/src/main/java/de/unibi/cebitec/aws/s3/transfer/model/up/UploadPart.java index 6f6aedc..0e69cb0 100644 --- a/src/main/java/de/unibi/cebitec/aws/s3/transfer/model/up/UploadPart.java +++ b/src/main/java/de/unibi/cebitec/aws/s3/transfer/model/up/UploadPart.java @@ -10,32 +10,36 @@ public class UploadPart implements IUploadChunk { public static final Logger log = LoggerFactory.getLogger(UploadPart.class); - private MultipartUploadFile multipartUploadFile; - private int partNumber; - private long fileOffset; - private long partSize; + private final MultipartUploadFile multipartUploadFile; + private final int partNumber; + private final long fileOffset; + private final long partSize; private PartETag tag; - public UploadPart(MultipartUploadFile multipartUploadFile) { + public UploadPart(MultipartUploadFile multipartUploadFile, int partNumber, long partSize, long fileOffset) { this.multipartUploadFile = multipartUploadFile; + this.partNumber = partNumber; + this.partSize = partSize; + this.fileOffset = fileOffset; } @Override public void upload(AmazonS3 s3, String bucketName) { UploadPartRequest uploadRequest = new UploadPartRequest() - .withBucketName(bucketName).withKey(this.multipartUploadFile.getKey()) - .withUploadId(this.multipartUploadFile.getUploadId()) - .withPartNumber(this.partNumber) - .withFileOffset(this.fileOffset) - .withFile(this.multipartUploadFile.file.toFile()) - .withPartSize(this.partSize); + .withBucketName(bucketName) + .withKey(multipartUploadFile.getKey()) + .withUploadId(multipartUploadFile.getUploadId()) + .withPartNumber(partNumber) + .withFileOffset(fileOffset) + .withFile(multipartUploadFile.file.toFile()) + .withPartSize(partSize); try { - log.debug("Starting upload of part {} of file: {}", this.partNumber, this.multipartUploadFile.getKey()); - this.tag = s3.uploadPart(uploadRequest).getPartETag(); + log.debug("Starting upload of part {} of file: {}", partNumber, multipartUploadFile.getKey()); + tag = s3.uploadPart(uploadRequest).getPartETag(); Measurements.countChunkAsFinished(); - log.debug("Upload done: Part {} of file: {}", this.partNumber, this.multipartUploadFile.getKey()); + log.debug("Upload done: Part {} of file: {}", partNumber, multipartUploadFile.getKey()); } catch (AmazonClientException e) { - log.debug("Failed to upload part {} of file: {} - Reason: {}", this.partNumber, this.multipartUploadFile.getKey(), e.toString()); + log.debug("Failed to upload part {} of file: {} - Reason: {}", partNumber, multipartUploadFile.getKey(), e.toString()); throw e; } } @@ -44,26 +48,14 @@ public int getPartNumber() { return partNumber; } - public void setPartNumber(int partNumber) { - this.partNumber = partNumber; - } - public long getFileOffset() { return fileOffset; } - public void setFileOffset(long fileOffset) { - this.fileOffset = fileOffset; - } - public long getPartSize() { return partSize; } - public void setPartSize(long partSize) { - this.partSize = partSize; - } - public PartETag getTag() { return tag; }