Skip to content

Commit

Permalink
Remove experiments guarding isolated channels enablement based on job…
Browse files Browse the repository at this point in the history
…settings (#32782)
  • Loading branch information
arunpandianp authored Oct 22, 2024
1 parent 3767eda commit 8d22fc2
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,6 @@ public final class StreamingDataflowWorker {
private static final int DEFAULT_STATUS_PORT = 8081;
private static final Random CLIENT_ID_GENERATOR = new Random();
private static final String CHANNELZ_PATH = "/channelz";
public static final String STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL =
"streaming_engine_use_job_settings_for_heartbeat_pool";

private final WindmillStateCache stateCache;
private final StreamingWorkerStatusPages statusPages;
Expand Down Expand Up @@ -249,10 +247,7 @@ private StreamingDataflowWorker(
GET_DATA_STREAM_TIMEOUT,
windmillServer::getDataStream);
getDataClient = new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool);
// Experiment gates the logic till backend changes are rollback safe
if (!DataflowRunner.hasExperiment(
options, STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL)
|| options.getUseSeparateWindmillHeartbeatStreams() != null) {
if (options.getUseSeparateWindmillHeartbeatStreams() != null) {
heartbeatSender =
StreamPoolHeartbeatSender.Create(
Boolean.TRUE.equals(options.getUseSeparateWindmillHeartbeatStreams())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig;
import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillMetadataServiceV1Alpha1Grpc;
Expand All @@ -53,8 +52,6 @@
public class GrpcDispatcherClient {

private static final Logger LOG = LoggerFactory.getLogger(GrpcDispatcherClient.class);
static final String STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_ISOLATED_CHANNELS =
"streaming_engine_use_job_settings_for_isolated_channels";
private final CountDownLatch onInitializedEndpoints;

/**
Expand All @@ -80,18 +77,12 @@ private GrpcDispatcherClient(
DispatcherStubs initialDispatcherStubs,
Random rand) {
this.windmillStubFactoryFactory = windmillStubFactoryFactory;
if (DataflowRunner.hasExperiment(
options, STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_ISOLATED_CHANNELS)) {
if (options.getUseWindmillIsolatedChannels() != null) {
this.useIsolatedChannels.set(options.getUseWindmillIsolatedChannels());
this.reactToIsolatedChannelsJobSetting = false;
} else {
this.useIsolatedChannels.set(false);
this.reactToIsolatedChannelsJobSetting = true;
}
} else {
this.useIsolatedChannels.set(Boolean.TRUE.equals(options.getUseWindmillIsolatedChannels()));
if (options.getUseWindmillIsolatedChannels() != null) {
this.useIsolatedChannels.set(options.getUseWindmillIsolatedChannels());
this.reactToIsolatedChannelsJobSetting = false;
} else {
this.useIsolatedChannels.set(false);
this.reactToIsolatedChannelsJobSetting = true;
}
this.windmillStubFactory.set(
windmillStubFactoryFactory.makeWindmillStubFactory(useIsolatedChannels.get()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactoryImpl;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
import org.hamcrest.Matcher;
import org.junit.Test;
Expand All @@ -55,9 +54,6 @@ public static class RespectsJobSettingTest {
public void createsNewStubWhenIsolatedChannelsConfigIsChanged() {
DataflowWorkerHarnessOptions options =
PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class);
options.setExperiments(
Lists.newArrayList(
GrpcDispatcherClient.STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_ISOLATED_CHANNELS));
GrpcDispatcherClient dispatcherClient =
GrpcDispatcherClient.create(options, new WindmillStubFactoryFactoryImpl(options));
// Create first time with Isolated channels disabled
Expand Down Expand Up @@ -91,27 +87,18 @@ public static class RespectsPipelineOptionsTest {
public static Collection<Object[]> data() {
List<Object[]> list = new ArrayList<>();
for (Boolean pipelineOption : new Boolean[] {true, false}) {
list.add(new Object[] {/*experimentEnabled=*/ false, pipelineOption});
list.add(new Object[] {/*experimentEnabled=*/ true, pipelineOption});
list.add(new Object[] {pipelineOption});
}
return list;
}

@Parameter(0)
public Boolean experimentEnabled;

@Parameter(1)
public Boolean pipelineOption;

@Test
public void ignoresIsolatedChannelsConfigWithPipelineOption() {
DataflowWorkerHarnessOptions options =
PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class);
if (experimentEnabled) {
options.setExperiments(
Lists.newArrayList(
GrpcDispatcherClient.STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_ISOLATED_CHANNELS));
}
options.setUseWindmillIsolatedChannels(pipelineOption);
GrpcDispatcherClient dispatcherClient =
GrpcDispatcherClient.create(options, new WindmillStubFactoryFactoryImpl(options));
Expand Down

0 comments on commit 8d22fc2

Please sign in to comment.