Skip to content

Commit

Permalink
[GOBBLIN-1831] Use flowexecutionid in kafka monitor and jobnames (apa…
Browse files Browse the repository at this point in the history
…che#3694)

* Use FlowexecutionId as job id instead of generated timestamp if the properties has a flow execution ID property

* Add flowexecution ID to kafkajob monitor for cancelled flows if applicable

* Address review

* Add enhanced logs
  • Loading branch information
Will-Lo committed May 11, 2023
1 parent 7bbf676 commit 3a045cd
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.JobLauncherUtils;
import org.apache.gobblin.util.PropertiesUtils;


/**
Expand Down Expand Up @@ -96,12 +97,14 @@ public HelixJobsMapping(Config sysConfig, URI fsUri, String rootDir) {

public static String createPlanningJobId (Properties jobPlanningProps) {
return JobLauncherUtils.newJobId(GobblinClusterConfigurationKeys.PLANNING_JOB_NAME_PREFIX
+ JobState.getJobNameFromProps(jobPlanningProps));
+ JobState.getJobNameFromProps(jobPlanningProps),
PropertiesUtils.getPropAsLong(jobPlanningProps, ConfigurationKeys.FLOW_EXECUTION_ID_KEY, System.currentTimeMillis()));
}

public static String createActualJobId (Properties jobProps) {
return JobLauncherUtils.newJobId(GobblinClusterConfigurationKeys.ACTUAL_JOB_NAME_PREFIX
+ JobState.getJobNameFromProps(jobProps));
return JobLauncherUtils.newJobId(GobblinClusterConfigurationKeys.ACTUAL_JOB_NAME_PREFIX
+ JobState.getJobNameFromProps(jobProps),
PropertiesUtils.getPropAsLong(jobProps, ConfigurationKeys.FLOW_EXECUTION_ID_KEY, System.currentTimeMillis()));
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,26 @@
import org.apache.commons.lang3.tuple.Pair;
import org.testng.Assert;
import org.testng.annotations.AfterSuite;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeSuite;
import org.testng.annotations.Test;

import com.google.common.io.Closer;
import com.typesafe.config.Config;

import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.kafka.KafkaTestBase;
import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
import org.apache.gobblin.kafka.writer.KafkaWriterConfigurationKeys;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.job_catalog.NonObservingFSJobCatalog;
import org.apache.gobblin.runtime.job_monitor.KafkaJobMonitor;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.writer.WriteResponse;
import org.apache.gobblin.runtime.api.SpecExecutor;

import lombok.extern.slf4j.Slf4j;


@Slf4j
Expand All @@ -63,9 +64,12 @@ public class StreamingKafkaSpecExecutorTest extends KafkaTestBase {
private String _kafkaBrokers;
private static final String _TEST_DIR_PATH = "/tmp/StreamingKafkaSpecExecutorTest";
private static final String _JOBS_DIR_PATH = _TEST_DIR_PATH + "/jobs";
String flowSpecUriString = "/flowgroup/flowname/spec";
Spec flowSpec = initJobSpecWithFlowExecutionId(flowSpecUriString, "12345");
String specUriString = "/foo/bar/spec";
Spec spec = initJobSpec(specUriString);


@BeforeSuite
public void beforeSuite() {
log.info("Process id = " + ManagementFactory.getRuntimeMXBean().getName());
Expand All @@ -92,9 +96,8 @@ private void cleanupTestDir() {
}
}
}

@Test
public void testAddSpec() throws Exception {
@BeforeClass
public void setup() throws Exception {
_closer = Closer.create();
_properties = new Properties();

Expand All @@ -116,16 +119,20 @@ public void testAddSpec() throws Exception {
// SEI Producer
_seip = _closer.register(new SimpleKafkaSpecProducer(config));

WriteResponse writeResponse = (WriteResponse) _seip.addSpec(spec).get();
log.info("WriteResponse: " + writeResponse);

_jobCatalog = new NonObservingFSJobCatalog(config.getConfig("gobblin.cluster"));
_jobCatalog.startAsync().awaitRunning();

// SEI Consumer
_seic = _closer.register(new StreamingKafkaSpecConsumer(config, _jobCatalog));
_seic.startAsync().awaitRunning();

}

@Test
public void testAddSpec() throws Exception {
WriteResponse writeResponse = (WriteResponse) _seip.addSpec(spec).get();
log.info("WriteResponse: " + writeResponse);

List<Pair<SpecExecutor.Verb, Spec>> consumedEvent = _seic.changedSpecs().get();
Assert.assertTrue(consumedEvent.size() == 1, "Consumption did not match production");

Expand Down Expand Up @@ -165,6 +172,78 @@ public void testDeleteSpec() throws Exception {
Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec");
}

@Test(dependsOnMethods = "testDeleteSpec")
public void testCancelSpec() throws Exception {
// Cancel an existing spec that was added
_seip.addSpec(spec).get();
WriteResponse writeResponse = (WriteResponse) _seip.cancelJob(new URI(specUriString), new Properties()).get();
log.info("WriteResponse: " + writeResponse);

// Wait for the cancellation to be processed
Thread.sleep(5000);
List<Pair<SpecExecutor.Verb, Spec>> consumedEvent = _seic.changedSpecs().get();
Assert.assertTrue(consumedEvent.size() == 3, "Consumption did not match production");

Map.Entry<SpecExecutor.Verb, Spec> consumedSpecAction = consumedEvent.get(2);
log.info(consumedSpecAction.getKey().toString());
Assert.assertTrue(consumedEvent.get(0).getKey().equals(SpecExecutor.Verb.ADD), "Verb did not match");
Assert.assertTrue(consumedEvent.get(1).getKey().equals(SpecExecutor.Verb.DELETE), "Verb did not match");
Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutor.Verb.CANCEL), "Verb did not match");
Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(specUriString), "Expected URI did not match");
Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec");
}

@Test (dependsOnMethods = "testCancelSpec")
public void testCancelSpecNoopDefault() throws Exception {
_seip.addSpec(flowSpec).get();
Properties props = new Properties();
props.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "54321"); // Does not match with added jobspec, so should not cancel job
WriteResponse writeResponse = (WriteResponse) _seip.cancelJob(new URI(flowSpecUriString), props).get();
log.info("WriteResponse: " + writeResponse);
// Wait for the cancellation to be processed, but it should ignore the spec as flow execution IDs do not match
Thread.sleep(5000);
List<Pair<SpecExecutor.Verb, Spec>> consumedEvent = _seic.changedSpecs().get();
Assert.assertTrue(consumedEvent.size() == 1, "Consumption did not match production");

Map.Entry<SpecExecutor.Verb, Spec> consumedSpecAction = consumedEvent.get(0);
Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutor.Verb.ADD), "Verb did not match");
Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(flowSpecUriString), "Expected URI did not match");
Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec");

_seip.cancelJob(new URI(flowSpecUriString), new Properties()).get();
Thread.sleep(5000);
consumedEvent = _seic.changedSpecs().get();
Assert.assertTrue(consumedEvent.size() == 2, "Should emit cancellation event if no flow ID provided");
consumedSpecAction = consumedEvent.get(1);
Assert.assertTrue(consumedEvent.get(0).getKey().equals(SpecExecutor.Verb.DELETE), "Verb did not match");
Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutor.Verb.CANCEL), "Verb did not match");
Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(flowSpecUriString), "Expected URI did not match");
Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec");
}

@Test(dependsOnMethods = "testCancelSpecNoopDefault")
public void testCancelSpecWithFlowExecutionId() throws Exception {
_seip.addSpec(flowSpec).get();
Properties props = new Properties();
props.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "12345");
WriteResponse writeResponse = (WriteResponse) _seip.cancelJob(new URI(flowSpecUriString), props).get();
log.info("WriteResponse: " + writeResponse);

// Wait for the cancellation to be processed
Thread.sleep(5000);
List<Pair<SpecExecutor.Verb, Spec>> consumedEvent = _seic.changedSpecs().get();
Assert.assertTrue(consumedEvent.size() == 3, "Consumption did not match production");

Map.Entry<SpecExecutor.Verb, Spec> consumedSpecAction = consumedEvent.get(2);
log.info(consumedSpecAction.getKey().toString());
Assert.assertTrue(consumedEvent.get(0).getKey().equals(SpecExecutor.Verb.ADD), "Verb did not match");
Assert.assertTrue(consumedEvent.get(1).getKey().equals(SpecExecutor.Verb.DELETE), "Verb did not match");
Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutor.Verb.CANCEL), "Verb did not match");
Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(flowSpecUriString), "Expected URI did not match");
Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec");
}


private static JobSpec initJobSpec(String specUri) {
Properties properties = new Properties();
return JobSpec.builder(specUri)
Expand All @@ -174,6 +253,16 @@ private static JobSpec initJobSpec(String specUri) {
.build();
}

private static JobSpec initJobSpecWithFlowExecutionId(String specUri, String flowExecutionId) {
Properties properties = new Properties();
properties.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId);
return JobSpec.builder(specUri)
.withConfig(ConfigUtils.propertiesToConfig(properties))
.withVersion("1")
.withDescription("Spec Description")
.build();
}

@AfterSuite
public void after() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,30 @@

package org.apache.gobblin.service;

import com.google.common.base.Joiner;
import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.concurrent.Future;
import java.util.Properties;
import java.util.concurrent.Future;

import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.slf4j.Logger;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;

import javax.annotation.concurrent.NotThreadSafe;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
Expand All @@ -54,9 +57,6 @@
import org.apache.gobblin.writer.AsyncDataWriter;
import org.apache.gobblin.writer.WriteCallback;

import javax.annotation.concurrent.NotThreadSafe;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@NotThreadSafe
public class SimpleKafkaSpecProducer implements SpecProducer<Spec>, Closeable {
Expand Down Expand Up @@ -105,19 +105,6 @@ private Meter createMeter(String suffix) {
return this.metricContext.meter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, getClass().getSimpleName(), suffix));
}

private Spec addExecutionIdToJobSpecUri(Spec spec) {
JobSpec newSpec = (JobSpec)spec;
if (newSpec.getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
try {
newSpec.setUri(new URI(Joiner.on("/").
join(spec.getUri().toString(), newSpec.getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY))));
} catch (URISyntaxException e) {
log.error("Cannot create job uri to cancel job", e);
}
}
return newSpec;
}

private URI getURIWithExecutionId(URI originalURI, Properties props) {
URI result = originalURI;
if (props.containsKey(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
Expand All @@ -133,48 +120,43 @@ private URI getURIWithExecutionId(URI originalURI, Properties props) {

@Override
public Future<?> addSpec(Spec addedSpec) {
Spec spec = addExecutionIdToJobSpecUri(addedSpec);
AvroJobSpec avroJobSpec = convertToAvroJobSpec(spec, SpecExecutor.Verb.ADD);
AvroJobSpec avroJobSpec = convertToAvroJobSpec(addedSpec, SpecExecutor.Verb.ADD);

log.info("Adding Spec: " + spec + " using Kafka.");
log.info("Adding Spec: " + addedSpec + " using Kafka.");
this.addSpecMeter.mark();

return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), new KafkaWriteCallback(avroJobSpec));
}

@Override
public Future<?> updateSpec(Spec updatedSpec) {
Spec spec = addExecutionIdToJobSpecUri(updatedSpec);
AvroJobSpec avroJobSpec = convertToAvroJobSpec(spec, SpecExecutor.Verb.UPDATE);
AvroJobSpec avroJobSpec = convertToAvroJobSpec(updatedSpec, SpecExecutor.Verb.UPDATE);

log.info("Updating Spec: " + spec + " using Kafka.");
log.info("Updating Spec: " + updatedSpec + " using Kafka.");
this.updateSpecMeter.mark();

return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), new KafkaWriteCallback(avroJobSpec));
}

@Override
public Future<?> deleteSpec(URI deletedSpecURI, Properties headers) {
URI finalDeletedSpecURI = getURIWithExecutionId(deletedSpecURI, headers);

AvroJobSpec avroJobSpec = AvroJobSpec.newBuilder().setUri(finalDeletedSpecURI.toString())
AvroJobSpec avroJobSpec = AvroJobSpec.newBuilder().setUri(deletedSpecURI.toString())
.setMetadata(ImmutableMap.of(SpecExecutor.VERB_KEY, SpecExecutor.Verb.DELETE.name()))
.setProperties(Maps.fromProperties(headers)).build();

log.info("Deleting Spec: " + finalDeletedSpecURI + " using Kafka.");
log.info("Deleting Spec: " + deletedSpecURI + " using Kafka.");
this.deleteSpecMeter.mark();

return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), new KafkaWriteCallback(avroJobSpec));
}

@Override
public Future<?> cancelJob(URI deletedSpecURI, Properties properties) {
URI finalDeletedSpecURI = getURIWithExecutionId(deletedSpecURI, properties);
AvroJobSpec avroJobSpec = AvroJobSpec.newBuilder().setUri(finalDeletedSpecURI.toString())
AvroJobSpec avroJobSpec = AvroJobSpec.newBuilder().setUri(deletedSpecURI.toString())
.setMetadata(ImmutableMap.of(SpecExecutor.VERB_KEY, SpecExecutor.Verb.CANCEL.name()))
.setProperties(Maps.fromProperties(properties)).build();

log.info("Cancelling job: " + finalDeletedSpecURI + " using Kafka.");
log.info("Cancelling job: " + deletedSpecURI + " using Kafka.");
this.cancelSpecMeter.mark();

return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), new KafkaWriteCallback(avroJobSpec));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.metastore.DatasetStateStore;
import org.apache.gobblin.metrics.ContextAwareMeter;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.JobSpecMonitor;
import org.apache.gobblin.runtime.api.JobSpecNotFoundException;
import org.apache.gobblin.runtime.api.MutableJobCatalog;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
Expand Down Expand Up @@ -136,14 +138,32 @@ protected void processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
break;
case DELETE:
this.removedSpecs.mark();
URI jobSpecUri = parsedMessage.getUri();
this.jobCatalog.remove(jobSpecUri);
this.jobCatalog.remove(parsedMessage.getUri());
// Delete the job state if it is a delete spec request
deleteStateStore(jobSpecUri);
deleteStateStore(parsedMessage.getUri());
break;
case CANCEL:
this.cancelledSpecs.mark();
this.jobCatalog.remove(parsedMessage.getUri(), true);
URI specUri = parsedMessage.getUri();
try {
JobSpec spec = this.jobCatalog.getJobSpec(specUri);
// If incoming job or existing job does not have an associated flow execution ID, default to cancelling the job
if (!spec.getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY) || !parsedMessage.getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
this.cancelledSpecs.mark();
this.jobCatalog.remove(specUri, true);
} else {
// Validate that the flow execution ID of the running flow matches the one in the incoming job spec
String flowIdToCancel = parsedMessage.getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
if (spec.getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY).equals(flowIdToCancel)) {
this.cancelledSpecs.mark();
this.jobCatalog.remove(specUri, true);
} else {
log.warn("Job spec {} that has flow execution ID {} could not be cancelled, incoming request expects to cancel flow execution ID {}", specUri,
spec.getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY), flowIdToCancel);
}
}
} catch (JobSpecNotFoundException e) {
log.warn("Could not find job spec {} to cancel in job catalog", specUri);
}
break;
default:
log.error("Cannot process spec {} with verb {}", parsedMessage.getUri(), verb);
Expand Down

0 comments on commit 3a045cd

Please sign in to comment.