Skip to content

Commit

Permalink
add sidecar metadata service for credential provisioning
Browse files Browse the repository at this point in the history
  • Loading branch information
itsankit-google committed Aug 2, 2023
1 parent 6d5e189 commit e51b676
Show file tree
Hide file tree
Showing 8 changed files with 232 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,25 @@

package io.cdap.cdap.internal.app.preview;

import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import io.cdap.cdap.api.feature.FeatureFlagsProvider;
import io.cdap.cdap.api.metrics.MetricsCollectionService;
import io.cdap.cdap.app.preview.PreviewConfigModule;
import io.cdap.cdap.app.preview.PreviewManager;
import io.cdap.cdap.app.preview.PreviewRequestQueue;
import io.cdap.cdap.app.store.preview.PreviewStore;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.conf.Constants.ArtifactLocalizer;
import io.cdap.cdap.common.conf.SConfiguration;
import io.cdap.cdap.common.feature.DefaultFeatureFlagsProvider;
import io.cdap.cdap.common.utils.DirUtils;
import io.cdap.cdap.data.runtime.DataSetsModules;
import io.cdap.cdap.data2.dataset2.DatasetFramework;
import io.cdap.cdap.data2.dataset2.lib.table.leveldb.LevelDBTableService;
import io.cdap.cdap.features.Feature;
import io.cdap.cdap.internal.app.runtime.ProgramOptionConstants;
import io.cdap.cdap.internal.app.worker.sidecar.ArtifactLocalizerTwillRunnable;
import io.cdap.cdap.master.spi.twill.DependentTwillPreparer;
Expand All @@ -46,6 +51,7 @@
import java.io.File;
import java.io.IOException;
import java.io.Writer;
import java.net.InetAddress;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
Expand Down Expand Up @@ -78,6 +84,7 @@ public class DistributedPreviewManager extends DefaultPreviewManager implements

private final CConfiguration cConf;
private final Configuration hConf;
private final FeatureFlagsProvider featureFlagsProvider;
private final TwillRunner twillRunner;
private ScheduledExecutorService scheduler;
private TwillController controller;
Expand Down Expand Up @@ -109,6 +116,7 @@ public class DistributedPreviewManager extends DefaultPreviewManager implements
this.cConf = cConf;
this.hConf = hConf;
this.twillRunner = twillRunner;
this.featureFlagsProvider = new DefaultFeatureFlagsProvider(cConf);
}

@Override
Expand Down Expand Up @@ -205,6 +213,16 @@ public void run() {
NamespaceId.SYSTEM.getNamespace());
twillPreparer.withConfiguration(Collections.unmodifiableMap(configMap));

if (Feature.NAMESPACE_RESOURCE_ISOLATION.isEnabled(featureFlagsProvider)) {
String localhost = InetAddress.getLoopbackAddress().getHostName();
twillPreparer = twillPreparer.withEnv(PreviewRunnerTwillRunnable.class.getSimpleName(),
ImmutableMap.of(
ArtifactLocalizer.GCE_METADATA_HOST_ENV_VAR,
String.format("%s:%s", localhost,
cConf.getInt(Constants.ArtifactLocalizer.PORT))
));
}

String priorityClass = cConf.get(Constants.Preview.CONTAINER_PRIORITY_CLASS_NAME);
if (priorityClass != null) {
twillPreparer = twillPreparer.setSchedulerQueue(priorityClass);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@

package io.cdap.cdap.internal.app.worker;

import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.AbstractScheduledService;
import com.google.inject.Inject;
import io.cdap.cdap.api.feature.FeatureFlagsProvider;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.conf.Constants.ArtifactLocalizer;
import io.cdap.cdap.common.conf.Constants.TaskWorker;
import io.cdap.cdap.common.feature.DefaultFeatureFlagsProvider;
import io.cdap.cdap.common.utils.DirUtils;
import io.cdap.cdap.features.Feature;
import io.cdap.cdap.internal.app.runtime.ProgramOptionConstants;
import io.cdap.cdap.internal.app.worker.sidecar.ArtifactLocalizerTwillRunnable;
import io.cdap.cdap.master.spi.twill.DependentTwillPreparer;
Expand All @@ -35,6 +40,7 @@
import java.io.File;
import java.io.IOException;
import java.io.Writer;
import java.net.InetAddress;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
Expand Down Expand Up @@ -64,6 +70,8 @@ public class TaskWorkerServiceLauncher extends AbstractScheduledService {
private final CConfiguration cConf;
private final Configuration hConf;

private final FeatureFlagsProvider featureFlagsProvider;

private final TwillRunner twillRunner;
private TwillController twillController;

Expand All @@ -78,6 +86,7 @@ public TaskWorkerServiceLauncher(CConfiguration cConf, Configuration hConf,
this.cConf = cConf;
this.hConf = hConf;
this.twillRunner = twillRunner;
this.featureFlagsProvider = new DefaultFeatureFlagsProvider(cConf);
}

@Override
Expand Down Expand Up @@ -182,6 +191,16 @@ public void run() {
NamespaceId.SYSTEM.getNamespace());
twillPreparer.withConfiguration(Collections.unmodifiableMap(configMap));

if (Feature.NAMESPACE_RESOURCE_ISOLATION.isEnabled(featureFlagsProvider)) {
String localhost = InetAddress.getLoopbackAddress().getHostName();
twillPreparer = twillPreparer.withEnv(TaskWorkerTwillRunnable.class.getSimpleName(),
ImmutableMap.of(
ArtifactLocalizer.GCE_METADATA_HOST_ENV_VAR,
String.format("%s:%s", localhost,
cConf.getInt(Constants.ArtifactLocalizer.PORT))
));
}

String priorityClass = cConf.get(Constants.TaskWorker.CONTAINER_PRIORITY_CLASS_NAME);
if (priorityClass != null) {
twillPreparer = twillPreparer.setSchedulerQueue(priorityClass);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public class ArtifactLocalizerService extends AbstractIdleService {
.setPort(cConf.getInt(Constants.ArtifactLocalizer.PORT))
.setBossThreadPoolSize(cConf.getInt(Constants.ArtifactLocalizer.BOSS_THREADS))
.setWorkerThreadPoolSize(cConf.getInt(Constants.ArtifactLocalizer.WORKER_THREADS))
.setHttpHandlers(new ArtifactLocalizerHttpHandlerInternal(artifactLocalizer))
.setHttpHandlers(new ArtifactLocalizerHttpHandlerInternal(artifactLocalizer),
new GCPMetadataHttpHandlerInternal(cConf))
.build();

this.cacheCleanupInterval = cConf.getInt(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.app.worker.sidecar;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.inject.Singleton;
import io.cdap.cdap.common.ForbiddenException;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.id.Id.Namespace;
import io.cdap.cdap.proto.BasicThrowable;
import io.cdap.cdap.proto.codec.BasicThrowableCodec;
import io.cdap.common.http.HttpRequests;
import io.cdap.common.http.HttpResponse;
import io.cdap.http.AbstractHttpHandler;
import io.cdap.http.HttpHandler;
import io.cdap.http.HttpResponder;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import java.net.URL;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.HttpHeaders;
import joptsimple.internal.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Internal {@link HttpHandler} for Artifact Localizer.
*/
@Singleton
@Path("/")
public class GCPMetadataHttpHandlerInternal extends AbstractHttpHandler {

private static final String METADATA_FLAVOR = "Metadata-Flavor";
private static final String GOOGLE = "Google";
private static final Logger LOG = LoggerFactory.getLogger(GCPMetadataHttpHandlerInternal.class);
private static final Gson GSON = new GsonBuilder().registerTypeAdapter(BasicThrowable.class,
new BasicThrowableCodec()).create();
private final CConfiguration cConf;
private final String metadataServiceEndpoint;
private Namespace namespace;

public GCPMetadataHttpHandlerInternal(CConfiguration cConf) {
this.cConf = cConf;
this.metadataServiceEndpoint = cConf.get(
Constants.TaskWorker.METADATA_SERVICE_END_POINT);
}

/**
* Returns the status of metadata server.
*
* @param request The {@link HttpRequest}.
* @param responder a {@link HttpResponder} for sending response.
* @throws Exception if there is any error.
*/
@GET
@Path("/")
public void status(HttpRequest request, HttpResponder responder) throws Exception {

// check that metadata header is present in the request.
if (!request.headers().contains(METADATA_FLAVOR, GOOGLE, true)) {
throw new ForbiddenException(
String.format("Request is missing required %s header. To access the metadata server, "
+ "you must add the %s: %s header to your request.", METADATA_FLAVOR,
METADATA_FLAVOR, GOOGLE));
}
responder.sendStatus(HttpResponseStatus.OK,
new DefaultHttpHeaders().add(METADATA_FLAVOR, GOOGLE));
}

@GET
@Path("/computeMetadata/v1/instance/service-accounts/default/token")
public void token(HttpRequest request, HttpResponder responder,
@QueryParam("scopes") String scopes) throws Exception {

LOG.info("Token requested");
// check that metadata header is present in the request.
if (!request.headers().contains(METADATA_FLAVOR, GOOGLE, true)) {
throw new ForbiddenException(
String.format("Request is missing required %s header. To access the metadata server, "
+ "you must add the %s: %s header to your request.", METADATA_FLAVOR,
METADATA_FLAVOR, GOOGLE));
}

// TODO: CDAP-20750
if (metadataServiceEndpoint == null) {
responder.sendString(HttpResponseStatus.NOT_IMPLEMENTED,
String.format("%s has not been set",
Constants.TaskWorker.METADATA_SERVICE_END_POINT));
return;
}

try {
URL url = new URL(metadataServiceEndpoint);
if (!Strings.isNullOrEmpty(scopes)) {
url = new URL(String.format("%s?scopes=%s", metadataServiceEndpoint, scopes));
}
io.cdap.common.http.HttpRequest tokenRequest = io.cdap.common.http.HttpRequest.get(url)
.addHeader(METADATA_FLAVOR, GOOGLE)
.build();
HttpResponse tokenResponse = HttpRequests.execute(tokenRequest);
responder.sendJson(HttpResponseStatus.OK, tokenResponse.getResponseBodyAsString());
} catch (Exception ex) {
LOG.warn("Failed to fetch token from metadata service", ex);
responder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, exceptionToJson(ex),
new DefaultHttpHeaders().set(HttpHeaders.CONTENT_TYPE, "application/json"));
}
}

/**
* Sets the CDAP Namespace information.
*
* @param request The {@link HttpRequest}.
* @param namespaceId Namespace id string.
* @param responder a {@link HttpResponder} for sending response.
*/
@PUT
@Path("/set-namespace/{namespace-id}")
public void setNamespace(HttpRequest request, HttpResponder responder,
@PathParam("namespace-id") String namespaceId) {
LOG.info("Set namespace {}", namespaceId);
this.namespace = new Namespace(namespaceId);
responder.sendStatus(HttpResponseStatus.OK);
}

/**
* Clears the CDAP Namespace information.
*
* @param request The {@link HttpRequest}.
* @param responder a {@link HttpResponder} for sending response.
*/
@DELETE
@Path("/clear-namespace")
public void clearNamespace(HttpRequest request, HttpResponder responder) {
LOG.info("Clear namespace");
this.namespace = null;
responder.sendStatus(HttpResponseStatus.OK);
}

/**
* Return json representation of an exception. Used to propagate exception across network for
* better surfacing errors and debuggability.
*/
private String exceptionToJson(Exception ex) {
BasicThrowable basicThrowable = new BasicThrowable(ex);
return GSON.toJson(basicThrowable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,7 @@ public static final class ArtifactLocalizer {
public static final String WORKER_THREADS = "artifact.localizer.worker.threads";
public static final String PRELOAD_LIST = "artifact.localizer.preload.list";
public static final String PRELOAD_VERSION_LIMIT = "artifact.localizer.preload.version.limit";
public static final String GCE_METADATA_HOST_ENV_VAR = "GCE_METADATA_HOST";
}

/**
Expand Down
8 changes: 8 additions & 0 deletions cdap-common/src/main/resources/cdap-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5861,6 +5861,14 @@
</description>
</property>

<property>
<name>feature.namespace.resource.isolation.enabled</name>
<value>true</value>
<description>
If true, namespace resource isolation will be enabled.
</description>
</property>

<property>
<name>artifact.cache.bind.address</name>
<value>0.0.0.0</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public enum Feature {
SOURCE_CONTROL_MANAGEMENT_GIT("6.9.0"),
WRANGLER_PRECONDITION_SQL("6.9.1"),
WRANGLER_EXECUTION_SQL("6.10.0"),
WRANGLER_SCHEMA_MANAGEMENT("6.10.0");
WRANGLER_SCHEMA_MANAGEMENT("6.10.0"),
NAMESPACE_RESOURCE_ISOLATION("6.10.0");

private final PlatformInfo.Version versionIntroduced;
private final boolean defaultAfterIntroduction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ class KubeTwillPreparer implements DependentTwillPreparer, StatefulTwillPreparer
// Configmap that stores localized config files
private static final String CONFIGMAP_MOUNTPATH = "/config";
public static final String CONFIGMAP_NAME_PREFIX = "cdap-config-";
public static final String GCE_METADATA_HOST_ENV_VAR = "GCE_METADATA_HOST";

private final MasterEnvironmentContext masterEnvContext;
private final ApiClient apiClient;
Expand Down Expand Up @@ -1164,9 +1165,13 @@ private V1PodSpec createPodSpec(Type resourceType, Location runtimeConfigLocatio
.filter(
envVar -> !envVar.getName().equals(WorkloadIdentityUtil.WORKLOAD_IDENTITY_ENV_VAR_KEY))
.collect(Collectors.toMap(V1EnvVar::getName, V1EnvVar::getValue));
// Add all environments of the the main runnable for the init container.
// Add all environments of the main runnable for the init container except GCE_METADATA_HOST.
if (environments.get(runnableName) != null) {
initContainerEnvirons.putAll(environments.get(runnableName));
Map<String, String> envs = environments.get(runnableName).entrySet().stream()
.filter(entry -> !entry.getKey().equals(GCE_METADATA_HOST_ENV_VAR))
.collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
initContainerEnvirons.putAll(envs);
}
// Add JVM options to environment.
initContainerEnvirons.put(JAVA_OPTS_KEY, masterEnvContext.getConfigurations()
Expand Down Expand Up @@ -1291,11 +1296,16 @@ private List<V1Container> createContainers(Type resourceType,
environs.putAll(environments.get(name));
// Add JVM options to environment.
environs.put(JAVA_OPTS_KEY, jvmOpts);
// remove GCE_METADATA_HOST_ENV_VAR from the dependent runnable container.
Map<String, String> envs = environs.entrySet().stream()
.filter(entry -> !entry.getKey().equals(GCE_METADATA_HOST_ENV_VAR))
.collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
mounts = addSecreteVolMountIfNeeded(spec, volumeMounts);
containers.add(
createContainer(name, podInfo.getContainerImage(), podInfo.getImagePullPolicy(), workDir,
createResourceRequirements(spec.getResourceSpecification()),
mounts, environs, KubeTwillLauncher.class,
mounts, envs, KubeTwillLauncher.class,
Stream.concat(Stream.of(name), args.stream()).toArray(String[]::new)));
}

Expand Down

0 comments on commit e51b676

Please sign in to comment.