Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-2056] initialize topology specs directly without waitging for listener call… #3937

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@

import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.apache.commons.lang3.StringUtils;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.quartz.CronExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -48,6 +46,7 @@
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobSpec;
Expand All @@ -58,12 +57,14 @@
import org.apache.gobblin.runtime.job_catalog.FSJobCatalog;
import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec;
import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PropertiesUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;


// Provide base implementation for constructing multi-hops route.
Expand All @@ -73,8 +74,7 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler {
// Since {@link SpecCompiler} is an {@link SpecCatalogListener}, it is expected that any Spec change should be reflected
// to these data structures.
@Getter
@Setter
protected final Map<URI, TopologySpec> topologySpecMap;
protected final Map<URI, TopologySpec> topologySpecMap = Maps.newConcurrentMap();

protected final Config config;
protected final Logger log;
Expand All @@ -97,35 +97,13 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler {

private Optional<UserQuotaManager> userQuotaManager;

public BaseFlowToJobSpecCompiler(Config config){
this(config,true);
}

public BaseFlowToJobSpecCompiler(Config config, boolean instrumentationEnabled){
this(config, Optional.<Logger>absent(), true);
}

public BaseFlowToJobSpecCompiler(Config config, Optional<Logger> log){
this(config, log,true);
}

public BaseFlowToJobSpecCompiler(Config config, Optional<Logger> log, boolean instrumentationEnabled){
this.log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
if (instrumentationEnabled) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the intention here to remove the use of this variable for checking if instrumentation is enabled? It's still used widely in GaaS modules so there should be justification

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arjun did some refactoring before which made instrumentation required I believe. Can you link that change to explain what's happening here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. yes, in https://github.com/apache/gobblin/pull/3855/files I made some optional fields mandatory. instrumentationEnabled was not one of them, but eventSubmitter was which is related.
  2. instrumentationEnabled is usually true in prod, these variables are set false only for doing easier tests.
  3. Also, in this particular case, instrumentationEnabled was hard coded to true in the above above constructor anyway.

this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), IdentityFlowToJobSpecCompiler.class);
this.flowCompilationSuccessFulMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_COMPILATION_SUCCESSFUL_METER));
this.flowCompilationFailedMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_COMPILATION_FAILED_METER));
this.flowCompilationTimer = Optional.<Timer>of(this.metricContext.timer(ServiceMetricNames.FLOW_COMPILATION_TIMER));
this.dataAuthorizationTimer = Optional.<Timer>of(this.metricContext.timer(ServiceMetricNames.DATA_AUTHORIZATION_TIMER));
}
else {
this.metricContext = null;
this.flowCompilationSuccessFulMeter = Optional.absent();
this.flowCompilationFailedMeter = Optional.absent();
this.flowCompilationTimer = Optional.absent();
this.dataAuthorizationTimer = Optional.absent();
}

public BaseFlowToJobSpecCompiler(Config config, Collection<TopologySpec> topologySpecSet){
this.log = LoggerFactory.getLogger(getClass());
this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), IdentityFlowToJobSpecCompiler.class);
this.flowCompilationSuccessFulMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_COMPILATION_SUCCESSFUL_METER));
this.flowCompilationFailedMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_COMPILATION_FAILED_METER));
this.flowCompilationTimer = Optional.<Timer>of(this.metricContext.timer(ServiceMetricNames.FLOW_COMPILATION_TIMER));
this.dataAuthorizationTimer = Optional.<Timer>of(this.metricContext.timer(ServiceMetricNames.DATA_AUTHORIZATION_TIMER));
this.warmStandbyEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_WARM_STANDBY_ENABLED_KEY, false);
if (this.warmStandbyEnabled) {
userQuotaManager = Optional.of(GobblinConstructorUtils.invokeConstructor(UserQuotaManager.class,
Expand All @@ -134,11 +112,12 @@ public BaseFlowToJobSpecCompiler(Config config, Optional<Logger> log, boolean in
userQuotaManager = Optional.absent();
}

this.topologySpecMap = Maps.newConcurrentMap();
topologySpecSet.forEach(this::onAddTopologySpec);


this.config = config;

/***
* ETL-5996
/*
* For multi-tenancy, the following needs to be added:
* 1. Change singular templateCatalog to Map<URI, JobCatalogWithTemplates> to support multiple templateCatalogs
* 2. Pick templateCatalog from JobCatalogWithTemplates based on URI, and try to resolve JobSpec using that
Expand Down Expand Up @@ -219,8 +198,6 @@ private AddSpecResponse onAddFlowSpec(FlowSpec flowSpec) {
public AddSpecResponse onAddSpec(Spec addedSpec) {
if (addedSpec instanceof FlowSpec) {
return onAddFlowSpec((FlowSpec) addedSpec);
} else if (addedSpec instanceof TopologySpec) {
return onAddTopologySpec( (TopologySpec) addedSpec);
Comment on lines -222 to -223
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it wise to remove the ability to be a TS listener?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

being a TS listener feels of no use to me. There should be no TS listener in gaas.

Copy link
Contributor

@phet phet May 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I equate topology specs w/ executors in the flow graph. since the FG can change dynamically w/o requiring a system restart, it doesn't seem out of the question to change the set of topos w/o a system restart either, given whatever new FG edges could indicate newly defined flow.edge.specExecutors

I agree that's not what we've done thus far, but it's arguably inconvenient to require two separate changes to define a new executor - one to the FG and one to the gaas configs. if it were possible to do both together, I would personally find that appealing

}
return new AddSpecResponse(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@
package org.apache.gobblin.service.modules.flow;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.typesafe.config.Config;

Expand All @@ -50,20 +48,8 @@
@Alpha
public class IdentityFlowToJobSpecCompiler extends BaseFlowToJobSpecCompiler {

public IdentityFlowToJobSpecCompiler(Config config) {
super(config, true);
}

public IdentityFlowToJobSpecCompiler(Config config, boolean instrumentationEnabled) {
super(config, Optional.<Logger>absent(), instrumentationEnabled);
}

public IdentityFlowToJobSpecCompiler(Config config, Optional<Logger> log) {
super(config, log, true);
}

public IdentityFlowToJobSpecCompiler(Config config, Optional<Logger> log, boolean instrumentationEnabled) {
super(config, log, instrumentationEnabled);
public IdentityFlowToJobSpecCompiler(Config config, Collection<TopologySpec> topologySpecSet) {
super(config, topologySpecSet);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.gobblin.service.modules.flow;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Properties;

Expand All @@ -28,6 +29,7 @@
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
Expand All @@ -45,8 +47,8 @@ public class MockedSpecCompiler extends IdentityFlowToJobSpecCompiler {
private static final int NUMBER_OF_JOBS = 3;
public static final String UNCOMPILABLE_FLOW = "uncompilableFlow";

public MockedSpecCompiler(Config config) {
super(config);
public MockedSpecCompiler(Config config, Collection<TopologySpec> topologySpecSet) {
super(config, topologySpecSet);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.lang.reflect.InvocationTargetException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -34,7 +36,6 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;

import com.google.common.base.Joiner;
import com.google.common.base.Optional;
Expand All @@ -57,6 +58,7 @@
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
import org.apache.gobblin.service.modules.flowgraph.Dag;
Expand Down Expand Up @@ -98,26 +100,14 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
// a map to hold aliases of data nodes, e.g. gobblin.service.datanode.aliases.map=node1-dev:node1,node1-stg:node1,node1-prod:node1
public static final String DATA_NODE_ID_TO_ALIAS_MAP = ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX + "datanode.aliases.map";

public MultiHopFlowCompiler(Config config) {
this(config, true);
}

public MultiHopFlowCompiler(Config config, boolean instrumentationEnabled) {
this(config, Optional.<Logger>absent(), instrumentationEnabled);
}

public MultiHopFlowCompiler(Config config, Optional<Logger> log) {
this(config, log, true);
}

public MultiHopFlowCompiler(Config config, AtomicReference<FlowGraph> flowGraph) {
super(config, Optional.absent(), true);
super(config, Collections.EMPTY_SET);
this.flowGraph = flowGraph;
this.dataMovementAuthorizer = new NoopDataMovementAuthorizer(config);
}

public MultiHopFlowCompiler(Config config, Optional<Logger> log, boolean instrumentationEnabled) {
super(config, log, instrumentationEnabled);
public MultiHopFlowCompiler(Config config, Collection<TopologySpec> topologySpecSet) {
super(config, topologySpecSet);
try {
this.dataNodeAliasMap = config.hasPath(DATA_NODE_ID_TO_ALIAS_MAP)
? Splitter.on(",").withKeyValueSeparator(":").split(config.getString(DATA_NODE_ID_TO_ALIAS_MAP))
Expand Down Expand Up @@ -161,7 +151,7 @@ public MultiHopFlowCompiler(Config config, Optional<Logger> log, boolean instrum
try {
String flowGraphMonitorClassName = ConfigUtils.getString(this.config, ServiceConfigKeys.GOBBLIN_SERVICE_FLOWGRAPH_CLASS_KEY, GitFlowGraphMonitor.class.getCanonicalName());
this.flowGraphMonitor = (FlowGraphMonitor) ConstructorUtils.invokeConstructor(Class.forName(new ClassAliasResolver<>(FlowGraphMonitor.class).resolve(
flowGraphMonitorClassName)), gitFlowGraphConfig, flowTemplateCatalog, this, this.topologySpecMap, this.getInitComplete(), instrumentationEnabled);
flowGraphMonitorClassName)), gitFlowGraphConfig, flowTemplateCatalog, this, this.topologySpecMap, this.getInitComplete(), true);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException | ClassNotFoundException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.modules.topology.TopologySpecFactory;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
Expand Down Expand Up @@ -73,12 +74,12 @@ public class FlowCompilationValidationHelper {

@Inject
public FlowCompilationValidationHelper(Config config, SharedFlowMetricsSingleton sharedFlowMetricsSingleton,
UserQuotaManager userQuotaManager, FlowStatusGenerator flowStatusGenerator) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do we know the topologySpecFactory has been initialized by now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TopologySpecFactory is guice based initialized

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I see that but what is the waiting you refer to in description "making spec compiler as a topology spec catalog listener and then wait for topology spec catalog to populate topologies." How do you know the topologySpecCatalog is populated?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking GobblinServiceManager start to see how TopologySpecCatalog is populated and it may not have the topologies loaded if it uses the listener method rather than TopologySpecFactory.getTopologies()

One safe guard we can add is both Orchestrator and SpecCompiler'sonAddSpec method should check if specCompiler.isActive is true before accepting specs to pass to compiler.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SpecCompiler gets information about topology specs through orchestrator's onAddSpec. And orchestrator gets info about topology specs because it is a listener on topology spec catalog.
So compiler has to "wait for topology spec catalog to populate topologies". topology spec catalog is populated here https://github.com/apache/gobblin/blob/master/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java#L508 but we may want compiler to have topology information before this code reaches.

UserQuotaManager userQuotaManager, FlowStatusGenerator flowStatusGenerator, TopologySpecFactory topologySpecFactory) {
try {
String specCompilerClassName = ConfigUtils.getString(config, ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY,
ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS);
this.specCompiler = (SpecCompiler) ConstructorUtils.invokeConstructor(Class.forName(
new ClassAliasResolver<>(SpecCompiler.class).resolve(specCompilerClassName)), config);
new ClassAliasResolver<>(SpecCompiler.class).resolve(specCompilerClassName)), config, topologySpecFactory.getTopologies());
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException |
ClassNotFoundException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;

Expand Down Expand Up @@ -68,8 +69,8 @@ public class IdentityFlowToJobSpecCompilerTest {
private static final String TOPOLOGY_SPEC_STORE_DIR = "/tmp/orchestrator/topologyTestSpecStore_" + System.currentTimeMillis();
private static final String FLOW_SPEC_STORE_DIR = "/tmp/orchestrator/flowTestSpecStore_" + System.currentTimeMillis();

private IdentityFlowToJobSpecCompiler compilerWithTemplateCalague;
private IdentityFlowToJobSpecCompiler compilerWithoutTemplateCalague;
private IdentityFlowToJobSpecCompiler compilerWithTemplateCatalog;
private IdentityFlowToJobSpecCompiler compilerWithoutTemplateCatalog;

@BeforeClass
public void setup() throws Exception {
Expand All @@ -86,16 +87,11 @@ public void setup() throws Exception {
// Initialize compiler with template catalog
Properties compilerWithTemplateCatalogProperties = new Properties();
compilerWithTemplateCatalogProperties.setProperty(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, TEST_TEMPLATE_CATALOG_URI);
this.compilerWithTemplateCalague = new IdentityFlowToJobSpecCompiler(ConfigUtils.propertiesToConfig(compilerWithTemplateCatalogProperties));

// Add a topology to compiler
this.compilerWithTemplateCalague.onAddSpec(initTopologySpec());
this.compilerWithTemplateCatalog = new IdentityFlowToJobSpecCompiler(ConfigUtils.propertiesToConfig(compilerWithTemplateCatalogProperties),
Collections.singleton(initTopologySpec()));

// Initialize compiler without template catalog
this.compilerWithoutTemplateCalague = new IdentityFlowToJobSpecCompiler(ConfigUtils.propertiesToConfig(new Properties()));

// Add a topology to compiler
this.compilerWithoutTemplateCalague.onAddSpec(initTopologySpec());
this.compilerWithoutTemplateCatalog = new IdentityFlowToJobSpecCompiler(ConfigUtils.propertiesToConfig(new Properties()), Collections.singleton(initTopologySpec()));
}

private void setupDir(String dir) throws Exception {
Expand Down Expand Up @@ -186,7 +182,7 @@ public void testCompilerWithTemplateCatalog() {
FlowSpec flowSpec = initFlowSpec();

// Run compiler on flowSpec
Dag<JobExecutionPlan> jobExecutionPlanDag = this.compilerWithTemplateCalague.compileFlow(flowSpec);
Dag<JobExecutionPlan> jobExecutionPlanDag = this.compilerWithTemplateCatalog.compileFlow(flowSpec);

// Assert pre-requisites
Assert.assertNotNull(jobExecutionPlanDag, "Expected non null dag.");
Expand Down Expand Up @@ -219,7 +215,7 @@ public void testCompilerWithoutTemplateCatalog() {
FlowSpec flowSpec = initFlowSpec();

// Run compiler on flowSpec
Dag<JobExecutionPlan> jobExecutionPlanDag = this.compilerWithoutTemplateCalague.compileFlow(flowSpec);
Dag<JobExecutionPlan> jobExecutionPlanDag = this.compilerWithoutTemplateCatalog.compileFlow(flowSpec);

// Assert pre-requisites
Assert.assertNotNull(jobExecutionPlanDag, "Expected non null dag.");
Expand Down Expand Up @@ -253,7 +249,7 @@ public void testNoJobSpecCompilation() {
FlowSpec flowSpec = initFlowSpec(TEST_FLOW_GROUP, TEST_FLOW_NAME, "unsupportedSource", "unsupportedSink");

// Run compiler on flowSpec
Dag<JobExecutionPlan> jobExecutionPlanDag = this.compilerWithTemplateCalague.compileFlow(flowSpec);
Dag<JobExecutionPlan> jobExecutionPlanDag = this.compilerWithTemplateCatalog.compileFlow(flowSpec);

// Assert pre-requisites
Assert.assertNotNull(jobExecutionPlanDag, "Expected non null dag.");
Expand Down
Loading
Loading