From e51b676e4c14b665c02a7a8c5608ae39c4cbbe0f Mon Sep 17 00:00:00 2001 From: itsankit-google Date: Tue, 1 Aug 2023 19:15:04 +0000 Subject: [PATCH] add sidecar metadata service for credential provisioning --- .../preview/DistributedPreviewManager.java | 18 ++ .../app/worker/TaskWorkerServiceLauncher.java | 19 ++ .../sidecar/ArtifactLocalizerService.java | 3 +- .../GCPMetadataHttpHandlerInternal.java | 169 ++++++++++++++++++ .../io/cdap/cdap/common/conf/Constants.java | 1 + .../src/main/resources/cdap-default.xml | 8 + .../java/io/cdap/cdap/features/Feature.java | 3 +- .../cdap/k8s/runtime/KubeTwillPreparer.java | 16 +- 8 files changed, 232 insertions(+), 5 deletions(-) create mode 100644 cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/worker/sidecar/GCPMetadataHttpHandlerInternal.java diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/preview/DistributedPreviewManager.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/preview/DistributedPreviewManager.java index efb827d2f158..0d1089be673b 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/preview/DistributedPreviewManager.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/preview/DistributedPreviewManager.java @@ -16,8 +16,10 @@ package io.cdap.cdap.internal.app.preview; +import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; import com.google.inject.name.Named; +import io.cdap.cdap.api.feature.FeatureFlagsProvider; import io.cdap.cdap.api.metrics.MetricsCollectionService; import io.cdap.cdap.app.preview.PreviewConfigModule; import io.cdap.cdap.app.preview.PreviewManager; @@ -25,11 +27,14 @@ import io.cdap.cdap.app.store.preview.PreviewStore; import io.cdap.cdap.common.conf.CConfiguration; import io.cdap.cdap.common.conf.Constants; +import io.cdap.cdap.common.conf.Constants.ArtifactLocalizer; import io.cdap.cdap.common.conf.SConfiguration; +import io.cdap.cdap.common.feature.DefaultFeatureFlagsProvider; import io.cdap.cdap.common.utils.DirUtils; import io.cdap.cdap.data.runtime.DataSetsModules; import io.cdap.cdap.data2.dataset2.DatasetFramework; import io.cdap.cdap.data2.dataset2.lib.table.leveldb.LevelDBTableService; +import io.cdap.cdap.features.Feature; import io.cdap.cdap.internal.app.runtime.ProgramOptionConstants; import io.cdap.cdap.internal.app.worker.sidecar.ArtifactLocalizerTwillRunnable; import io.cdap.cdap.master.spi.twill.DependentTwillPreparer; @@ -46,6 +51,7 @@ import java.io.File; import java.io.IOException; import java.io.Writer; +import java.net.InetAddress; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; @@ -78,6 +84,7 @@ public class DistributedPreviewManager extends DefaultPreviewManager implements private final CConfiguration cConf; private final Configuration hConf; + private final FeatureFlagsProvider featureFlagsProvider; private final TwillRunner twillRunner; private ScheduledExecutorService scheduler; private TwillController controller; @@ -109,6 +116,7 @@ public class DistributedPreviewManager extends DefaultPreviewManager implements this.cConf = cConf; this.hConf = hConf; this.twillRunner = twillRunner; + this.featureFlagsProvider = new DefaultFeatureFlagsProvider(cConf); } @Override @@ -205,6 +213,16 @@ public void run() { NamespaceId.SYSTEM.getNamespace()); twillPreparer.withConfiguration(Collections.unmodifiableMap(configMap)); + if (Feature.NAMESPACE_RESOURCE_ISOLATION.isEnabled(featureFlagsProvider)) { + String localhost = InetAddress.getLoopbackAddress().getHostName(); + twillPreparer = twillPreparer.withEnv(PreviewRunnerTwillRunnable.class.getSimpleName(), + ImmutableMap.of( + ArtifactLocalizer.GCE_METADATA_HOST_ENV_VAR, + String.format("%s:%s", localhost, + cConf.getInt(Constants.ArtifactLocalizer.PORT)) + )); + } + String priorityClass = cConf.get(Constants.Preview.CONTAINER_PRIORITY_CLASS_NAME); if (priorityClass != null) { twillPreparer = twillPreparer.setSchedulerQueue(priorityClass); diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/worker/TaskWorkerServiceLauncher.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/worker/TaskWorkerServiceLauncher.java index 945d14ce7eea..09e0a04d4d14 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/worker/TaskWorkerServiceLauncher.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/worker/TaskWorkerServiceLauncher.java @@ -16,12 +16,17 @@ package io.cdap.cdap.internal.app.worker; +import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.AbstractScheduledService; import com.google.inject.Inject; +import io.cdap.cdap.api.feature.FeatureFlagsProvider; import io.cdap.cdap.common.conf.CConfiguration; import io.cdap.cdap.common.conf.Constants; +import io.cdap.cdap.common.conf.Constants.ArtifactLocalizer; import io.cdap.cdap.common.conf.Constants.TaskWorker; +import io.cdap.cdap.common.feature.DefaultFeatureFlagsProvider; import io.cdap.cdap.common.utils.DirUtils; +import io.cdap.cdap.features.Feature; import io.cdap.cdap.internal.app.runtime.ProgramOptionConstants; import io.cdap.cdap.internal.app.worker.sidecar.ArtifactLocalizerTwillRunnable; import io.cdap.cdap.master.spi.twill.DependentTwillPreparer; @@ -35,6 +40,7 @@ import java.io.File; import java.io.IOException; import java.io.Writer; +import java.net.InetAddress; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; @@ -64,6 +70,8 @@ public class TaskWorkerServiceLauncher extends AbstractScheduledService { private final CConfiguration cConf; private final Configuration hConf; + private final FeatureFlagsProvider featureFlagsProvider; + private final TwillRunner twillRunner; private TwillController twillController; @@ -78,6 +86,7 @@ public TaskWorkerServiceLauncher(CConfiguration cConf, Configuration hConf, this.cConf = cConf; this.hConf = hConf; this.twillRunner = twillRunner; + this.featureFlagsProvider = new DefaultFeatureFlagsProvider(cConf); } @Override @@ -182,6 +191,16 @@ public void run() { NamespaceId.SYSTEM.getNamespace()); twillPreparer.withConfiguration(Collections.unmodifiableMap(configMap)); + if (Feature.NAMESPACE_RESOURCE_ISOLATION.isEnabled(featureFlagsProvider)) { + String localhost = InetAddress.getLoopbackAddress().getHostName(); + twillPreparer = twillPreparer.withEnv(TaskWorkerTwillRunnable.class.getSimpleName(), + ImmutableMap.of( + ArtifactLocalizer.GCE_METADATA_HOST_ENV_VAR, + String.format("%s:%s", localhost, + cConf.getInt(Constants.ArtifactLocalizer.PORT)) + )); + } + String priorityClass = cConf.get(Constants.TaskWorker.CONTAINER_PRIORITY_CLASS_NAME); if (priorityClass != null) { twillPreparer = twillPreparer.setSchedulerQueue(priorityClass); diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/worker/sidecar/ArtifactLocalizerService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/worker/sidecar/ArtifactLocalizerService.java index a487f5e5a3ca..772b2ce9f872 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/worker/sidecar/ArtifactLocalizerService.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/worker/sidecar/ArtifactLocalizerService.java @@ -58,7 +58,8 @@ public class ArtifactLocalizerService extends AbstractIdleService { .setPort(cConf.getInt(Constants.ArtifactLocalizer.PORT)) .setBossThreadPoolSize(cConf.getInt(Constants.ArtifactLocalizer.BOSS_THREADS)) .setWorkerThreadPoolSize(cConf.getInt(Constants.ArtifactLocalizer.WORKER_THREADS)) - .setHttpHandlers(new ArtifactLocalizerHttpHandlerInternal(artifactLocalizer)) + .setHttpHandlers(new ArtifactLocalizerHttpHandlerInternal(artifactLocalizer), + new GCPMetadataHttpHandlerInternal(cConf)) .build(); this.cacheCleanupInterval = cConf.getInt( diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/worker/sidecar/GCPMetadataHttpHandlerInternal.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/worker/sidecar/GCPMetadataHttpHandlerInternal.java new file mode 100644 index 000000000000..601bf7c919f5 --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/worker/sidecar/GCPMetadataHttpHandlerInternal.java @@ -0,0 +1,169 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.app.worker.sidecar; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.inject.Singleton; +import io.cdap.cdap.common.ForbiddenException; +import io.cdap.cdap.common.conf.CConfiguration; +import io.cdap.cdap.common.conf.Constants; +import io.cdap.cdap.common.id.Id.Namespace; +import io.cdap.cdap.proto.BasicThrowable; +import io.cdap.cdap.proto.codec.BasicThrowableCodec; +import io.cdap.common.http.HttpRequests; +import io.cdap.common.http.HttpResponse; +import io.cdap.http.AbstractHttpHandler; +import io.cdap.http.HttpHandler; +import io.cdap.http.HttpResponder; +import io.netty.handler.codec.http.DefaultHttpHeaders; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponseStatus; +import java.net.URL; +import javax.ws.rs.DELETE; +import javax.ws.rs.GET; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.HttpHeaders; +import joptsimple.internal.Strings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Internal {@link HttpHandler} for Artifact Localizer. + */ +@Singleton +@Path("/") +public class GCPMetadataHttpHandlerInternal extends AbstractHttpHandler { + + private static final String METADATA_FLAVOR = "Metadata-Flavor"; + private static final String GOOGLE = "Google"; + private static final Logger LOG = LoggerFactory.getLogger(GCPMetadataHttpHandlerInternal.class); + private static final Gson GSON = new GsonBuilder().registerTypeAdapter(BasicThrowable.class, + new BasicThrowableCodec()).create(); + private final CConfiguration cConf; + private final String metadataServiceEndpoint; + private Namespace namespace; + + public GCPMetadataHttpHandlerInternal(CConfiguration cConf) { + this.cConf = cConf; + this.metadataServiceEndpoint = cConf.get( + Constants.TaskWorker.METADATA_SERVICE_END_POINT); + } + + /** + * Returns the status of metadata server. + * + * @param request The {@link HttpRequest}. + * @param responder a {@link HttpResponder} for sending response. + * @throws Exception if there is any error. + */ + @GET + @Path("/") + public void status(HttpRequest request, HttpResponder responder) throws Exception { + + // check that metadata header is present in the request. + if (!request.headers().contains(METADATA_FLAVOR, GOOGLE, true)) { + throw new ForbiddenException( + String.format("Request is missing required %s header. To access the metadata server, " + + "you must add the %s: %s header to your request.", METADATA_FLAVOR, + METADATA_FLAVOR, GOOGLE)); + } + responder.sendStatus(HttpResponseStatus.OK, + new DefaultHttpHeaders().add(METADATA_FLAVOR, GOOGLE)); + } + + @GET + @Path("/computeMetadata/v1/instance/service-accounts/default/token") + public void token(HttpRequest request, HttpResponder responder, + @QueryParam("scopes") String scopes) throws Exception { + + LOG.info("Token requested"); + // check that metadata header is present in the request. + if (!request.headers().contains(METADATA_FLAVOR, GOOGLE, true)) { + throw new ForbiddenException( + String.format("Request is missing required %s header. To access the metadata server, " + + "you must add the %s: %s header to your request.", METADATA_FLAVOR, + METADATA_FLAVOR, GOOGLE)); + } + + // TODO: CDAP-20750 + if (metadataServiceEndpoint == null) { + responder.sendString(HttpResponseStatus.NOT_IMPLEMENTED, + String.format("%s has not been set", + Constants.TaskWorker.METADATA_SERVICE_END_POINT)); + return; + } + + try { + URL url = new URL(metadataServiceEndpoint); + if (!Strings.isNullOrEmpty(scopes)) { + url = new URL(String.format("%s?scopes=%s", metadataServiceEndpoint, scopes)); + } + io.cdap.common.http.HttpRequest tokenRequest = io.cdap.common.http.HttpRequest.get(url) + .addHeader(METADATA_FLAVOR, GOOGLE) + .build(); + HttpResponse tokenResponse = HttpRequests.execute(tokenRequest); + responder.sendJson(HttpResponseStatus.OK, tokenResponse.getResponseBodyAsString()); + } catch (Exception ex) { + LOG.warn("Failed to fetch token from metadata service", ex); + responder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, exceptionToJson(ex), + new DefaultHttpHeaders().set(HttpHeaders.CONTENT_TYPE, "application/json")); + } + } + + /** + * Sets the CDAP Namespace information. + * + * @param request The {@link HttpRequest}. + * @param namespaceId Namespace id string. + * @param responder a {@link HttpResponder} for sending response. + */ + @PUT + @Path("/set-namespace/{namespace-id}") + public void setNamespace(HttpRequest request, HttpResponder responder, + @PathParam("namespace-id") String namespaceId) { + LOG.info("Set namespace {}", namespaceId); + this.namespace = new Namespace(namespaceId); + responder.sendStatus(HttpResponseStatus.OK); + } + + /** + * Clears the CDAP Namespace information. + * + * @param request The {@link HttpRequest}. + * @param responder a {@link HttpResponder} for sending response. + */ + @DELETE + @Path("/clear-namespace") + public void clearNamespace(HttpRequest request, HttpResponder responder) { + LOG.info("Clear namespace"); + this.namespace = null; + responder.sendStatus(HttpResponseStatus.OK); + } + + /** + * Return json representation of an exception. Used to propagate exception across network for + * better surfacing errors and debuggability. + */ + private String exceptionToJson(Exception ex) { + BasicThrowable basicThrowable = new BasicThrowable(ex); + return GSON.toJson(basicThrowable); + } +} diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java b/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java index 4977ddda8082..3aebe69466ca 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java @@ -586,6 +586,7 @@ public static final class ArtifactLocalizer { public static final String WORKER_THREADS = "artifact.localizer.worker.threads"; public static final String PRELOAD_LIST = "artifact.localizer.preload.list"; public static final String PRELOAD_VERSION_LIMIT = "artifact.localizer.preload.version.limit"; + public static final String GCE_METADATA_HOST_ENV_VAR = "GCE_METADATA_HOST"; } /** diff --git a/cdap-common/src/main/resources/cdap-default.xml b/cdap-common/src/main/resources/cdap-default.xml index 55a16b74d1a7..673068bcc420 100644 --- a/cdap-common/src/main/resources/cdap-default.xml +++ b/cdap-common/src/main/resources/cdap-default.xml @@ -5861,6 +5861,14 @@ + + feature.namespace.resource.isolation.enabled + true + + If true, namespace resource isolation will be enabled. + + + artifact.cache.bind.address 0.0.0.0 diff --git a/cdap-features/src/main/java/io/cdap/cdap/features/Feature.java b/cdap-features/src/main/java/io/cdap/cdap/features/Feature.java index da8346cc4c57..fca0c647769f 100644 --- a/cdap-features/src/main/java/io/cdap/cdap/features/Feature.java +++ b/cdap-features/src/main/java/io/cdap/cdap/features/Feature.java @@ -40,7 +40,8 @@ public enum Feature { SOURCE_CONTROL_MANAGEMENT_GIT("6.9.0"), WRANGLER_PRECONDITION_SQL("6.9.1"), WRANGLER_EXECUTION_SQL("6.10.0"), - WRANGLER_SCHEMA_MANAGEMENT("6.10.0"); + WRANGLER_SCHEMA_MANAGEMENT("6.10.0"), + NAMESPACE_RESOURCE_ISOLATION("6.10.0"); private final PlatformInfo.Version versionIntroduced; private final boolean defaultAfterIntroduction; diff --git a/cdap-kubernetes/src/main/java/io/cdap/cdap/k8s/runtime/KubeTwillPreparer.java b/cdap-kubernetes/src/main/java/io/cdap/cdap/k8s/runtime/KubeTwillPreparer.java index ad682c8e3dc9..4ec2ad3a4f46 100644 --- a/cdap-kubernetes/src/main/java/io/cdap/cdap/k8s/runtime/KubeTwillPreparer.java +++ b/cdap-kubernetes/src/main/java/io/cdap/cdap/k8s/runtime/KubeTwillPreparer.java @@ -168,6 +168,7 @@ class KubeTwillPreparer implements DependentTwillPreparer, StatefulTwillPreparer // Configmap that stores localized config files private static final String CONFIGMAP_MOUNTPATH = "/config"; public static final String CONFIGMAP_NAME_PREFIX = "cdap-config-"; + public static final String GCE_METADATA_HOST_ENV_VAR = "GCE_METADATA_HOST"; private final MasterEnvironmentContext masterEnvContext; private final ApiClient apiClient; @@ -1164,9 +1165,13 @@ private V1PodSpec createPodSpec(Type resourceType, Location runtimeConfigLocatio .filter( envVar -> !envVar.getName().equals(WorkloadIdentityUtil.WORKLOAD_IDENTITY_ENV_VAR_KEY)) .collect(Collectors.toMap(V1EnvVar::getName, V1EnvVar::getValue)); - // Add all environments of the the main runnable for the init container. + // Add all environments of the main runnable for the init container except GCE_METADATA_HOST. if (environments.get(runnableName) != null) { - initContainerEnvirons.putAll(environments.get(runnableName)); + Map envs = environments.get(runnableName).entrySet().stream() + .filter(entry -> !entry.getKey().equals(GCE_METADATA_HOST_ENV_VAR)) + .collect(Collectors.toMap(Map.Entry::getKey, + Map.Entry::getValue)); + initContainerEnvirons.putAll(envs); } // Add JVM options to environment. initContainerEnvirons.put(JAVA_OPTS_KEY, masterEnvContext.getConfigurations() @@ -1291,11 +1296,16 @@ private List createContainers(Type resourceType, environs.putAll(environments.get(name)); // Add JVM options to environment. environs.put(JAVA_OPTS_KEY, jvmOpts); + // remove GCE_METADATA_HOST_ENV_VAR from the dependent runnable container. + Map envs = environs.entrySet().stream() + .filter(entry -> !entry.getKey().equals(GCE_METADATA_HOST_ENV_VAR)) + .collect(Collectors.toMap(Map.Entry::getKey, + Map.Entry::getValue)); mounts = addSecreteVolMountIfNeeded(spec, volumeMounts); containers.add( createContainer(name, podInfo.getContainerImage(), podInfo.getImagePullPolicy(), workDir, createResourceRequirements(spec.getResourceSpecification()), - mounts, environs, KubeTwillLauncher.class, + mounts, envs, KubeTwillLauncher.class, Stream.concat(Stream.of(name), args.stream()).toArray(String[]::new))); }