Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add shutdown and start mechanics to windmill streams #32774

Open
wants to merge 22 commits into
base: master
Choose a base branch
from

Conversation

m-trieu
Copy link
Contributor

@m-trieu m-trieu commented Oct 14, 2024

Start and closing Windmill streams are currently via halfClose() and on stream creation. Implementations were previously created and returned in a "started" state usually after the stream has already sent the initial headers to open the connection to the backend servers.

Starting in the current state prevents us from being able to start the stream "lazily". And closing allows other blocking stream operations to prevent streams from being able to be closed (stalling at times up to 10-20 minutes).

  • Add start() flexibility to the WindmillStream API by allowing external callers to start the stream themselves.
  • Add shutdown() capability to allow the stream to receive a shutdown signal, that is idempotent and does not block (or is blocked by) other blocking stream operations.

This is especially important in direct path mode where the user worker manages the fan out to the backend.

in terms of implementation, similar to WindmillStream.shutdown(), WindmillStream.start()'s behavior will only execute once during the lifetime of the WindmillStream object. Subsequent calls to start() and shutdown() will do nothing.

R: @arunpandianp @scwhittle

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@m-trieu m-trieu force-pushed the mt-start-stream branch 2 times, most recently from 9591a9a to ce57880 Compare October 14, 2024 23:35
@m-trieu
Copy link
Contributor Author

m-trieu commented Oct 15, 2024

assign set of reviewers

Copy link
Contributor

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @damccorm added as fallback since no labels match configuration

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@@ -216,12 +207,18 @@ static FanOutStreamingEngineWorkerHarness forTesting(
return fanOutStreamingEngineWorkProvider;
}

@SuppressWarnings("ReturnValueIgnored")
@SuppressWarnings("FutureReturnValueIgnored")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assign the future to a variable named unusedFuture and remove the suppression?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// Run this on a separate thread than the grpc stream thread.
newWorkerMetadataPublisher.submit(
() -> newWindmillEndpoints.add(endpoints))));
this.getWorkerMetadataStream = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does anything prevent us from creating the stream here and starting it in start?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't want to start and do too much (make an RPC call) in the constructor

We also need the metadata stub (dispatcher stub), which we don't have until we call start in StreamingDataflowWorker. Can fix this by making start() be asynchronous here but I still think that might be doing too much in the constructor.

Will have to suppress a warning (reference to 'this' in constructor) but i can create the stream here and just start the stream in start() instead of assigning.

GetWorkBudget adjustment = GetWorkBudget.builder().setItems(items).setBytes(bytes).build();
getWorkBudget.set(adjustment);
if (started.get()) {
getWorkStream.adjustBudget(adjustment);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
getWorkStream.adjustBudget(adjustment);
getWorkStream.setBudget(newBudget);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rebased onto #32775

@@ -187,13 +206,14 @@ protected void startThrottleTimer() {
commitWorkThrottleTimer.start();
}

private void flushInternal(Map<Long, PendingRequest> requests) {
private void flushInternal(Map<Long, PendingRequest> requests) throws InterruptedException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it doesn't look like any of the method calls inside flushInternal are throwing InterruptedException. Can we remove the throws from here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed


private ResettableRequestObserver(Supplier<StreamObserver<RequestT>> requestObserverSupplier) {
this.requestObserverSupplier = requestObserverSupplier;
this.delegateRequestObserver = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we initialize delegateRequestObserver with requestObserverSupplier.get() here and remove the null checks in delegate()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need some initial state to not allow sends() or other stream operations w/o a call to startStream/start

we can use null or a dummy observer that will throw if any of the methods are called before startStream/start is called?

@@ -160,80 +179,125 @@ protected boolean isShutdown() {
private StreamObserver<RequestT> requestObserver() {
if (requestObserver == null) {
throw new NullPointerException(
"requestObserver cannot be null. Missing a call to startStream() to initialize.");
"requestObserver cannot be null. Missing a call to start() to initialize stream.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

requestObserver is now created in constructor, we don't need the null check here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bump

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

executor.execute(runnable);
} catch (RejectedExecutionException e) {
logger.debug("{}-{} has been shutdown.", getClass(), backendWorkerToken);
} catch (IllegalStateException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

executor.execute won't propagate the IllegalStateException from closed stream to here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't ever do anything with the IllegalStateException anyways except LOG so I think this is ok

if (isShutdown.compareAndSet(false, true)) {
requestObserver()
.onError(new WindmillStreamShutdownException("Explicit call to shutdown stream."));
shutdownInternal();
shutdownTime.set(DateTime.now());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move shutdownTime.set(DateTime.now()); to line 354 before calling requestObserver().onError?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// Retry issuing the request since the response stream was cancelled.
continue;
} catch (AppendableInputStream.InvalidInputStreamStateException
| VerifyException
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when is the VerifyException thrown?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throughout the class when we insert/remove into the pending map we call verify(map operation)
so that might get thrown here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since VerifyException is not expected when things are working as expected, why not throw it upstream as it is? Converting it to WindmillStreamShutdownException will mask the real failure and the worker will be operating in an unknown state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is thrown and we are shutdown, the verify exception is invalid because any ongoing responses/requests are invalid. We clear the pending + requests fields so some of the map operations are looking for things that are no longer there.

An alternative is instead of just verify, we add a check for isShutdown ontop of the boolean condition being checked and either return or throw if isShutdown. The issue here is we start checking for isShutdown every vs letting the exception just get thrown and handled here.

If the stream is not shutdown, the VerifyException will still be thrown. I can add a test for this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we see VerifyException, that means there is a logic bug and we need to correct code to fix them.

One case I see is, on shutdown we do batches.clear(), this could happen in the middle of the batch sending. There is verify(batch == batches.peekFirst()); inside send that will throw after shutdown.

from https://github.com/google/guava/wiki/ConditionalFailuresExplained#kinds-of-conditional-failures

A verification check is used when you lack high confidence that an API you consume will meet its (real or implied) specification. It's easiest to understand this type of check as "like an assertion in almost every way, but we'll never want to disable them."

Can we make sure Verify exceptions are not thrown unless there is a bug?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added checks to isShutdown

throw new RuntimeException(e);
} finally {
pending.remove(request.id());
}
}

// If we have exited the loop here, the stream has been shutdown. Cancel the response stream.
request.getResponseStream().cancel();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this? only the parseFn.parse above was reading the responseStream and nothing should be blocked on the response stream at this point?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed, just wanted to cancel the stream incase there was anything dangling

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is not needed, lets remove it. To catch anything dangling, we can add a check and throw

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

@m-trieu
Copy link
Contributor Author

m-trieu commented Oct 25, 2024

back to you @scwhittle @arunpandianp thanks!

@Nullable QueuedBatch prevBatch = null;
synchronized (shutdownLock) {
if (isShutdown()) {
handleShutdown(request);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
handleShutdown(request);
throw shutdownException(request);

throwing here would be more readable, than relying on handleShutdown to throw.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

batch.await();
batch.notifySent();
} catch (Exception e) {
LOG.error("Error occurred sending batch.", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might log on every shutdown. We could remove this and rely on logging unexpected exceptions upstream

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@m-trieu
Copy link
Contributor Author

m-trieu commented Oct 25, 2024

back to you @arunpandianp @scwhittle thanks

// Default gRPC streams to 2MB chunks, which has shown to be a large enough chunk size to reduce
// per-chunk overhead, and small enough that we can still perform granular flow-control.
protected static final int RPC_STREAM_CHUNK_SIZE = 2 << 20;
private static final Logger LOG = LoggerFactory.getLogger(AbstractWindmillStream.class);
private static final Status OK_STATUS = Status.fromCode(Status.Code.OK);

protected final AtomicBoolean clientClosed;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a comment:

Indicates that the logical stream has been half-closed and is waiting for clean server shutdown.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


/**
* Indicates if the current {@link ResettableRequestObserver} was closed by calling {@link
* #halfClose()}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good to compare to clarify versus clientClosed.
This is more about the specific physical stream

Maybe add "Separate from clientClosed as this is specific to the requestObserver and is initially false on retry."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

private final AtomicReference<String> lastError;
private final AtomicReference<DateTime> lastErrorTime;
private final AtomicReference<String> lastRestartReason;
private final AtomicReference<DateTime> lastRestartTime;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is getting to be a lot of atomics. Most are just atomic for status page.

I think it could be less confusing to have some MonitoringInfo object which has synchronized methods like noteRestart(...), sleepUntil(sleeper, time), noteSend, noteResponse to make it clearer what is just monitoring stuff and what is part of the state machine.

For state machine things like isShutdown, clientClosed it woudl be nice if we could move away from atomics to variables marked with GuardedBy. In some cases that might lead to duplication with the monitoring state but I think it could still help with analyzing the locking correctness or races within this class. For example for shutdown, I think we could have boolean guardedby shutdown mutex, and a method on MonitoringState like noteShutdown() which would internally have it's own boolean and time the shutdown occurred.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if (started) {
// requestObserver is not set until the first startStream() is called. If the stream was
// never started there is nothing to clean up internally.
requestObserver.onError(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems racy since start sets started to true before startStream calls requestObserver.reset() initially, or it could happen that startStream resets the observer after we call onError here and the new one doesn't have an error called.

how about adding a RequestObserver.poison which permanently puts the request observer in a poisoned state, calling onError on the current one or any future made ones? you can remove the started check and the requestobserver handles the synchronization within itself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done
also moved ResettableRequestObserver to its own file, and added tests

@m-trieu
Copy link
Contributor Author

m-trieu commented Oct 29, 2024

back to you thanks @scwhittle

private @Nullable StreamObserver<T> delegateStreamObserver;

/**
* Indicates that the request observer should no longer be used. Attempts to perform operations on
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move comment next to poison method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

/** Records stream metrics for debugging. */
@ThreadSafe
final class StreamDebugMetrics {
private final AtomicInteger restartCount = new AtomicInteger();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can these just be synchronized ints as well instead of atomic? WE're not doing anything expensive under lock so it doesn't seem like it needs separate consideration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

lastRestartTime = DateTime.now();
}

synchronized long startTimeMs() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getStartTimeMs?

ditto for lastSendTimeMs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

/**
* Used to guard {@link #start()} and {@link #shutdown()} behavior.
*
* @implNote Do not hold when performing IO. If also locking on {@code this} in the same context,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you acquire this before (this) lock, then this mutex could be blocked by I/O because we perform IO beneath this lock.

If this is supposed to be lightweight it seems like it should be acquired after (this) to avoid that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@GuardedBy("this")
private boolean streamClosed;

private volatile boolean isShutdown;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be guarded by shutdown lock? the accessor method can synchronize

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

private boolean streamClosed;

private volatile boolean isShutdown;
private volatile boolean started;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be guarded by one of hte locks instead of volatile?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@m-trieu
Copy link
Contributor Author

m-trieu commented Nov 1, 2024

back to you @scwhittle thank you!

throw new IllegalStateException("Send called on a client closed stream.");
}

requestObserver().onNext(request);
try {
verify(!Thread.holdsLock(shutdownLock), "shutdownLock should not be held during send.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would remove this since it seems likely expensive.
Instead you could verify with a test:

  • setup requestObserver that blocks until notified
  • one thread calls send and starts blocking
  • main test thread calls shutdown() and verifies the method returns
  • main test thread unblocks the requestObserver

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -156,29 +163,44 @@ public void sendHealthCheck() {
protected void onResponse(StreamingCommitResponse response) {
commitWorkThrottleTimer.stop();

RuntimeException finalException = null;
CommitCompletionException failures = new CommitCompletionException();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: how about a builder for this since we won't want to add exceptions in other places.

then the final line of method can be

builder.throwIfNonEmpty();
and interrnally it builds exception and throws if needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking the builder would not be an exception itself, and then the exception would just be a simple class without mutating methods.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

queuedBytes = 0;
queue.clear();
try {
if (!hasReceivedShutdownSignal()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove? it is handled in stream methods already and doing it here leaves a gap between shutdown check and flush anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

send(StreamingGetDataRequest.newBuilder().setHeader(jobHeader).build());
if (clientClosed.get()) {
if (clientClosed && !hasReceivedShutdownSignal()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove? above send will fail if shutdown
though I think we can also guarantee at higher level that onNewStream isn't called if shutdown

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return true;
}
private boolean maybeTeardownStream() {
synchronized (AbstractWindmillStream.this) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just make a synchronized method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed by moving it out of the inner class
GuardedBy check was failing since reference to this was ambiguous

private boolean maybeTeardownStream() {
synchronized (AbstractWindmillStream.this) {
if (isShutdown || (clientClosed && !hasPendingRequests())) {
streamRegistry.remove(AbstractWindmillStream.this);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why doesn't just "this" work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed by moving it out of the inner class
GuardedBy check was failing since reference to this was ambiguous

@@ -59,6 +59,9 @@ final class StreamDebugMetrics {
@GuardedBy("this")
private DateTime shutdownTime = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mark nullable and above

I think you still need to fix the over exemption of windmill classes from checker.
#30183

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// If the stream was stopped due to a resource exhausted error then we are throttled.
if (status != null && status.getCode() == Status.Code.RESOURCE_EXHAUSTED) {
if (status.getCode() == Status.Code.RESOURCE_EXHAUSTED) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. I would maybe keep this in the onError case, as the other stuff is more just logs/debug page and this is more functional.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

while (true) {
private <ResponseT> ResponseT issueRequest(QueuedRequest request, ParseFn<ResponseT> parseFn)
throws WindmillStreamShutdownException {
while (!isShutdownLocked()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rm? below needs to handle it anyway

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

pending.clear();
batches.forEach(
batch -> {
batch.markFinalized();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

batch usage should be syncrhonized on this

should shutdownInternal be marked synchronized? It is currently in sycnrhonzied shutdown block anyhway

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

private void queueRequestAndWait(QueuedRequest request) throws InterruptedException {
private synchronized void handleShutdown(QueuedRequest request, Throwable cause)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: name throwIfShutdown?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

request.addToStreamingGetDataRequest(builder);
}
return builder.build();
private synchronized void verify(boolean condition, String message) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be removed? it seems you have shutdown check first in cases I see (and if not I think it would be clearer to have it as part of the check where it is than hidden in method that doesn't sound like it examines shutdown)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Verify.verify(condition || isShutdown, message);
}

private synchronized boolean isShutdownLocked() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rm if you remove above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dpne

Copy link
Contributor

@scwhittle scwhittle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some of previous comments were not resolved, please look through them too. Thanks!

requestObserver.poison();
isShutdown = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should remove this (and the suppress)

shouldn't the poison prevent the blocking beneath the mutex? and then the below lock will be acquired soon?

Setting it to true outside the mutex will break invariants that are easier to think about if it is strictly guarded by. (and it breaks logic below we'll never run shutdownInternal)

if we do need it for something it seems like we could have a separate volatile shutdownRequested boolean. But I'd prefer to figure out what gets stuck with the current code and fix it instead because it is confusing to have two.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cleaneed up was supposed to stay within the sync block

@@ -146,6 +146,7 @@ public void onNext(T value) throws StreamObserverCancelledException {
"Output channel stalled for {}s, outbound thread {}.",
totalSecondsWaited,
Thread.currentThread().getName());
Thread.dumpStack();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you want to submit, log this instead. We've used things liek StringUtils.arrayToNewlines(Thread.currentThread().getStackTrace(), 10); elsehwere to get a string to log

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed this waas for debugging

@@ -472,7 +473,8 @@ private void flushResponse() {
responseObserver.onNext(responseBuilder.build());
} catch (Exception e) {
// Stream is already closed.
System.out.println("trieu: " + e);
LOG.warn("trieu: ", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rm debug logs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed this waas for debugging

int nextPhase =
isReadyNotifier.awaitAdvanceInterruptibly(awaitPhase, waitSeconds, TimeUnit.SECONDS);
// If nextPhase is a value less than 0, the phaser has been terminated.
if (nextPhase < 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also check isClosed to ensure that we don't call onNext on something that we called onFinish or onError on.

We need it here since the locking in the ResettableStreamObserver isn't sufficient as it sends without lock held. You might be able to remove it from that class if we're just relying on this one to provide enforcement that the underlying stream observer is used correctly (ie locked, just one terminal method, no other methods after it)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added checks under lock to onError and onComplete all onNext calls are already guarded by lock

synchronized (lock) {
isClosed = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ie if (!isClosed) check to all of these

@m-trieu
Copy link
Contributor Author

m-trieu commented Nov 13, 2024

still need to add some more tests to DirectStreamObserverTest
addressed the other comments

thanks!
@scwhittle

@@ -121,7 +121,7 @@ public void onNext(T t) throws StreamClosedException, WindmillStreamShutdownExce

try {
delegate.onError(e);
} catch (RuntimeException ignored) {
} catch (IllegalStateException ignored) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add the ignored to the exception too? just in case we catch something unexpected and it helps debugging?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -188,7 +188,9 @@ protected void onResponse(StreamingGetDataResponse chunk) {

for (int i = 0; i < chunk.getRequestIdCount(); ++i) {
AppendableInputStream responseStream = pending.get(chunk.getRequestId(i));
verify(responseStream != null, "No pending response stream");
synchronized (this) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avoid synchronizing if not necessary and you need to handle the null case or you will just get an exception in following line.

if (responseStream == null) {
sychronized (this) { verify(isShutdown); }
continue;
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

outboundObserver.onCompleted();
}
}

private void markClosedOrThrow() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just make method synchronized (though with suggestion below, probably easier to just duplicate in onCompleted/onError and use the same if block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

outboundObserver.onCompleted();
}
}

private void markClosedOrThrow() {
synchronized (lock) {
Preconditions.checkState(!isClosed);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is going to throw if we have sequence

T1: onNext()
T2: terminate()
T1: onError()

It seems like that could happen if we're terminating from other threads than the one driving the observer generally. We could have a separate bool tracking if userClosed or not, and change this exception to be based upon that as that is using the class wrong. having a terminate before/during a onCompleted/onError doesn't necessarily seem like misuse and I think we should avoid throwing an exception.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -104,6 +106,10 @@ public void setMessageCompression(boolean b) {}
() ->
assertThrows(WindmillStreamShutdownException.class, () -> testStream.testSend(1)));
testStream.shutdown();

// Sleep a bit to give sendExecutor time to execute the send().
Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sleep for less, 100ms maybe? Tests take long enough already

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -142,7 +144,8 @@ public void testQueuedBatch_notifyFailed_throwsWindmillStreamShutdownExceptionOn
assertThrows(
WindmillStreamShutdownException.class,
queuedBatch::waitForSendOrFailNotification));

// Wait a few seconds for the above future to get scheduled and run.
Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto how about 100ms

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

});

// Sleep a bit to allow future to run.
Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto less

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if (currentPhase < 0) return;
if (currentPhase < 0) {
throw new StreamObserverCancelledException("StreamObserver was terminated.");
}
messagesSinceReady = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert(!isClosed);
since we terminate before closing seems like it is guaranteed but clearer to reader to assert I think

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if (currentPhase < 0) return;
if (currentPhase < 0) {
throw new StreamObserverCancelledException("StreamObserver was terminated.");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert(!isClosed);
since we terminate before closing seems like it is guaranteed but clearer to reader to assert I think

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if (currentPhase < 0) return;
if (currentPhase < 0) {
throw new StreamObserverCancelledException("StreamObserver was terminated.");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert(!isClosed);
since we terminate before grabbing synchronized to close it is guaranteed but clearer to reader to assert I think

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@m-trieu m-trieu force-pushed the mt-start-stream branch 2 times, most recently from 64e61d9 to 42836f4 Compare November 13, 2024 22:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants