-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add shutdown and start mechanics to windmill streams #32774
base: master
Are you sure you want to change the base?
Conversation
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
9591a9a
to
ce57880
Compare
assign set of reviewers |
Assigning reviewers. If you would like to opt out of this review, comment R: @damccorm added as fallback since no labels match configuration Available commands:
The PR bot will only process comments in the main thread (not review comments). |
@@ -216,12 +207,18 @@ static FanOutStreamingEngineWorkerHarness forTesting( | |||
return fanOutStreamingEngineWorkProvider; | |||
} | |||
|
|||
@SuppressWarnings("ReturnValueIgnored") | |||
@SuppressWarnings("FutureReturnValueIgnored") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assign the future to a variable named unusedFuture
and remove the suppression?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
// Run this on a separate thread than the grpc stream thread. | ||
newWorkerMetadataPublisher.submit( | ||
() -> newWindmillEndpoints.add(endpoints)))); | ||
this.getWorkerMetadataStream = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does anything prevent us from creating the stream here and starting it in start?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't want to start and do too much (make an RPC call) in the constructor
We also need the metadata stub (dispatcher stub), which we don't have until we call start in StreamingDataflowWorker. Can fix this by making start() be asynchronous here but I still think that might be doing too much in the constructor.
Will have to suppress a warning (reference to 'this' in constructor) but i can create the stream here and just start the stream in start() instead of assigning.
GetWorkBudget adjustment = GetWorkBudget.builder().setItems(items).setBytes(bytes).build(); | ||
getWorkBudget.set(adjustment); | ||
if (started.get()) { | ||
getWorkStream.adjustBudget(adjustment); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getWorkStream.adjustBudget(adjustment); | |
getWorkStream.setBudget(newBudget); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rebased onto #32775
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
Show resolved
Hide resolved
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
Show resolved
Hide resolved
...er/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStream.java
Outdated
Show resolved
Hide resolved
@@ -187,13 +206,14 @@ protected void startThrottleTimer() { | |||
commitWorkThrottleTimer.start(); | |||
} | |||
|
|||
private void flushInternal(Map<Long, PendingRequest> requests) { | |||
private void flushInternal(Map<Long, PendingRequest> requests) throws InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it doesn't look like any of the method calls inside flushInternal
are throwing InterruptedException. Can we remove the throws from here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
.../java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java
Show resolved
Hide resolved
|
||
private ResettableRequestObserver(Supplier<StreamObserver<RequestT>> requestObserverSupplier) { | ||
this.requestObserverSupplier = requestObserverSupplier; | ||
this.delegateRequestObserver = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we initialize delegateRequestObserver with requestObserverSupplier.get()
here and remove the null checks in delegate()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need some initial state to not allow sends() or other stream operations w/o a call to startStream/start
we can use null
or a dummy observer that will throw if any of the methods are called before startStream/start is called?
@@ -160,80 +179,125 @@ protected boolean isShutdown() { | |||
private StreamObserver<RequestT> requestObserver() { | |||
if (requestObserver == null) { | |||
throw new NullPointerException( | |||
"requestObserver cannot be null. Missing a call to startStream() to initialize."); | |||
"requestObserver cannot be null. Missing a call to start() to initialize stream."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
requestObserver is now created in constructor, we don't need the null check here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bump
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
Outdated
Show resolved
Hide resolved
executor.execute(runnable); | ||
} catch (RejectedExecutionException e) { | ||
logger.debug("{}-{} has been shutdown.", getClass(), backendWorkerToken); | ||
} catch (IllegalStateException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
executor.execute
won't propagate the IllegalStateException
from closed stream to here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't ever do anything with the IllegalStateException anyways except LOG so I think this is ok
if (isShutdown.compareAndSet(false, true)) { | ||
requestObserver() | ||
.onError(new WindmillStreamShutdownException("Explicit call to shutdown stream.")); | ||
shutdownInternal(); | ||
shutdownTime.set(DateTime.now()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we move shutdownTime.set(DateTime.now());
to line 354 before calling requestObserver().onError
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java
Show resolved
Hide resolved
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java
Show resolved
Hide resolved
// Retry issuing the request since the response stream was cancelled. | ||
continue; | ||
} catch (AppendableInputStream.InvalidInputStreamStateException | ||
| VerifyException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when is the VerifyException thrown?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throughout the class when we insert/remove into the pending map we call verify(map operation)
so that might get thrown here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since VerifyException is not expected when things are working as expected, why not throw it upstream as it is? Converting it to WindmillStreamShutdownException will mask the real failure and the worker will be operating in an unknown state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it is thrown and we are shutdown, the verify exception is invalid because any ongoing responses/requests are invalid. We clear the pending
+ requests
fields so some of the map operations are looking for things that are no longer there.
An alternative is instead of just verify, we add a check for isShutdown ontop of the boolean condition being checked and either return or throw if isShutdown. The issue here is we start checking for isShutdown every vs letting the exception just get thrown and handled here.
If the stream is not shutdown, the VerifyException will still be thrown. I can add a test for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we see VerifyException, that means there is a logic bug and we need to correct code to fix them.
One case I see is, on shutdown we do batches.clear()
, this could happen in the middle of the batch sending. There is verify(batch == batches.peekFirst());
inside send
that will throw after shutdown.
from https://github.com/google/guava/wiki/ConditionalFailuresExplained#kinds-of-conditional-failures
A verification check is used when you lack high confidence that an API you consume will meet its (real or implied) specification. It's easiest to understand this type of check as "like an assertion in almost every way, but we'll never want to disable them."
Can we make sure Verify exceptions are not thrown unless there is a bug?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added checks to isShutdown
throw new RuntimeException(e); | ||
} finally { | ||
pending.remove(request.id()); | ||
} | ||
} | ||
|
||
// If we have exited the loop here, the stream has been shutdown. Cancel the response stream. | ||
request.getResponseStream().cancel(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need this? only the parseFn.parse
above was reading the responseStream and nothing should be blocked on the response stream at this point?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not needed, just wanted to cancel the stream incase there was anything dangling
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it is not needed, lets remove it. To catch anything dangling, we can add a check and throw
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
...apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java
Outdated
Show resolved
Hide resolved
f05b41e
to
b66c086
Compare
b66c086
to
881399e
Compare
881399e
to
8618174
Compare
back to you @scwhittle @arunpandianp thanks! |
@Nullable QueuedBatch prevBatch = null; | ||
synchronized (shutdownLock) { | ||
if (isShutdown()) { | ||
handleShutdown(request); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
handleShutdown(request); | |
throw shutdownException(request); |
throwing here would be more readable, than relying on handleShutdown to throw.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
batch.await(); | ||
batch.notifySent(); | ||
} catch (Exception e) { | ||
LOG.error("Error occurred sending batch.", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might log on every shutdown. We could remove this and rely on logging unexpected exceptions upstream
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
back to you @arunpandianp @scwhittle thanks |
// Default gRPC streams to 2MB chunks, which has shown to be a large enough chunk size to reduce | ||
// per-chunk overhead, and small enough that we can still perform granular flow-control. | ||
protected static final int RPC_STREAM_CHUNK_SIZE = 2 << 20; | ||
private static final Logger LOG = LoggerFactory.getLogger(AbstractWindmillStream.class); | ||
private static final Status OK_STATUS = Status.fromCode(Status.Code.OK); | ||
|
||
protected final AtomicBoolean clientClosed; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a comment:
Indicates that the logical stream has been half-closed and is waiting for clean server shutdown.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
/** | ||
* Indicates if the current {@link ResettableRequestObserver} was closed by calling {@link | ||
* #halfClose()}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be good to compare to clarify versus clientClosed.
This is more about the specific physical stream
Maybe add "Separate from clientClosed as this is specific to the requestObserver and is initially false on retry."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
private final AtomicReference<String> lastError; | ||
private final AtomicReference<DateTime> lastErrorTime; | ||
private final AtomicReference<String> lastRestartReason; | ||
private final AtomicReference<DateTime> lastRestartTime; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is getting to be a lot of atomics. Most are just atomic for status page.
I think it could be less confusing to have some MonitoringInfo object which has synchronized methods like noteRestart(...), sleepUntil(sleeper, time), noteSend, noteResponse to make it clearer what is just monitoring stuff and what is part of the state machine.
For state machine things like isShutdown, clientClosed it woudl be nice if we could move away from atomics to variables marked with GuardedBy. In some cases that might lead to duplication with the monitoring state but I think it could still help with analyzing the locking correctness or races within this class. For example for shutdown, I think we could have boolean guardedby shutdown mutex, and a method on MonitoringState like noteShutdown() which would internally have it's own boolean and time the shutdown occurred.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
if (started) { | ||
// requestObserver is not set until the first startStream() is called. If the stream was | ||
// never started there is nothing to clean up internally. | ||
requestObserver.onError( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems racy since start sets started to true before startStream calls requestObserver.reset() initially, or it could happen that startStream resets the observer after we call onError here and the new one doesn't have an error called.
how about adding a RequestObserver.poison which permanently puts the request observer in a poisoned state, calling onError on the current one or any future made ones? you can remove the started check and the requestobserver handles the synchronization within itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
also moved ResettableRequestObserver to its own file, and added tests
213e804
to
1b819bc
Compare
back to you thanks @scwhittle |
1b819bc
to
77969d0
Compare
d49d905
to
5a68e0f
Compare
private @Nullable StreamObserver<T> delegateStreamObserver; | ||
|
||
/** | ||
* Indicates that the request observer should no longer be used. Attempts to perform operations on |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move comment next to poison method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
...n/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableStreamObserver.java
Outdated
Show resolved
Hide resolved
/** Records stream metrics for debugging. */ | ||
@ThreadSafe | ||
final class StreamDebugMetrics { | ||
private final AtomicInteger restartCount = new AtomicInteger(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can these just be synchronized ints as well instead of atomic? WE're not doing anything expensive under lock so it doesn't seem like it needs separate consideration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
lastRestartTime = DateTime.now(); | ||
} | ||
|
||
synchronized long startTimeMs() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getStartTimeMs?
ditto for lastSendTimeMs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
.../java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java
Show resolved
Hide resolved
/** | ||
* Used to guard {@link #start()} and {@link #shutdown()} behavior. | ||
* | ||
* @implNote Do not hold when performing IO. If also locking on {@code this} in the same context, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you acquire this before (this) lock, then this mutex could be blocked by I/O because we perform IO beneath this lock.
If this is supposed to be lightweight it seems like it should be acquired after (this) to avoid that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@GuardedBy("this") | ||
private boolean streamClosed; | ||
|
||
private volatile boolean isShutdown; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this be guarded by shutdown lock? the accessor method can synchronize
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
private boolean streamClosed; | ||
|
||
private volatile boolean isShutdown; | ||
private volatile boolean started; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this be guarded by one of hte locks instead of volatile?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
Outdated
Show resolved
Hide resolved
685d386
to
3fb76ac
Compare
back to you @scwhittle thank you! |
throw new IllegalStateException("Send called on a client closed stream."); | ||
} | ||
|
||
requestObserver().onNext(request); | ||
try { | ||
verify(!Thread.holdsLock(shutdownLock), "shutdownLock should not be held during send."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would remove this since it seems likely expensive.
Instead you could verify with a test:
- setup requestObserver that blocks until notified
- one thread calls send and starts blocking
- main test thread calls shutdown() and verifies the method returns
- main test thread unblocks the requestObserver
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
Outdated
Show resolved
Hide resolved
...n/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableStreamObserver.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
Outdated
Show resolved
Hide resolved
...va/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java
Show resolved
Hide resolved
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
Show resolved
Hide resolved
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
Show resolved
Hide resolved
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
Outdated
Show resolved
Hide resolved
@@ -156,29 +163,44 @@ public void sendHealthCheck() { | |||
protected void onResponse(StreamingCommitResponse response) { | |||
commitWorkThrottleTimer.stop(); | |||
|
|||
RuntimeException finalException = null; | |||
CommitCompletionException failures = new CommitCompletionException(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: how about a builder for this since we won't want to add exceptions in other places.
then the final line of method can be
builder.throwIfNonEmpty();
and interrnally it builds exception and throws if needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking the builder would not be an exception itself, and then the exception would just be a simple class without mutating methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
.../java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java
Outdated
Show resolved
Hide resolved
queuedBytes = 0; | ||
queue.clear(); | ||
try { | ||
if (!hasReceivedShutdownSignal()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we remove? it is handled in stream methods already and doing it here leaves a gap between shutdown check and flush anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
send(StreamingGetDataRequest.newBuilder().setHeader(jobHeader).build()); | ||
if (clientClosed.get()) { | ||
if (clientClosed && !hasReceivedShutdownSignal()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove? above send will fail if shutdown
though I think we can also guarantee at higher level that onNewStream isn't called if shutdown
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
Outdated
Show resolved
Hide resolved
return true; | ||
} | ||
private boolean maybeTeardownStream() { | ||
synchronized (AbstractWindmillStream.this) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just make a synchronized method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed by moving it out of the inner class
GuardedBy check was failing since reference to this
was ambiguous
private boolean maybeTeardownStream() { | ||
synchronized (AbstractWindmillStream.this) { | ||
if (isShutdown || (clientClosed && !hasPendingRequests())) { | ||
streamRegistry.remove(AbstractWindmillStream.this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why doesn't just "this" work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed by moving it out of the inner class
GuardedBy check was failing since reference to this
was ambiguous
@@ -59,6 +59,9 @@ final class StreamDebugMetrics { | |||
@GuardedBy("this") | |||
private DateTime shutdownTime = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mark nullable and above
I think you still need to fix the over exemption of windmill classes from checker.
#30183
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
// If the stream was stopped due to a resource exhausted error then we are throttled. | ||
if (status != null && status.getCode() == Status.Code.RESOURCE_EXHAUSTED) { | ||
if (status.getCode() == Status.Code.RESOURCE_EXHAUSTED) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. I would maybe keep this in the onError case, as the other stuff is more just logs/debug page and this is more functional.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
while (true) { | ||
private <ResponseT> ResponseT issueRequest(QueuedRequest request, ParseFn<ResponseT> parseFn) | ||
throws WindmillStreamShutdownException { | ||
while (!isShutdownLocked()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rm? below needs to handle it anyway
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
pending.clear(); | ||
batches.forEach( | ||
batch -> { | ||
batch.markFinalized(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
batch usage should be syncrhonized on this
should shutdownInternal be marked synchronized? It is currently in sycnrhonzied shutdown block anyhway
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
|
||
private void queueRequestAndWait(QueuedRequest request) throws InterruptedException { | ||
private synchronized void handleShutdown(QueuedRequest request, Throwable cause) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: name throwIfShutdown?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
request.addToStreamingGetDataRequest(builder); | ||
} | ||
return builder.build(); | ||
private synchronized void verify(boolean condition, String message) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this be removed? it seems you have shutdown check first in cases I see (and if not I think it would be clearer to have it as part of the check where it is than hidden in method that doesn't sound like it examines shutdown)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Verify.verify(condition || isShutdown, message); | ||
} | ||
|
||
private synchronized boolean isShutdownLocked() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rm if you remove above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dpne
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some of previous comments were not resolved, please look through them too. Thanks!
requestObserver.poison(); | ||
isShutdown = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should remove this (and the suppress)
shouldn't the poison prevent the blocking beneath the mutex? and then the below lock will be acquired soon?
Setting it to true outside the mutex will break invariants that are easier to think about if it is strictly guarded by. (and it breaks logic below we'll never run shutdownInternal)
if we do need it for something it seems like we could have a separate volatile shutdownRequested boolean. But I'd prefer to figure out what gets stuck with the current code and fix it instead because it is confusing to have two.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cleaneed up was supposed to stay within the sync block
@@ -146,6 +146,7 @@ public void onNext(T value) throws StreamObserverCancelledException { | |||
"Output channel stalled for {}s, outbound thread {}.", | |||
totalSecondsWaited, | |||
Thread.currentThread().getName()); | |||
Thread.dumpStack(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you want to submit, log this instead. We've used things liek StringUtils.arrayToNewlines(Thread.currentThread().getStackTrace(), 10); elsehwere to get a string to log
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed this waas for debugging
@@ -472,7 +473,8 @@ private void flushResponse() { | |||
responseObserver.onNext(responseBuilder.build()); | |||
} catch (Exception e) { | |||
// Stream is already closed. | |||
System.out.println("trieu: " + e); | |||
LOG.warn("trieu: ", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rm debug logs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed this waas for debugging
.../java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java
Show resolved
Hide resolved
int nextPhase = | ||
isReadyNotifier.awaitAdvanceInterruptibly(awaitPhase, waitSeconds, TimeUnit.SECONDS); | ||
// If nextPhase is a value less than 0, the phaser has been terminated. | ||
if (nextPhase < 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also check isClosed to ensure that we don't call onNext on something that we called onFinish or onError on.
We need it here since the locking in the ResettableStreamObserver isn't sufficient as it sends without lock held. You might be able to remove it from that class if we're just relying on this one to provide enforcement that the underlying stream observer is used correctly (ie locked, just one terminal method, no other methods after it)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added checks under lock
to onError
and onComplete
all onNext
calls are already guarded by lock
synchronized (lock) { | ||
isClosed = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ie if (!isClosed) check to all of these
still need to add some more tests to thanks! |
@@ -121,7 +121,7 @@ public void onNext(T t) throws StreamClosedException, WindmillStreamShutdownExce | |||
|
|||
try { | |||
delegate.onError(e); | |||
} catch (RuntimeException ignored) { | |||
} catch (IllegalStateException ignored) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add the ignored to the exception too? just in case we catch something unexpected and it helps debugging?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -188,7 +188,9 @@ protected void onResponse(StreamingGetDataResponse chunk) { | |||
|
|||
for (int i = 0; i < chunk.getRequestIdCount(); ++i) { | |||
AppendableInputStream responseStream = pending.get(chunk.getRequestId(i)); | |||
verify(responseStream != null, "No pending response stream"); | |||
synchronized (this) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
avoid synchronizing if not necessary and you need to handle the null case or you will just get an exception in following line.
if (responseStream == null) {
sychronized (this) { verify(isShutdown); }
continue;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
outboundObserver.onCompleted(); | ||
} | ||
} | ||
|
||
private void markClosedOrThrow() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just make method synchronized (though with suggestion below, probably easier to just duplicate in onCompleted/onError and use the same if block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
outboundObserver.onCompleted(); | ||
} | ||
} | ||
|
||
private void markClosedOrThrow() { | ||
synchronized (lock) { | ||
Preconditions.checkState(!isClosed); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is going to throw if we have sequence
T1: onNext()
T2: terminate()
T1: onError()
It seems like that could happen if we're terminating from other threads than the one driving the observer generally. We could have a separate bool tracking if userClosed or not, and change this exception to be based upon that as that is using the class wrong. having a terminate before/during a onCompleted/onError doesn't necessarily seem like misuse and I think we should avoid throwing an exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -104,6 +106,10 @@ public void setMessageCompression(boolean b) {} | |||
() -> | |||
assertThrows(WindmillStreamShutdownException.class, () -> testStream.testSend(1))); | |||
testStream.shutdown(); | |||
|
|||
// Sleep a bit to give sendExecutor time to execute the send(). | |||
Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sleep for less, 100ms maybe? Tests take long enough already
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -142,7 +144,8 @@ public void testQueuedBatch_notifyFailed_throwsWindmillStreamShutdownExceptionOn | |||
assertThrows( | |||
WindmillStreamShutdownException.class, | |||
queuedBatch::waitForSendOrFailNotification)); | |||
|
|||
// Wait a few seconds for the above future to get scheduled and run. | |||
Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto how about 100ms
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
}); | ||
|
||
// Sleep a bit to allow future to run. | ||
Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto less
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
if (currentPhase < 0) return; | ||
if (currentPhase < 0) { | ||
throw new StreamObserverCancelledException("StreamObserver was terminated."); | ||
} | ||
messagesSinceReady = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert(!isClosed);
since we terminate before closing seems like it is guaranteed but clearer to reader to assert I think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
if (currentPhase < 0) return; | ||
if (currentPhase < 0) { | ||
throw new StreamObserverCancelledException("StreamObserver was terminated."); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert(!isClosed);
since we terminate before closing seems like it is guaranteed but clearer to reader to assert I think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
if (currentPhase < 0) return; | ||
if (currentPhase < 0) { | ||
throw new StreamObserverCancelledException("StreamObserver was terminated."); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert(!isClosed);
since we terminate before grabbing synchronized to close it is guaranteed but clearer to reader to assert I think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
64e61d9
to
42836f4
Compare
42836f4
to
f75df0f
Compare
Start and closing Windmill streams are currently via
halfClose()
and on stream creation. Implementations were previously created and returned in a "started" state usually after the stream has already sent the initial headers to open the connection to the backend servers.Starting in the current state prevents us from being able to start the stream "lazily". And closing allows other blocking stream operations to prevent streams from being able to be closed (stalling at times up to 10-20 minutes).
This is especially important in direct path mode where the user worker manages the fan out to the backend.
in terms of implementation, similar to WindmillStream.shutdown(), WindmillStream.start()'s behavior will only execute once during the lifetime of the WindmillStream object. Subsequent calls to start() and shutdown() will do nothing.
R: @arunpandianp @scwhittle
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.