Skip to content

Commit

Permalink
don't block on rendering status pages
Browse files Browse the repository at this point in the history
  • Loading branch information
m-trieu committed Oct 29, 2024
1 parent 77969d0 commit 5a68e0f
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -276,21 +276,38 @@ public final void maybeSendHealthCheck(Instant lastSendThreshold) {
* information. Blocking sends are made beneath this stream object's lock which could block
* status page rendering.
*/
@SuppressWarnings("GuardedBy")
public final void appendSummaryHtml(PrintWriter writer) {
appendSpecificHtml(writer);
debugMetrics.printRestartsHtml(writer);
StreamDebugMetrics.Snapshot summaryMetrics = debugMetrics.getSummaryMetrics();
summaryMetrics
.restartMetrics()
.ifPresent(
metrics ->
writer.format(
", %d restarts, last restart reason [ %s ] at [%s], %d errors",
metrics.restartCount(),
metrics.lastRestartReason(),
metrics.lastRestartTime(),
metrics.errorCount()));

if (clientClosed) {
writer.write(", client closed");
}
long nowMs = Instant.now().getMillis();
long sleepLeft = debugMetrics.sleepLeft();
if (sleepLeft > 0) {
writer.format(", %dms backoff remaining", sleepLeft);

if (summaryMetrics.sleepLeft() > 0) {
writer.format(", %dms backoff remaining", summaryMetrics.sleepLeft());
}
debugMetrics.printSummaryHtml(writer, nowMs);

writer.format(
", closed: %s, " + "isShutdown: %s, shutdown time: %s",
streamClosed, isShutdown, debugMetrics.shutdownTime());
", current stream is %dms old, last send %dms, last response %dms, closed: %s, "
+ "isShutdown: %s, shutdown time: %s",
summaryMetrics.streamAge(),
summaryMetrics.timeSinceLastSend(),
summaryMetrics.timeSinceLastResponse(),
streamClosed,
isShutdown,
summaryMetrics.shutdownTime().orElse(null));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
*/
package org.apache.beam.runners.dataflow.worker.windmill.client;

import java.io.PrintWriter;
import com.google.auto.value.AutoValue;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.joda.time.DateTime;
Expand Down Expand Up @@ -89,10 +91,6 @@ synchronized void recordSleep(long sleepMs) {
sleepUntil = nowMs() + sleepMs;
}

synchronized long sleepLeft() {
return sleepUntil - nowMs();
}

int incrementAndGetRestarts() {
return restartCount.incrementAndGet();
}
Expand All @@ -111,25 +109,74 @@ synchronized String responseDebugString(long nowMillis) {
: "received response " + (nowMillis - lastResponseTimeMs) + "ms ago";
}

void printRestartsHtml(PrintWriter writer) {
private Optional<RestartMetrics> getRestartMetrics() {
if (restartCount.get() > 0) {
synchronized (this) {
writer.format(
", %d restarts, last restart reason [ %s ] at [%s], %d errors",
restartCount.get(), lastRestartReason, lastRestartTime, errorCount.get());
return Optional.of(
RestartMetrics.create(
restartCount.get(), lastRestartReason, lastRestartTime, errorCount.get()));
}
}
}

synchronized DateTime shutdownTime() {
return shutdownTime;
return Optional.empty();
}

synchronized void printSummaryHtml(PrintWriter writer, long nowMs) {
writer.format(
", current stream is %dms old, last send %dms, last response %dms",
synchronized Snapshot getSummaryMetrics() {
long nowMs = Instant.now().getMillis();
return Snapshot.create(
debugDuration(nowMs, startTimeMs),
debugDuration(nowMs, lastSendTimeMs),
debugDuration(nowMs, lastResponseTimeMs));
debugDuration(nowMs, lastResponseTimeMs),
getRestartMetrics(),
sleepUntil - nowMs(),
shutdownTime);
}

@AutoValue
abstract static class Snapshot {
private static Snapshot create(
long streamAge,
long timeSinceLastSend,
long timeSinceLastResponse,
Optional<RestartMetrics> restartMetrics,
long sleepLeft,
@Nullable DateTime shutdownTime) {
return new AutoValue_StreamDebugMetrics_Snapshot(
streamAge,
timeSinceLastSend,
timeSinceLastResponse,
restartMetrics,
sleepLeft,
Optional.ofNullable(shutdownTime));
}

abstract long streamAge();

abstract long timeSinceLastSend();

abstract long timeSinceLastResponse();

abstract Optional<RestartMetrics> restartMetrics();

abstract long sleepLeft();

abstract Optional<DateTime> shutdownTime();
}

@AutoValue
abstract static class RestartMetrics {
private static RestartMetrics create(
int restartCount, String restartReason, DateTime lastRestartTime, int errorCount) {
return new AutoValue_StreamDebugMetrics_RestartMetrics(
restartCount, restartReason, lastRestartTime, errorCount);
}

abstract int restartCount();

abstract String lastRestartReason();

abstract DateTime lastRestartTime();

abstract int errorCount();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,36 @@
package org.apache.beam.runners.dataflow.worker.windmill.client;

import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;

import java.util.ArrayList;
import java.util.List;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class ResettableStreamObserverTest {
private final StreamObserver<Integer> delegate =
spy(
new StreamObserver<Integer>() {
@Override
public void onNext(Integer integer) {}
private final StreamObserver<Integer> delegate = newDelegate();

@Override
public void onError(Throwable throwable) {}
private static StreamObserver<Integer> newDelegate() {
return spy(
new StreamObserver<Integer>() {
@Override
public void onNext(Integer integer) {}

@Override
public void onCompleted() {}
});
@Override
public void onError(Throwable throwable) {}

@Override
public void onCompleted() {}
});
}

@Test
public void testPoison_beforeDelegateSet() {
Expand All @@ -66,14 +72,14 @@ public void testReset_afterPoisonedThrows() {
}

@Test
public void onNext_afterPoisonedThrows() {
public void testOnNext_afterPoisonedThrows() {
ResettableStreamObserver<Integer> observer = new ResettableStreamObserver<>(() -> delegate);
observer.poison();
assertThrows(WindmillStreamShutdownException.class, () -> observer.onNext(1));
}

@Test
public void onError_afterPoisonedThrows() {
public void testOnError_afterPoisonedThrows() {
ResettableStreamObserver<Integer> observer = new ResettableStreamObserver<>(() -> delegate);
observer.poison();
assertThrows(
Expand All @@ -82,9 +88,31 @@ public void onError_afterPoisonedThrows() {
}

@Test
public void onCompleted_afterPoisonedThrows() {
public void testOnCompleted_afterPoisonedThrows() {
ResettableStreamObserver<Integer> observer = new ResettableStreamObserver<>(() -> delegate);
observer.poison();
assertThrows(WindmillStreamShutdownException.class, observer::onCompleted);
}

@Test
public void testReset_usesNewDelegate() {
List<StreamObserver<Integer>> delegates = new ArrayList<>();
ResettableStreamObserver<Integer> observer =
new ResettableStreamObserver<>(
() -> {
StreamObserver<Integer> delegate = newDelegate();
delegates.add(delegate);
return delegate;
});
observer.reset();
observer.onNext(1);
observer.reset();
observer.onNext(2);

StreamObserver<Integer> firstObserver = delegates.get(0);
StreamObserver<Integer> secondObserver = delegates.get(1);

verify(firstObserver).onNext(eq(1));
verify(secondObserver).onNext(eq(2));
}
}

0 comments on commit 5a68e0f

Please sign in to comment.