-
Notifications
You must be signed in to change notification settings - Fork 750
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[GOBBLIN-2056] initialize topology specs directly without waitging for listener call… #3937
base: master
Are you sure you want to change the base?
Conversation
1dccd35
to
6d61f02
Compare
6d61f02
to
5813b3f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't you need to load the topology spec store earlier in the GobblinServiceManager? Otherwise the topology map being passed to the flow compilation validation helper can be empty right?
|
||
public BaseFlowToJobSpecCompiler(Config config, Optional<Logger> log, boolean instrumentationEnabled){ | ||
this.log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass()); | ||
if (instrumentationEnabled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the intention here to remove the use of this variable for checking if instrumentation is enabled? It's still used widely in GaaS modules so there should be justification
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Arjun did some refactoring before which made instrumentation required I believe. Can you link that change to explain what's happening here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- yes, in https://github.com/apache/gobblin/pull/3855/files I made some optional fields mandatory. instrumentationEnabled was not one of them, but eventSubmitter was which is related.
- instrumentationEnabled is usually true in prod, these variables are set false only for doing easier tests.
- Also, in this particular case, instrumentationEnabled was hard coded to
true
in the above above constructor anyway.
@@ -73,12 +74,12 @@ public class FlowCompilationValidationHelper { | |||
|
|||
@Inject | |||
public FlowCompilationValidationHelper(Config config, SharedFlowMetricsSingleton sharedFlowMetricsSingleton, | |||
UserQuotaManager userQuotaManager, FlowStatusGenerator flowStatusGenerator) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how do we know the topologySpecFactory
has been initialized by now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TopologySpecFactory is guice based initialized
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I see that but what is the waiting
you refer to in description "making spec compiler as a topology spec catalog listener and then wait for topology spec catalog to populate topologies." How do you know the topologySpecCatalog
is populated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checking GobblinServiceManager
start to see how TopologySpecCatalog
is populated and it may not have the topologies loaded if it uses the listener method rather than TopologySpecFactory.getTopologies()
Line 508 in 33660f5
this.topologyCatalog.put(topologySpec); |
One safe guard we can add is both Orchestrator
and SpecCompiler
'sonAddSpec
method should check if specCompiler.isActive
is true before accepting specs to pass to compiler.
Line 522 in 33660f5
this.orchestrator.getSpecCompiler().setActive(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SpecCompiler gets information about topology specs through orchestrator's onAddSpec. And orchestrator gets info about topology specs because it is a listener on topology spec catalog.
So compiler has to "wait for topology spec catalog to populate topologies". topology spec catalog is populated here https://github.com/apache/gobblin/blob/master/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java#L508 but we may want compiler to have topology information before this code reaches.
|
||
public BaseFlowToJobSpecCompiler(Config config, Optional<Logger> log, boolean instrumentationEnabled){ | ||
this.log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass()); | ||
if (instrumentationEnabled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Arjun did some refactoring before which made instrumentation required I believe. Can you link that change to explain what's happening here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems the intent is to remove the loose coupling between topo spec init and compiler init. are there any potential downsides? e.g. would it disallow the dynamic addition of additional topos during the course of system operation? this or any other limitations should be clearly captured in a comment
when you say:
topologies are static and fixed
is this essential to their nature or merely common practice we've adopted?
I'm not against a pragmatic change to preclude something we truly don't anticipate needing, but let's characterize for maintainers whether this is pure expedience (aka. easier than adding concurrency coordination to sub-service init)... or we actually believe it to be an essential part to modeling the solution (that the current impl mispercieved).
also nit: "waiting" in the title.
this.config = config; | ||
|
||
/*** | ||
/* | ||
* ETL-5996 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not an apache gobblin jira ticket
} else if (addedSpec instanceof TopologySpec) { | ||
return onAddTopologySpec( (TopologySpec) addedSpec); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it wise to remove the ability to be a TS listener?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
being a TS listener feels of no use to me. There should be no TS listener in gaas.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I equate topology specs w/ executors in the flow graph. since the FG can change dynamically w/o requiring a system restart, it doesn't seem out of the question to change the set of topos w/o a system restart either, given whatever new FG edges could indicate newly defined flow.edge.specExecutors
I agree that's not what we've done thus far, but it's arguably inconvenient to require two separate changes to define a new executor - one to the FG and one to the gaas configs. if it were possible to do both together, I would personally find that appealing
...src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java
Outdated
Show resolved
Hide resolved
@@ -114,6 +119,8 @@ public void setup() throws Exception { | |||
FlowLaunchHandler mockFlowTriggerHandler = mock(FlowLaunchHandler.class); | |||
DagManager mockDagManager = mock(DagManager.class); | |||
doNothing().when(mockDagManager).setTopologySpecMap(anyMap()); | |||
TopologySpecFactory mockedTopologySpecFactory = mock(TopologySpecFactory.class); | |||
doReturn(Collections.singleton(this.topologySpec)).when(mockedTopologySpecFactory).getTopologies(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prefer the when().thenReturn()
form, which is typesafe
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer this way, because in when(obj.method()).thenReturn(obj)
it actually calls obj.method()
which is rarely useful (and often generate exceptions) because obj is a mocked dummy object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
up to you. I do agree with a spy that doReturn
is better for the reason you give, but with a mock, the type safety is helpful.
// Make sure TopologyCatalog Listener is empty | ||
Assert.assertTrue(specCompiler.getTopologySpecMap().size() == 0, "SpecCompiler should not know about any Topology " | ||
+ "before addition"); | ||
Assert.assertTrue(specCompiler.getTopologySpecMap().size() == 1, "SpecCompiler should know about any Topology " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
desc here makes it sound like the test should be > 1
. to that end, I didn't notice any Preconditions
check or similar to insist on a non-empty collection. do we want one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"SpecCompiler should know about any Topology irrespective of what is there in the topology catalog" ? how does that sound like test should be >1
? What description do you suggest?
There is no precondition. SpecCompiler should just know about the topologies. Number of topologies known to spec compiler should just always be 1 (equals to total number of topologies).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"know about any" suggested it would apply even if there were > 1 topo specs. couldn't there be more than one?
also, if non-empty Set<TopologySpec>
is critical, I was suggesting a guava Precondition
in the ctor
// Make sure TopologyCatalog empty | ||
Assert.assertTrue(this.topologyCatalog.getSize() == 0, "Topology catalog should contain 0 Spec before addition"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems this asserts that the spec compiler's topos may now potentially deviate from the topo catalog's, which might make the system harder to reason about... is it really a good thing to drop such an invariant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is a deviation only till everything is initialized. after that, they both should be same (1 in this test).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prior to this change, there's an "inconsistent timespan", where the compiler doesn't have topo specs initialized. you suggested this might compromise correctness of DagManagementStateStore
.
after this change, sounds like there will be an "inconsistent timespan", where the TopologyCatalog
may be out of sync w/ the topo specs known to the compiler. could there be any negative consequence to that?
…backs
Dear Gobblin maintainers,
Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
JIRA
Description
topologies are static and fixed. they can be initialized through configs. but right now we initialize them in spec compiler making spec compiler as a topology spec catalog listener and then wait for topology spec catalog to populate topologies.
this sometimes take longer time because it happens when some of the services' setActive is called. some newly created classes like DagManagementStateStore initialization may fail if topologies are not present by that time.
so in this pr i am populating topologies sooner in spec compiler
Tests
updated existing test cases
Commits