Skip to content

Commit

Permalink
move heartbeat processor to where it is being used (#31298)
Browse files Browse the repository at this point in the history
  • Loading branch information
m-trieu authored Jun 28, 2024
1 parent 2cc2b8e commit 6ec1fb2
Show file tree
Hide file tree
Showing 8 changed files with 210 additions and 179 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.google.api.services.dataflow.model.CounterUpdate;
import com.google.api.services.dataflow.model.MapTask;
import com.google.auto.value.AutoValue;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand All @@ -38,7 +39,6 @@
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair;
import org.apache.beam.runners.core.metrics.MetricsLogger;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
Expand Down Expand Up @@ -103,7 +103,6 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
Expand All @@ -121,11 +120,6 @@ public class StreamingDataflowWorker {
MetricName.named(
"org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl",
"throttling-msecs");
// Maximum number of threads for processing. Currently each thread processes one key at a time.
static final int MAX_PROCESSING_THREADS = 300;
static final long THREAD_EXPIRATION_TIME_SEC = 60;
static final int GET_WORK_STREAM_TIMEOUT_MINUTES = 3;
static final Duration COMMIT_STREAM_TIMEOUT = Duration.standardMinutes(1);

/**
* Sinks are marked 'full' in {@link StreamingModeExecutionContext} once the amount of data sinked
Expand All @@ -135,13 +129,20 @@ public class StreamingDataflowWorker {
*/
public static final int MAX_SINK_BYTES = 10_000_000;

// Maximum number of threads for processing. Currently, each thread processes one key at a time.
static final int MAX_PROCESSING_THREADS = 300;
static final long THREAD_EXPIRATION_TIME_SEC = 60;
static final int GET_WORK_STREAM_TIMEOUT_MINUTES = 3;
static final Duration COMMIT_STREAM_TIMEOUT = Duration.standardMinutes(1);
private static final Logger LOG = LoggerFactory.getLogger(StreamingDataflowWorker.class);

/** The idGenerator to generate unique id globally. */
private static final IdGenerator ID_GENERATOR = IdGenerators.decrementingLongs();

private static final int DEFAULT_STATUS_PORT = 8081;
// Maximum size of the result of a GetWork request.
private static final long MAX_GET_WORK_FETCH_BYTES = 64L << 20; // 64m

/** Maximum number of failure stacktraces to report in each update sent to backend. */
private static final int MAX_FAILURES_TO_REPORT_IN_UPDATE = 1000;

Expand Down Expand Up @@ -328,39 +329,27 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
threadName ->
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat(threadName).build());
GrpcWindmillStreamFactory windmillStreamFactory =
createWindmillStreamFactory(options, clientId);
GrpcDispatcherClient dispatcherClient = GrpcDispatcherClient.create(createStubFactory(options));

// If ComputationConfig.Fetcher is the Streaming Appliance implementation, WindmillServerStub
// can be created without a heartbeat response processor, as appliance does not send heartbeats.
Pair<ComputationConfig.Fetcher, Optional<WindmillServerStub>> configFetcherAndWindmillClient =
createConfigFetcherAndWindmillClient(
options,
dataflowServiceClient,
dispatcherClient,
maxWorkItemCommitBytes,
windmillStreamFactory);
GrpcWindmillStreamFactory.Builder windmillStreamFactoryBuilder =
createGrpcwindmillStreamFactoryBuilder(options, clientId);

ConfigFetcherComputationStateCacheAndWindmillClient
configFetcherComputationStateCacheAndWindmillClient =
createConfigFetcherComputationStateCacheAndWindmillClient(
options,
dataflowServiceClient,
maxWorkItemCommitBytes,
windmillStreamFactoryBuilder,
configFetcher ->
ComputationStateCache.create(
configFetcher,
workExecutor,
windmillStateCache::forComputation,
ID_GENERATOR));

ComputationStateCache computationStateCache =
ComputationStateCache.create(
configFetcherAndWindmillClient.getLeft(),
workExecutor,
windmillStateCache::forComputation,
ID_GENERATOR);

// If WindmillServerStub is not present, it is a Streaming Engine job. We now have all the
// components created to initialize the GrpcWindmillServer.
configFetcherComputationStateCacheAndWindmillClient.computationStateCache();
WindmillServerStub windmillServer =
configFetcherAndWindmillClient
.getRight()
.orElseGet(
() ->
GrpcWindmillServer.create(
options,
windmillStreamFactory,
dispatcherClient,
new WorkHeartbeatResponseProcessor(computationStateCache::get)));
configFetcherComputationStateCacheAndWindmillClient.windmillServer();

FailureTracker failureTracker =
options.isEnableStreamingEngine()
Expand Down Expand Up @@ -393,7 +382,7 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
return new StreamingDataflowWorker(
windmillServer,
clientId,
configFetcherAndWindmillClient.getLeft(),
configFetcherComputationStateCacheAndWindmillClient.configFetcher(),
computationStateCache,
windmillStateCache,
workExecutor,
Expand All @@ -407,20 +396,29 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
streamingCounters,
memoryMonitor,
maxWorkItemCommitBytes,
windmillStreamFactory,
configFetcherComputationStateCacheAndWindmillClient.windmillStreamFactory(),
executorSupplier,
stageInfo);
}

private static Pair<ComputationConfig.Fetcher, Optional<WindmillServerStub>>
createConfigFetcherAndWindmillClient(
/**
* {@link ComputationConfig.Fetcher}, {@link ComputationStateCache}, and {@link
* WindmillServerStub} are constructed in different orders due to cyclic dependencies depending on
* the underlying implementation. This method simplifies creating them and returns an object with
* all of these dependencies initialized.
*/
private static ConfigFetcherComputationStateCacheAndWindmillClient
createConfigFetcherComputationStateCacheAndWindmillClient(
DataflowWorkerHarnessOptions options,
WorkUnitClient dataflowServiceClient,
GrpcDispatcherClient dispatcherClient,
AtomicInteger maxWorkItemCommitBytes,
GrpcWindmillStreamFactory windmillStreamFactory) {
GrpcWindmillStreamFactory.Builder windmillStreamFactoryBuilder,
Function<ComputationConfig.Fetcher, ComputationStateCache> computationStateCacheFactory) {
ComputationConfig.Fetcher configFetcher;
@Nullable WindmillServerStub windmillServer = null;
WindmillServerStub windmillServer;
ComputationStateCache computationStateCache;
GrpcDispatcherClient dispatcherClient = GrpcDispatcherClient.create(createStubFactory(options));
GrpcWindmillStreamFactory windmillStreamFactory;
if (options.isEnableStreamingEngine()) {
configFetcher =
StreamingEngineComputationConfigFetcher.create(
Expand All @@ -431,13 +429,36 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
config,
dispatcherClient::consumeWindmillDispatcherEndpoints,
maxWorkItemCommitBytes));
computationStateCache = computationStateCacheFactory.apply(configFetcher);
windmillStreamFactory =
windmillStreamFactoryBuilder
.setProcessHeartbeatResponses(
new WorkHeartbeatResponseProcessor(computationStateCache::get))
.setHealthCheckIntervalMillis(
options.getWindmillServiceStreamingRpcHealthCheckPeriodMs())
.build();
windmillServer = GrpcWindmillServer.create(options, windmillStreamFactory, dispatcherClient);
} else {
windmillServer =
createWindmillServerStub(options, windmillStreamFactory, dispatcherClient, ignored -> {});
if (options.getWindmillServiceEndpoint() != null
|| options.getLocalWindmillHostport().startsWith("grpc:")) {
windmillStreamFactory =
windmillStreamFactoryBuilder
.setHealthCheckIntervalMillis(
options.getWindmillServiceStreamingRpcHealthCheckPeriodMs())
.build();
windmillServer =
GrpcWindmillServer.create(options, windmillStreamFactory, dispatcherClient);
} else {
windmillStreamFactory = windmillStreamFactoryBuilder.build();
windmillServer = new JniWindmillApplianceServer(options.getLocalWindmillHostport());
}

configFetcher = new StreamingApplianceComputationConfigFetcher(windmillServer::getConfig);
computationStateCache = computationStateCacheFactory.apply(configFetcher);
}

return Pair.of(configFetcher, Optional.ofNullable(windmillServer));
return ConfigFetcherComputationStateCacheAndWindmillClient.create(
configFetcher, computationStateCache, windmillServer, windmillStreamFactory);
}

@VisibleForTesting
Expand Down Expand Up @@ -516,6 +537,11 @@ static StreamingDataflowWorker forTesting(
options.getWindmillHarnessUpdateReportingPeriod().getMillis(),
options.getPerWorkerMetricsUpdateReportingPeriodMillis());

GrpcWindmillStreamFactory.Builder windmillStreamFactory =
createGrpcwindmillStreamFactoryBuilder(options, 1)
.setProcessHeartbeatResponses(
new WorkHeartbeatResponseProcessor(computationStateCache::get));

return new StreamingDataflowWorker(
windmillServer,
1L,
Expand All @@ -533,7 +559,12 @@ static StreamingDataflowWorker forTesting(
streamingCounters,
memoryMonitor,
maxWorkItemCommitBytes,
createWindmillStreamFactory(options, 1),
options.isEnableStreamingEngine()
? windmillStreamFactory
.setHealthCheckIntervalMillis(
options.getWindmillServiceStreamingRpcHealthCheckPeriodMs())
.build()
: windmillStreamFactory.build(),
executorSupplier,
stageInfo);
}
Expand All @@ -552,7 +583,7 @@ private static void onPipelineConfig(
}
}

private static GrpcWindmillStreamFactory createWindmillStreamFactory(
private static GrpcWindmillStreamFactory.Builder createGrpcwindmillStreamFactoryBuilder(
DataflowWorkerHarnessOptions options, long clientId) {
Duration maxBackoff =
!options.isEnableStreamingEngine() && options.getLocalWindmillHostport() != null
Expand All @@ -569,7 +600,10 @@ private static GrpcWindmillStreamFactory createWindmillStreamFactory(
.setMaxBackOffSupplier(() -> maxBackoff)
.setLogEveryNStreamFailures(options.getWindmillServiceStreamingLogEveryNStreamFailures())
.setStreamingRpcBatchLimit(options.getWindmillServiceStreamingRpcBatchLimit())
.build();
.setSendKeyedGetDataRequests(
!options.isEnableStreamingEngine()
|| !DataflowRunner.hasExperiment(
options, "streaming_engine_send_new_heartbeat_requests"));
}

private static BoundedQueueExecutor createWorkUnitExecutor(DataflowWorkerHarnessOptions options) {
Expand Down Expand Up @@ -619,23 +653,6 @@ public static void main(String[] args) throws Exception {
worker.start();
}

private static WindmillServerStub createWindmillServerStub(
DataflowWorkerHarnessOptions options,
GrpcWindmillStreamFactory windmillStreamFactory,
GrpcDispatcherClient dispatcherClient,
Consumer<List<Windmill.ComputationHeartbeatResponse>> processHeartbeatResponses) {
if (options.getWindmillServiceEndpoint() != null
|| options.isEnableStreamingEngine()
|| options.getLocalWindmillHostport().startsWith("grpc:")) {
windmillStreamFactory.scheduleHealthChecks(
options.getWindmillServiceStreamingRpcHealthCheckPeriodMs());
return GrpcWindmillServer.create(
options, windmillStreamFactory, dispatcherClient, processHeartbeatResponses);
} else {
return new JniWindmillApplianceServer(options.getLocalWindmillHostport());
}
}

private static ChannelCachingStubFactory createStubFactory(
DataflowWorkerHarnessOptions workerOptions) {
Function<WindmillServiceAddress, ManagedChannel> channelFactory =
Expand Down Expand Up @@ -895,4 +912,25 @@ public Iterable<CounterUpdate> buildCounters() {
.pendingCumulativeCounters()
.extractUpdates(false, DataflowCounterUpdateExtractor.INSTANCE));
}

@AutoValue
abstract static class ConfigFetcherComputationStateCacheAndWindmillClient {

private static ConfigFetcherComputationStateCacheAndWindmillClient create(
ComputationConfig.Fetcher configFetcher,
ComputationStateCache computationStateCache,
WindmillServerStub windmillServer,
GrpcWindmillStreamFactory windmillStreamFactory) {
return new AutoValue_StreamingDataflowWorker_ConfigFetcherComputationStateCacheAndWindmillClient(
configFetcher, computationStateCache, windmillServer, windmillStreamFactory);
}

abstract ComputationConfig.Fetcher configFetcher();

abstract ComputationStateCache computationStateCache();

abstract WindmillServerStub windmillServer();

abstract GrpcWindmillStreamFactory windmillStreamFactory();
}
}
Loading

0 comments on commit 6ec1fb2

Please sign in to comment.