Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add shutdown and start mechanics to windmill streams #32774

Open
wants to merge 22 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.dataflow.worker.windmill.client;

import com.google.errorprone.annotations.CanIgnoreReturnValue;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Set;
Expand All @@ -27,6 +28,7 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.runners.dataflow.worker.windmill.client.ResettableThrowingStreamObserver.StreamClosedException;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverCancelledException;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverFactory;
import org.apache.beam.sdk.util.BackOff;
Expand All @@ -48,7 +50,7 @@
* and {@link #onNewStream()} to perform any work that must be done when a new stream is created,
* such as sending headers or retrying requests.
*
* <p>{@link #send(RequestT)} and {@link #startStream()} should not be called from {@link
* <p>{@link #trySend(RequestT)} and {@link #startStream()} should not be called from {@link
* #onResponse(ResponseT)}; use {@link #executeSafely(Runnable)} instead.
*
* <p>Synchronization on this is used to synchronize the gRpc stream state and internal data
Expand Down Expand Up @@ -80,10 +82,12 @@ public abstract class AbstractWindmillStream<RequestT, ResponseT> implements Win
private final String backendWorkerToken;
private final ResettableThrowingStreamObserver<RequestT> requestObserver;
private final StreamDebugMetrics debugMetrics;
protected volatile boolean clientClosed;

@GuardedBy("this")
private boolean isShutdown;
protected boolean clientClosed;

@GuardedBy("this")
protected boolean isShutdown;

@GuardedBy("this")
private boolean started;
Expand Down Expand Up @@ -133,8 +137,7 @@ private static String createThreadName(String streamType, String backendWorkerTo
protected abstract void onResponse(ResponseT response);

/** Called when a new underlying stream to the server has been opened. */
protected abstract void onNewStream()
throws StreamClosedException, WindmillStreamShutdownException;
protected abstract void onNewStream() throws WindmillStreamShutdownException;

/** Returns whether there are any pending requests that should be retried on a stream break. */
protected abstract boolean hasPendingRequests();
Expand All @@ -146,16 +149,19 @@ protected abstract void onNewStream()
*/
protected abstract void startThrottleTimer();

/** Reflects that {@link #shutdown()} was explicitly called. */
protected synchronized boolean hasReceivedShutdownSignal() {
return isShutdown;
}

/** Send a request to the server. */
protected final synchronized void send(RequestT request)
throws StreamClosedException, WindmillStreamShutdownException {
/** Try to send a request to the server. Returns true if the request was successfully sent. */
@CanIgnoreReturnValue
protected final synchronized boolean trySend(RequestT request)
throws WindmillStreamShutdownException {
debugMetrics.recordSend();
requestObserver.onNext(request);
try {
requestObserver.onNext(request);
return true;
} catch (StreamClosedException e) {
// Stream was broken, requests may be retried when stream is reopened.
}

return false;
}

@Override
Expand Down Expand Up @@ -189,6 +195,7 @@ private void startStream() {
return;
scwhittle marked this conversation as resolved.
Show resolved Hide resolved
}
} catch (WindmillStreamShutdownException e) {
// shutdown() is responsible for cleaning up pending requests.
logger.debug("Stream was shutdown while creating new stream.", e);
scwhittle marked this conversation as resolved.
Show resolved Hide resolved
} catch (Exception e) {
logger.error("Failed to create new stream, retrying: ", e);
Expand All @@ -201,6 +208,8 @@ private void startStream() {
logger.info(
"Interrupted during {} creation backoff. The stream will not be created.",
scwhittle marked this conversation as resolved.
Show resolved Hide resolved
getClass());
// Shutdown the stream to clean up any dangling resources and pending requests.
shutdown();
break;
} catch (IOException ioe) {
// Keep trying to create the stream.
Expand All @@ -225,7 +234,7 @@ protected final void executeSafely(Runnable runnable) {
}
}

public final void maybeSendHealthCheck(Instant lastSendThreshold) {
public final synchronized void maybeSendHealthCheck(Instant lastSendThreshold) {
if (!clientClosed && debugMetrics.getLastSendTimeMs() < lastSendThreshold.getMillis()) {
try {
sendHealthCheck();
Expand All @@ -235,8 +244,7 @@ public final void maybeSendHealthCheck(Instant lastSendThreshold) {
}
}

protected abstract void sendHealthCheck()
throws WindmillStreamShutdownException, StreamClosedException;
protected abstract void sendHealthCheck() throws WindmillStreamShutdownException;

/**
* @implNote Care is taken that synchronization on this is unnecessary for all status page
Expand All @@ -257,7 +265,7 @@ public final void appendSummaryHtml(PrintWriter writer) {
metrics.lastRestartTime(),
metrics.errorCount()));

if (clientClosed) {
if (summaryMetrics.isClientClosed()) {
writer.write(", client closed");
}

Expand All @@ -272,7 +280,7 @@ public final void appendSummaryHtml(PrintWriter writer) {
summaryMetrics.timeSinceLastSend(),
summaryMetrics.timeSinceLastResponse(),
requestObserver.isClosed(),
hasReceivedShutdownSignal(),
summaryMetrics.shutdownTime().isPresent(),
scwhittle marked this conversation as resolved.
Show resolved Hide resolved
summaryMetrics.shutdownTime().orElse(null));
}

Expand All @@ -285,6 +293,7 @@ public final void appendSummaryHtml(PrintWriter writer) {
@Override
public final synchronized void halfClose() {
// Synchronization of close and onCompleted necessary for correct retry logic in onNewStream.
scwhittle marked this conversation as resolved.
Show resolved Hide resolved
debugMetrics.recordHalfClose();
clientClosed = true;
try {
requestObserver.onCompleted();
Expand Down Expand Up @@ -316,7 +325,6 @@ public final void shutdown() {
if (!isShutdown) {
isShutdown = true;
debugMetrics.recordShutdown();

shutdownInternal();
}
}
Expand Down Expand Up @@ -412,15 +420,17 @@ private void recordStreamStatus(Status status) {
}

/** Returns true if the stream was torn down and should not be restarted internally. */
private synchronized boolean maybeTeardownStream() {
if (hasReceivedShutdownSignal() || (clientClosed && !hasPendingRequests())) {
streamRegistry.remove(AbstractWindmillStream.this);
finishLatch.countDown();
executor.shutdownNow();
return true;
}
private boolean maybeTeardownStream() {
synchronized (AbstractWindmillStream.this) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just make a synchronized method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed by moving it out of the inner class
GuardedBy check was failing since reference to this was ambiguous

if (isShutdown || (clientClosed && !hasPendingRequests())) {
streamRegistry.remove(AbstractWindmillStream.this);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why doesn't just "this" work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed by moving it out of the inner class
GuardedBy check was failing since reference to this was ambiguous

finishLatch.countDown();
executor.shutdownNow();
return true;
}

return false;
return false;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,14 @@ public synchronized void onCompleted()
synchronized boolean isClosed() {
return isCurrentStreamClosed;
}

/**
* Indicates that the current stream was closed and the {@link StreamObserver} has finished via
* {@link StreamObserver#onCompleted()}. The stream may perform
*/
static final class StreamClosedException extends Exception {
private StreamClosedException(String s) {
super(s);
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ final class StreamDebugMetrics {
@GuardedBy("this")
private DateTime shutdownTime = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mark nullable and above

I think you still need to fix the over exemption of windmill classes from checker.
#30183

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


@GuardedBy("this")
private boolean clientClosed = false;

private StreamDebugMetrics(Supplier<Instant> clock) {
this.clock = clock;
}
Expand Down Expand Up @@ -122,6 +125,10 @@ synchronized void recordShutdown() {
shutdownTime = clock.get().toDateTime();
}

synchronized void recordHalfClose() {
clientClosed = true;
}

synchronized Optional<String> responseDebugString(long nowMillis) {
return lastResponseTimeMs == 0
? Optional.empty()
Expand All @@ -145,7 +152,8 @@ synchronized Snapshot getSummaryMetrics() {
debugDuration(nowMs, lastResponseTimeMs),
getRestartMetrics(),
sleepUntil - nowMs(),
shutdownTime);
shutdownTime,
clientClosed);
}

@AutoValue
Expand All @@ -156,14 +164,16 @@ private static Snapshot create(
long timeSinceLastResponse,
Optional<RestartMetrics> restartMetrics,
long sleepLeft,
@Nullable DateTime shutdownTime) {
@Nullable DateTime shutdownTime,
boolean isClientClosed) {
return new AutoValue_StreamDebugMetrics_Snapshot(
streamAge,
timeSinceLastSend,
timeSinceLastResponse,
restartMetrics,
sleepLeft,
Optional.ofNullable(shutdownTime));
Optional.ofNullable(shutdownTime),
isClientClosed);
}

abstract long streamAge();
Expand All @@ -177,6 +187,8 @@ private static Snapshot create(
abstract long sleepLeft();

abstract Optional<DateTime> shutdownTime();

abstract boolean isClientClosed();
}

@AutoValue
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.beam.runners.dataflow.worker.windmill.client;

/** Thrown when operations are requested on a {@link WindmillStream} has been shutdown/closed. */
/**
* Thrown when operations are requested on a {@link WindmillStream} has been shutdown. Future
* operations on the stream are not allowed and will throw an {@link
* WindmillStreamShutdownException}.
*/
public final class WindmillStreamShutdownException extends Exception {
scwhittle marked this conversation as resolved.
Show resolved Hide resolved
public WindmillStreamShutdownException(String message) {
super(message);
Expand Down
Loading
Loading