Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-2140] Remove isMultiActiveSchedulerEnabled flag and its usages #4035

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,14 @@ public class DagActionStoreChangeMonitorTest {
*/
class MockDagActionStoreChangeMonitor extends DagActionStoreChangeMonitor {

public MockDagActionStoreChangeMonitor(String topic, Config config, int numThreads,
boolean isMultiActiveSchedulerEnabled) {
this(topic, config, numThreads, isMultiActiveSchedulerEnabled, mock(DagManagementStateStore.class), mock(DagManager.class), mock(FlowCatalog.class), mock(Orchestrator.class));
public MockDagActionStoreChangeMonitor(String topic, Config config, int numThreads) {
this(topic, config, numThreads, mock(DagManagementStateStore.class), mock(DagManager.class), mock(FlowCatalog.class), mock(Orchestrator.class));
}

public MockDagActionStoreChangeMonitor(String topic, Config config, int numThreads, boolean isMultiActiveSchedulerEnabled,
public MockDagActionStoreChangeMonitor(String topic, Config config, int numThreads,
DagManagementStateStore dagManagementStateStore, DagManager dagManager, FlowCatalog flowCatalog, Orchestrator orchestrator) {
super(topic, config, dagManager, numThreads, flowCatalog, orchestrator,
dagManagementStateStore, isMultiActiveSchedulerEnabled, mock(DagProcessingEngineMetrics.class));
dagManagementStateStore, mock(DagProcessingEngineMetrics.class));
}

protected void processMessageForTest(DecodeableKafkaRecord record) {
Expand All @@ -108,7 +107,7 @@ MockDagActionStoreChangeMonitor createMockDagActionStoreChangeMonitor() {
.withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
.withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, ConfigValueFactory.fromAnyRef("/tmp/fakeStateStore"))
.withValue("zookeeper.connect", ConfigValueFactory.fromAnyRef("localhost:2121"));
return new MockDagActionStoreChangeMonitor("dummyTopic", config, 5, true);
return new MockDagActionStoreChangeMonitor("dummyTopic", config, 5);
}

// Called at start of every test so the count of each method being called is reset to 0
Expand Down Expand Up @@ -239,7 +238,7 @@ public void testStartupSequenceHandlesFailures() throws Exception {
// Throw an uncaught exception during startup sequence
when(mockFlowCatalog.getSpecs(any(URI.class))).thenThrow(new RuntimeException("Uncaught exception"));
mockDagActionStoreChangeMonitor = new MockDagActionStoreChangeMonitor("dummyTopic", monitorConfig, 5,
true, dagManagementStateStore, mockDagManager, mockFlowCatalog, mockOrchestrator);
dagManagementStateStore, mockDagManager, mockFlowCatalog, mockOrchestrator);
try {
mockDagActionStoreChangeMonitor.setActive();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ public class DagManagementDagActionStoreChangeMonitorTest {
*/
static class MockDagManagementDagActionStoreChangeMonitor extends DagManagementDagActionStoreChangeMonitor {

public MockDagManagementDagActionStoreChangeMonitor(Config config, int numThreads, boolean isMultiActiveSchedulerEnabled) {
public MockDagManagementDagActionStoreChangeMonitor(Config config, int numThreads) {
super(config, numThreads, mock(FlowCatalog.class), mock(Orchestrator.class), mock(DagManagementStateStore.class),
isMultiActiveSchedulerEnabled, mock(DagManagement.class), dagActionReminderScheduler,
mock(DagManagement.class), dagActionReminderScheduler,
mock(DagProcessingEngineMetrics.class));
}
protected void processMessageForTest(DecodeableKafkaRecord<String, DagActionStoreChangeEvent> record) {
Expand All @@ -89,7 +89,7 @@ MockDagManagementDagActionStoreChangeMonitor createMockDagManagementDagActionSto
.withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
.withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, ConfigValueFactory.fromAnyRef("/tmp/fakeStateStore"))
.withValue("zookeeper.connect", ConfigValueFactory.fromAnyRef("localhost:2121"));
return new MockDagManagementDagActionStoreChangeMonitor(config, 5, true);
return new MockDagManagementDagActionStoreChangeMonitor(config, 5);
}

// Called at start of every test so the count of each method being called is reset to 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@ public class GobblinServiceConfiguration {
@Getter
private final boolean isWarmStandbyEnabled;

@Getter
private final boolean isMultiActiveSchedulerEnabled;

@Getter
private final boolean isMultiActiveExecutionEnabled;

Expand Down Expand Up @@ -108,7 +105,6 @@ public GobblinServiceConfiguration(String serviceName, String serviceId, Config
}

this.isWarmStandbyEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_WARM_STANDBY_ENABLED_KEY, false);
this.isMultiActiveSchedulerEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_MULTI_ACTIVE_SCHEDULER_ENABLED_KEY, false);
this.isMultiActiveExecutionEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_MULTI_ACTIVE_EXECUTION_ENABLED, false);

this.isHelixManagerEnabled = config.hasPath(ServiceConfigKeys.ZK_CONNECTION_STRING_KEY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,6 @@ public void configure(Binder binder) {
binder.bindConstant()
.annotatedWith(Names.named(InjectionNames.WARM_STANDBY_ENABLED))
.to(serviceConfig.isWarmStandbyEnabled());
binder.bindConstant()
.annotatedWith(Names.named(InjectionNames.MULTI_ACTIVE_SCHEDULER_ENABLED))
.to(serviceConfig.isMultiActiveSchedulerEnabled());
binder.bindConstant()
.annotatedWith(Names.named(InjectionNames.DAG_PROC_ENGINE_ENABLED))
.to(serviceConfig.isDagProcessingEngineEnabled());
Expand Down Expand Up @@ -198,9 +195,6 @@ binding time (optionally bound classes cannot have names associated with them),
ConfigurationKeys.SCHEDULER_LEASE_ARBITER_NAME)).toProvider(
FlowLaunchMultiActiveLeaseArbiterFactory.class);
OptionalBinder.newOptionalBinder(binder, FlowLaunchHandler.class);
if (serviceConfig.isMultiActiveSchedulerEnabled()) {
binder.bind(FlowLaunchHandler.class);
}

OptionalBinder.newOptionalBinder(binder, DagManagement.class);
OptionalBinder.newOptionalBinder(binder, DagTaskStream.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,11 +324,6 @@ private void handleLeadershipChange(NotificationContext changeContext) {
LOGGER.info("Leader notification for {} HM.isLeader {}", this.helixManager.get().getInstanceName(),
this.helixManager.get().isLeader());

if (configuration.isSchedulerEnabled()) {
LOGGER.info("Gobblin Service is now running in master instance mode, enabling Scheduler.");
this.scheduler.setActive(true);
}

if (helixLeaderGauges.isPresent()) {
helixLeaderGauges.get().setState(LeaderState.MASTER);
}
Expand All @@ -354,12 +349,6 @@ private void handleLeadershipChange(NotificationContext changeContext) {
LOGGER.info("Leader lost notification for {} HM.isLeader {}", this.helixManager.get().getInstanceName(),
this.helixManager.get().isLeader());

if (configuration.isSchedulerEnabled() && !configuration.isMultiActiveSchedulerEnabled()) {
LOGGER.info("Gobblin Service is now running in non-leader mode without multi-active scheduler enabled, "
+ "disabling Scheduler.");
this.scheduler.setActive(false);
}

if (helixLeaderGauges.isPresent()) {
helixLeaderGauges.get().setState(LeaderState.SLAVE);
}
Expand Down Expand Up @@ -467,15 +456,13 @@ public void start() throws ApplicationException {
if (this.helixManager.isPresent()) {
// Subscribe to leadership changes
this.helixManager.get().addControllerListener((ControllerChangeListener) this::handleLeadershipChange);

if (configuration.isSchedulerEnabled()) {
LOGGER.info("[Init] Gobblin service is running in multi active mode, enabling Scheduler.");
this.scheduler.setActive(true);
}

// Update for first time since there might be no notification
if (helixManager.get().isLeader()) {
if (configuration.isSchedulerEnabled()) {
LOGGER.info("[Init] Gobblin Service is running in master instance mode, enabling Scheduler.");
this.scheduler.setActive(true);
}

if (configuration.isGitConfigMonitorEnabled()) {
this.gitConfigMonitor.setActive(true);
}
Expand All @@ -485,14 +472,6 @@ public void start() throws ApplicationException {
}

} else {
if (configuration.isSchedulerEnabled()) {
if (configuration.isMultiActiveSchedulerEnabled()) {
LOGGER.info("[Init] Gobblin Service enabling scheduler for non-leader since multi-active scheduler enabled");
this.scheduler.setActive(true);
} else {
LOGGER.info("[Init] Gobblin Service is running in non-leader instance mode, not enabling Scheduler.");
}
}
if (helixLeaderGauges.isPresent()) {
helixLeaderGauges.get().setState(LeaderState.SLAVE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ public String load(String key) throws Exception {
@VisibleForTesting
protected DagManager dagManager;
protected Orchestrator orchestrator;
protected boolean isMultiActiveSchedulerEnabled;
@Getter
@VisibleForTesting
protected FlowCatalog flowCatalog;
Expand All @@ -109,7 +108,7 @@ public String load(String key) throws Exception {
// client itself to determine all Kafka related information dynamically rather than through the config.
public DagActionStoreChangeMonitor(String topic, Config config, DagManager dagManager, int numThreads,
FlowCatalog flowCatalog, Orchestrator orchestrator, DagManagementStateStore dagManagementStateStore,
boolean isMultiActiveSchedulerEnabled, DagProcessingEngineMetrics dagProcEngineMetrics) {
DagProcessingEngineMetrics dagProcEngineMetrics) {
// Differentiate group id for each host
super(topic, config.withValue(GROUP_ID_KEY,
ConfigValueFactory.fromAnyRef(DAG_ACTION_CHANGE_MONITOR_PREFIX + UUID.randomUUID().toString())),
Expand All @@ -118,7 +117,6 @@ public DagActionStoreChangeMonitor(String topic, Config config, DagManager dagMa
this.flowCatalog = flowCatalog;
this.orchestrator = orchestrator;
this.dagManagementStateStore = dagManagementStateStore;
this.isMultiActiveSchedulerEnabled = isMultiActiveSchedulerEnabled;
this.dagProcEngineMetrics = dagProcEngineMetrics;

/*
Expand Down Expand Up @@ -282,11 +280,6 @@ protected void handleDagAction(DagActionStore.DagAction dagAction, boolean isSta
this.killsInvoked.mark();
} else if (dagAction.getDagActionType().equals(DagActionStore.DagActionType.LAUNCH)) {
// If multi-active scheduler is NOT turned on we should not receive these type of events
if (!this.isMultiActiveSchedulerEnabled) {
this.unexpectedErrors.mark();
throw new RuntimeException(String.format("Received LAUNCH dagAction while not in multi-active scheduler "
+ "mode for flowAction: %s", dagAction));
}
submitFlowToDagManagerHelper(dagAction, isStartup);
} else {
log.warn("Received unsupported dagAction {}. Expected to be a KILL, RESUME, or LAUNCH", dagAction.getDagActionType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@
import com.typesafe.config.Config;

import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Provider;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.runtime.util.InjectionNames;
import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
Expand All @@ -47,20 +45,17 @@ public class DagActionStoreChangeMonitorFactory implements Provider<DagActionSto
private FlowCatalog flowCatalog;
private Orchestrator orchestrator;
private DagManagementStateStore dagManagementStateStore;
private boolean isMultiActiveSchedulerEnabled;
private DagProcessingEngineMetrics dagProcEngineMetrics;

@Inject
public DagActionStoreChangeMonitorFactory(Config config, DagManager dagManager, FlowCatalog flowCatalog,
Orchestrator orchestrator, DagManagementStateStore dagManagementStateStore,
@Named(InjectionNames.MULTI_ACTIVE_SCHEDULER_ENABLED) boolean isMultiActiveSchedulerEnabled,
DagProcessingEngineMetrics dagProcEngineMetrics) {
this.config = Objects.requireNonNull(config);
this.dagManager = dagManager;
this.flowCatalog = flowCatalog;
this.orchestrator = orchestrator;
this.dagManagementStateStore = dagManagementStateStore;
this.isMultiActiveSchedulerEnabled = isMultiActiveSchedulerEnabled;
this.dagProcEngineMetrics = dagProcEngineMetrics;
}

Expand All @@ -72,7 +67,7 @@ private DagActionStoreChangeMonitor createDagActionStoreMonitor() {
int numThreads = ConfigUtils.getInt(dagActionStoreChangeConfig, DAG_ACTION_STORE_CHANGE_MONITOR_NUM_THREADS_KEY, 5);

return new DagActionStoreChangeMonitor(topic, dagActionStoreChangeConfig, this.dagManager, numThreads, flowCatalog,
orchestrator, dagManagementStateStore, isMultiActiveSchedulerEnabled, dagProcEngineMetrics);
orchestrator, dagManagementStateStore, dagProcEngineMetrics);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ public class DagManagementDagActionStoreChangeMonitor extends DagActionStoreChan
// client itself to determine all Kafka related information dynamically rather than through the config.
public DagManagementDagActionStoreChangeMonitor(Config config, int numThreads,
FlowCatalog flowCatalog, Orchestrator orchestrator, DagManagementStateStore dagManagementStateStore,
boolean isMultiActiveSchedulerEnabled, DagManagement dagManagement,
DagActionReminderScheduler dagActionReminderScheduler, DagProcessingEngineMetrics dagProcEngineMetrics) {
DagManagement dagManagement, DagActionReminderScheduler dagActionReminderScheduler,
DagProcessingEngineMetrics dagProcEngineMetrics) {
// DagManager is only needed in the `handleDagAction` method of its parent class and not needed in this class,
// so we are passing a null value for DagManager to its parent class.
super("", config, null, numThreads, flowCatalog, orchestrator, dagManagementStateStore,
isMultiActiveSchedulerEnabled, dagProcEngineMetrics);
dagProcEngineMetrics);
this.dagManagement = dagManagement;
this.dagActionReminderScheduler = dagActionReminderScheduler;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@
import com.typesafe.config.Config;

import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Provider;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.runtime.util.InjectionNames;
import org.apache.gobblin.service.modules.orchestration.DagActionReminderScheduler;
import org.apache.gobblin.service.modules.orchestration.DagManagement;
import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
Expand All @@ -48,21 +46,18 @@ public class DagManagementDagActionStoreChangeMonitorFactory implements Provider
private final FlowCatalog flowCatalog;
private final Orchestrator orchestrator;
private final DagManagementStateStore dagManagementStateStore;
private final boolean isMultiActiveSchedulerEnabled;
private final DagManagement dagManagement;
private final DagActionReminderScheduler dagActionReminderScheduler;
private final DagProcessingEngineMetrics dagProcEngineMetrics;

@Inject
public DagManagementDagActionStoreChangeMonitorFactory(Config config, DagManager dagManager, FlowCatalog flowCatalog,
Orchestrator orchestrator, DagManagementStateStore dagManagementStateStore, DagManagement dagManagement,
@Named(InjectionNames.MULTI_ACTIVE_SCHEDULER_ENABLED) boolean isMultiActiveSchedulerEnabled,
DagActionReminderScheduler dagActionReminderScheduler, DagProcessingEngineMetrics dagProcEngineMetrics) {
this.config = Objects.requireNonNull(config);
this.flowCatalog = flowCatalog;
this.orchestrator = orchestrator;
this.dagManagementStateStore = dagManagementStateStore;
this.isMultiActiveSchedulerEnabled = isMultiActiveSchedulerEnabled;
this.dagManagement = dagManagement;
this.dagActionReminderScheduler = dagActionReminderScheduler;
this.dagProcEngineMetrics = dagProcEngineMetrics;
Expand All @@ -75,7 +70,7 @@ private DagManagementDagActionStoreChangeMonitor createDagActionStoreMonitor() {
int numThreads = ConfigUtils.getInt(dagActionStoreChangeConfig, DAG_ACTION_STORE_CHANGE_MONITOR_NUM_THREADS_KEY, 5);

return new DagManagementDagActionStoreChangeMonitor(dagActionStoreChangeConfig,
numThreads, flowCatalog, orchestrator, dagManagementStateStore, isMultiActiveSchedulerEnabled, this.dagManagement,
numThreads, flowCatalog, orchestrator, dagManagementStateStore, this.dagManagement,
this.dagActionReminderScheduler, dagProcEngineMetrics);
}

Expand Down
Loading