-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add shutdown and start mechanics to windmill streams #32774
base: master
Are you sure you want to change the base?
Changes from 1 commit
7d5802b
a2d5c84
d53310c
f30455b
219c880
6b79fad
2426a6b
9679382
6e5ba0c
99f0078
e92786e
6f052de
fb8573a
93c5b31
77969d0
5a68e0f
3fb76ac
3fafefa
ab34387
4a9a863
95a19f4
f75df0f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,6 +36,7 @@ | |
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Status; | ||
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver; | ||
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; | ||
import org.joda.time.DateTime; | ||
import org.joda.time.Instant; | ||
import org.slf4j.Logger; | ||
|
||
|
@@ -71,6 +72,7 @@ public abstract class AbstractWindmillStream<RequestT, ResponseT> implements Win | |
// shutdown. | ||
private static final Status OK_STATUS = Status.fromCode(Status.Code.OK); | ||
private static final String NEVER_RECEIVED_RESPONSE_LOG_STRING = "never received response"; | ||
private static final String NOT_SHUTDOWN = "not shutdown"; | ||
protected final Sleeper sleeper; | ||
|
||
private final Logger logger; | ||
|
@@ -262,7 +264,7 @@ public final void appendSummaryHtml(PrintWriter writer) { | |
", %d restarts, last restart reason [ %s ] at [%s], %d errors", | ||
metrics.restartCount(), | ||
metrics.lastRestartReason(), | ||
metrics.lastRestartTime(), | ||
metrics.lastRestartTime().orElse(null), | ||
metrics.errorCount())); | ||
|
||
if (summaryMetrics.isClientClosed()) { | ||
|
@@ -275,13 +277,12 @@ public final void appendSummaryHtml(PrintWriter writer) { | |
|
||
writer.format( | ||
", current stream is %dms old, last send %dms, last response %dms, closed: %s, " | ||
+ "isShutdown: %s, shutdown time: %s", | ||
+ "shutdown time: %s", | ||
summaryMetrics.streamAge(), | ||
summaryMetrics.timeSinceLastSend(), | ||
summaryMetrics.timeSinceLastResponse(), | ||
requestObserver.isClosed(), | ||
summaryMetrics.shutdownTime().isPresent(), | ||
summaryMetrics.shutdownTime().orElse(null)); | ||
summaryMetrics.shutdownTime().map(DateTime::toString).orElse(NOT_SHUTDOWN)); | ||
} | ||
|
||
/** | ||
|
@@ -297,8 +298,10 @@ public final synchronized void halfClose() { | |
clientClosed = true; | ||
try { | ||
requestObserver.onCompleted(); | ||
} catch (StreamClosedException | WindmillStreamShutdownException e) { | ||
logger.warn("Stream was previously closed or shutdown."); | ||
} catch (StreamClosedException e) { | ||
logger.warn("Stream was previously closed."); | ||
} catch (WindmillStreamShutdownException e) { | ||
logger.warn("Stream was previously shutdown."); | ||
} | ||
} | ||
|
||
|
@@ -317,10 +320,13 @@ public String backendWorkerToken() { | |
return backendWorkerToken; | ||
} | ||
|
||
@SuppressWarnings("GuardedBy") | ||
@Override | ||
public final void shutdown() { | ||
scwhittle marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// Don't lock on "this" before poisoning the request observer as allow IO to block shutdown. | ||
// Don't lock on "this" before poisoning the request observer since otherwise the observer may | ||
// be blocking in send(). | ||
requestObserver.poison(); | ||
isShutdown = true; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should remove this (and the suppress) shouldn't the poison prevent the blocking beneath the mutex? and then the below lock will be acquired soon? Setting it to true outside the mutex will break invariants that are easier to think about if it is strictly guarded by. (and it breaks logic below we'll never run shutdownInternal) if we do need it for something it seems like we could have a separate volatile shutdownRequested boolean. But I'd prefer to figure out what gets stuck with the current code and fix it instead because it is confusing to have two. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. cleaneed up was supposed to stay within the sync block |
||
synchronized (this) { | ||
if (!isShutdown) { | ||
isShutdown = true; | ||
|
@@ -332,6 +338,18 @@ public final void shutdown() { | |
|
||
protected abstract void shutdownInternal(); | ||
|
||
/** Returns true if the stream was torn down and should not be restarted internally. */ | ||
private synchronized boolean maybeTeardownStream() { | ||
if (isShutdown || (clientClosed && !hasPendingRequests())) { | ||
streamRegistry.remove(AbstractWindmillStream.this); | ||
finishLatch.countDown(); | ||
executor.shutdownNow(); | ||
return true; | ||
} | ||
|
||
return false; | ||
} | ||
|
||
private class ResponseObserver implements StreamObserver<ResponseT> { | ||
|
||
@Override | ||
|
@@ -351,7 +369,13 @@ public void onError(Throwable t) { | |
return; | ||
} | ||
|
||
recordStreamStatus(Status.fromThrowable(t)); | ||
Status errorStatus = Status.fromThrowable(t); | ||
recordStreamStatus(errorStatus); | ||
|
||
// If the stream was stopped due to a resource exhausted error then we are throttled. | ||
if (errorStatus.getCode() == Status.Code.RESOURCE_EXHAUSTED) { | ||
startThrottleTimer(); | ||
} | ||
|
||
try { | ||
long sleep = backoff.nextBackOffMillis(); | ||
|
@@ -411,25 +435,6 @@ private void recordStreamStatus(Status status) { | |
.responseDebugString(nowMillis) | ||
.orElse(NEVER_RECEIVED_RESPONSE_LOG_STRING)); | ||
} | ||
|
||
// If the stream was stopped due to a resource exhausted error then we are throttled. | ||
if (status.getCode() == Status.Code.RESOURCE_EXHAUSTED) { | ||
startThrottleTimer(); | ||
} | ||
} | ||
} | ||
|
||
/** Returns true if the stream was torn down and should not be restarted internally. */ | ||
private boolean maybeTeardownStream() { | ||
synchronized (AbstractWindmillStream.this) { | ||
if (isShutdown || (clientClosed && !hasPendingRequests())) { | ||
streamRegistry.remove(AbstractWindmillStream.this); | ||
finishLatch.countDown(); | ||
executor.shutdownNow(); | ||
return true; | ||
} | ||
|
||
return false; | ||
} | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,7 @@ | |
import java.io.InputStream; | ||
import java.io.SequenceInputStream; | ||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.Collection; | ||
import java.util.Collections; | ||
import java.util.HashMap; | ||
|
@@ -472,7 +473,8 @@ private void flushResponse() { | |
responseObserver.onNext(responseBuilder.build()); | ||
} catch (Exception e) { | ||
// Stream is already closed. | ||
System.out.println("trieu: " + e); | ||
LOG.warn("trieu: ", e); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. rm debug logs There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. removed this waas for debugging |
||
LOG.warn(Arrays.toString(e.getStackTrace())); | ||
} | ||
responseBuilder.clear(); | ||
} | ||
|
@@ -512,7 +514,9 @@ private void flushResponse() { | |
done.countDown(); | ||
}); | ||
} | ||
done.await(); | ||
while (done.await(5, TimeUnit.SECONDS)) { | ||
LOG.info("trieu: {}", done.getCount()); | ||
} | ||
stream.halfClose(); | ||
assertTrue(stream.awaitTermination(60, TimeUnit.SECONDS)); | ||
executor.shutdown(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove suppression