Skip to content

Commit

Permalink
[CDAP-20629] Create CredentialProvider API and SPI, implement provisi…
Browse files Browse the repository at this point in the history
…on handler methods, and add profile manager validation

[CDAP-20629] Fix checkstyle warnings and reformat files

[CDAP-20629] Refactor various ZK to lowercase Zk to comply with checkstyle

[CDAP-20629] Refactor JVMResource, JMXMetricsCollector, and JMXMetricsCollectorFactory to lowercase Jvm and Jmx

[CDAP-20629] Suppress warnings for SSL and HBase due to refactoring risk

[CDAP-20629] Address comments

[CDAP-20629] Moved CredentialProvider API classes to cdap-proto
  • Loading branch information
dli357 committed Jul 19, 2023
1 parent f2ad305 commit 7882f8c
Show file tree
Hide file tree
Showing 88 changed files with 2,665 additions and 1,271 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2018-2022 Cask Data, Inc.
* Copyright © 2018-2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
Expand Down Expand Up @@ -41,8 +41,8 @@
import io.cdap.cdap.common.guice.KafkaClientModule;
import io.cdap.cdap.common.guice.LocalLocationModule;
import io.cdap.cdap.common.guice.RemoteAuthenticatorModules;
import io.cdap.cdap.common.guice.ZKClientModule;
import io.cdap.cdap.common.guice.ZKDiscoveryModule;
import io.cdap.cdap.common.guice.ZkClientModule;
import io.cdap.cdap.common.guice.ZkDiscoveryModule;
import io.cdap.cdap.common.id.Id;
import io.cdap.cdap.common.test.AppJarHelper;
import io.cdap.cdap.data.runtime.DataFabricModules;
Expand Down Expand Up @@ -82,8 +82,8 @@
import org.junit.rules.TemporaryFolder;

/**
* Unit tests for the {@link DistributedWorkflowProgramRunner}.
* This test class uses {@link DistributedWorkflowTestApp} for testing various aspect of the program runner.
* Unit tests for the {@link DistributedWorkflowProgramRunner}. This test class uses {@link
* DistributedWorkflowTestApp} for testing various aspect of the program runner.
*/
public class DistributedWorkflowProgramRunnerTest {

Expand All @@ -95,7 +95,7 @@ public class DistributedWorkflowProgramRunnerTest {

@BeforeClass
public static void init() throws IOException {
cConf = createCConf();
cConf = createCconf();
programRunnerFactory = createProgramRunnerFactory(cConf);
}

Expand All @@ -104,91 +104,91 @@ public void testDefaultResources() throws IOException {
// By default the workflow driver would have 768m if none of the children is setting it to higher
// (default for programs are 512m)
testDriverResources(DistributedWorkflowTestApp.SequentialWorkflow.class.getSimpleName(),
Collections.emptyMap(), new Resources(768));
Collections.emptyMap(), new Resources(768));
}

@Test
public void testExplicitResources() throws IOException {
// Explicitly set the resources for the workflow, it should always get honored.
// The one prefixed with "task.workflow." should override the one without.
testDriverResources(DistributedWorkflowTestApp.ComplexWorkflow.class.getSimpleName(),
ImmutableMap.of(
SystemArguments.MEMORY_KEY, "4096",
"task.workflow." + SystemArguments.MEMORY_KEY, "1024"
),
new Resources(1024));
ImmutableMap.of(
SystemArguments.MEMORY_KEY, "4096",
"task.workflow." + SystemArguments.MEMORY_KEY, "1024"
),
new Resources(1024));
}

@Test
public void testInferredResources() throws IOException {
// Inferred from the largest memory usage from children
testDriverResources(DistributedWorkflowTestApp.SequentialWorkflow.class.getSimpleName(),
Collections.singletonMap("mapreduce.mr1." + SystemArguments.MEMORY_KEY, "2048"),
new Resources(2048));
Collections.singletonMap("mapreduce.mr1." + SystemArguments.MEMORY_KEY, "2048"),
new Resources(2048));
}

@Test
public void testForkJoinInferredResources() throws IOException {
// The complex workflow has 4 parallel executions at max, hence the memory setting should be summation of them
// For vcores, it should pick the max
testDriverResources(DistributedWorkflowTestApp.ComplexWorkflow.class.getSimpleName(),
ImmutableMap.of(
"spark.s2." + SystemArguments.MEMORY_KEY, "1024",
"mapreduce.mr3." + SystemArguments.CORES_KEY, "3"
),
new Resources(512 + 512 + 1024 + 512, 3));
ImmutableMap.of(
"spark.s2." + SystemArguments.MEMORY_KEY, "1024",
"mapreduce.mr3." + SystemArguments.CORES_KEY, "3"
),
new Resources(512 + 512 + 1024 + 512, 3));
}

@Test
public void testConditionInferredResources() throws IOException {
// Set one of the branch to have memory > fork memory
testDriverResources(DistributedWorkflowTestApp.ComplexWorkflow.class.getSimpleName(),
Collections.singletonMap("spark.s4." + SystemArguments.MEMORY_KEY, "4096"),
new Resources(4096));
Collections.singletonMap("spark.s4." + SystemArguments.MEMORY_KEY, "4096"),
new Resources(4096));
}

@Test
public void testInferredWithMaxResources() throws IOException {
// Set to use large memory for the first node in the complex workflow to have it larger than the sum of all forks
testDriverResources(DistributedWorkflowTestApp.ComplexWorkflow.class.getSimpleName(),
Collections.singletonMap("mapreduce.mr1." + SystemArguments.MEMORY_KEY, "4096"),
new Resources(4096));
Collections.singletonMap("mapreduce.mr1." + SystemArguments.MEMORY_KEY, "4096"),
new Resources(4096));
}

@Test
public void testInferredScopedResources() throws IOException {
// Inferred from the largest memory usage from children.
// Make sure the children arguments have scope resolved correctly.
testDriverResources(DistributedWorkflowTestApp.SequentialWorkflow.class.getSimpleName(),
ImmutableMap.of(
"mapreduce.mr1.task.mapper." + SystemArguments.MEMORY_KEY, "2048",
"spark.s1.task.client." + SystemArguments.MEMORY_KEY, "1024",
"spark.s1.task.driver." + SystemArguments.MEMORY_KEY, "4096"
),
// Should pick the spark client memory
new Resources(1024));
ImmutableMap.of(
"mapreduce.mr1.task.mapper." + SystemArguments.MEMORY_KEY, "2048",
"spark.s1.task.client." + SystemArguments.MEMORY_KEY, "1024",
"spark.s1.task.driver." + SystemArguments.MEMORY_KEY, "4096"
),
// Should pick the spark client memory
new Resources(1024));
}

@Test
public void testOverrideInferredResources() throws IOException {
// Explicitly setting memory always override what's inferred from children
testDriverResources(DistributedWorkflowTestApp.SequentialWorkflow.class.getSimpleName(),
ImmutableMap.of(
"task.workflow." + SystemArguments.MEMORY_KEY, "512",
"mapreduce.mr1." + SystemArguments.MEMORY_KEY, "2048",
"spark.s1." + SystemArguments.MEMORY_KEY, "1024"
), new Resources(512));
ImmutableMap.of(
"task.workflow." + SystemArguments.MEMORY_KEY, "512",
"mapreduce.mr1." + SystemArguments.MEMORY_KEY, "2048",
"spark.s1." + SystemArguments.MEMORY_KEY, "1024"
), new Resources(512));
}

@Test
public void testReservedMemoryOverride() throws IOException {
// Sets the reserved memory override for the workflow
String workflowName = DistributedWorkflowTestApp.SequentialWorkflow.class.getSimpleName();
ProgramLaunchConfig launchConfig = setupWorkflowRuntime(workflowName,
ImmutableMap.of(
SystemArguments.RESERVED_MEMORY_KEY_OVERRIDE, "400",
SystemArguments.MEMORY_KEY, "800"
));
ImmutableMap.of(
SystemArguments.RESERVED_MEMORY_KEY_OVERRIDE, "400",
SystemArguments.MEMORY_KEY, "800"
));

RunnableDefinition runnableDefinition = launchConfig.getRunnables().get(workflowName);
Assert.assertNotNull(runnableDefinition);
Expand All @@ -200,14 +200,16 @@ public void testReservedMemoryOverride() throws IOException {


/**
* Helper method to help testing workflow driver resources settings based on varying user arguments
* Helper method to help testing workflow driver resources settings based on varying user
* arguments
*
* @param workflowName name of the workflow as defined in the {@link DistributedWorkflowTestApp}.
* @param runtimeArgs the runtime arguments for the workflow program
* @param workflowName name of the workflow as defined in the {@link
* DistributedWorkflowTestApp}.
* @param runtimeArgs the runtime arguments for the workflow program
* @param expectedDriverResources the expected {@link Resources} setting for the workflow driver
*/
private void testDriverResources(String workflowName, Map<String, String> runtimeArgs,
Resources expectedDriverResources) throws IOException {
Resources expectedDriverResources) throws IOException {

ProgramLaunchConfig launchConfig = setupWorkflowRuntime(workflowName, runtimeArgs);

Expand All @@ -224,7 +226,7 @@ private void testDriverResources(String workflowName, Map<String, String> runtim
* Setup the {@link ProgramLaunchConfig} for the given workflow.
*/
private ProgramLaunchConfig setupWorkflowRuntime(String workflowName,
Map<String, String> runtimeArgs) throws IOException {
Map<String, String> runtimeArgs) throws IOException {
// Create the distributed workflow program runner
ProgramRunner programRunner = programRunnerFactory.create(ProgramType.WORKFLOW);
Assert.assertTrue(programRunner instanceof DistributedWorkflowProgramRunner);
Expand All @@ -234,74 +236,75 @@ private ProgramLaunchConfig setupWorkflowRuntime(String workflowName,
Program workflowProgram = createWorkflowProgram(cConf, programRunner, workflowName);
ProgramLaunchConfig launchConfig = new ProgramLaunchConfig();
ProgramOptions programOpts = new SimpleProgramOptions(workflowProgram.getId(),
new BasicArguments(), new BasicArguments(runtimeArgs));
new BasicArguments(), new BasicArguments(runtimeArgs));

// Setup the launching config
workflowRunner.setupLaunchConfig(launchConfig, workflowProgram, programOpts,
cConf, new Configuration(), TEMP_FOLDER.newFolder());
cConf, new Configuration(), TEMP_FOLDER.newFolder());
return launchConfig;
}

/**
* Creates a workflow {@link Program}.
*/
private Program createWorkflowProgram(CConfiguration cConf, ProgramRunner programRunner,
String workflowName) throws IOException {
Location appJarLocation = AppJarHelper.createDeploymentJar(new LocalLocationFactory(TEMP_FOLDER.newFolder()),
DistributedWorkflowTestApp.class);
String workflowName) throws IOException {
Location appJarLocation = AppJarHelper
.createDeploymentJar(new LocalLocationFactory(TEMP_FOLDER.newFolder()),
DistributedWorkflowTestApp.class);
ArtifactId artifactId = NamespaceId.DEFAULT.artifact("test", "1.0.0");
DistributedWorkflowTestApp app = new DistributedWorkflowTestApp();
DefaultAppConfigurer configurer = new DefaultAppConfigurer(Id.Namespace.DEFAULT,
Id.Artifact.fromEntityId(artifactId), app);
Id.Artifact.fromEntityId(artifactId), app);
app.configure(configurer, new DefaultApplicationContext<>());

ApplicationSpecification appSpec = configurer.createSpecification(null);
ProgramId programId = NamespaceId.DEFAULT
.app(appSpec.getName())
.program(ProgramType.WORKFLOW, workflowName);
.app(appSpec.getName())
.program(ProgramType.WORKFLOW, workflowName);

return Programs.create(cConf, programRunner, new ProgramDescriptor(programId, appSpec),
appJarLocation, TEMP_FOLDER.newFolder());
appJarLocation, TEMP_FOLDER.newFolder());
}

private static ProgramRunnerFactory createProgramRunnerFactory(CConfiguration cConf) {
Injector injector = Guice.createInjector(
new ConfigModule(cConf),
RemoteAuthenticatorModules.getNoOpModule(),
new ZKClientModule(),
new ZKDiscoveryModule(),
new LocalLogAppenderModule(),
new LocalLocationModule(),
new IOModule(),
new KafkaClientModule(),
new DataSetServiceModules().getDistributedModules(),
new DataFabricModules("cdap.master").getDistributedModules(),
new DataSetsModules().getDistributedModules(),
new MetricsClientRuntimeModule().getDistributedModules(),
new MetricsStoreModule(),
new MessagingClientModule(),
new AuditModule(),
CoreSecurityRuntimeModule.getDistributedModule(cConf),
new AuthenticationContextModules().getNoOpModule(),
new AuthorizationModule(),
new AuthorizationEnforcementModule().getMasterModule(),
new TwillModule(),
new AppFabricServiceRuntimeModule(cConf).getDistributedModules(),
new ProgramRunnerRuntimeModule().getDistributedModules(),
new SecureStoreServerModule(),
new OperationalStatsModule(),
new AbstractModule() {
@Override
protected void configure() {
// TODO (CDAP-14677): find a better way to inject metadata publisher
bind(MetadataServiceClient.class).to(DefaultMetadataServiceClient.class);
}
});
new ConfigModule(cConf),
RemoteAuthenticatorModules.getNoOpModule(),
new ZkClientModule(),
new ZkDiscoveryModule(),
new LocalLogAppenderModule(),
new LocalLocationModule(),
new IOModule(),
new KafkaClientModule(),
new DataSetServiceModules().getDistributedModules(),
new DataFabricModules("cdap.master").getDistributedModules(),
new DataSetsModules().getDistributedModules(),
new MetricsClientRuntimeModule().getDistributedModules(),
new MetricsStoreModule(),
new MessagingClientModule(),
new AuditModule(),
CoreSecurityRuntimeModule.getDistributedModule(cConf),
new AuthenticationContextModules().getNoOpModule(),
new AuthorizationModule(),
new AuthorizationEnforcementModule().getMasterModule(),
new TwillModule(),
new AppFabricServiceRuntimeModule(cConf).getDistributedModules(),
new ProgramRunnerRuntimeModule().getDistributedModules(),
new SecureStoreServerModule(),
new OperationalStatsModule(),
new AbstractModule() {
@Override
protected void configure() {
// TODO (CDAP-14677): find a better way to inject metadata publisher
bind(MetadataServiceClient.class).to(DefaultMetadataServiceClient.class);
}
});

return injector.getInstance(ProgramRunnerFactory.class);
}

private static CConfiguration createCConf() throws IOException {
private static CConfiguration createCconf() throws IOException {
CConfiguration cConf = CConfiguration.create();
cConf.set(Constants.CFG_LOCAL_DATA_DIR, TEMP_FOLDER.newFolder().getAbsolutePath());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
import io.cdap.cdap.internal.app.store.DefaultStore;
import io.cdap.cdap.internal.bootstrap.guice.BootstrapModules;
import io.cdap.cdap.internal.capability.CapabilityModule;
import io.cdap.cdap.internal.credential.guice.MasterCredentialProviderModule;
import io.cdap.cdap.internal.credential.handler.CredentialProviderHttpHandler;
import io.cdap.cdap.internal.credential.handler.CredentialProviderHttpHandlerInternal;
import io.cdap.cdap.internal.events.EventPublishManager;
Expand Down Expand Up @@ -185,6 +186,7 @@ public Module getInMemoryModules() {
new ConfigStoreModule(),
new SourceControlModule(),
new EntityVerifierModule(),
new MasterCredentialProviderModule(),
BootstrapModules.getInMemoryModule(),
new AbstractModule() {
@Override
Expand Down Expand Up @@ -226,6 +228,7 @@ public Module getStandaloneModules() {
new SourceControlModule(),
new EntityVerifierModule(),
new ProvisionerModule(),
new MasterCredentialProviderModule(),
BootstrapModules.getFileBasedModule(),
new AbstractModule() {
@Override
Expand Down Expand Up @@ -279,6 +282,7 @@ public Module getDistributedModules() {
new SourceControlModule(),
new EntityVerifierModule(),
new ProvisionerModule(),
new MasterCredentialProviderModule(),
BootstrapModules.getFileBasedModule(),
new AbstractModule() {
@Override
Expand Down Expand Up @@ -525,7 +529,8 @@ private org.quartz.Scheduler getScheduler(JobStore store,
/**
* A Guice provider for the {@link UGIProvider} class based on the CDAP configuration.
*
* <p>When Kerberos is enabled, it provides {@link DefaultUGIProvider} instance. Otherwise, an {@link
* <p>When Kerberos is enabled, it provides {@link DefaultUGIProvider} instance. Otherwise, an
* {@link
* UnsupportedUGIProvider} will be used.
*/
private static final class UgiProviderProvider implements Provider<UGIProvider> {
Expand Down
Loading

0 comments on commit 7882f8c

Please sign in to comment.