diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/DistributedProgramRuntimeService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/DistributedProgramRuntimeService.java index 90b7647e112e..a0ea940bdfd2 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/DistributedProgramRuntimeService.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/DistributedProgramRuntimeService.java @@ -165,8 +165,8 @@ public Map list(ProgramType type) { // Goes through all live application and fill the twillProgramInfo table for (TwillRunner.LiveInfo liveInfo : twillRunner.lookupLive()) { - String appName = liveInfo.getApplicationName(); - ProgramId programId = TwillAppNames.fromTwillAppName(appName, false); + ProgramId programId = TwillAppNames.fromTwillAppName(liveInfo.getApplicationName(), + false, liveInfo.getApplicationVersion()); if (programId == null) { continue; } diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/ProgramTwillApplication.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/ProgramTwillApplication.java index 3cf46486b8ee..5de0cfd8befb 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/ProgramTwillApplication.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/ProgramTwillApplication.java @@ -140,4 +140,9 @@ private Builder.RunnableSetter localizeFiles(Map local public String getRunId() { return programRunId.getRun(); } + + @Override + public String getApplicationVersion() { + return programRunId.getVersion(); + } } diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/twill/TwillAppNames.java b/cdap-common/src/main/java/io/cdap/cdap/common/twill/TwillAppNames.java index 1349298d84d4..6b7536adb3df 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/twill/TwillAppNames.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/twill/TwillAppNames.java @@ -68,6 +68,22 @@ public static ProgramId fromTwillAppName(String twillAppName) { */ @Nullable public static ProgramId fromTwillAppName(String twillAppName, boolean mustMatch) { + return fromTwillAppName(twillAppName, mustMatch, null); + } + + /** + * Given a Twill app name and version, returns the id of the program that was used to construct + * this Twill app name. + * + * @return {@code null} if mustMatch is false, and if the specified Twill app name does not match + * the {@link #APP_NAME_PATTERN}. For instance, for the Constants.Service.MASTER_SERVICES + * Twill app, it will return null. + * @throws IllegalArgumentException if the given app name does not match the {@link + * #APP_NAME_PATTERN} and mustMatch is true. + */ + @Nullable + public static ProgramId fromTwillAppName(String twillAppName, boolean mustMatch, + @Nullable String version) { Matcher matcher = APP_NAME_PATTERN.matcher(twillAppName); if (!matcher.matches()) { Preconditions.checkArgument(!mustMatch, @@ -79,6 +95,8 @@ public static ProgramId fromTwillAppName(String twillAppName, boolean mustMatch) "Expected matcher for '%s' to have 4 groups, but it had %s groups.", twillAppName, matcher.groupCount()); ProgramType type = ProgramType.valueOf(matcher.group(1).toUpperCase()); - return new ProgramId(matcher.group(2), matcher.group(3), type, matcher.group(4)); + return version != null ? + new ProgramId(matcher.group(2), matcher.group(3), version, type, matcher.group(4)) : + new ProgramId(matcher.group(2), matcher.group(3), type, matcher.group(4)); } } diff --git a/cdap-common/src/test/java/io/cdap/cdap/common/twill/TwillAppNamesTest.java b/cdap-common/src/test/java/io/cdap/cdap/common/twill/TwillAppNamesTest.java index 0e8f40a1db64..94efd992fb4a 100644 --- a/cdap-common/src/test/java/io/cdap/cdap/common/twill/TwillAppNamesTest.java +++ b/cdap-common/src/test/java/io/cdap/cdap/common/twill/TwillAppNamesTest.java @@ -17,8 +17,10 @@ package io.cdap.cdap.common.twill; import io.cdap.cdap.common.conf.Constants; +import io.cdap.cdap.proto.ProgramType; import io.cdap.cdap.proto.id.NamespaceId; import io.cdap.cdap.proto.id.ProgramId; +import java.util.UUID; import org.junit.Assert; import org.junit.Test; @@ -46,4 +48,14 @@ public void test() { Assert.assertTrue(e.getMessage().contains("does not match pattern for programs")); } } + + @Test + public void testAppWithVersion() { + String appVersion = UUID.randomUUID().toString(); + ProgramId expected = new ProgramId("default", "app", appVersion, + ProgramType.SPARK, "DataPipelineWorkflow"); + ProgramId programId = TwillAppNames.fromTwillAppName("spark.default.app.DataPipelineWorkflow", + false, appVersion); + Assert.assertEquals(programId, expected); + } } diff --git a/cdap-kubernetes/src/main/java/io/cdap/cdap/k8s/runtime/KubeTwillRunnerService.java b/cdap-kubernetes/src/main/java/io/cdap/cdap/k8s/runtime/KubeTwillRunnerService.java index 3a60bb8fad2a..09570e8b6567 100644 --- a/cdap-kubernetes/src/main/java/io/cdap/cdap/k8s/runtime/KubeTwillRunnerService.java +++ b/cdap-kubernetes/src/main/java/io/cdap/cdap/k8s/runtime/KubeTwillRunnerService.java @@ -29,6 +29,7 @@ import io.cdap.cdap.master.spi.namespace.NamespaceDetail; import io.cdap.cdap.master.spi.namespace.NamespaceListener; import io.cdap.cdap.master.spi.twill.ExtendedTwillApplication; +import io.cdap.cdap.proto.id.ApplicationId; import io.cdap.cdap.proto.id.NamespaceId; import io.kubernetes.client.common.KubernetesObject; import io.kubernetes.client.custom.Quantity; @@ -64,9 +65,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; import java.util.Map; -import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -121,6 +120,7 @@ public class KubeTwillRunnerService implements TwillRunnerService, NamespaceList private static final Logger LOG = LoggerFactory.getLogger(KubeTwillRunnerService.class); static final String APP_LABEL = "cdap.twill.app"; + static final String APP_VERSION = "cdap.twill.app.version"; private static final String CDAP_NAMESPACE_LABEL = "cdap.namespace"; private static final String NAMESPACE_CPU_LIMIT_PROPERTY = "k8s.namespace.cpu.limits"; private static final String NAMESPACE_MEMORY_LIMIT_PROPERTY = "k8s.namespace.memory.limits"; @@ -209,15 +209,22 @@ public TwillPreparer prepare(TwillRunnable runnable, public TwillPreparer prepare(TwillApplication application) { TwillSpecification spec = application.configure(); RunId runId; + String appVersion; if (application instanceof ExtendedTwillApplication) { runId = RunIds.fromString(((ExtendedTwillApplication) application).getRunId()); + appVersion = ((ExtendedTwillApplication) application).getApplicationVersion(); } else { + // Version is not set for system apps + appVersion = null; runId = RunIds.generate(); } Location appLocation = getApplicationLocation(spec.getName(), runId); Map labels = new HashMap<>(extraLabels); labels.put(RUNNER_LABEL, RUNNER_LABEL_VAL); labels.put(APP_LABEL, spec.getName()); + if (appVersion != null && !appVersion.equals(ApplicationId.DEFAULT_VERSION)) { + labels.put(APP_VERSION, appVersion); + } labels.put(RUN_ID_LABEL, runId.getId()); return new KubeTwillPreparer(masterEnvContext, apiClient, kubeNamespace, podInfo, @@ -232,8 +239,9 @@ public TwillPreparer prepare(TwillApplication application) { //since monitor is disabled, we fire and forget return controller; } + KubeLiveInfo liveInfo = liveInfos.computeIfAbsent(spec.getName(), - n -> new KubeLiveInfo(resourceType, n)); + n -> new KubeLiveInfo(resourceType, n, appVersion)); return liveInfo.addControllerIfAbsent(runId, timeout, timeoutUnit, controller, meta); } finally { liveInfoLock.unlock(); @@ -923,6 +931,7 @@ private final class AppResourceChangeListener implem public void resourceAdded(T resource) { V1ObjectMeta metadata = resource.getMetadata(); String appName = metadata.getAnnotations().get(APP_LABEL); + String appVersion = metadata.getLabels().getOrDefault(APP_VERSION, null); if (appName == null) { // This shouldn't happen. Just to guard against future bug. return; @@ -946,7 +955,7 @@ public void resourceAdded(T resource) { liveInfoLock.lock(); try { KubeLiveInfo liveInfo = liveInfos.computeIfAbsent(appName, - k -> new KubeLiveInfo(resource.getClass(), appName)); + k -> new KubeLiveInfo(resource.getClass(), appName, appVersion)); KubeTwillController controller = createKubeTwillController(appName, runId, resource.getClass(), metadata); liveInfo.addControllerIfAbsent(runId, startTimeoutMillis, TimeUnit.MILLISECONDS, controller, @@ -1084,12 +1093,15 @@ private final class KubeLiveInfo implements LiveInfo { private final Type resourceType; private final String applicationName; + @Nullable + private final String applicationVersion; private final Map controllers; - KubeLiveInfo(Type resourceType, String applicationName) { + KubeLiveInfo(Type resourceType, String applicationName, @Nullable String appVersion) { this.resourceType = resourceType; this.applicationName = applicationName; this.controllers = new ConcurrentSkipListMap<>(); + this.applicationVersion = appVersion; } KubeTwillController addControllerIfAbsent(RunId runId, long timeout, TimeUnit timeoutUnit, @@ -1124,6 +1136,12 @@ public String getApplicationName() { return applicationName; } + @Override + @Nullable + public String getApplicationVersion() { + return applicationVersion; + } + @Override public Iterable getControllers() { // Protect against modifications diff --git a/cdap-master-spi/src/main/java/io/cdap/cdap/master/spi/twill/ExtendedTwillApplication.java b/cdap-master-spi/src/main/java/io/cdap/cdap/master/spi/twill/ExtendedTwillApplication.java index 665d8a109ad3..06d706fb38f9 100644 --- a/cdap-master-spi/src/main/java/io/cdap/cdap/master/spi/twill/ExtendedTwillApplication.java +++ b/cdap-master-spi/src/main/java/io/cdap/cdap/master/spi/twill/ExtendedTwillApplication.java @@ -25,4 +25,6 @@ public interface ExtendedTwillApplication extends TwillApplication { String getRunId(); + String getApplicationVersion(); + } diff --git a/pom.xml b/pom.xml index 5fd3f5239be4..4f251e562417 100644 --- a/pom.xml +++ b/pom.xml @@ -172,7 +172,7 @@ 0.15.0-incubating 0.8.4 0.9.3 - 1.3.1 + 1.4.0-SNAPSHOT 2.3.6 3.4.5 1.3.1