Skip to content

Commit

Permalink
Merge pull request #15234 from cdapio/CDAP-19428
Browse files Browse the repository at this point in the history
[CDAP-19428] Allow configurable GCP Dataproc Oauth Scopes
  • Loading branch information
itsankit-google authored Jul 14, 2023
2 parents a935317 + c091059 commit 9e79ee8
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ ClusterOperationMetadata createCluster(String name, String imageVersion,
metadata.putAll(conf.getClusterMetaData());

GceClusterConfig.Builder clusterConfig = GceClusterConfig.newBuilder()
.addServiceAccountScopes(DataprocConf.CLOUD_PLATFORM_SCOPE)
.addAllServiceAccountScopes(conf.getScopes())
.setShieldedInstanceConfig(
ShieldedInstanceConfig.newBuilder()
.setEnableSecureBoot(conf.isSecureBootEnabled())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ final class DataprocConf {
// Dataproc will pass it to GCE when creating the GCE cluster.
// It can be overridden by profile runtime arguments (system.profile.properties.serviceAccount)
static final String SERVICE_ACCOUNT = "serviceAccount";
static final String SCOPES = "scopes";
static final String ROOT_URL = "root.url";

static final String GCS_BUCKET = "gcsBucket";
Expand Down Expand Up @@ -153,6 +154,7 @@ final class DataprocConf {
private final Map<String, String> clusterMetaData;
private final Map<String, String> clusterLabels;
private final List<String> networkTags;
private final List<String> scopes;
private final String initActions;
private final String autoScalingPolicy;
private final int idleTtlMinutes;
Expand Down Expand Up @@ -199,7 +201,7 @@ private DataprocConf(@Nullable String accountKey, String region, String zone, St
@Nullable String customImageUri,
@Nullable Map<String, String> clusterMetaData,
@Nullable Map<String, String> clusterLabels, List<String> networkTags,
@Nullable String initActions, boolean runtimeJobManagerEnabled,
List<String> scopes, @Nullable String initActions, boolean runtimeJobManagerEnabled,
Map<String, String> clusterProperties, @Nullable String autoScalingPolicy, int idleTtlMinutes,
@Nullable String tokenEndpoint, boolean secureBootEnabled, boolean vTpmEnabled,
boolean integrityMonitoringEnabled, boolean clusterReuseEnabled,
Expand All @@ -213,6 +215,7 @@ private DataprocConf(@Nullable String accountKey, String region, String zone, St
this.zone = zone;
this.projectId = projectId;
this.tempBucket = tempBucket;
this.scopes = scopes;
this.clusterReuseEnabled = clusterReuseEnabled;
this.clusterReuseThresholdMinutes = clusterReuseThresholdMinutes;
this.clusterReuseRetryDelayMs = clusterReuseRetryDelayMs;
Expand Down Expand Up @@ -426,6 +429,10 @@ List<String> getNetworkTags() {
return Collections.unmodifiableList(networkTags);
}

List<String> getScopes() {
return Collections.unmodifiableList(scopes);
}

List<String> getInitActions() {
if (Strings.isNullOrEmpty(initActions)) {
return Collections.emptyList();
Expand Down Expand Up @@ -771,6 +778,15 @@ static DataprocConf create(Map<String, String> properties) {
properties.getOrDefault(DataprocUtils.TROUBLESHOOTING_DOCS_URL_KEY,
DataprocUtils.TROUBLESHOOTING_DOCS_URL_DEFAULT);

final String scopesProperty = String.format("%s,%s",
Optional.ofNullable(getString(properties, SCOPES)).orElse(""), CLOUD_PLATFORM_SCOPE);
List<String> scopes = Collections.unmodifiableList(
Arrays.stream(scopesProperty.split(","))
.map(String::trim)
.filter(s -> !s.isEmpty())
.distinct()
.collect(Collectors.toList()));

return new DataprocConf(accountKey, region, zone, projectId, networkHostProjectId, network,
subnet,
masterNumNodes, masterCpus, masterMemoryMb, masterDiskGb,
Expand All @@ -782,7 +798,7 @@ static DataprocConf create(Map<String, String> properties) {
stackdriverLoggingEnabled, stackdriverMonitoringEnabled,
componentGatewayEnabled, skipDelete,
imageVersion, customImageUri, clusterMetaData, clusterLabels, networkTags,
initActions, runtimeJobManagerEnabled, clusterProps, autoScalingPolicy, idleTtl,
scopes, initActions, runtimeJobManagerEnabled, clusterProps, autoScalingPolicy, idleTtl,
tokenEndpoint, secureBootEnabled, vTpmEnabled, integrityMonitoringEnabled,
clusterReuseEnabled, clusterReuseThresholdMinutes, clusterReuseRetryDelayMs,
clusterReuseRetryMaxMs, clusterReuseUpdateMaxMs, clusterReuseKey,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,15 @@
"placeholder": "projects/<gcp-project-id>/locations/<key-location>/keyRings/<key-ring-name>/cryptoKeys/<key-name>"
}
},
{
"name": "scopes",
"label": "OAuth Scopes",
"widget-type": "csv",
"description": "The OAuth 2.0 scopes that you might need to request to access Google APIs, depending on the level of access you need. Google Cloud Platform Scope (https://www.googleapis.com/auth/cloud-platform) is always included.",
"widget-attributes" : {
"value-placeholder": ""
}
},
{
"name": "initActions",
"label": "Initialization Actions",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ public void testRunKey() throws Exception {
@Test
public void testDataprocConf() {
Map<String, String> props = new HashMap<>();
String scopes = String.format("%s,https://www.googleapis.com/auth/drive",
DataprocConf.CLOUD_PLATFORM_SCOPE);

props.put(DataprocConf.PROJECT_ID_KEY, "pid");
props.put("accountKey", "key");
props.put("region", "region1");
Expand All @@ -138,6 +141,7 @@ public void testDataprocConf() {
props.put("idleTTL", "20");
props.put("clusterReuseRetryDelayMs", "20");
props.put("clusterReuseRetryMaxMs", "200");
props.put(DataprocConf.SCOPES, scopes);

DataprocConf conf = DataprocConf.create(props);

Expand Down Expand Up @@ -167,6 +171,7 @@ public void testDataprocConf() {

Assert.assertEquals(20, conf.getClusterReuseRetryDelayMs());
Assert.assertEquals(200, conf.getClusterReuseRetryMaxMs());
Assert.assertEquals(scopes, String.join(",", conf.getScopes()));
}

@Test
Expand Down

0 comments on commit 9e79ee8

Please sign in to comment.