Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add shutdown and start mechanics to windmill streams #32774

Open
wants to merge 22 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Status;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.joda.time.DateTime;
import org.joda.time.Instant;
import org.slf4j.Logger;

Expand Down Expand Up @@ -71,6 +72,7 @@ public abstract class AbstractWindmillStream<RequestT, ResponseT> implements Win
// shutdown.
private static final Status OK_STATUS = Status.fromCode(Status.Code.OK);
private static final String NEVER_RECEIVED_RESPONSE_LOG_STRING = "never received response";
private static final String NOT_SHUTDOWN = "not shutdown";
protected final Sleeper sleeper;

private final Logger logger;
Expand Down Expand Up @@ -262,7 +264,7 @@ public final void appendSummaryHtml(PrintWriter writer) {
", %d restarts, last restart reason [ %s ] at [%s], %d errors",
metrics.restartCount(),
metrics.lastRestartReason(),
metrics.lastRestartTime(),
metrics.lastRestartTime().orElse(null),
metrics.errorCount()));

if (summaryMetrics.isClientClosed()) {
Expand All @@ -275,13 +277,12 @@ public final void appendSummaryHtml(PrintWriter writer) {

writer.format(
", current stream is %dms old, last send %dms, last response %dms, closed: %s, "
+ "isShutdown: %s, shutdown time: %s",
+ "shutdown time: %s",
summaryMetrics.streamAge(),
summaryMetrics.timeSinceLastSend(),
summaryMetrics.timeSinceLastResponse(),
requestObserver.isClosed(),
summaryMetrics.shutdownTime().isPresent(),
summaryMetrics.shutdownTime().orElse(null));
summaryMetrics.shutdownTime().map(DateTime::toString).orElse(NOT_SHUTDOWN));
}

/**
Expand All @@ -297,8 +298,10 @@ public final synchronized void halfClose() {
clientClosed = true;
try {
requestObserver.onCompleted();
} catch (StreamClosedException | WindmillStreamShutdownException e) {
logger.warn("Stream was previously closed or shutdown.");
} catch (StreamClosedException e) {
logger.warn("Stream was previously closed.");
} catch (WindmillStreamShutdownException e) {
logger.warn("Stream was previously shutdown.");
}
}

Expand All @@ -317,10 +320,13 @@ public String backendWorkerToken() {
return backendWorkerToken;
}

@SuppressWarnings("GuardedBy")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove suppression

@Override
public final void shutdown() {
scwhittle marked this conversation as resolved.
Show resolved Hide resolved
// Don't lock on "this" before poisoning the request observer as allow IO to block shutdown.
// Don't lock on "this" before poisoning the request observer since otherwise the observer may
// be blocking in send().
requestObserver.poison();
isShutdown = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should remove this (and the suppress)

shouldn't the poison prevent the blocking beneath the mutex? and then the below lock will be acquired soon?

Setting it to true outside the mutex will break invariants that are easier to think about if it is strictly guarded by. (and it breaks logic below we'll never run shutdownInternal)

if we do need it for something it seems like we could have a separate volatile shutdownRequested boolean. But I'd prefer to figure out what gets stuck with the current code and fix it instead because it is confusing to have two.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cleaneed up was supposed to stay within the sync block

synchronized (this) {
if (!isShutdown) {
isShutdown = true;
Expand All @@ -332,6 +338,18 @@ public final void shutdown() {

protected abstract void shutdownInternal();

/** Returns true if the stream was torn down and should not be restarted internally. */
private synchronized boolean maybeTeardownStream() {
if (isShutdown || (clientClosed && !hasPendingRequests())) {
streamRegistry.remove(AbstractWindmillStream.this);
finishLatch.countDown();
executor.shutdownNow();
return true;
}

return false;
}

private class ResponseObserver implements StreamObserver<ResponseT> {

@Override
Expand All @@ -351,7 +369,13 @@ public void onError(Throwable t) {
return;
}

recordStreamStatus(Status.fromThrowable(t));
Status errorStatus = Status.fromThrowable(t);
recordStreamStatus(errorStatus);

// If the stream was stopped due to a resource exhausted error then we are throttled.
if (errorStatus.getCode() == Status.Code.RESOURCE_EXHAUSTED) {
startThrottleTimer();
}

try {
long sleep = backoff.nextBackOffMillis();
Expand Down Expand Up @@ -411,25 +435,6 @@ private void recordStreamStatus(Status status) {
.responseDebugString(nowMillis)
.orElse(NEVER_RECEIVED_RESPONSE_LOG_STRING));
}

// If the stream was stopped due to a resource exhausted error then we are throttled.
if (status.getCode() == Status.Code.RESOURCE_EXHAUSTED) {
startThrottleTimer();
}
}
}

/** Returns true if the stream was torn down and should not be restarted internally. */
private boolean maybeTeardownStream() {
synchronized (AbstractWindmillStream.this) {
if (isShutdown || (clientClosed && !hasPendingRequests())) {
streamRegistry.remove(AbstractWindmillStream.this);
finishLatch.countDown();
executor.shutdownNow();
return true;
}

return false;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ final class ResettableThrowingStreamObserver<T> {
* StreamObserver.
*/
@GuardedBy("this")
private boolean isCurrentStreamClosed = false;
private boolean isCurrentStreamClosed = true;

ResettableThrowingStreamObserver(
Supplier<TerminatingStreamObserver<T>> streamObserverFactory, Logger logger) {
Expand All @@ -72,12 +72,10 @@ private synchronized StreamObserver<T> delegate()

if (isCurrentStreamClosed) {
throw new StreamClosedException(
"Current stream is closed, requires reset for future stream operations.");
"Current stream is closed, requires reset() for future stream operations.");
}

return Preconditions.checkNotNull(
delegateStreamObserver,
"requestObserver cannot be null. Missing a call to startStream() to initialize.");
return Preconditions.checkNotNull(delegateStreamObserver, "requestObserver cannot be null.");
}

/** Creates a new delegate to use for future {@link StreamObserver} methods. */
Expand Down Expand Up @@ -131,9 +129,10 @@ public void onNext(T t) throws StreamClosedException, WindmillStreamShutdownExce
}
}

public void onError(Throwable throwable)
public synchronized void onError(Throwable throwable)
throws StreamClosedException, WindmillStreamShutdownException {
delegate().onError(throwable);
isCurrentStreamClosed = true;
}

public synchronized void onCompleted()
scwhittle marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ final class StreamDebugMetrics {
private String lastRestartReason = "";

@GuardedBy("this")
private DateTime lastRestartTime = null;
private @Nullable DateTime lastRestartTime = null;

@GuardedBy("this")
private long lastResponseTimeMs = 0;
Expand All @@ -57,7 +57,7 @@ final class StreamDebugMetrics {
private long startTimeMs = 0;

@GuardedBy("this")
private DateTime shutdownTime = null;
private @Nullable DateTime shutdownTime = null;

@GuardedBy("this")
private boolean clientClosed = false;
Expand Down Expand Up @@ -194,16 +194,19 @@ private static Snapshot create(
@AutoValue
abstract static class RestartMetrics {
private static RestartMetrics create(
int restartCount, String restartReason, DateTime lastRestartTime, int errorCount) {
int restartCount,
String restartReason,
@Nullable DateTime lastRestartTime,
int errorCount) {
return new AutoValue_StreamDebugMetrics_RestartMetrics(
restartCount, restartReason, lastRestartTime, errorCount);
restartCount, restartReason, Optional.ofNullable(lastRestartTime), errorCount);
}

abstract int restartCount();

abstract String lastRestartReason();

abstract DateTime lastRestartTime();
abstract Optional<DateTime> lastRestartTime();

abstract int errorCount();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.beam.runners.dataflow.worker.windmill.client;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,8 @@ protected synchronized void onNewStream() throws WindmillStreamShutdownException
StreamingGetWorkRequest request =
StreamingGetWorkRequest.newBuilder()
.setRequest(
requestHeader.toBuilder()
requestHeader
.toBuilder()
.setMaxItems(initialGetWorkBudget.items())
.setMaxBytes(initialGetWorkBudget.bytes())
.build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,7 @@ final class GrpcGetDataStream
private static final StreamingGetDataRequest HEALTH_CHECK_REQUEST =
StreamingGetDataRequest.newBuilder().build();

/**
* @implNote {@link QueuedBatch} objects in the queue are is guarded by {@code this}
*/
/** @implNote {@link QueuedBatch} objects in the queue are is guarded by {@code this} */
private final Deque<QueuedBatch> batches;
scwhittle marked this conversation as resolved.
Show resolved Hide resolved

private final Map<Long, AppendableInputStream> pending;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ public void onNext(T value) throws StreamObserverCancelledException {
"Output channel stalled for {}s, outbound thread {}.",
totalSecondsWaited,
Thread.currentThread().getName());
Thread.dumpStack();
scwhittle marked this conversation as resolved.
Show resolved Hide resolved
}

waitSeconds = waitSeconds * 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,9 @@ protected boolean hasPendingRequests() {
@Override
protected void startThrottleTimer() {}

public void testSend(Integer i) throws ResettableThrowingStreamObserver.StreamClosedException, WindmillStreamShutdownException {
public void testSend(Integer i)
throws ResettableThrowingStreamObserver.StreamClosedException,
WindmillStreamShutdownException {
trySend(i);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ public void testOnCompleted_afterPoisonedThrows() {

@Test
public void testReset_usesNewDelegate()
throws WindmillStreamShutdownException, ResettableThrowingStreamObserver.StreamClosedException {
throws WindmillStreamShutdownException,
ResettableThrowingStreamObserver.StreamClosedException {
List<StreamObserver<Integer>> delegates = new ArrayList<>();
ResettableThrowingStreamObserver<Integer> observer =
newStreamObserver(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,9 @@ public void testSummaryMetrics_withRestarts() {
assertThat(restartMetrics.lastRestartReason()).isEqualTo(restartReason);
assertThat(restartMetrics.restartCount()).isEqualTo(1);
assertThat(restartMetrics.errorCount()).isEqualTo(1);
assertThat(restartMetrics.lastRestartTime()).isLessThan(DateTime.now());
assertThat(restartMetrics.lastRestartTime().toInstant()).isGreaterThan(Instant.EPOCH);
assertTrue(restartMetrics.lastRestartTime().isPresent());
assertThat(restartMetrics.lastRestartTime().get()).isLessThan(DateTime.now());
assertThat(restartMetrics.lastRestartTime().get().toInstant()).isGreaterThan(Instant.EPOCH);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -472,7 +473,8 @@ private void flushResponse() {
responseObserver.onNext(responseBuilder.build());
} catch (Exception e) {
// Stream is already closed.
System.out.println("trieu: " + e);
LOG.warn("trieu: ", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rm debug logs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed this waas for debugging

LOG.warn(Arrays.toString(e.getStackTrace()));
}
responseBuilder.clear();
}
Expand Down Expand Up @@ -512,7 +514,9 @@ private void flushResponse() {
done.countDown();
});
}
done.await();
while (done.await(5, TimeUnit.SECONDS)) {
LOG.info("trieu: {}", done.getCount());
}
stream.halfClose();
assertTrue(stream.awaitTermination(60, TimeUnit.SECONDS));
executor.shutdown();
Expand Down
Loading