diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/preview/DistributedPreviewManager.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/preview/DistributedPreviewManager.java
index efb827d2f158..3bd1671d5d9d 100644
--- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/preview/DistributedPreviewManager.java
+++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/preview/DistributedPreviewManager.java
@@ -16,8 +16,10 @@
package io.cdap.cdap.internal.app.preview;
+import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import com.google.inject.name.Named;
+import io.cdap.cdap.api.feature.FeatureFlagsProvider;
import io.cdap.cdap.api.metrics.MetricsCollectionService;
import io.cdap.cdap.app.preview.PreviewConfigModule;
import io.cdap.cdap.app.preview.PreviewManager;
@@ -25,11 +27,14 @@
import io.cdap.cdap.app.store.preview.PreviewStore;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
+import io.cdap.cdap.common.conf.Constants.ArtifactLocalizer;
import io.cdap.cdap.common.conf.SConfiguration;
+import io.cdap.cdap.common.feature.DefaultFeatureFlagsProvider;
import io.cdap.cdap.common.utils.DirUtils;
import io.cdap.cdap.data.runtime.DataSetsModules;
import io.cdap.cdap.data2.dataset2.DatasetFramework;
import io.cdap.cdap.data2.dataset2.lib.table.leveldb.LevelDBTableService;
+import io.cdap.cdap.features.Feature;
import io.cdap.cdap.internal.app.runtime.ProgramOptionConstants;
import io.cdap.cdap.internal.app.worker.sidecar.ArtifactLocalizerTwillRunnable;
import io.cdap.cdap.master.spi.twill.DependentTwillPreparer;
@@ -46,6 +51,7 @@
import java.io.File;
import java.io.IOException;
import java.io.Writer;
+import java.net.InetAddress;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -78,6 +84,7 @@ public class DistributedPreviewManager extends DefaultPreviewManager implements
private final CConfiguration cConf;
private final Configuration hConf;
+ private final FeatureFlagsProvider featureFlagsProvider;
private final TwillRunner twillRunner;
private ScheduledExecutorService scheduler;
private TwillController controller;
@@ -90,10 +97,10 @@ public class DistributedPreviewManager extends DefaultPreviewManager implements
AccessControllerInstantiator accessControllerInstantiator,
AccessEnforcer accessEnforcer,
AuthenticationContext authenticationContext,
- @Named(PreviewConfigModule.PREVIEW_LEVEL_DB) LevelDBTableService previewLevelDBTableService,
- @Named(PreviewConfigModule.PREVIEW_CCONF) CConfiguration previewCConf,
- @Named(PreviewConfigModule.PREVIEW_HCONF) Configuration previewHConf,
- @Named(PreviewConfigModule.PREVIEW_SCONF) SConfiguration previewSConf,
+ @Named(PreviewConfigModule.PREVIEW_LEVEL_DB) LevelDBTableService previewLevelDbTableService,
+ @Named(PreviewConfigModule.PREVIEW_CCONF) CConfiguration previewCconf,
+ @Named(PreviewConfigModule.PREVIEW_HCONF) Configuration previewHconf,
+ @Named(PreviewConfigModule.PREVIEW_SCONF) SConfiguration previewSconf,
PreviewRequestQueue previewRequestQueue, PreviewStore previewStore,
PreviewRunStopper previewRunStopper, MessagingService messagingService,
MetricsCollectionService metricsCollectionService,
@@ -101,14 +108,15 @@ public class DistributedPreviewManager extends DefaultPreviewManager implements
TwillRunner twillRunner) {
super(discoveryServiceClient, datasetFramework, transactionSystemClient,
accessControllerInstantiator, accessEnforcer, authenticationContext,
- previewLevelDBTableService,
- previewCConf, previewHConf, previewSConf, previewRequestQueue, previewStore,
+ previewLevelDbTableService,
+ previewCconf, previewHconf, previewSconf, previewRequestQueue, previewStore,
previewRunStopper,
messagingService, previewDataCleanupService, metricsCollectionService);
this.cConf = cConf;
this.hConf = hConf;
this.twillRunner = twillRunner;
+ this.featureFlagsProvider = new DefaultFeatureFlagsProvider(cConf);
}
@Override
@@ -205,6 +213,16 @@ public void run() {
NamespaceId.SYSTEM.getNamespace());
twillPreparer.withConfiguration(Collections.unmodifiableMap(configMap));
+ if (Feature.NAMESPACED_SERVICE_ACCOUNTS.isEnabled(featureFlagsProvider)) {
+ String localhost = InetAddress.getLoopbackAddress().getHostName();
+ twillPreparer = twillPreparer.withEnv(PreviewRunnerTwillRunnable.class.getSimpleName(),
+ ImmutableMap.of(
+ ArtifactLocalizer.GCE_METADATA_HOST_ENV_VAR,
+ String.format("%s:%s", localhost,
+ cConf.getInt(Constants.ArtifactLocalizer.PORT))
+ ));
+ }
+
String priorityClass = cConf.get(Constants.Preview.CONTAINER_PRIORITY_CLASS_NAME);
if (priorityClass != null) {
twillPreparer = twillPreparer.setSchedulerQueue(priorityClass);
diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/worker/TaskWorkerServiceLauncher.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/worker/TaskWorkerServiceLauncher.java
index 945d14ce7eea..10ea86e1db98 100644
--- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/worker/TaskWorkerServiceLauncher.java
+++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/worker/TaskWorkerServiceLauncher.java
@@ -16,12 +16,17 @@
package io.cdap.cdap.internal.app.worker;
+import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.AbstractScheduledService;
import com.google.inject.Inject;
+import io.cdap.cdap.api.feature.FeatureFlagsProvider;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
+import io.cdap.cdap.common.conf.Constants.ArtifactLocalizer;
import io.cdap.cdap.common.conf.Constants.TaskWorker;
+import io.cdap.cdap.common.feature.DefaultFeatureFlagsProvider;
import io.cdap.cdap.common.utils.DirUtils;
+import io.cdap.cdap.features.Feature;
import io.cdap.cdap.internal.app.runtime.ProgramOptionConstants;
import io.cdap.cdap.internal.app.worker.sidecar.ArtifactLocalizerTwillRunnable;
import io.cdap.cdap.master.spi.twill.DependentTwillPreparer;
@@ -35,6 +40,7 @@
import java.io.File;
import java.io.IOException;
import java.io.Writer;
+import java.net.InetAddress;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -64,6 +70,8 @@ public class TaskWorkerServiceLauncher extends AbstractScheduledService {
private final CConfiguration cConf;
private final Configuration hConf;
+ private final FeatureFlagsProvider featureFlagsProvider;
+
private final TwillRunner twillRunner;
private TwillController twillController;
@@ -78,6 +86,7 @@ public TaskWorkerServiceLauncher(CConfiguration cConf, Configuration hConf,
this.cConf = cConf;
this.hConf = hConf;
this.twillRunner = twillRunner;
+ this.featureFlagsProvider = new DefaultFeatureFlagsProvider(cConf);
}
@Override
@@ -182,6 +191,16 @@ public void run() {
NamespaceId.SYSTEM.getNamespace());
twillPreparer.withConfiguration(Collections.unmodifiableMap(configMap));
+ if (Feature.NAMESPACED_SERVICE_ACCOUNTS.isEnabled(featureFlagsProvider)) {
+ String localhost = InetAddress.getLoopbackAddress().getHostName();
+ twillPreparer = twillPreparer.withEnv(TaskWorkerTwillRunnable.class.getSimpleName(),
+ ImmutableMap.of(
+ ArtifactLocalizer.GCE_METADATA_HOST_ENV_VAR,
+ String.format("%s:%s", localhost,
+ cConf.getInt(Constants.ArtifactLocalizer.PORT))
+ ));
+ }
+
String priorityClass = cConf.get(Constants.TaskWorker.CONTAINER_PRIORITY_CLASS_NAME);
if (priorityClass != null) {
twillPreparer = twillPreparer.setSchedulerQueue(priorityClass);
diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/worker/sidecar/ArtifactLocalizerService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/worker/sidecar/ArtifactLocalizerService.java
index a487f5e5a3ca..9a180300ad4a 100644
--- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/worker/sidecar/ArtifactLocalizerService.java
+++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/worker/sidecar/ArtifactLocalizerService.java
@@ -58,7 +58,8 @@ public class ArtifactLocalizerService extends AbstractIdleService {
.setPort(cConf.getInt(Constants.ArtifactLocalizer.PORT))
.setBossThreadPoolSize(cConf.getInt(Constants.ArtifactLocalizer.BOSS_THREADS))
.setWorkerThreadPoolSize(cConf.getInt(Constants.ArtifactLocalizer.WORKER_THREADS))
- .setHttpHandlers(new ArtifactLocalizerHttpHandlerInternal(artifactLocalizer))
+ .setHttpHandlers(new ArtifactLocalizerHttpHandlerInternal(artifactLocalizer),
+ new GcpMetadataHttpHandlerInternal(cConf))
.build();
this.cacheCleanupInterval = cConf.getInt(
diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/worker/sidecar/GcpMetadataHttpHandlerInternal.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/worker/sidecar/GcpMetadataHttpHandlerInternal.java
new file mode 100644
index 000000000000..f5836bcc1f2a
--- /dev/null
+++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/worker/sidecar/GcpMetadataHttpHandlerInternal.java
@@ -0,0 +1,194 @@
+/*
+ * Copyright © 2023 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.cdap.internal.app.worker.sidecar;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonSyntaxException;
+import com.google.inject.Singleton;
+import io.cdap.cdap.common.BadRequestException;
+import io.cdap.cdap.common.ForbiddenException;
+import io.cdap.cdap.common.conf.CConfiguration;
+import io.cdap.cdap.common.conf.Constants;
+import io.cdap.cdap.gateway.handlers.util.AbstractAppFabricHttpHandler;
+import io.cdap.cdap.proto.BasicThrowable;
+import io.cdap.cdap.proto.codec.BasicThrowableCodec;
+import io.cdap.cdap.proto.security.GcpMetadataTaskContext;
+import io.cdap.common.http.HttpRequests;
+import io.cdap.common.http.HttpResponse;
+import io.cdap.http.HttpHandler;
+import io.cdap.http.HttpResponder;
+import io.netty.handler.codec.http.DefaultHttpHeaders;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import java.net.URL;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.HttpHeaders;
+import joptsimple.internal.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Internal {@link HttpHandler} for Artifact Localizer.
+ */
+@Singleton
+@Path("/")
+public class GcpMetadataHttpHandlerInternal extends AbstractAppFabricHttpHandler {
+
+ protected static final String METADATA_FLAVOR_HEADER_KEY = "Metadata-Flavor";
+ protected static final String METADATA_FLAVOR_HEADER_VALUE = "Google";
+ private static final Logger LOG = LoggerFactory.getLogger(GcpMetadataHttpHandlerInternal.class);
+ private static final Gson GSON = new GsonBuilder().registerTypeAdapter(BasicThrowable.class,
+ new BasicThrowableCodec()).create();
+ private final CConfiguration cConf;
+ private final String metadataServiceEndpoint;
+ private GcpMetadataTaskContext gcpMetadataTaskContext;
+
+ /**
+ * Constructs the {@link GcpMetadataHttpHandlerInternal}.
+ *
+ * @param cConf CConfiguration
+ */
+ public GcpMetadataHttpHandlerInternal(CConfiguration cConf) {
+ this.cConf = cConf;
+ this.metadataServiceEndpoint = cConf.get(
+ Constants.TaskWorker.METADATA_SERVICE_END_POINT);
+ }
+
+ /**
+ * Returns the status of metadata server.
+ *
+ * @param request The {@link HttpRequest}.
+ * @param responder a {@link HttpResponder} for sending response.
+ * @throws Exception if there is any error.
+ */
+ @GET
+ @Path("/")
+ public void status(HttpRequest request, HttpResponder responder) throws Exception {
+
+ // check that metadata header is present in the request.
+ if (!request.headers().contains(METADATA_FLAVOR_HEADER_KEY,
+ METADATA_FLAVOR_HEADER_VALUE, true)) {
+ throw new ForbiddenException(
+ String.format("Request is missing required %s header. To access the metadata server, "
+ + "you must add the %s: %s header to your request.", METADATA_FLAVOR_HEADER_KEY,
+ METADATA_FLAVOR_HEADER_KEY, METADATA_FLAVOR_HEADER_VALUE));
+ }
+ responder.sendStatus(HttpResponseStatus.OK,
+ new DefaultHttpHeaders().add(METADATA_FLAVOR_HEADER_KEY, METADATA_FLAVOR_HEADER_VALUE));
+ }
+
+ /**
+ * Returns the token of metadata server.
+ *
+ * @param request The {@link HttpRequest}.
+ * @param responder a {@link HttpResponder} for sending response.
+ * @throws Exception if there is any error.
+ */
+ @GET
+ @Path("/computeMetadata/v1/instance/service-accounts/default/token")
+ public void token(HttpRequest request, HttpResponder responder,
+ @QueryParam("scopes") String scopes) throws Exception {
+
+ LOG.debug("Token requested for namespace: {}", gcpMetadataTaskContext.getNamespace());
+ // check that metadata header is present in the request.
+ if (!request.headers().contains(METADATA_FLAVOR_HEADER_KEY,
+ METADATA_FLAVOR_HEADER_VALUE, true)) {
+ throw new ForbiddenException(
+ String.format("Request is missing required %s header. To access the metadata server, "
+ + "you must add the %s: %s header to your request.", METADATA_FLAVOR_HEADER_KEY,
+ METADATA_FLAVOR_HEADER_KEY, METADATA_FLAVOR_HEADER_VALUE));
+ }
+
+ // TODO: CDAP-20750
+ if (metadataServiceEndpoint == null) {
+ responder.sendString(HttpResponseStatus.NOT_IMPLEMENTED,
+ String.format("%s has not been set",
+ Constants.TaskWorker.METADATA_SERVICE_END_POINT));
+ return;
+ }
+
+ try {
+ URL url = new URL(metadataServiceEndpoint);
+ if (!Strings.isNullOrEmpty(scopes)) {
+ url = new URL(String.format("%s?scopes=%s", metadataServiceEndpoint, scopes));
+ }
+ io.cdap.common.http.HttpRequest tokenRequest = io.cdap.common.http.HttpRequest.get(url)
+ .addHeader(METADATA_FLAVOR_HEADER_KEY, METADATA_FLAVOR_HEADER_VALUE)
+ .build();
+ HttpResponse tokenResponse = HttpRequests.execute(tokenRequest);
+ responder.sendJson(HttpResponseStatus.OK, tokenResponse.getResponseBodyAsString());
+ } catch (Exception ex) {
+ LOG.warn("Failed to fetch token from metadata service", ex);
+ responder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, exceptionToJson(ex),
+ new DefaultHttpHeaders().set(HttpHeaders.CONTENT_TYPE, "application/json"));
+ }
+ }
+
+ /**
+ * Sets the CDAP Namespace information.
+ *
+ * @param request The {@link HttpRequest}.
+ * @param responder a {@link HttpResponder} for sending response.
+ */
+ @PUT
+ @Path("/set-context")
+ public void setContext(FullHttpRequest request, HttpResponder responder)
+ throws BadRequestException {
+ this.gcpMetadataTaskContext = getGcpMetadataTaskContext(request);
+ responder.sendJson(HttpResponseStatus.OK,
+ String.format("Context was set successfully with namespace '%s'.",
+ gcpMetadataTaskContext.getNamespace()));
+ }
+
+ /**
+ * Clears the CDAP Namespace information.
+ *
+ * @param request The {@link HttpRequest}.
+ * @param responder a {@link HttpResponder} for sending response.
+ */
+ @DELETE
+ @Path("/clear-context")
+ public void clearContext(HttpRequest request, HttpResponder responder) {
+ this.gcpMetadataTaskContext = null;
+ LOG.debug("Context cleared.");
+ responder.sendStatus(HttpResponseStatus.OK);
+ }
+
+ /**
+ * Return json representation of an exception. Used to propagate exception across network for
+ * better surfacing errors and debuggability.
+ */
+ private String exceptionToJson(Exception ex) {
+ BasicThrowable basicThrowable = new BasicThrowable(ex);
+ return GSON.toJson(basicThrowable);
+ }
+
+ private GcpMetadataTaskContext getGcpMetadataTaskContext(FullHttpRequest httpRequest)
+ throws BadRequestException {
+ try {
+ return parseBody(httpRequest, GcpMetadataTaskContext.class);
+ } catch (JsonSyntaxException e) {
+ throw new BadRequestException("Invalid json object provided in request body.");
+ }
+ }
+}
diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/worker/sidecar/GcpMetadataHttpHandlerInternalTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/worker/sidecar/GcpMetadataHttpHandlerInternalTest.java
new file mode 100644
index 000000000000..7e90a4c09f7c
--- /dev/null
+++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/worker/sidecar/GcpMetadataHttpHandlerInternalTest.java
@@ -0,0 +1,127 @@
+/*
+ * Copyright © 2023 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.cdap.internal.app.worker.sidecar;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import io.cdap.cdap.common.conf.CConfiguration;
+import io.cdap.cdap.common.conf.Constants;
+import io.cdap.cdap.common.http.CommonNettyHttpServiceBuilder;
+import io.cdap.cdap.common.metrics.NoOpMetricsCollectionService;
+import io.cdap.cdap.common.namespace.InMemoryNamespaceAdmin;
+import io.cdap.cdap.common.namespace.NamespaceAdmin;
+import io.cdap.cdap.internal.app.ApplicationSpecificationAdapter;
+import io.cdap.cdap.proto.NamespaceMeta;
+import io.cdap.cdap.proto.id.NamespaceId;
+import io.cdap.cdap.proto.security.GcpMetadataTaskContext;
+import io.cdap.common.http.HttpRequest;
+import io.cdap.common.http.HttpRequests;
+import io.cdap.common.http.HttpResponse;
+import io.cdap.http.ChannelPipelineModifier;
+import io.cdap.http.NettyHttpService;
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.http.HttpContentDecompressor;
+import java.net.URL;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Tests for {@link GcpMetadataHttpHandlerInternal}.
+ */
+public class GcpMetadataHttpHandlerInternalTest {
+
+ @ClassRule
+ public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
+ private static final Gson GSON =
+ ApplicationSpecificationAdapter.addTypeAdapters(new GsonBuilder()).create();
+ private static NettyHttpService httpService;
+
+ @BeforeClass
+ public static void init() throws Exception {
+ CConfiguration cConf = CConfiguration.create();
+ cConf.set(Constants.CFG_LOCAL_DATA_DIR, TEMP_FOLDER.newFolder().getAbsolutePath());
+
+ NamespaceAdmin namespaceAdmin = new InMemoryNamespaceAdmin();
+ namespaceAdmin.create(NamespaceMeta.SYSTEM);
+ namespaceAdmin.create(NamespaceMeta.DEFAULT);
+
+ httpService = new CommonNettyHttpServiceBuilder(cConf, "test",
+ new NoOpMetricsCollectionService())
+ .setHttpHandlers(
+ new GcpMetadataHttpHandlerInternal(cConf)
+ )
+ .setChannelPipelineModifier(new ChannelPipelineModifier() {
+ @Override
+ public void modify(ChannelPipeline pipeline) {
+ pipeline.addAfter("compressor", "decompressor", new HttpContentDecompressor());
+ }
+ })
+ .build();
+ httpService.start();
+ }
+
+ @AfterClass
+ public static void finish() throws Exception {
+ httpService.stop();
+ }
+
+ @Test
+ public void testStatus() throws Exception {
+ String endpoint = String.format("http://%s:%s/",
+ httpService.getBindAddress().getHostName(), httpService.getBindAddress().getPort());
+ URL url = new URL(endpoint);
+ HttpRequest httpRequest = HttpRequest.get(url).build();
+ HttpResponse httpResponse = HttpRequests.execute(httpRequest);
+ Assert.assertEquals(403, httpResponse.getResponseCode());
+
+ httpRequest = HttpRequest.get(url).addHeader(
+ GcpMetadataHttpHandlerInternal.METADATA_FLAVOR_HEADER_KEY,
+ GcpMetadataHttpHandlerInternal.METADATA_FLAVOR_HEADER_VALUE).build();
+ httpResponse = HttpRequests.execute(httpRequest);
+ Assert.assertEquals(200, httpResponse.getResponseCode());
+ }
+
+ @Test
+ public void testSetAndClearContext() throws Exception {
+ String endpoint = String.format("http://%s:%s",
+ httpService.getBindAddress().getHostName(), httpService.getBindAddress().getPort());
+ NamespaceId namespaceId = new NamespaceId("test");
+ GcpMetadataTaskContext gcpMetadataTaskContext =
+ new GcpMetadataTaskContext(namespaceId.getNamespace(),
+ "alice", "0.0.0.0", null);
+
+ // set context
+ URL url = new URL(String.format("%s/set-context", endpoint));
+ HttpRequest httpRequest = HttpRequest.put(url).withBody(GSON.toJson(gcpMetadataTaskContext,
+ GcpMetadataTaskContext.class)).build();
+ HttpResponse httpResponse = HttpRequests.execute(httpRequest);
+ Assert.assertEquals(200, httpResponse.getResponseCode());
+ String expectedResponse = String.format("Context was set successfully with namespace '%s'.",
+ namespaceId.getNamespace());
+ Assert.assertEquals(expectedResponse, httpResponse.getResponseBodyAsString());
+
+ //clear context
+ url = new URL(String.format("%s/clear-context", endpoint));
+ httpRequest = HttpRequest.delete(url).build();
+ httpResponse = HttpRequests.execute(httpRequest);
+ Assert.assertEquals(200, httpResponse.getResponseCode());
+ }
+}
diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java b/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java
index 4977ddda8082..3aebe69466ca 100644
--- a/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java
+++ b/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java
@@ -586,6 +586,7 @@ public static final class ArtifactLocalizer {
public static final String WORKER_THREADS = "artifact.localizer.worker.threads";
public static final String PRELOAD_LIST = "artifact.localizer.preload.list";
public static final String PRELOAD_VERSION_LIMIT = "artifact.localizer.preload.version.limit";
+ public static final String GCE_METADATA_HOST_ENV_VAR = "GCE_METADATA_HOST";
}
/**
diff --git a/cdap-common/src/main/resources/cdap-default.xml b/cdap-common/src/main/resources/cdap-default.xml
index 55a16b74d1a7..76f733293246 100644
--- a/cdap-common/src/main/resources/cdap-default.xml
+++ b/cdap-common/src/main/resources/cdap-default.xml
@@ -5861,6 +5861,14 @@
+
+ feature.namespaced.service.accounts.enabled
+ false
+
+ If true, namespace resource isolation will be enabled.
+
+
+
artifact.cache.bind.address
0.0.0.0
diff --git a/cdap-features/src/main/java/io/cdap/cdap/features/Feature.java b/cdap-features/src/main/java/io/cdap/cdap/features/Feature.java
index da8346cc4c57..4f15fab8a6a7 100644
--- a/cdap-features/src/main/java/io/cdap/cdap/features/Feature.java
+++ b/cdap-features/src/main/java/io/cdap/cdap/features/Feature.java
@@ -40,7 +40,8 @@ public enum Feature {
SOURCE_CONTROL_MANAGEMENT_GIT("6.9.0"),
WRANGLER_PRECONDITION_SQL("6.9.1"),
WRANGLER_EXECUTION_SQL("6.10.0"),
- WRANGLER_SCHEMA_MANAGEMENT("6.10.0");
+ WRANGLER_SCHEMA_MANAGEMENT("6.10.0"),
+ NAMESPACED_SERVICE_ACCOUNTS("6.10.0");
private final PlatformInfo.Version versionIntroduced;
private final boolean defaultAfterIntroduction;
diff --git a/cdap-kubernetes/src/main/java/io/cdap/cdap/k8s/runtime/KubeTwillPreparer.java b/cdap-kubernetes/src/main/java/io/cdap/cdap/k8s/runtime/KubeTwillPreparer.java
index ad682c8e3dc9..02b892605bc2 100644
--- a/cdap-kubernetes/src/main/java/io/cdap/cdap/k8s/runtime/KubeTwillPreparer.java
+++ b/cdap-kubernetes/src/main/java/io/cdap/cdap/k8s/runtime/KubeTwillPreparer.java
@@ -168,6 +168,7 @@ class KubeTwillPreparer implements DependentTwillPreparer, StatefulTwillPreparer
// Configmap that stores localized config files
private static final String CONFIGMAP_MOUNTPATH = "/config";
public static final String CONFIGMAP_NAME_PREFIX = "cdap-config-";
+ public static final String GCE_METADATA_HOST_ENV_VAR = "GCE_METADATA_HOST";
private final MasterEnvironmentContext masterEnvContext;
private final ApiClient apiClient;
@@ -1164,9 +1165,14 @@ private V1PodSpec createPodSpec(Type resourceType, Location runtimeConfigLocatio
.filter(
envVar -> !envVar.getName().equals(WorkloadIdentityUtil.WORKLOAD_IDENTITY_ENV_VAR_KEY))
.collect(Collectors.toMap(V1EnvVar::getName, V1EnvVar::getValue));
- // Add all environments of the the main runnable for the init container.
+ // Add all environments of the main runnable for the init container except GCE_METADATA_HOST.
+ // Since GCE_METADATA_HOST is used to override the metadata server only in the main runnable.
if (environments.get(runnableName) != null) {
- initContainerEnvirons.putAll(environments.get(runnableName));
+ Map envs = environments.get(runnableName).entrySet().stream()
+ .filter(entry -> !entry.getKey().equals(GCE_METADATA_HOST_ENV_VAR))
+ .collect(Collectors.toMap(Map.Entry::getKey,
+ Map.Entry::getValue));
+ initContainerEnvirons.putAll(envs);
}
// Add JVM options to environment.
initContainerEnvirons.put(JAVA_OPTS_KEY, masterEnvContext.getConfigurations()
@@ -1291,11 +1297,16 @@ private List createContainers(Type resourceType,
environs.putAll(environments.get(name));
// Add JVM options to environment.
environs.put(JAVA_OPTS_KEY, jvmOpts);
+ // remove GCE_METADATA_HOST_ENV_VAR from the dependent runnable container.
+ Map envs = environs.entrySet().stream()
+ .filter(entry -> !entry.getKey().equals(GCE_METADATA_HOST_ENV_VAR))
+ .collect(Collectors.toMap(Map.Entry::getKey,
+ Map.Entry::getValue));
mounts = addSecreteVolMountIfNeeded(spec, volumeMounts);
containers.add(
createContainer(name, podInfo.getContainerImage(), podInfo.getImagePullPolicy(), workDir,
createResourceRequirements(spec.getResourceSpecification()),
- mounts, environs, KubeTwillLauncher.class,
+ mounts, envs, KubeTwillLauncher.class,
Stream.concat(Stream.of(name), args.stream()).toArray(String[]::new)));
}
diff --git a/cdap-proto/src/main/java/io/cdap/cdap/proto/security/GcpMetadataTaskContext.java b/cdap-proto/src/main/java/io/cdap/cdap/proto/security/GcpMetadataTaskContext.java
new file mode 100644
index 000000000000..1e3d2a639f9c
--- /dev/null
+++ b/cdap-proto/src/main/java/io/cdap/cdap/proto/security/GcpMetadataTaskContext.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright © 2023 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.cdap.proto.security;
+
+import javax.annotation.Nullable;
+
+/**
+ * Defines the task context used by GcpMetadataHttpHandlerInternal.
+ */
+public final class GcpMetadataTaskContext {
+ private final String namespace;
+
+ private final String userId;
+
+ private final String userIP;
+
+ private final Credential userCredential;
+
+ /**
+ * Constructs a {@link GcpMetadataTaskContext}.
+ *
+ * @param namespace the namespace id string.
+ * @param userId the user Id.
+ * @param userIP the user Ip.
+ * @param userCredential the user credential.
+ */
+ public GcpMetadataTaskContext(String namespace, String userId,
+ String userIP, @Nullable Credential userCredential) {
+ this.namespace = namespace;
+ this.userId = userId;
+ this.userIP = userIP;
+ this.userCredential = userCredential;
+ }
+
+ public String getNamespace() {
+ return namespace;
+ }
+
+ public String getUserId() {
+ return userId;
+ }
+
+ public String getUserIp() {
+ return userIP;
+ }
+
+ public Credential getUserCredential() {
+ return userCredential;
+ }
+}