-
Notifications
You must be signed in to change notification settings - Fork 850
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Progress Listener Invocation methods to Asynchronous and Synchronous code paths #5044
base: feature/anirudkr-progress-listener
Are you sure you want to change the base?
Add Progress Listener Invocation methods to Asynchronous and Synchronous code paths #5044
Conversation
...e/src/main/java/software/amazon/awssdk/core/internal/metrics/BytesSentTrackingPublisher.java
Outdated
Show resolved
Hide resolved
...ftware/amazon/awssdk/core/internal/http/pipeline/stages/PreExecutionUpdateProgressStage.java
Outdated
Show resolved
Hide resolved
...ftware/amazon/awssdk/core/internal/http/pipeline/stages/PreExecutionUpdateProgressStage.java
Outdated
Show resolved
Hide resolved
...ftware/amazon/awssdk/core/internal/http/pipeline/stages/PreExecutionUpdateProgressStage.java
Outdated
Show resolved
Hide resolved
.../awssdk/core/internal/http/pipeline/stages/AsyncExecutionFailureExceptionReportingStage.java
Outdated
Show resolved
Hide resolved
...ava/software/amazon/awssdk/core/internal/http/pipeline/stages/MakeAsyncHttpRequestStage.java
Outdated
Show resolved
Hide resolved
...ava/software/amazon/awssdk/core/internal/http/pipeline/stages/MakeAsyncHttpRequestStage.java
Outdated
Show resolved
Hide resolved
.../java/software/amazon/awssdk/core/internal/http/pipeline/stages/ProgressUpdateStageTest.java
Outdated
Show resolved
Hide resolved
.../java/software/amazon/awssdk/core/internal/http/pipeline/stages/ProgressUpdateStageTest.java
Outdated
Show resolved
Hide resolved
...c/test/java/software/amazon/awssdk/core/internal/metrics/BytesReadTrackingPublisherTest.java
Outdated
Show resolved
Hide resolved
…ad and Download operations in a wya in which BytesReadTrackingPublisher can be reused for upload and download
...ava/software/amazon/awssdk/core/internal/http/pipeline/stages/MakeAsyncHttpRequestStage.java
Outdated
Show resolved
Hide resolved
...src/main/java/software/amazon/awssdk/core/internal/util/UploadProgressUpdaterInvocation.java
Outdated
Show resolved
Hide resolved
...src/main/java/software/amazon/awssdk/core/internal/metrics/BytesReadTrackingInputStream.java
Outdated
Show resolved
Hide resolved
...sdk-core/src/main/java/software/amazon/awssdk/core/internal/util/ProgressUpdaterInvoker.java
Outdated
Show resolved
Hide resolved
...ain/java/software/amazon/awssdk/core/internal/progress/listener/LoggingProgressListener.java
Outdated
Show resolved
Hide resolved
*/ | ||
@SdkInternalApi | ||
public final class BytesReadTrackingPublisher implements Publisher<ByteBuffer> { | ||
public final class BytesReadTrackingPublisher implements SdkHttpContentPublisher { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any reason we changed this class to implement SdkHttpContentPublisher?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is to enable the ByteReadTrackingPublisher to track the AsyncBody publisher of type SdkHttpContentPublisher :
Line 138 in f6f2c25
requestProvider = ProgressListenerUtils.wrapRequestProviderWithByteTrackingIfProgressListenerAttached(requestProvider, |
...c/main/java/software/amazon/awssdk/core/internal/util/DownloadProgressUpdaterInvocation.java
Outdated
Show resolved
Hide resolved
...c/main/java/software/amazon/awssdk/core/internal/util/DownloadProgressUpdaterInvocation.java
Outdated
Show resolved
Hide resolved
...dk-core/src/main/java/software/amazon/awssdk/core/internal/http/RequestExecutionContext.java
Outdated
Show resolved
Hide resolved
ContentStreamProvider.fromInputStream(new BytesReadTrackingInputStream( | ||
AbortableInputStream.create(contentStreamProvider.newStream()), | ||
new AtomicLong(0L), | ||
new UploadProgressUpdaterInvocation(progressUpdater))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit hard to read, let's create variables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like the existing implementation passes RESPONSE_BYTES_READ
to BytesReadTrackingInputStream
and now we are passing 0L, is this expected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RESPONSE_BYTES_READ is used to account for "The running count of bytes in the response body that have been read by the client." Here, we are using it for an upload context but the same BytesReadTrackingInputStream is made use of. Hence, the 0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this will break existing code because we will later read context.executionAttributes().getAttribute(SdkInternalExecutionAttribute.RESPONSE_BYTES_READ)
to get the counter
if (ProgressListenerUtils.progressListenerAttached(context.originalRequest())) { | ||
Long requestContentLength = | ||
(context.requestProvider() != null && context.requestProvider().contentLength().isPresent()) ? | ||
context.requestProvider().contentLength().get() : null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems requestProvider() returns AsyncRequestBody which is for async code path, what about sync?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is only to track the bytes published in the Async request code path; I still have not found a way to track the sync req. Wil put that out in the next pr
|
||
public SdkExchangeProgress requestBodyProgress() { | ||
return requestBodyProgress; | ||
@SdkPublicApi |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be internal API
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
@@ -68,6 +72,10 @@ public int read(byte[] b) throws IOException { | |||
private void updateBytesRead(long read) { | |||
if (read > 0) { | |||
bytesRead.addAndGet(read); | |||
|
|||
if (progressUpdaterInvoker != null) { | |||
progressUpdaterInvoker.incrementBytesTransferred(bytesRead.get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't it be incrementBytesTransferred(read)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed
* ProgressUpdater exposes methods that invokes listener methods to update and store request progress state | ||
*/ | ||
@SdkInternalApi | ||
public class DeafultProgressUpdater implements ProgressUpdater { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: typo, DefaultProgressUpdater
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
public SdkExchangeProgress requestBodyProgress() { | ||
return null; | ||
} | ||
|
||
public SdkExchangeProgress responseBodyProgress() { | ||
return null; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unused methods
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed this
import software.amazon.awssdk.core.internal.progress.listener.ProgressUpdater; | ||
|
||
@SdkInternalApi | ||
public class ResponseProgressUpdaterInvocation implements ProgressUpdaterInvoker { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ResponseProgressUpdaterInvoker
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed, good suggestion
@@ -189,8 +190,7 @@ default void responseBytesReceived(Context.ResponseBytesReceived context) { | |||
/** | |||
* For Expect: 100-continue embedded requests, the service returning anything other than 100 continue | |||
* indicates a request failure. This method captures the error in the payload | |||
* After this, either executionFailure or requestHeaderSent will always be invoked depending on | |||
* whether the error type is retryable or not | |||
* After this it will either be an executionFailure or a request retry |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfinished sentence
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i meant it as a finishing sentence, modified it to be clearer.
public class BytesSentTrackingPublisherTest { | ||
|
||
@Test | ||
public void test_updatesBytesSent() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
test_
prefix is really not needed. Let's follow our unit tests naming pattern
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed
ContentStreamProvider.fromInputStream(new BytesReadTrackingInputStream( | ||
AbortableInputStream.create(contentStreamProvider.newStream()), | ||
new AtomicLong(0L), | ||
new UploadProgressUpdaterInvocation(progressUpdater))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this will break existing code because we will later read context.executionAttributes().getAttribute(SdkInternalExecutionAttribute.RESPONSE_BYTES_READ)
to get the counter
|
||
public static SdkHttpContentPublisher wrapRequestProviderWithByteTrackingIfProgressListenerAttached( | ||
SdkHttpContentPublisher requestProvider, ProgressUpdater progressUpdater) { | ||
return new BytesReadTrackingPublisher(requestProvider, new AtomicLong(0L), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here, new AtomicLong(0L),
will break existing code path because the SDK relies on SdkInternalExecutionAttribute.RESPONSE_BYTES_READ
to contain the counter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm.. Im not modifying the context attribute SdkInternalExecutionAttribute.RESPONSE_BYTES_READ here. While wrapping the request provider(while sending the request) with BytesReadTracking publisher im setting bytes read to 0, as we have not yet made the request at that point. This should not modify what we set on the response
Quality Gate failedFailed conditions See analysis details on SonarCloud Catch issues before they fail your Quality Gate with our IDE extension SonarLint |
Asynchronous request Path to include Progress Listener invocations to track transactions
Motivation and Context
This is a parity feature gap between 1.x and 2.x
Modifications
Added callbacks :
in asynchronous code path.
Testing
Unit Tests
Screenshots (if appropriate)
Types of changes
Checklist
mvn install
succeedsscripts/new-change
script and following the instructions. Commit the new file created by the script in.changes/next-release
with your changes.License