Skip to content

Commit

Permalink
Merge pull request #244 from data-integrations/cherrypick-dataset-pro…
Browse files Browse the repository at this point in the history
…ject-id-0.7

DataSet Project Id feature cherry pick 0.7
  • Loading branch information
shivamVarCS authored Nov 10, 2023
2 parents f154d17 + 73c5357 commit 698ccf2
Show file tree
Hide file tree
Showing 7 changed files with 426 additions and 50 deletions.
13 changes: 11 additions & 2 deletions docs/bigquery-cdcTarget.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,17 @@ on the new primary key.
Properties
----------

**Project ID**: Project of the BigQuery dataset. When running on a Dataproc cluster, this can be left blank,
which will use the project of the cluster.
**Project ID**: Google Cloud Project ID, which uniquely identifies a project.
It can be found on the Dashboard in the Google Cloud Platform Console. This is the project
that the BigQuery job will run in. `BigQuery Job User` role on this project must be granted to the specified service
account to run the job. If a temporary bucket needs to be created, the bucket will also be created in this project and
'GCE Storage Bucket Admin' role on this project must be granted to the specified service account to create buckets.

**Dataset Project ID**: Project the destination dataset belongs to. This is only required if the dataset is not
in the same project that the BigQuery job will run in. If no value is given, it will default to the
configured Project ID. `BigQuery Data Editor` role on this project must be granted to the specified service account to
write BigQuery data to this project.


**Location**: The location where the BigQuery dataset and GCS staging bucket will get created. For example, 'us-east1'
for regional bucket, 'us' for multi-regional bucket. A complete list of available locations can be found at
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

<groupId>io.cdap.delta</groupId>
<artifactId>bigquery-delta-plugins</artifactId>
<version>0.7.2</version>
<version>0.7.3</version>
<name>BigQuery Delta plugins</name>
<packaging>jar</packaging>
<description>BigQuery Delta plugins</description>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1096,11 +1096,13 @@ static String createDiffQuery(TableId stagingTable, List<String> primaryKeys, lo
joinCondition += getOrderingCondition(sortKeys, "A", "B");
}
return "SELECT A.* FROM\n" +
"(SELECT * FROM " + BigQueryUtils.wrapInBackTick(stagingTable.getDataset(), stagingTable.getTable()) +
"(SELECT * FROM " +
BigQueryUtils.wrapInBackTick(stagingTable.getProject(), stagingTable.getDataset(), stagingTable.getTable()) +
" WHERE _batch_id = " + batchId +
" AND _sequence_num > " + latestSequenceNumInTargetTable + ") as A\n" +
"LEFT OUTER JOIN\n" +
"(SELECT * FROM " + BigQueryUtils.wrapInBackTick(stagingTable.getDataset(), stagingTable.getTable()) +
"(SELECT * FROM " +
BigQueryUtils.wrapInBackTick(stagingTable.getProject(), stagingTable.getDataset(), stagingTable.getTable()) +
" WHERE _batch_id = " + batchId +
" AND _sequence_num > " + latestSequenceNumInTargetTable + ") as B\n" +
"ON " + joinCondition +
Expand Down Expand Up @@ -1226,7 +1228,8 @@ static String createMergeQuery(TableId targetTableId, List<String> primaryKeys,
}

String mergeQuery = "MERGE " +
BigQueryUtils.wrapInBackTick(targetTableId.getDataset(), targetTableId.getTable()) + " as T\n" +
BigQueryUtils.wrapInBackTick(targetTableId.getProject(), targetTableId.getDataset(), targetTableId.getTable()) +
" as T\n" +
"USING (" + diffQuery + ") as D\n" +
"ON " + mergeCondition + "\n" +
"WHEN MATCHED AND D._op = \"DELETE\" " + updateAndDeleteCondition + "THEN\n" +
Expand Down
195 changes: 153 additions & 42 deletions src/main/java/io/cdap/delta/bigquery/BigQueryTarget.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
package io.cdap.delta.bigquery;

import com.google.auth.Credentials;
import com.google.auth.oauth2.ExternalAccountCredentials;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.cloud.ServiceOptions;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.EncryptionConfiguration;
import com.google.cloud.storage.Bucket;
Expand All @@ -29,6 +32,7 @@
import com.google.cloud.storage.StorageOptions;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
Expand All @@ -41,14 +45,23 @@
import io.cdap.delta.api.EventConsumer;
import io.cdap.delta.api.assessment.StandardizedTableDetail;
import io.cdap.delta.api.assessment.TableAssessor;
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.RetryPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;

/**
Expand All @@ -65,11 +78,25 @@ public class BigQueryTarget implements DeltaTarget {
private static final String GCS_SCHEME = "gs://";
private static final String GCP_CMEK_KEY_NAME = "gcp.cmek.key.name";
private static final int MAX_TABLES_PER_QUERY = 1000;
private static final String RATE_LIMIT_EXCEEDED_REASON = "rateLimitExceeded";
private static final Set<Integer> RATE_LIMIT_EXCEEDED_CODES = new HashSet<>(Arrays.asList(400, 403));
private static final int BILLING_TIER_LIMIT_EXCEEDED_CODE = 400;
private static final String BILLING_TIER_LIMIT_EXCEEDED_REASON = "billingTierLimitExceeded";
private static final int RETRY_COUNT = 25;
private final int retryCount;
private final Conf conf;
public static final List<String> BIGQUERY_SCOPES = Arrays.asList("https://www.googleapis.com/auth/drive",
"https://www.googleapis.com/auth/bigquery");


@SuppressWarnings("unused")
public BigQueryTarget(Conf conf) {
this(conf, RETRY_COUNT);
}
@SuppressWarnings("unused")
public BigQueryTarget(Conf conf, int retryCount) {
this.conf = conf;
this.retryCount = retryCount;
}

@Override
Expand All @@ -79,78 +106,109 @@ public void configure(Configurer configurer) {

@Override
public void initialize(DeltaTargetContext context) throws Exception {

Credentials credentials = conf.getCredentials();
String project = conf.getProject();

String project = conf.getDatasetProject();

String cmekKey = context.getRuntimeArguments().get(GCP_CMEK_KEY_NAME) != null ?
context.getRuntimeArguments().get(GCP_CMEK_KEY_NAME) : conf.getEncryptionKeyName();

EncryptionConfiguration encryptionConfig = cmekKey == null ? null :
EncryptionConfiguration.newBuilder().setKmsKeyName(cmekKey).build();

BigQuery bigQuery = BigQueryOptions.newBuilder()
.setCredentials(credentials)
.setProjectId(project)
.build()
.getService();

long maximumExistingSequenceNumber =
BigQueryUtils.getMaximumExistingSequenceNumber(context.getAllTables(), project, conf.getDatasetName(),
bigQuery, encryptionConfig, MAX_TABLES_PER_QUERY);
BigQuery bigQuery = getBigQuery(project, credentials);

RetryPolicy<Object> retryPolicy = createBaseRetryPolicy()
.handleIf(ex -> {
if (ex.getCause() instanceof IOException) {
return true;
}
if (ex instanceof BigQueryException) {
BigQueryException t = (BigQueryException) ex;
int code = t.getCode();
String reason = t.getError() != null ? t.getError().getReason() : null;
boolean isRateLimitExceeded = RATE_LIMIT_EXCEEDED_CODES.contains(code)
&& RATE_LIMIT_EXCEEDED_REASON.equals(reason);
boolean isBillingTierLimitExceeded = code == BILLING_TIER_LIMIT_EXCEEDED_CODE
&& BILLING_TIER_LIMIT_EXCEEDED_REASON.equals(reason);
return t.isRetryable() || isRateLimitExceeded || isBillingTierLimitExceeded;
}
return false;
});
try {
long maximumExistingSequenceNumber = Failsafe.with(retryPolicy).get(() ->
BigQueryUtils.getMaximumExistingSequenceNumber(context.getAllTables(), project, conf.getDatasetName(),
bigQuery, encryptionConfig, MAX_TABLES_PER_QUERY));
LOG.info("Found maximum sequence number {}", maximumExistingSequenceNumber);
context.initializeSequenceNumber(maximumExistingSequenceNumber);
} catch (Exception e) {
throw new RuntimeException("Failed to compute the maximum sequence number among all the target tables " +
"selected for replication. Please make sure that if target tables exists, " +
"they should have '_sequence_num' column in them.", e);
}

LOG.info("Found maximum sequence number {}", maximumExistingSequenceNumber);

context.initializeSequenceNumber(maximumExistingSequenceNumber);
}

@Override
public EventConsumer createConsumer(DeltaTargetContext context) throws IOException {
Credentials credentials = conf.getCredentials();
String project = conf.getProject();
String datasetProject = conf.getDatasetProject();

String cmekKey = context.getRuntimeArguments().get(GCP_CMEK_KEY_NAME) != null ?
context.getRuntimeArguments().get(GCP_CMEK_KEY_NAME) : conf.getEncryptionKeyName();

EncryptionConfiguration encryptionConfig = cmekKey == null ? null :
EncryptionConfiguration.newBuilder().setKmsKeyName(cmekKey).build();

BigQuery bigQuery = BigQueryOptions.newBuilder()
.setCredentials(credentials)
.setProjectId(project)
.build()
.getService();
BigQuery bigQuery = getBigQuery(project, credentials);

Storage storage = StorageOptions.newBuilder()
.setCredentials(credentials)
.setProjectId(project)
.build()
.getService();

String stagingBucketName = getStagingBucketName(conf.stagingBucket, context.getPipelineId());
Bucket bucket = storage.get(stagingBucketName);
if (bucket == null) {
try {
BucketInfo.Builder builder = BucketInfo.newBuilder(stagingBucketName);
if (cmekKey != null) {
builder.setDefaultKmsKeyName(cmekKey);
}
if (conf.stagingBucketLocation != null && !conf.stagingBucketLocation.trim().isEmpty()) {
builder.setLocation(conf.stagingBucketLocation);
}
bucket = storage.create(builder.build());
} catch (StorageException e) {
// It is possible that in multiple worker instances scenario
// bucket is created by another worker instance after this worker instance
// determined that the bucket does not exists. Ignore error if bucket already exists.
if (e.getCode() != CONFLICT) {
throw new IOException(
String.format("Unable to create staging bucket '%s' in project '%s'. "
+ "Please make sure the service account has permission to create buckets, "
+ "or create the bucket before starting the program.", stagingBucketName, project), e);
RetryPolicy<Object> retryPolicy = createBaseRetryPolicy()
.handleIf(ex -> ex instanceof IOException
|| ex.getCause() instanceof IOException
|| (ex instanceof StorageException && ((StorageException) ex).isRetryable()));

Bucket bucket;
try {
bucket = Failsafe.with(retryPolicy).get(() -> {
Bucket b = storage.get(stagingBucketName);
if (b == null) {
try {
BucketInfo.Builder builder = BucketInfo.newBuilder(stagingBucketName);
if (cmekKey != null) {
builder.setDefaultKmsKeyName(cmekKey);
}
if (conf.stagingBucketLocation != null && !conf.stagingBucketLocation.trim().isEmpty()) {
builder.setLocation(conf.stagingBucketLocation);
}
b = storage.create(builder.build());
} catch (StorageException e) {
// It is possible that in multiple worker instances scenario
// bucket is created by another worker instance after this worker instance
// determined that the bucket does not exists. Ignore error if bucket already exists.
if (e.getCode() != CONFLICT) {
throw e;
}
b = storage.get(stagingBucketName);
}
}
bucket = storage.get(stagingBucketName);
}
return b;
});
} catch (Exception e) {
throw new RuntimeException(
String.format("Unable to create staging bucket '%s' in project '%s'. " +
"Please make sure the service account has permission to create buckets, " +
"or create the bucket before starting the program.", stagingBucketName, project), e);
}

return new BigQueryEventConsumer(context, storage, bigQuery, bucket, project,
return new BigQueryEventConsumer(context, storage, bigQuery, bucket, datasetProject,
conf.getLoadIntervalSeconds(), conf.getStagingTablePrefix(),
conf.requiresManualDrops(), encryptionConfig, null, conf.getDatasetName(),
conf.softDeletesEnabled());
Expand Down Expand Up @@ -178,17 +236,55 @@ private static String stringifyPipelineId(DeltaPipelineId pipelineId) {
pipelineId.getGeneration());
}

private <T> RetryPolicy<T> createBaseRetryPolicy() {
RetryPolicy<T> retryPolicy = new RetryPolicy<>();
return retryPolicy.withMaxAttempts(retryCount)
.withMaxDuration(Duration.of(2, ChronoUnit.MINUTES))
.withBackoff(1, 30, ChronoUnit.SECONDS)
.withJitter(0.1);
}

public static BigQuery getBigQuery(String project, @Nullable Credentials credentials) {
BigQueryOptions.Builder bigqueryBuilder = BigQueryOptions.newBuilder().setProjectId(project);
if (credentials != null) {
Set<String> scopes = new HashSet<>(BIGQUERY_SCOPES);

if (credentials instanceof ServiceAccountCredentials) {
scopes.addAll(((ServiceAccountCredentials) credentials).getScopes());
} else if (credentials instanceof ExternalAccountCredentials) {
Collection<String> currentScopes = ((ExternalAccountCredentials) credentials).getScopes();
if (currentScopes != null) {
scopes.addAll(currentScopes);
}
}

if (credentials instanceof GoogleCredentials) {
credentials = ((GoogleCredentials) credentials).createScoped(scopes);
}
bigqueryBuilder.setCredentials(credentials);
}
return bigqueryBuilder.build().getService();
}

/**
* Config for BigQuery target.
*/
@SuppressWarnings("unused")
public static class Conf extends PluginConfig {

public static final String AUTO_DETECT = "auto-detect";
@Nullable
@Description("Project of the BigQuery dataset. When running on a Google Cloud VM, this can be set to "
+ "'auto-detect', which will use the project of the VM.")
private String project;

@Macro
@Nullable
@Description("The project the dataset belongs to. This is only required if the dataset is not " +
"in the same project that the BigQuery job will run in. If no value is given, it will" +
" default to the configured project ID.")
private String datasetProject;

@Macro
@Nullable
@Description("Service account key to use when interacting with GCS and BigQuery. The service account "
Expand Down Expand Up @@ -290,5 +386,20 @@ private Credentials getCredentials() throws IOException {
.createScoped(Collections.singleton("https://www.googleapis.com/auth/cloud-platform"));
}
}
public String getDatasetProject() {
// if it is set to 'auto-detect,' the default project ID will be automatically detected
// otherwise if the user provides an ID, it will be used.
// or else IllegalArgument exception will be thrown
if (AUTO_DETECT.equalsIgnoreCase(datasetProject)) {
String defaultProject = ServiceOptions.getDefaultProjectId();
if (defaultProject == null) {
throw new IllegalArgumentException(
"Could not detect Google Cloud project id from the environment. Please specify a dataset project id.");
}
return defaultProject;
}
// if it's null or empty that means it should be same as project
return Strings.isNullOrEmpty(datasetProject) ? getProject() : datasetProject;
}
}
}
9 changes: 7 additions & 2 deletions src/main/java/io/cdap/delta/bigquery/BigQueryUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ static long getMaximumExistingSequenceNumberPerBatch(Set<SourceTable> allTables,
normalizeTableName(table.getTable()));
if (existingTableIDs.contains(tableId)) {
maxSequenceNumQueryPerTable.add(String.format("SELECT MAX(_sequence_num) as max_sequence_num FROM %s",
wrapInBackTick(tableId.getDataset(), tableId.getTable())));
wrapInBackTick(tableId.getProject(), tableId.getDataset(), tableId.getTable())));
}
}

Expand Down Expand Up @@ -149,7 +149,7 @@ static long getMaximumSequenceNumberForTable(BigQuery bigQuery, TableId tableId,
}

String query = String.format("SELECT MAX(_sequence_num) FROM %s",
wrapInBackTick(tableId.getDataset(), tableId.getTable()));
wrapInBackTick(tableId.getProject(), tableId.getDataset(), tableId.getTable()));
return executeAggregateQuery(bigQuery, query, encryptionConfig);
}

Expand Down Expand Up @@ -291,6 +291,11 @@ static String wrapInBackTick(String datasetName, String tableName) {
return BACKTICK + datasetName + "." + tableName + BACKTICK;
}

static String wrapInBackTick(String project, String datasetName, String tableName) {

return BACKTICK + project + "." + datasetName + "." + tableName + BACKTICK;
}

/**
* Tries to submit a BQ job. If there is an Already Exists Exception, it will fetch the existing job
* @param bigquery
Expand Down
Loading

0 comments on commit 698ccf2

Please sign in to comment.