From 73c53573c0c484fa3dfcaacfecac9319f21f6999 Mon Sep 17 00:00:00 2001 From: Shivam Varshney Date: Wed, 18 Oct 2023 15:05:19 +0530 Subject: [PATCH] DataSet Project Id feature --- docs/bigquery-cdcTarget.md | 13 +- pom.xml | 2 +- .../delta/bigquery/BigQueryEventConsumer.java | 9 +- .../cdap/delta/bigquery/BigQueryTarget.java | 195 +++++++++++--- .../io/cdap/delta/bigquery/BigQueryUtils.java | 9 +- .../delta/bigquery/BigQueryTargetTest.java | 240 ++++++++++++++++++ widgets/bigquery-cdcTarget.json | 8 + 7 files changed, 426 insertions(+), 50 deletions(-) diff --git a/docs/bigquery-cdcTarget.md b/docs/bigquery-cdcTarget.md index eef981c..5cfd733 100644 --- a/docs/bigquery-cdcTarget.md +++ b/docs/bigquery-cdcTarget.md @@ -37,8 +37,17 @@ on the new primary key. Properties ---------- -**Project ID**: Project of the BigQuery dataset. When running on a Dataproc cluster, this can be left blank, -which will use the project of the cluster. +**Project ID**: Google Cloud Project ID, which uniquely identifies a project. +It can be found on the Dashboard in the Google Cloud Platform Console. This is the project +that the BigQuery job will run in. `BigQuery Job User` role on this project must be granted to the specified service +account to run the job. If a temporary bucket needs to be created, the bucket will also be created in this project and +'GCE Storage Bucket Admin' role on this project must be granted to the specified service account to create buckets. + +**Dataset Project ID**: Project the destination dataset belongs to. This is only required if the dataset is not +in the same project that the BigQuery job will run in. If no value is given, it will default to the +configured Project ID. `BigQuery Data Editor` role on this project must be granted to the specified service account to +write BigQuery data to this project. + **Location**: The location where the BigQuery dataset and GCS staging bucket will get created. For example, 'us-east1' for regional bucket, 'us' for multi-regional bucket. A complete list of available locations can be found at diff --git a/pom.xml b/pom.xml index fe6470f..69a4c22 100644 --- a/pom.xml +++ b/pom.xml @@ -22,7 +22,7 @@ io.cdap.delta bigquery-delta-plugins - 0.7.2 + 0.7.3 BigQuery Delta plugins jar BigQuery Delta plugins diff --git a/src/main/java/io/cdap/delta/bigquery/BigQueryEventConsumer.java b/src/main/java/io/cdap/delta/bigquery/BigQueryEventConsumer.java index e9eeff3..ccdb89e 100644 --- a/src/main/java/io/cdap/delta/bigquery/BigQueryEventConsumer.java +++ b/src/main/java/io/cdap/delta/bigquery/BigQueryEventConsumer.java @@ -1096,11 +1096,13 @@ static String createDiffQuery(TableId stagingTable, List primaryKeys, lo joinCondition += getOrderingCondition(sortKeys, "A", "B"); } return "SELECT A.* FROM\n" + - "(SELECT * FROM " + BigQueryUtils.wrapInBackTick(stagingTable.getDataset(), stagingTable.getTable()) + + "(SELECT * FROM " + + BigQueryUtils.wrapInBackTick(stagingTable.getProject(), stagingTable.getDataset(), stagingTable.getTable()) + " WHERE _batch_id = " + batchId + " AND _sequence_num > " + latestSequenceNumInTargetTable + ") as A\n" + "LEFT OUTER JOIN\n" + - "(SELECT * FROM " + BigQueryUtils.wrapInBackTick(stagingTable.getDataset(), stagingTable.getTable()) + + "(SELECT * FROM " + + BigQueryUtils.wrapInBackTick(stagingTable.getProject(), stagingTable.getDataset(), stagingTable.getTable()) + " WHERE _batch_id = " + batchId + " AND _sequence_num > " + latestSequenceNumInTargetTable + ") as B\n" + "ON " + joinCondition + @@ -1226,7 +1228,8 @@ static String createMergeQuery(TableId targetTableId, List primaryKeys, } String mergeQuery = "MERGE " + - BigQueryUtils.wrapInBackTick(targetTableId.getDataset(), targetTableId.getTable()) + " as T\n" + + BigQueryUtils.wrapInBackTick(targetTableId.getProject(), targetTableId.getDataset(), targetTableId.getTable()) + + " as T\n" + "USING (" + diffQuery + ") as D\n" + "ON " + mergeCondition + "\n" + "WHEN MATCHED AND D._op = \"DELETE\" " + updateAndDeleteCondition + "THEN\n" + diff --git a/src/main/java/io/cdap/delta/bigquery/BigQueryTarget.java b/src/main/java/io/cdap/delta/bigquery/BigQueryTarget.java index c4db226..3338c5d 100644 --- a/src/main/java/io/cdap/delta/bigquery/BigQueryTarget.java +++ b/src/main/java/io/cdap/delta/bigquery/BigQueryTarget.java @@ -17,9 +17,12 @@ package io.cdap.delta.bigquery; import com.google.auth.Credentials; +import com.google.auth.oauth2.ExternalAccountCredentials; import com.google.auth.oauth2.GoogleCredentials; +import com.google.auth.oauth2.ServiceAccountCredentials; import com.google.cloud.ServiceOptions; import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryException; import com.google.cloud.bigquery.BigQueryOptions; import com.google.cloud.bigquery.EncryptionConfiguration; import com.google.cloud.storage.Bucket; @@ -29,6 +32,7 @@ import com.google.cloud.storage.StorageOptions; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; +import com.google.common.base.Strings; import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Macro; import io.cdap.cdap.api.annotation.Name; @@ -41,6 +45,8 @@ import io.cdap.delta.api.EventConsumer; import io.cdap.delta.api.assessment.StandardizedTableDetail; import io.cdap.delta.api.assessment.TableAssessor; +import net.jodah.failsafe.Failsafe; +import net.jodah.failsafe.RetryPolicy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,7 +54,14 @@ import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; import javax.annotation.Nullable; /** @@ -65,11 +78,25 @@ public class BigQueryTarget implements DeltaTarget { private static final String GCS_SCHEME = "gs://"; private static final String GCP_CMEK_KEY_NAME = "gcp.cmek.key.name"; private static final int MAX_TABLES_PER_QUERY = 1000; + private static final String RATE_LIMIT_EXCEEDED_REASON = "rateLimitExceeded"; + private static final Set RATE_LIMIT_EXCEEDED_CODES = new HashSet<>(Arrays.asList(400, 403)); + private static final int BILLING_TIER_LIMIT_EXCEEDED_CODE = 400; + private static final String BILLING_TIER_LIMIT_EXCEEDED_REASON = "billingTierLimitExceeded"; + private static final int RETRY_COUNT = 25; + private final int retryCount; private final Conf conf; + public static final List BIGQUERY_SCOPES = Arrays.asList("https://www.googleapis.com/auth/drive", + "https://www.googleapis.com/auth/bigquery"); + @SuppressWarnings("unused") public BigQueryTarget(Conf conf) { + this(conf, RETRY_COUNT); + } + @SuppressWarnings("unused") + public BigQueryTarget(Conf conf, int retryCount) { this.conf = conf; + this.retryCount = retryCount; } @Override @@ -79,78 +106,109 @@ public void configure(Configurer configurer) { @Override public void initialize(DeltaTargetContext context) throws Exception { + Credentials credentials = conf.getCredentials(); - String project = conf.getProject(); + + String project = conf.getDatasetProject(); + String cmekKey = context.getRuntimeArguments().get(GCP_CMEK_KEY_NAME) != null ? context.getRuntimeArguments().get(GCP_CMEK_KEY_NAME) : conf.getEncryptionKeyName(); EncryptionConfiguration encryptionConfig = cmekKey == null ? null : EncryptionConfiguration.newBuilder().setKmsKeyName(cmekKey).build(); - BigQuery bigQuery = BigQueryOptions.newBuilder() - .setCredentials(credentials) - .setProjectId(project) - .build() - .getService(); - - long maximumExistingSequenceNumber = - BigQueryUtils.getMaximumExistingSequenceNumber(context.getAllTables(), project, conf.getDatasetName(), - bigQuery, encryptionConfig, MAX_TABLES_PER_QUERY); + BigQuery bigQuery = getBigQuery(project, credentials); + + RetryPolicy retryPolicy = createBaseRetryPolicy() + .handleIf(ex -> { + if (ex.getCause() instanceof IOException) { + return true; + } + if (ex instanceof BigQueryException) { + BigQueryException t = (BigQueryException) ex; + int code = t.getCode(); + String reason = t.getError() != null ? t.getError().getReason() : null; + boolean isRateLimitExceeded = RATE_LIMIT_EXCEEDED_CODES.contains(code) + && RATE_LIMIT_EXCEEDED_REASON.equals(reason); + boolean isBillingTierLimitExceeded = code == BILLING_TIER_LIMIT_EXCEEDED_CODE + && BILLING_TIER_LIMIT_EXCEEDED_REASON.equals(reason); + return t.isRetryable() || isRateLimitExceeded || isBillingTierLimitExceeded; + } + return false; + }); + try { + long maximumExistingSequenceNumber = Failsafe.with(retryPolicy).get(() -> + BigQueryUtils.getMaximumExistingSequenceNumber(context.getAllTables(), project, conf.getDatasetName(), + bigQuery, encryptionConfig, MAX_TABLES_PER_QUERY)); + LOG.info("Found maximum sequence number {}", maximumExistingSequenceNumber); + context.initializeSequenceNumber(maximumExistingSequenceNumber); + } catch (Exception e) { + throw new RuntimeException("Failed to compute the maximum sequence number among all the target tables " + + "selected for replication. Please make sure that if target tables exists, " + + "they should have '_sequence_num' column in them.", e); + } - LOG.info("Found maximum sequence number {}", maximumExistingSequenceNumber); - context.initializeSequenceNumber(maximumExistingSequenceNumber); } @Override public EventConsumer createConsumer(DeltaTargetContext context) throws IOException { Credentials credentials = conf.getCredentials(); String project = conf.getProject(); + String datasetProject = conf.getDatasetProject(); + String cmekKey = context.getRuntimeArguments().get(GCP_CMEK_KEY_NAME) != null ? context.getRuntimeArguments().get(GCP_CMEK_KEY_NAME) : conf.getEncryptionKeyName(); EncryptionConfiguration encryptionConfig = cmekKey == null ? null : EncryptionConfiguration.newBuilder().setKmsKeyName(cmekKey).build(); - BigQuery bigQuery = BigQueryOptions.newBuilder() - .setCredentials(credentials) - .setProjectId(project) - .build() - .getService(); + BigQuery bigQuery = getBigQuery(project, credentials); Storage storage = StorageOptions.newBuilder() .setCredentials(credentials) - .setProjectId(project) .build() .getService(); String stagingBucketName = getStagingBucketName(conf.stagingBucket, context.getPipelineId()); - Bucket bucket = storage.get(stagingBucketName); - if (bucket == null) { - try { - BucketInfo.Builder builder = BucketInfo.newBuilder(stagingBucketName); - if (cmekKey != null) { - builder.setDefaultKmsKeyName(cmekKey); - } - if (conf.stagingBucketLocation != null && !conf.stagingBucketLocation.trim().isEmpty()) { - builder.setLocation(conf.stagingBucketLocation); - } - bucket = storage.create(builder.build()); - } catch (StorageException e) { - // It is possible that in multiple worker instances scenario - // bucket is created by another worker instance after this worker instance - // determined that the bucket does not exists. Ignore error if bucket already exists. - if (e.getCode() != CONFLICT) { - throw new IOException( - String.format("Unable to create staging bucket '%s' in project '%s'. " - + "Please make sure the service account has permission to create buckets, " - + "or create the bucket before starting the program.", stagingBucketName, project), e); + RetryPolicy retryPolicy = createBaseRetryPolicy() + .handleIf(ex -> ex instanceof IOException + || ex.getCause() instanceof IOException + || (ex instanceof StorageException && ((StorageException) ex).isRetryable())); + + Bucket bucket; + try { + bucket = Failsafe.with(retryPolicy).get(() -> { + Bucket b = storage.get(stagingBucketName); + if (b == null) { + try { + BucketInfo.Builder builder = BucketInfo.newBuilder(stagingBucketName); + if (cmekKey != null) { + builder.setDefaultKmsKeyName(cmekKey); + } + if (conf.stagingBucketLocation != null && !conf.stagingBucketLocation.trim().isEmpty()) { + builder.setLocation(conf.stagingBucketLocation); + } + b = storage.create(builder.build()); + } catch (StorageException e) { + // It is possible that in multiple worker instances scenario + // bucket is created by another worker instance after this worker instance + // determined that the bucket does not exists. Ignore error if bucket already exists. + if (e.getCode() != CONFLICT) { + throw e; + } + b = storage.get(stagingBucketName); + } } - bucket = storage.get(stagingBucketName); - } + return b; + }); + } catch (Exception e) { + throw new RuntimeException( + String.format("Unable to create staging bucket '%s' in project '%s'. " + + "Please make sure the service account has permission to create buckets, " + + "or create the bucket before starting the program.", stagingBucketName, project), e); } - - return new BigQueryEventConsumer(context, storage, bigQuery, bucket, project, + return new BigQueryEventConsumer(context, storage, bigQuery, bucket, datasetProject, conf.getLoadIntervalSeconds(), conf.getStagingTablePrefix(), conf.requiresManualDrops(), encryptionConfig, null, conf.getDatasetName(), conf.softDeletesEnabled()); @@ -178,17 +236,55 @@ private static String stringifyPipelineId(DeltaPipelineId pipelineId) { pipelineId.getGeneration()); } + private RetryPolicy createBaseRetryPolicy() { + RetryPolicy retryPolicy = new RetryPolicy<>(); + return retryPolicy.withMaxAttempts(retryCount) + .withMaxDuration(Duration.of(2, ChronoUnit.MINUTES)) + .withBackoff(1, 30, ChronoUnit.SECONDS) + .withJitter(0.1); + } + + public static BigQuery getBigQuery(String project, @Nullable Credentials credentials) { + BigQueryOptions.Builder bigqueryBuilder = BigQueryOptions.newBuilder().setProjectId(project); + if (credentials != null) { + Set scopes = new HashSet<>(BIGQUERY_SCOPES); + + if (credentials instanceof ServiceAccountCredentials) { + scopes.addAll(((ServiceAccountCredentials) credentials).getScopes()); + } else if (credentials instanceof ExternalAccountCredentials) { + Collection currentScopes = ((ExternalAccountCredentials) credentials).getScopes(); + if (currentScopes != null) { + scopes.addAll(currentScopes); + } + } + + if (credentials instanceof GoogleCredentials) { + credentials = ((GoogleCredentials) credentials).createScoped(scopes); + } + bigqueryBuilder.setCredentials(credentials); + } + return bigqueryBuilder.build().getService(); + } + /** * Config for BigQuery target. */ @SuppressWarnings("unused") public static class Conf extends PluginConfig { + public static final String AUTO_DETECT = "auto-detect"; @Nullable @Description("Project of the BigQuery dataset. When running on a Google Cloud VM, this can be set to " + "'auto-detect', which will use the project of the VM.") private String project; + @Macro + @Nullable + @Description("The project the dataset belongs to. This is only required if the dataset is not " + + "in the same project that the BigQuery job will run in. If no value is given, it will" + + " default to the configured project ID.") + private String datasetProject; + @Macro @Nullable @Description("Service account key to use when interacting with GCS and BigQuery. The service account " @@ -290,5 +386,20 @@ private Credentials getCredentials() throws IOException { .createScoped(Collections.singleton("https://www.googleapis.com/auth/cloud-platform")); } } + public String getDatasetProject() { + // if it is set to 'auto-detect,' the default project ID will be automatically detected + // otherwise if the user provides an ID, it will be used. + // or else IllegalArgument exception will be thrown + if (AUTO_DETECT.equalsIgnoreCase(datasetProject)) { + String defaultProject = ServiceOptions.getDefaultProjectId(); + if (defaultProject == null) { + throw new IllegalArgumentException( + "Could not detect Google Cloud project id from the environment. Please specify a dataset project id."); + } + return defaultProject; + } + // if it's null or empty that means it should be same as project + return Strings.isNullOrEmpty(datasetProject) ? getProject() : datasetProject; + } } } diff --git a/src/main/java/io/cdap/delta/bigquery/BigQueryUtils.java b/src/main/java/io/cdap/delta/bigquery/BigQueryUtils.java index 8b1a0c3..f56e32b 100644 --- a/src/main/java/io/cdap/delta/bigquery/BigQueryUtils.java +++ b/src/main/java/io/cdap/delta/bigquery/BigQueryUtils.java @@ -119,7 +119,7 @@ static long getMaximumExistingSequenceNumberPerBatch(Set allTables, normalizeTableName(table.getTable())); if (existingTableIDs.contains(tableId)) { maxSequenceNumQueryPerTable.add(String.format("SELECT MAX(_sequence_num) as max_sequence_num FROM %s", - wrapInBackTick(tableId.getDataset(), tableId.getTable()))); + wrapInBackTick(tableId.getProject(), tableId.getDataset(), tableId.getTable()))); } } @@ -149,7 +149,7 @@ static long getMaximumSequenceNumberForTable(BigQuery bigQuery, TableId tableId, } String query = String.format("SELECT MAX(_sequence_num) FROM %s", - wrapInBackTick(tableId.getDataset(), tableId.getTable())); + wrapInBackTick(tableId.getProject(), tableId.getDataset(), tableId.getTable())); return executeAggregateQuery(bigQuery, query, encryptionConfig); } @@ -291,6 +291,11 @@ static String wrapInBackTick(String datasetName, String tableName) { return BACKTICK + datasetName + "." + tableName + BACKTICK; } + static String wrapInBackTick(String project, String datasetName, String tableName) { + + return BACKTICK + project + "." + datasetName + "." + tableName + BACKTICK; + } + /** * Tries to submit a BQ job. If there is an Already Exists Exception, it will fetch the existing job * @param bigquery diff --git a/src/test/java/io/cdap/delta/bigquery/BigQueryTargetTest.java b/src/test/java/io/cdap/delta/bigquery/BigQueryTargetTest.java index 352144d..c20ed2b 100644 --- a/src/test/java/io/cdap/delta/bigquery/BigQueryTargetTest.java +++ b/src/test/java/io/cdap/delta/bigquery/BigQueryTargetTest.java @@ -16,16 +16,151 @@ package io.cdap.delta.bigquery; +import com.google.api.gax.paging.Page; +import com.google.auth.Credentials; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryError; +import com.google.cloud.bigquery.BigQueryException; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.Dataset; +import com.google.cloud.bigquery.EncryptionConfiguration; +import com.google.cloud.bigquery.Job; +import com.google.cloud.bigquery.JobInfo; +import com.google.cloud.bigquery.JobStatus; +import com.google.cloud.bigquery.Table; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.storage.BucketInfo; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageException; +import com.google.cloud.storage.StorageOptions; import io.cdap.delta.api.DeltaPipelineId; +import io.cdap.delta.api.DeltaTargetContext; +import io.cdap.delta.api.SourceTable; +import net.jodah.failsafe.FailsafeException; import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + + + import static io.cdap.delta.bigquery.BigQueryTarget.STAGING_BUCKET_PREFIX; /** * Tests for BigQueryTarget. */ +@PrepareForTest({BigQueryUtils.class, BigQueryTarget.class, StorageOptions.class, + BucketInfo.class, BigQueryOptions.class}) +@RunWith(PowerMockRunner.class) public class BigQueryTargetTest { + private static final String GCP_CMEK_KEY_NAME = "gcp.cmek.key.name"; + private static final String PROJECT = "project"; + private static final String DATASET = "dataset"; + private static final String TABLE = "table"; + private static final String RATE_LIMIT_EXCEEDED_REASON = "rateLimitExceeded"; + private static final Set RATE_LIMIT_EXCEEDED_CODES = new HashSet<>(Arrays.asList(400, 403)); + private static final int BILLING_TIER_LIMIT_EXCEEDED_CODE = 400; + private static final String BILLING_TIER_LIMIT_EXCEEDED_REASON = "billingTierLimitExceeded"; + private static final Integer NOT_IMPLEMENTED_CODE = 501; + private static final int BQ_JOB_TIME_BOUND = 2; + private static final int RETRY_COUNT = 2; + private final DeltaPipelineId pipelineId = new DeltaPipelineId("ns", "app", 1L); + @Rule + private final ExpectedException exceptionRule = ExpectedException.none(); + @Mock + private Job job; + @Mock + private DeltaTargetContext deltaTargetContext; + @Mock + private BigQueryTarget.Conf conf; + @Mock + private StorageOptions.Builder storageBuilder; + @Mock + private BigQueryOptions.Builder bigqueryBuilder; + @Mock + private BucketInfo.Builder bucketInfoBuilder; + @Mock + private BigQuery bigQuery; + @Mock + private BucketInfo bucketInfo; + @Mock + private Storage storage; + @Mock + private StorageOptions storageOptions; + @Mock + private BigQueryOptions bigqueryOptions; + @Mock + private Credentials credentials; + @Mock + private SourceTable sourceTable; + @Mock + private Table table; + private BigQueryTarget bqTarget; + + @Before + public void setUp() throws Exception { + Mockito.when(deltaTargetContext.getRuntimeArguments()).thenReturn(new HashMap() {{ + put(GCP_CMEK_KEY_NAME, "GCP_CMEK_KEY_NAME"); + }}); + Mockito.when(deltaTargetContext.getPipelineId()).thenReturn(pipelineId); + Mockito.when(deltaTargetContext.getAllTables()).thenReturn(new HashSet() {{ add(sourceTable); }}); + + PowerMockito.when(conf, "getProject").thenReturn(PROJECT); + PowerMockito.when(conf, "getCredentials").thenReturn(credentials); + PowerMockito.when(conf, "getDatasetProject").thenReturn(PROJECT); + + Mockito.when(bucketInfoBuilder.build()).thenReturn(bucketInfo); + + Mockito.when(storageOptions.getService()).thenReturn(storage); + Mockito.when(storageBuilder.build()).thenReturn(storageOptions); + Mockito.when(storageBuilder.setCredentials(credentials)).thenReturn(storageBuilder); + Mockito.when(storageBuilder.setProjectId(PROJECT)).thenReturn(storageBuilder); + PowerMockito.whenNew(StorageOptions.Builder.class).withNoArguments().thenReturn(storageBuilder); + + Mockito.when(bigqueryOptions.getService()).thenReturn(bigQuery); + Mockito.when(bigqueryBuilder.build()).thenReturn(bigqueryOptions); + Mockito.when(bigqueryBuilder.setCredentials(credentials)).thenReturn(bigqueryBuilder); + Mockito.when(bigqueryBuilder.setProjectId(PROJECT)).thenReturn(bigqueryBuilder); + PowerMockito.whenNew(BigQueryOptions.Builder.class).withNoArguments().thenReturn(bigqueryBuilder); + + Mockito.when(sourceTable.getDatabase()).thenReturn(DATASET); + Mockito.when(sourceTable.getTable()).thenReturn(TABLE); + Mockito.when(table.getTableId()).thenReturn(TableId.of(PROJECT, DATASET, TABLE)); + + //Random execution time for BigQuery job + Mockito.when(job.waitFor()) + .thenAnswer((a) -> { + TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextLong(BQ_JOB_TIME_BOUND)); + return job; + }); + Mockito.when(job.getStatus()).thenReturn(Mockito.mock(JobStatus.class)); + + Mockito.when(bigQuery.getTable(Mockito.any())).thenReturn(table); + Mockito.when(bigQuery.getDataset(Mockito.anyString())).thenReturn(Mockito.mock(Dataset.class)); + Mockito.when(bigQuery.create(Mockito.any(JobInfo.class))).thenReturn(job); + Mockito.when(bigQuery.listTables(Mockito.anyString())).thenReturn(Mockito.mock(Page.class)); + Mockito.when(bigQuery.listTables(Mockito.anyString()).iterateAll()).thenReturn(Collections.singletonList(table)); + + bqTarget = new BigQueryTarget(conf, RETRY_COUNT); + } @Test public void testStagingBucketName() { @@ -41,4 +176,109 @@ public void testStagingBucketName() { Assert.assertEquals(expectedBucketName, BigQueryTarget.getStagingBucketName(" ", pipelineId)); Assert.assertEquals(expectedBucketName, BigQueryTarget.getStagingBucketName(null, pipelineId)); } + + @Test + public void testGetMaximumExistingSequenceNumberForRetryableFailures() throws Exception { + PowerMockito.mockStatic(BigQueryUtils.class, Mockito.CALLS_REAL_METHODS); + List exceptions = new ArrayList<>(); + exceptions.add(new BigQueryException(500, null)); + exceptions.add(new BigQueryException(BILLING_TIER_LIMIT_EXCEEDED_CODE, null, + new BigQueryError(BILLING_TIER_LIMIT_EXCEEDED_REASON, null, null))); + exceptions.add(new BigQueryException(RATE_LIMIT_EXCEEDED_CODES.stream().findAny().get(), null, + new BigQueryError(RATE_LIMIT_EXCEEDED_REASON, null, null))); + + for (Throwable exception: exceptions) { + Mockito.when(job.getQueryResults()).thenThrow(exception); + try { + exceptionRule.expect(RuntimeException.class); + bqTarget.initialize(deltaTargetContext); + } finally { + //verify at least 1 retry happens + PowerMockito.verifyStatic(BigQueryUtils.class, Mockito.atLeast(1)); + BigQueryUtils.getMaximumExistingSequenceNumber(Mockito.anySet(), Mockito.anyString(), + Mockito.nullable(String.class), Mockito.any(BigQuery.class), + Mockito.nullable(EncryptionConfiguration.class), Mockito.anyInt()); + } + } + //verify when list tables() throws exception + Mockito.when(bigQuery.listTables(Mockito.anyString())).thenThrow( + new BigQueryException(BILLING_TIER_LIMIT_EXCEEDED_CODE, null, + new BigQueryError(BILLING_TIER_LIMIT_EXCEEDED_REASON, null, null))); + try { + exceptionRule.expect(RuntimeException.class); + bqTarget.initialize(deltaTargetContext); + } finally { + //verify at least 1 retry happens + PowerMockito.verifyStatic(BigQueryUtils.class, Mockito.atLeast(1)); + BigQueryUtils.getMaximumExistingSequenceNumber(Mockito.anySet(), Mockito.anyString(), + Mockito.nullable(String.class), Mockito.any(BigQuery.class), + Mockito.nullable(EncryptionConfiguration.class), Mockito.anyInt()); + } + } + + @Test + public void testGetMaximumExistingSequenceNumberForNonRetryableFailures() throws Exception { + PowerMockito.mockStatic(BigQueryUtils.class, Mockito.CALLS_REAL_METHODS); + List exceptions = new ArrayList<>(); + exceptions.add(new BigQueryException(NOT_IMPLEMENTED_CODE, null)); + exceptions.add(new RuntimeException()); + + for (Throwable exception: exceptions) { + Mockito.when(job.getQueryResults()).thenThrow(exception); + try { + exceptionRule.expect(RuntimeException.class); + bqTarget.initialize(deltaTargetContext); + } finally { + //Verify no retries + PowerMockito.verifyStatic(BigQueryUtils.class, Mockito.times(1)); + BigQueryUtils.getMaximumExistingSequenceNumber(Mockito.anySet(), Mockito.anyString(), + Mockito.nullable(String.class), Mockito.any(BigQuery.class), + Mockito.nullable(EncryptionConfiguration.class), Mockito.anyInt()); + } + } + } + + @Test + public void testCreateConsumerRetryableFailureForGcsCreate() throws Exception { + PowerMockito.mockStatic(BucketInfo.class); + PowerMockito.doReturn(bucketInfoBuilder).when(BucketInfo.class); + BucketInfo.newBuilder(Mockito.nullable(String.class)); + + Throwable exception = new StorageException(408, null); + Mockito.when(storage.create(Mockito.any(BucketInfo.class))).thenThrow(exception); + try { + exceptionRule.expect(RuntimeException.class); + bqTarget.createConsumer(deltaTargetContext); + } finally { + //Verify at least 1 retry + Mockito.verify(storage, Mockito.atLeast(2)).create(Mockito.any(BucketInfo.class)); + } + } + + @Test + public void testCreateConsumerRetryableFailureForGcsGet() throws Exception { + Throwable exception = new StorageException(500, null); + Mockito.when(storage.get(Mockito.anyString())).thenThrow(exception); + try { + exceptionRule.expect(RuntimeException.class); + bqTarget.createConsumer(deltaTargetContext); + } finally { + //Verify at least 1 retry + Mockito.verify(storage, Mockito.atLeast(2)).get(Mockito.nullable(String.class)); + } + } + + @Test + public void testCreateConsumerNonRetryableFailure() throws Exception { + Throwable exception = new StorageException(501, null); + Mockito.when(storage.get(Mockito.anyString())).thenThrow(exception); + try { + exceptionRule.expect(RuntimeException.class); + bqTarget.createConsumer(deltaTargetContext); + } finally { + //Verify no retry + Mockito.verify(storage, Mockito.times(1)).get(Mockito.nullable(String.class)); + } + } + } diff --git a/widgets/bigquery-cdcTarget.json b/widgets/bigquery-cdcTarget.json index 4e18141..17edba1 100644 --- a/widgets/bigquery-cdcTarget.json +++ b/widgets/bigquery-cdcTarget.json @@ -14,6 +14,14 @@ "widget-attributes": { "default": "auto-detect" } + }, + { + "name": "datasetProject", + "label": "DataSet Project ID", + "widget-type": "textbox", + "widget-attributes": { + "placeholder": "Project the destination dataset belongs to, if different from the project ID." + } } ] },