Skip to content

Commit

Permalink
Merge pull request #15220 from cdapio/CDAP-20712-cherry-pick
Browse files Browse the repository at this point in the history
[cherry-pick][CDAP-20712] add support for passing Dataproc temp bucket in compute profile
  • Loading branch information
itsankit-google authored Jul 3, 2023
2 parents 314bc31 + cb723d0 commit e606c37
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -311,10 +311,14 @@ ClusterOperationMetadata createCluster(String name, String imageVersion,
.setGcePdKmsKeyName(conf.getEncryptionKeyName()).build());
}

if (conf.getGcsBucket() != null) {
if (!Strings.isNullOrEmpty(conf.getGcsBucket())) {
builder.setConfigBucket(conf.getGcsBucket());
}

if (!Strings.isNullOrEmpty(conf.getTempBucket())) {
builder.setTempBucket(conf.getTempBucket());
}

Cluster cluster = com.google.cloud.dataproc.v1.Cluster.newBuilder()
.setClusterName(name)
.putAllLabels(labels)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ final class DataprocConf {
static final String SERVICE_ACCOUNT = "serviceAccount";
static final String ROOT_URL = "root.url";

static final String GCS_BUCKET = "gcsBucket";
static final String TEMP_BUCKET = "tempBucket";

static final Pattern CLUSTER_PROPERTIES_PATTERN = Pattern.compile("^[a-zA-Z0-9\\-]+:");
static final int MAX_NETWORK_TAGS = 64;

Expand Down Expand Up @@ -137,6 +140,7 @@ final class DataprocConf {

private final String encryptionKeyName;
private final String gcsBucket;
private final String tempBucket;

private final String serviceAccount;
private final boolean preferExternalIp;
Expand Down Expand Up @@ -188,8 +192,9 @@ private DataprocConf(@Nullable String accountKey, String region, String zone, St
int workerDiskGb, String workerDiskType, @Nullable String workerMachineType,
long pollCreateDelay, long pollCreateJitter, long pollDeleteDelay, long pollInterval,
@Nullable String encryptionKeyName, @Nullable String gcsBucket,
@Nullable String serviceAccount, boolean preferExternalIp, boolean stackdriverLoggingEnabled,
boolean stackdriverMonitoringEnabled, boolean componentGatewayEnable, boolean skipDelete,
@Nullable String tempBucket, @Nullable String serviceAccount, boolean preferExternalIp,
boolean stackdriverLoggingEnabled, boolean stackdriverMonitoringEnabled,
boolean componentGatewayEnable, boolean skipDelete,
@Nullable String imageVersion,
@Nullable String customImageUri,
@Nullable Map<String, String> clusterMetaData,
Expand All @@ -207,6 +212,7 @@ private DataprocConf(@Nullable String accountKey, String region, String zone, St
this.region = region;
this.zone = zone;
this.projectId = projectId;
this.tempBucket = tempBucket;
this.clusterReuseEnabled = clusterReuseEnabled;
this.clusterReuseThresholdMinutes = clusterReuseThresholdMinutes;
this.clusterReuseRetryDelayMs = clusterReuseRetryDelayMs;
Expand Down Expand Up @@ -378,6 +384,11 @@ String getGcsBucket() {
return gcsBucket;
}

@Nullable
String getTempBucket() {
return tempBucket;
}

@Nullable
String getServiceAccount() {
return serviceAccount;
Expand Down Expand Up @@ -682,7 +693,8 @@ static DataprocConf create(Map<String, String> properties) {
final String imageVersion = getString(properties, IMAGE_VERSION);
final String customImageUri = getString(properties, CUSTOM_IMAGE_URI);
final String gcpCmekKeyName = getString(properties, ENCRYPTION_KEY_NAME);
final String gcpCmekBucket = getString(properties, "gcsBucket");
final String gcpCmekBucket = getString(properties, GCS_BUCKET);
final String tempBucket = getString(properties, TEMP_BUCKET);

final Map<String, String> clusterMetaData = Collections.unmodifiableMap(
DataprocUtils.parseKeyValueConfig(getString(properties, CLUSTER_META_DATA), ";", "\\|"));
Expand Down Expand Up @@ -766,7 +778,7 @@ static DataprocConf create(Map<String, String> properties) {
workerNumNodes, secondaryWorkerNumNodes, workerCpus, workerMemoryMb, workerDiskGb,
workerDiskType, workerMachineType,
pollCreateDelay, pollCreateJitter, pollDeleteDelay, pollInterval,
gcpCmekKeyName, gcpCmekBucket, serviceAccount, preferExternalIp,
gcpCmekKeyName, gcpCmekBucket, tempBucket, serviceAccount, preferExternalIp,
stackdriverLoggingEnabled, stackdriverMonitoringEnabled,
componentGatewayEnabled, skipDelete,
imageVersion, customImageUri, clusterMetaData, clusterLabels, networkTags,
Expand Down
11 changes: 10 additions & 1 deletion cdap-runtime-ext-dataproc/src/main/resources/gcp-dataproc.json
Original file line number Diff line number Diff line change
Expand Up @@ -468,13 +468,22 @@
},
{
"widget-type": "textbox",
"label": "GCS Bucket",
"label": "Staging Bucket",
"name": "gcsBucket",
"description": "Google Cloud Storage bucket used to stage job dependencies and config files for running pipelines in Google Cloud Dataproc",
"widget-attributes": {
"size": "medium"
}
},
{
"widget-type": "textbox",
"label": "Temp Bucket",
"name": "tempBucket",
"description": "Google Cloud Storage bucket used to store ephemeral cluster and jobs data, such as Spark and MapReduce history files in Google Cloud Dataproc",
"widget-attributes": {
"size": "medium"
}
},
{
"widget-type": "textbox",
"label": "Encryption Key Name",
Expand Down

0 comments on commit e606c37

Please sign in to comment.