-
Notifications
You must be signed in to change notification settings - Fork 750
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[GOBBLIN-2056] initialize topology specs directly without waitging for listener call… #3937
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,15 +19,13 @@ | |
|
||
import java.io.IOException; | ||
import java.net.URI; | ||
import java.util.Collection; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Properties; | ||
|
||
import org.apache.commons.lang3.StringUtils; | ||
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; | ||
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager; | ||
import org.apache.gobblin.util.reflection.GobblinConstructorUtils; | ||
import org.quartz.CronExpression; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
@@ -48,6 +46,7 @@ | |
import org.apache.gobblin.configuration.State; | ||
import org.apache.gobblin.instrumented.Instrumented; | ||
import org.apache.gobblin.metrics.MetricContext; | ||
import org.apache.gobblin.metrics.ServiceMetricNames; | ||
import org.apache.gobblin.metrics.Tag; | ||
import org.apache.gobblin.runtime.api.FlowSpec; | ||
import org.apache.gobblin.runtime.api.JobSpec; | ||
|
@@ -58,12 +57,14 @@ | |
import org.apache.gobblin.runtime.job_catalog.FSJobCatalog; | ||
import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec; | ||
import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse; | ||
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; | ||
import org.apache.gobblin.service.ServiceConfigKeys; | ||
import org.apache.gobblin.metrics.ServiceMetricNames; | ||
import org.apache.gobblin.service.modules.flowgraph.Dag; | ||
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager; | ||
import org.apache.gobblin.service.modules.spec.JobExecutionPlan; | ||
import org.apache.gobblin.util.ConfigUtils; | ||
import org.apache.gobblin.util.PropertiesUtils; | ||
import org.apache.gobblin.util.reflection.GobblinConstructorUtils; | ||
|
||
|
||
// Provide base implementation for constructing multi-hops route. | ||
|
@@ -73,8 +74,7 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler { | |
// Since {@link SpecCompiler} is an {@link SpecCatalogListener}, it is expected that any Spec change should be reflected | ||
// to these data structures. | ||
@Getter | ||
@Setter | ||
protected final Map<URI, TopologySpec> topologySpecMap; | ||
protected final Map<URI, TopologySpec> topologySpecMap = Maps.newConcurrentMap(); | ||
|
||
protected final Config config; | ||
protected final Logger log; | ||
|
@@ -97,35 +97,13 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler { | |
|
||
private Optional<UserQuotaManager> userQuotaManager; | ||
|
||
public BaseFlowToJobSpecCompiler(Config config){ | ||
this(config,true); | ||
} | ||
|
||
public BaseFlowToJobSpecCompiler(Config config, boolean instrumentationEnabled){ | ||
this(config, Optional.<Logger>absent(), true); | ||
} | ||
|
||
public BaseFlowToJobSpecCompiler(Config config, Optional<Logger> log){ | ||
this(config, log,true); | ||
} | ||
|
||
public BaseFlowToJobSpecCompiler(Config config, Optional<Logger> log, boolean instrumentationEnabled){ | ||
this.log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass()); | ||
if (instrumentationEnabled) { | ||
this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), IdentityFlowToJobSpecCompiler.class); | ||
this.flowCompilationSuccessFulMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_COMPILATION_SUCCESSFUL_METER)); | ||
this.flowCompilationFailedMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_COMPILATION_FAILED_METER)); | ||
this.flowCompilationTimer = Optional.<Timer>of(this.metricContext.timer(ServiceMetricNames.FLOW_COMPILATION_TIMER)); | ||
this.dataAuthorizationTimer = Optional.<Timer>of(this.metricContext.timer(ServiceMetricNames.DATA_AUTHORIZATION_TIMER)); | ||
} | ||
else { | ||
this.metricContext = null; | ||
this.flowCompilationSuccessFulMeter = Optional.absent(); | ||
this.flowCompilationFailedMeter = Optional.absent(); | ||
this.flowCompilationTimer = Optional.absent(); | ||
this.dataAuthorizationTimer = Optional.absent(); | ||
} | ||
|
||
public BaseFlowToJobSpecCompiler(Config config, Collection<TopologySpec> topologySpecSet){ | ||
this.log = LoggerFactory.getLogger(getClass()); | ||
this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), IdentityFlowToJobSpecCompiler.class); | ||
this.flowCompilationSuccessFulMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_COMPILATION_SUCCESSFUL_METER)); | ||
this.flowCompilationFailedMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_COMPILATION_FAILED_METER)); | ||
this.flowCompilationTimer = Optional.<Timer>of(this.metricContext.timer(ServiceMetricNames.FLOW_COMPILATION_TIMER)); | ||
this.dataAuthorizationTimer = Optional.<Timer>of(this.metricContext.timer(ServiceMetricNames.DATA_AUTHORIZATION_TIMER)); | ||
this.warmStandbyEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_WARM_STANDBY_ENABLED_KEY, false); | ||
if (this.warmStandbyEnabled) { | ||
userQuotaManager = Optional.of(GobblinConstructorUtils.invokeConstructor(UserQuotaManager.class, | ||
|
@@ -134,11 +112,12 @@ public BaseFlowToJobSpecCompiler(Config config, Optional<Logger> log, boolean in | |
userQuotaManager = Optional.absent(); | ||
} | ||
|
||
this.topologySpecMap = Maps.newConcurrentMap(); | ||
topologySpecSet.forEach(this::onAddTopologySpec); | ||
|
||
|
||
this.config = config; | ||
|
||
/*** | ||
* ETL-5996 | ||
/* | ||
* For multi-tenancy, the following needs to be added: | ||
* 1. Change singular templateCatalog to Map<URI, JobCatalogWithTemplates> to support multiple templateCatalogs | ||
* 2. Pick templateCatalog from JobCatalogWithTemplates based on URI, and try to resolve JobSpec using that | ||
|
@@ -219,8 +198,6 @@ private AddSpecResponse onAddFlowSpec(FlowSpec flowSpec) { | |
public AddSpecResponse onAddSpec(Spec addedSpec) { | ||
if (addedSpec instanceof FlowSpec) { | ||
return onAddFlowSpec((FlowSpec) addedSpec); | ||
} else if (addedSpec instanceof TopologySpec) { | ||
return onAddTopologySpec( (TopologySpec) addedSpec); | ||
Comment on lines
-222
to
-223
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is it wise to remove the ability to be a TS listener? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. being a TS listener feels of no use to me. There should be no TS listener in gaas. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I equate topology specs w/ executors in the flow graph. since the FG can change dynamically w/o requiring a system restart, it doesn't seem out of the question to change the set of topos w/o a system restart either, given whatever new FG edges could indicate newly defined I agree that's not what we've done thus far, but it's arguably inconvenient to require two separate changes to define a new executor - one to the FG and one to the gaas configs. if it were possible to do both together, I would personally find that appealing |
||
} | ||
return new AddSpecResponse(null); | ||
} | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -43,6 +43,7 @@ | |||||
import org.apache.gobblin.service.modules.orchestration.TimingEventUtils; | ||||||
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager; | ||||||
import org.apache.gobblin.service.modules.spec.JobExecutionPlan; | ||||||
import org.apache.gobblin.service.modules.topology.TopologySpecFactory; | ||||||
import org.apache.gobblin.service.monitoring.FlowStatusGenerator; | ||||||
import org.apache.gobblin.util.ClassAliasResolver; | ||||||
import org.apache.gobblin.util.ConfigUtils; | ||||||
|
@@ -73,12 +74,12 @@ public class FlowCompilationValidationHelper { | |||||
|
||||||
@Inject | ||||||
public FlowCompilationValidationHelper(Config config, SharedFlowMetricsSingleton sharedFlowMetricsSingleton, | ||||||
UserQuotaManager userQuotaManager, FlowStatusGenerator flowStatusGenerator) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how do we know the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TopologySpecFactory is guice based initialized There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes I see that but what is the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Checking Line 508 in 33660f5
One safe guard we can add is both Line 522 in 33660f5
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SpecCompiler gets information about topology specs through orchestrator's onAddSpec. And orchestrator gets info about topology specs because it is a listener on topology spec catalog. |
||||||
UserQuotaManager userQuotaManager, FlowStatusGenerator flowStatusGenerator, TopologySpecFactory topologySpecFactory) { | ||||||
try { | ||||||
String specCompilerClassName = ConfigUtils.getString(config, ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY, | ||||||
ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS); | ||||||
this.specCompiler = (SpecCompiler) ConstructorUtils.invokeConstructor(Class.forName( | ||||||
new ClassAliasResolver<>(SpecCompiler.class).resolve(specCompilerClassName)), config); | ||||||
new ClassAliasResolver<>(SpecCompiler.class).resolve(specCompilerClassName)), config, topologySpecFactory.getTopologies()); | ||||||
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException | | ||||||
ClassNotFoundException e) { | ||||||
throw new RuntimeException(e); | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the intention here to remove the use of this variable for checking if instrumentation is enabled? It's still used widely in GaaS modules so there should be justification
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Arjun did some refactoring before which made instrumentation required I believe. Can you link that change to explain what's happening here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
true
in the above above constructor anyway.