From 3a045cdd989df84af9a44199469ea4bcc53919b3 Mon Sep 17 00:00:00 2001 From: William Lo Date: Thu, 11 May 2023 12:56:59 -0400 Subject: [PATCH] [GOBBLIN-1831] Use flowexecutionid in kafka monitor and jobnames (#3694) * Use FlowexecutionId as job id instead of generated timestamp if the properties has a flow execution ID property * Add flowexecution ID to kafkajob monitor for cancelled flows if applicable * Address review * Add enhanced logs --- .../gobblin/cluster/HelixJobsMapping.java | 9 +- .../StreamingKafkaSpecExecutorTest.java | 107 ++++++++++++++++-- .../service/SimpleKafkaSpecProducer.java | 46 +++----- .../runtime/job_monitor/KafkaJobMonitor.java | 30 ++++- 4 files changed, 143 insertions(+), 49 deletions(-) diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java index ab4d87c4e27..8128aa0aab1 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java @@ -36,6 +36,7 @@ import org.apache.gobblin.util.ClassAliasResolver; import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.JobLauncherUtils; +import org.apache.gobblin.util.PropertiesUtils; /** @@ -96,12 +97,14 @@ public HelixJobsMapping(Config sysConfig, URI fsUri, String rootDir) { public static String createPlanningJobId (Properties jobPlanningProps) { return JobLauncherUtils.newJobId(GobblinClusterConfigurationKeys.PLANNING_JOB_NAME_PREFIX - + JobState.getJobNameFromProps(jobPlanningProps)); + + JobState.getJobNameFromProps(jobPlanningProps), + PropertiesUtils.getPropAsLong(jobPlanningProps, ConfigurationKeys.FLOW_EXECUTION_ID_KEY, System.currentTimeMillis())); } public static String createActualJobId (Properties jobProps) { - return JobLauncherUtils.newJobId(GobblinClusterConfigurationKeys.ACTUAL_JOB_NAME_PREFIX - + JobState.getJobNameFromProps(jobProps)); + return JobLauncherUtils.newJobId(GobblinClusterConfigurationKeys.ACTUAL_JOB_NAME_PREFIX + + JobState.getJobNameFromProps(jobProps), + PropertiesUtils.getPropAsLong(jobProps, ConfigurationKeys.FLOW_EXECUTION_ID_KEY, System.currentTimeMillis())); } @Nullable diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorTest.java index abbdc610ca1..bf7cf071bc9 100644 --- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorTest.java +++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorTest.java @@ -29,25 +29,26 @@ import org.apache.commons.lang3.tuple.Pair; import org.testng.Assert; import org.testng.annotations.AfterSuite; +import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeSuite; import org.testng.annotations.Test; import com.google.common.io.Closer; import com.typesafe.config.Config; +import lombok.extern.slf4j.Slf4j; + import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.kafka.KafkaTestBase; import org.apache.gobblin.kafka.client.Kafka09ConsumerClient; import org.apache.gobblin.kafka.writer.KafkaWriterConfigurationKeys; import org.apache.gobblin.runtime.api.JobSpec; import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.runtime.api.SpecExecutor; import org.apache.gobblin.runtime.job_catalog.NonObservingFSJobCatalog; import org.apache.gobblin.runtime.job_monitor.KafkaJobMonitor; import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.writer.WriteResponse; -import org.apache.gobblin.runtime.api.SpecExecutor; - -import lombok.extern.slf4j.Slf4j; @Slf4j @@ -63,9 +64,12 @@ public class StreamingKafkaSpecExecutorTest extends KafkaTestBase { private String _kafkaBrokers; private static final String _TEST_DIR_PATH = "/tmp/StreamingKafkaSpecExecutorTest"; private static final String _JOBS_DIR_PATH = _TEST_DIR_PATH + "/jobs"; + String flowSpecUriString = "/flowgroup/flowname/spec"; + Spec flowSpec = initJobSpecWithFlowExecutionId(flowSpecUriString, "12345"); String specUriString = "/foo/bar/spec"; Spec spec = initJobSpec(specUriString); + @BeforeSuite public void beforeSuite() { log.info("Process id = " + ManagementFactory.getRuntimeMXBean().getName()); @@ -92,9 +96,8 @@ private void cleanupTestDir() { } } } - - @Test - public void testAddSpec() throws Exception { + @BeforeClass + public void setup() throws Exception { _closer = Closer.create(); _properties = new Properties(); @@ -116,9 +119,6 @@ public void testAddSpec() throws Exception { // SEI Producer _seip = _closer.register(new SimpleKafkaSpecProducer(config)); - WriteResponse writeResponse = (WriteResponse) _seip.addSpec(spec).get(); - log.info("WriteResponse: " + writeResponse); - _jobCatalog = new NonObservingFSJobCatalog(config.getConfig("gobblin.cluster")); _jobCatalog.startAsync().awaitRunning(); @@ -126,6 +126,13 @@ public void testAddSpec() throws Exception { _seic = _closer.register(new StreamingKafkaSpecConsumer(config, _jobCatalog)); _seic.startAsync().awaitRunning(); + } + + @Test + public void testAddSpec() throws Exception { + WriteResponse writeResponse = (WriteResponse) _seip.addSpec(spec).get(); + log.info("WriteResponse: " + writeResponse); + List> consumedEvent = _seic.changedSpecs().get(); Assert.assertTrue(consumedEvent.size() == 1, "Consumption did not match production"); @@ -165,6 +172,78 @@ public void testDeleteSpec() throws Exception { Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec"); } + @Test(dependsOnMethods = "testDeleteSpec") + public void testCancelSpec() throws Exception { + // Cancel an existing spec that was added + _seip.addSpec(spec).get(); + WriteResponse writeResponse = (WriteResponse) _seip.cancelJob(new URI(specUriString), new Properties()).get(); + log.info("WriteResponse: " + writeResponse); + + // Wait for the cancellation to be processed + Thread.sleep(5000); + List> consumedEvent = _seic.changedSpecs().get(); + Assert.assertTrue(consumedEvent.size() == 3, "Consumption did not match production"); + + Map.Entry consumedSpecAction = consumedEvent.get(2); + log.info(consumedSpecAction.getKey().toString()); + Assert.assertTrue(consumedEvent.get(0).getKey().equals(SpecExecutor.Verb.ADD), "Verb did not match"); + Assert.assertTrue(consumedEvent.get(1).getKey().equals(SpecExecutor.Verb.DELETE), "Verb did not match"); + Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutor.Verb.CANCEL), "Verb did not match"); + Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(specUriString), "Expected URI did not match"); + Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec"); + } + + @Test (dependsOnMethods = "testCancelSpec") + public void testCancelSpecNoopDefault() throws Exception { + _seip.addSpec(flowSpec).get(); + Properties props = new Properties(); + props.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "54321"); // Does not match with added jobspec, so should not cancel job + WriteResponse writeResponse = (WriteResponse) _seip.cancelJob(new URI(flowSpecUriString), props).get(); + log.info("WriteResponse: " + writeResponse); + // Wait for the cancellation to be processed, but it should ignore the spec as flow execution IDs do not match + Thread.sleep(5000); + List> consumedEvent = _seic.changedSpecs().get(); + Assert.assertTrue(consumedEvent.size() == 1, "Consumption did not match production"); + + Map.Entry consumedSpecAction = consumedEvent.get(0); + Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutor.Verb.ADD), "Verb did not match"); + Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(flowSpecUriString), "Expected URI did not match"); + Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec"); + + _seip.cancelJob(new URI(flowSpecUriString), new Properties()).get(); + Thread.sleep(5000); + consumedEvent = _seic.changedSpecs().get(); + Assert.assertTrue(consumedEvent.size() == 2, "Should emit cancellation event if no flow ID provided"); + consumedSpecAction = consumedEvent.get(1); + Assert.assertTrue(consumedEvent.get(0).getKey().equals(SpecExecutor.Verb.DELETE), "Verb did not match"); + Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutor.Verb.CANCEL), "Verb did not match"); + Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(flowSpecUriString), "Expected URI did not match"); + Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec"); + } + + @Test(dependsOnMethods = "testCancelSpecNoopDefault") + public void testCancelSpecWithFlowExecutionId() throws Exception { + _seip.addSpec(flowSpec).get(); + Properties props = new Properties(); + props.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "12345"); + WriteResponse writeResponse = (WriteResponse) _seip.cancelJob(new URI(flowSpecUriString), props).get(); + log.info("WriteResponse: " + writeResponse); + + // Wait for the cancellation to be processed + Thread.sleep(5000); + List> consumedEvent = _seic.changedSpecs().get(); + Assert.assertTrue(consumedEvent.size() == 3, "Consumption did not match production"); + + Map.Entry consumedSpecAction = consumedEvent.get(2); + log.info(consumedSpecAction.getKey().toString()); + Assert.assertTrue(consumedEvent.get(0).getKey().equals(SpecExecutor.Verb.ADD), "Verb did not match"); + Assert.assertTrue(consumedEvent.get(1).getKey().equals(SpecExecutor.Verb.DELETE), "Verb did not match"); + Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutor.Verb.CANCEL), "Verb did not match"); + Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(flowSpecUriString), "Expected URI did not match"); + Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec"); + } + + private static JobSpec initJobSpec(String specUri) { Properties properties = new Properties(); return JobSpec.builder(specUri) @@ -174,6 +253,16 @@ private static JobSpec initJobSpec(String specUri) { .build(); } + private static JobSpec initJobSpecWithFlowExecutionId(String specUri, String flowExecutionId) { + Properties properties = new Properties(); + properties.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId); + return JobSpec.builder(specUri) + .withConfig(ConfigUtils.propertiesToConfig(properties)) + .withVersion("1") + .withDescription("Spec Description") + .build(); + } + @AfterSuite public void after() { try { diff --git a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java index 953d841f11a..0d01bd4f91a 100644 --- a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java +++ b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java @@ -17,27 +17,30 @@ package org.apache.gobblin.service; -import com.google.common.base.Joiner; import java.io.Closeable; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.net.URI; import java.net.URISyntaxException; import java.util.List; -import java.util.concurrent.Future; import java.util.Properties; +import java.util.concurrent.Future; import org.apache.commons.lang3.reflect.ConstructorUtils; -import org.apache.gobblin.configuration.ConfigurationKeys; import org.slf4j.Logger; import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; +import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.typesafe.config.Config; +import javax.annotation.concurrent.NotThreadSafe; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.State; import org.apache.gobblin.instrumented.Instrumented; import org.apache.gobblin.metrics.MetricContext; @@ -54,9 +57,6 @@ import org.apache.gobblin.writer.AsyncDataWriter; import org.apache.gobblin.writer.WriteCallback; -import javax.annotation.concurrent.NotThreadSafe; -import lombok.extern.slf4j.Slf4j; - @Slf4j @NotThreadSafe public class SimpleKafkaSpecProducer implements SpecProducer, Closeable { @@ -105,19 +105,6 @@ private Meter createMeter(String suffix) { return this.metricContext.meter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, getClass().getSimpleName(), suffix)); } - private Spec addExecutionIdToJobSpecUri(Spec spec) { - JobSpec newSpec = (JobSpec)spec; - if (newSpec.getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) { - try { - newSpec.setUri(new URI(Joiner.on("/"). - join(spec.getUri().toString(), newSpec.getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)))); - } catch (URISyntaxException e) { - log.error("Cannot create job uri to cancel job", e); - } - } - return newSpec; - } - private URI getURIWithExecutionId(URI originalURI, Properties props) { URI result = originalURI; if (props.containsKey(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) { @@ -133,10 +120,9 @@ private URI getURIWithExecutionId(URI originalURI, Properties props) { @Override public Future addSpec(Spec addedSpec) { - Spec spec = addExecutionIdToJobSpecUri(addedSpec); - AvroJobSpec avroJobSpec = convertToAvroJobSpec(spec, SpecExecutor.Verb.ADD); + AvroJobSpec avroJobSpec = convertToAvroJobSpec(addedSpec, SpecExecutor.Verb.ADD); - log.info("Adding Spec: " + spec + " using Kafka."); + log.info("Adding Spec: " + addedSpec + " using Kafka."); this.addSpecMeter.mark(); return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), new KafkaWriteCallback(avroJobSpec)); @@ -144,10 +130,9 @@ public Future addSpec(Spec addedSpec) { @Override public Future updateSpec(Spec updatedSpec) { - Spec spec = addExecutionIdToJobSpecUri(updatedSpec); - AvroJobSpec avroJobSpec = convertToAvroJobSpec(spec, SpecExecutor.Verb.UPDATE); + AvroJobSpec avroJobSpec = convertToAvroJobSpec(updatedSpec, SpecExecutor.Verb.UPDATE); - log.info("Updating Spec: " + spec + " using Kafka."); + log.info("Updating Spec: " + updatedSpec + " using Kafka."); this.updateSpecMeter.mark(); return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), new KafkaWriteCallback(avroJobSpec)); @@ -155,13 +140,11 @@ public Future updateSpec(Spec updatedSpec) { @Override public Future deleteSpec(URI deletedSpecURI, Properties headers) { - URI finalDeletedSpecURI = getURIWithExecutionId(deletedSpecURI, headers); - - AvroJobSpec avroJobSpec = AvroJobSpec.newBuilder().setUri(finalDeletedSpecURI.toString()) + AvroJobSpec avroJobSpec = AvroJobSpec.newBuilder().setUri(deletedSpecURI.toString()) .setMetadata(ImmutableMap.of(SpecExecutor.VERB_KEY, SpecExecutor.Verb.DELETE.name())) .setProperties(Maps.fromProperties(headers)).build(); - log.info("Deleting Spec: " + finalDeletedSpecURI + " using Kafka."); + log.info("Deleting Spec: " + deletedSpecURI + " using Kafka."); this.deleteSpecMeter.mark(); return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), new KafkaWriteCallback(avroJobSpec)); @@ -169,12 +152,11 @@ public Future deleteSpec(URI deletedSpecURI, Properties headers) { @Override public Future cancelJob(URI deletedSpecURI, Properties properties) { - URI finalDeletedSpecURI = getURIWithExecutionId(deletedSpecURI, properties); - AvroJobSpec avroJobSpec = AvroJobSpec.newBuilder().setUri(finalDeletedSpecURI.toString()) + AvroJobSpec avroJobSpec = AvroJobSpec.newBuilder().setUri(deletedSpecURI.toString()) .setMetadata(ImmutableMap.of(SpecExecutor.VERB_KEY, SpecExecutor.Verb.CANCEL.name())) .setProperties(Maps.fromProperties(properties)).build(); - log.info("Cancelling job: " + finalDeletedSpecURI + " using Kafka."); + log.info("Cancelling job: " + deletedSpecURI + " using Kafka."); this.cancelSpecMeter.mark(); return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), new KafkaWriteCallback(avroJobSpec)); diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java index 8a1bf3161c9..6952dcff896 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java @@ -28,11 +28,13 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.kafka.client.DecodeableKafkaRecord; import org.apache.gobblin.metastore.DatasetStateStore; import org.apache.gobblin.metrics.ContextAwareMeter; import org.apache.gobblin.runtime.api.JobSpec; import org.apache.gobblin.runtime.api.JobSpecMonitor; +import org.apache.gobblin.runtime.api.JobSpecNotFoundException; import org.apache.gobblin.runtime.api.MutableJobCatalog; import org.apache.gobblin.runtime.api.SpecExecutor; import org.apache.gobblin.runtime.kafka.HighLevelConsumer; @@ -136,14 +138,32 @@ protected void processMessage(DecodeableKafkaRecord message) { break; case DELETE: this.removedSpecs.mark(); - URI jobSpecUri = parsedMessage.getUri(); - this.jobCatalog.remove(jobSpecUri); + this.jobCatalog.remove(parsedMessage.getUri()); // Delete the job state if it is a delete spec request - deleteStateStore(jobSpecUri); + deleteStateStore(parsedMessage.getUri()); break; case CANCEL: - this.cancelledSpecs.mark(); - this.jobCatalog.remove(parsedMessage.getUri(), true); + URI specUri = parsedMessage.getUri(); + try { + JobSpec spec = this.jobCatalog.getJobSpec(specUri); + // If incoming job or existing job does not have an associated flow execution ID, default to cancelling the job + if (!spec.getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY) || !parsedMessage.getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) { + this.cancelledSpecs.mark(); + this.jobCatalog.remove(specUri, true); + } else { + // Validate that the flow execution ID of the running flow matches the one in the incoming job spec + String flowIdToCancel = parsedMessage.getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY); + if (spec.getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY).equals(flowIdToCancel)) { + this.cancelledSpecs.mark(); + this.jobCatalog.remove(specUri, true); + } else { + log.warn("Job spec {} that has flow execution ID {} could not be cancelled, incoming request expects to cancel flow execution ID {}", specUri, + spec.getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY), flowIdToCancel); + } + } + } catch (JobSpecNotFoundException e) { + log.warn("Could not find job spec {} to cancel in job catalog", specUri); + } break; default: log.error("Cannot process spec {} with verb {}", parsedMessage.getUri(), verb);