From ec0fb5990a6af78725ccc9ea18e1dd23ab884195 Mon Sep 17 00:00:00 2001 From: Justin Taras Date: Thu, 27 Jan 2022 17:07:24 -0500 Subject: [PATCH] remove extra new line. add queryAdd operation add propertyName to the validation methods. added validation check for the query operation added more verbose message added more verbose message changes for docs and widgets --- docs/Salesforce-batchsource.md | 6 +++- docs/SalesforceMultiObjects-batchsource.md | 4 +++ .../batch/SalesforceBaseSourceConfig.java | 33 +++++++++++++++++-- .../batch/SalesforceBatchMultiSource.java | 2 +- .../source/batch/SalesforceBatchSource.java | 3 +- .../batch/SalesforceMultiSourceConfig.java | 5 +-- .../source/batch/SalesforceSourceConfig.java | 3 +- .../batch/util/SalesforceSourceConstants.java | 1 + .../batch/util/SalesforceSplitUtil.java | 25 ++++++++++---- .../SalesforceMultiSourceConfigBuilder.java | 8 +++-- .../batch/SalesforceSourceConfigBuilder.java | 8 ++++- widgets/Salesforce-batchsource.json | 19 +++++++++++ .../SalesforceMultiObjects-batchsource.json | 19 +++++++++++ 13 files changed, 119 insertions(+), 17 deletions(-) diff --git a/docs/Salesforce-batchsource.md b/docs/Salesforce-batchsource.md index d4cabf94..6af33081 100644 --- a/docs/Salesforce-batchsource.md +++ b/docs/Salesforce-batchsource.md @@ -256,4 +256,8 @@ PK chunking only works with the following objects: Support also includes custom objects, and any Sharing and History tables that support standard objects. -**Chunk Size:** Specify size of chunk. Maximum Size is 250,000. Default Size is 100,000. \ No newline at end of file +**Chunk Size:** Specify size of chunk. Maximum Size is 250,000. Default Size is 100,000. + +**Query Operation:** +Specify the query operation to run on the table. If query is selected, only current records will be returned. +If queryAll is selected, all current and deleted records will be returned. Default operation is query. \ No newline at end of file diff --git a/docs/SalesforceMultiObjects-batchsource.md b/docs/SalesforceMultiObjects-batchsource.md index 77220dd8..c1d1f9dc 100644 --- a/docs/SalesforceMultiObjects-batchsource.md +++ b/docs/SalesforceMultiObjects-batchsource.md @@ -75,6 +75,10 @@ from 3am (inclusive) to 9am (exclusive). The duration is specified using numbers Several units can be specified, but each unit can only be used once. For example, `2 days, 1 hours, 30 minutes`. The duration is ignored if a value is already specified for `Last Modified After` or `Last Modified Before`. +**Query Operation:** +Specify the query operation to run on the table. If query is selected, only current records will be returned. +If queryAll is selected, all current and deleted records will be returned. Default operation is query. + **Offset:** Filter data to only read records where the system field `LastModifiedDate` is less than the logical start time of the pipeline minus the given offset. For example, if duration is '6 hours' and the offset is '1 hours', and the pipeline runs at 9am, data last modified between 2am (inclusive) and 8am (exclusive) will be read. diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBaseSourceConfig.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBaseSourceConfig.java index e875709d..50fe4648 100644 --- a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBaseSourceConfig.java +++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBaseSourceConfig.java @@ -15,6 +15,7 @@ */ package io.cdap.plugin.salesforce.plugin.source.batch; +import com.sforce.async.OperationEnum; import com.sforce.ws.ConnectionException; import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Macro; @@ -76,6 +77,14 @@ public abstract class SalesforceBaseSourceConfig extends BaseSalesforceConfig { @Macro private String offset; + @Name(SalesforceSourceConstants.PROPERTY_OPERATION) + @Description("If set to query, the query result will only return current rows. If set to queryAll, " + + "all records, including deletes will be sourced") + @Nullable + private String operation; + + private static final String DEFAULT_OPERATION = "query"; + protected SalesforceBaseSourceConfig(String referenceName, @Nullable String consumerKey, @Nullable String consumerSecret, @@ -87,12 +96,14 @@ protected SalesforceBaseSourceConfig(String referenceName, @Nullable String duration, @Nullable String offset, @Nullable String securityToken, - @Nullable OAuthInfo oAuthInfo) { + @Nullable OAuthInfo oAuthInfo, + @Nullable String operation) { super(referenceName, consumerKey, consumerSecret, username, password, loginUrl, securityToken, oAuthInfo); this.datetimeAfter = datetimeAfter; this.datetimeBefore = datetimeBefore; this.duration = duration; this.offset = offset; + this.operation = operation; } public Map getDuration() { @@ -134,6 +145,11 @@ protected void validateFilters(FailureCollector collector) { } catch (InvalidConfigException e) { collector.addFailure(e.getMessage(), null).withConfigProperty(e.getProperty()); } + try { + validateOperationProperty(SalesforceSourceConstants.PROPERTY_OPERATION, getOperation()); + } catch (InvalidConfigException e) { + collector.addFailure(e.getMessage(), null).withConfigProperty(e.getProperty()); + } } /** @@ -191,7 +207,6 @@ private SObjectFilterDescriptor getSObjectFilterDescriptor(long logicalStartTime return filterDescriptor; } - @Nullable private void validateIntervalFilterProperty(String propertyName, String datetime) { if (containsMacro(propertyName)) { return; @@ -205,6 +220,16 @@ private void validateIntervalFilterProperty(String propertyName, String datetime } } + private void validateOperationProperty(String propertyName, String operation) { + try { + OperationEnum.valueOf(operation); + } catch (InvalidConfigException e) { + throw new InvalidConfigException( + String.format("Invalid Query Operation: '%s'. Valid operation values are query and queryAll.", + operation), propertyName); + } + } + private void validateRangeFilterProperty(String propertyName, Map rangeValue) { if (containsMacro(propertyName) || rangeValue.isEmpty()) { return; @@ -271,4 +296,8 @@ private int parseUnitValue(String propertyName, String value) { private ZonedDateTime parseDatetime(String datetime) throws DateTimeParseException { return StringUtils.isBlank(datetime) ? null : ZonedDateTime.parse(datetime, DateTimeFormatter.ISO_DATE_TIME); } + + public String getOperation() { + return operation == null ? DEFAULT_OPERATION : operation; + } } diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchMultiSource.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchMultiSource.java index 9b72e997..73f545a9 100644 --- a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchMultiSource.java +++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchMultiSource.java @@ -104,7 +104,7 @@ public void prepareRun(BatchSourceContext context) throws ConnectionException { config.getLoginUrl()); BulkConnection bulkConnection = SalesforceSplitUtil.getBulkConnection(authenticatorCredentials); List querySplits = queries.parallelStream() - .map(query -> SalesforceSplitUtil.getQuerySplits(query, bulkConnection, false)) + .map(query -> SalesforceSplitUtil.getQuerySplits(query, bulkConnection, false, config.getOperation())) .flatMap(Collection::stream).collect(Collectors.toList()); // store the jobIds so be used in onRunFinish() to close the connections querySplits.parallelStream().forEach(salesforceSplit -> jobIds.add(salesforceSplit.getJobId())); diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchSource.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchSource.java index 4244aa29..1c035ac9 100644 --- a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchSource.java +++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchSource.java @@ -129,7 +129,8 @@ public void prepareRun(BatchSourceContext context) { } bulkConnection.addHeader(SalesforceSourceConstants.HEADER_ENABLE_PK_CHUNK, String.join(";", chunkHeaderValues)); } - List querySplits = SalesforceSplitUtil.getQuerySplits(query, bulkConnection, enablePKChunk); + List querySplits = SalesforceSplitUtil.getQuerySplits(query, bulkConnection, + enablePKChunk, config.getOperation()); querySplits.parallelStream().forEach(salesforceSplit -> jobIds.add(salesforceSplit.getJobId())); context.setInput(Input.of(config.referenceName, new SalesforceInputFormatProvider( config, ImmutableMap.of(sObjectName, schema.toString()), querySplits, null))); diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceMultiSourceConfig.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceMultiSourceConfig.java index 4d0df1b7..4f96fb8e 100644 --- a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceMultiSourceConfig.java +++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceMultiSourceConfig.java @@ -83,9 +83,10 @@ public SalesforceMultiSourceConfig(String referenceName, @Nullable String blackList, @Nullable String sObjectNameField, @Nullable String securityToken, - @Nullable OAuthInfo oAuthInfo) { + @Nullable OAuthInfo oAuthInfo, + @Nullable String operation) { super(referenceName, consumerKey, consumerSecret, username, password, loginUrl, - datetimeAfter, datetimeBefore, duration, offset, securityToken, oAuthInfo); + datetimeAfter, datetimeBefore, duration, offset, securityToken, oAuthInfo, operation); this.whiteList = whiteList; this.blackList = blackList; this.sObjectNameField = sObjectNameField; diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceSourceConfig.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceSourceConfig.java index 2a848e8a..cee3e9f7 100644 --- a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceSourceConfig.java +++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceSourceConfig.java @@ -103,12 +103,13 @@ public class SalesforceSourceConfig extends SalesforceBaseSourceConfig { @Nullable String offset, @Nullable String schema, @Nullable String securityToken, + @Nullable String operation, @Nullable OAuthInfo oAuthInfo, @Nullable Boolean enablePKChunk, @Nullable Integer chunkSize, @Nullable String parent) { super(referenceName, consumerKey, consumerSecret, username, password, loginUrl, - datetimeAfter, datetimeBefore, duration, offset, securityToken, oAuthInfo); + datetimeAfter, datetimeBefore, duration, offset, securityToken, oAuthInfo, operation); this.query = query; this.sObjectName = sObjectName; this.schema = schema; diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSourceConstants.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSourceConstants.java index 00c10ed2..102eb898 100644 --- a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSourceConstants.java +++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSourceConstants.java @@ -31,6 +31,7 @@ public class SalesforceSourceConstants { public static final String PROPERTY_QUERY = "query"; public static final String PROPERTY_SOBJECT_NAME = "sObjectName"; + public static final String PROPERTY_OPERATION = "operation"; public static final String PROPERTY_PK_CHUNK_ENABLE_NAME = "enablePKChunk"; public static final String PROPERTY_CHUNK_SIZE_NAME = "chunkSize"; diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSplitUtil.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSplitUtil.java index 5d9acd5f..e8741250 100644 --- a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSplitUtil.java +++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSplitUtil.java @@ -23,6 +23,7 @@ import com.sforce.async.JobStateEnum; import com.sforce.async.OperationEnum; import io.cdap.plugin.salesforce.BulkAPIBatchException; +import io.cdap.plugin.salesforce.InvalidConfigException; import io.cdap.plugin.salesforce.SObjectDescriptor; import io.cdap.plugin.salesforce.SalesforceBulkUtil; import io.cdap.plugin.salesforce.SalesforceConnectionUtil; @@ -56,8 +57,8 @@ public final class SalesforceSplitUtil { * @return list of salesforce splits */ public static List getQuerySplits(String query, BulkConnection bulkConnection, - boolean enablePKChunk) { - return Stream.of(getBatches(query, bulkConnection, enablePKChunk)) + boolean enablePKChunk, String operation) { + return Stream.of(getBatches(query, bulkConnection, enablePKChunk, operation)) .map(batch -> new SalesforceSplit(batch.getJobId(), batch.getId(), query)) .collect(Collectors.toList()); } @@ -72,13 +73,14 @@ public static List getQuerySplits(String query, BulkConnection * @param enablePKChunk enable PK Chunking * @return array of batch info */ - private static BatchInfo[] getBatches(String query, BulkConnection bulkConnection, boolean enablePKChunk) { + private static BatchInfo[] getBatches(String query, BulkConnection bulkConnection, + boolean enablePKChunk, String operation) { try { if (!SalesforceQueryUtil.isQueryUnderLengthLimit(query)) { LOG.debug("Wide object query detected. Query length '{}'", query.length()); query = SalesforceQueryUtil.createSObjectIdQuery(query); } - BatchInfo[] batches = runBulkQuery(bulkConnection, query, enablePKChunk); + BatchInfo[] batches = runBulkQuery(bulkConnection, query, enablePKChunk, operation); LOG.debug("Number of batches received from Salesforce: '{}'", batches.length); return batches; } catch (AsyncApiException | IOException e) { @@ -96,11 +98,13 @@ private static BatchInfo[] getBatches(String query, BulkConnection bulkConnectio * @throws AsyncApiException if there is an issue creating the job * @throws IOException failed to close the query */ - private static BatchInfo[] runBulkQuery(BulkConnection bulkConnection, String query, boolean enablePKChunk) + private static BatchInfo[] runBulkQuery(BulkConnection bulkConnection, String query, + boolean enablePKChunk, String operation) throws AsyncApiException, IOException { SObjectDescriptor sObjectDescriptor = SObjectDescriptor.fromQuery(query); - JobInfo job = SalesforceBulkUtil.createJob(bulkConnection, sObjectDescriptor.getName(), OperationEnum.query, null); + JobInfo job = SalesforceBulkUtil.createJob(bulkConnection, sObjectDescriptor.getName(), + getOperationEnum(operation), null); BatchInfo batchInfo; try (ByteArrayInputStream bout = new ByteArrayInputStream(query.getBytes())) { batchInfo = bulkConnection.createBatchFromStream(job, bout); @@ -212,4 +216,13 @@ public static void closeJobs(Set jobIds, AuthenticatorCredentials authen throw runtimeException; } } + + private static OperationEnum getOperationEnum(String operation) { + try { + return OperationEnum.valueOf(operation); + } catch (IllegalArgumentException ex) { + throw new InvalidConfigException("Unsupported value for operation: " + operation, + SalesforceSourceConstants.PROPERTY_OPERATION); + } + } } diff --git a/src/test/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceMultiSourceConfigBuilder.java b/src/test/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceMultiSourceConfigBuilder.java index b7e146dd..b6c7f9f6 100644 --- a/src/test/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceMultiSourceConfigBuilder.java +++ b/src/test/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceMultiSourceConfigBuilder.java @@ -34,6 +34,7 @@ public class SalesforceMultiSourceConfigBuilder { private String sObjectNameField; private String securityToken; private OAuthInfo oAuthInfo; + private String operation; public SalesforceMultiSourceConfigBuilder setReferenceName(String referenceName) { this.referenceName = referenceName; @@ -110,11 +111,14 @@ public SalesforceMultiSourceConfigBuilder setoAuthInfo(OAuthInfo oAuthInfo) { return this; } + public SalesforceMultiSourceConfigBuilder setOperation(String operation) { + this.operation = operation; + return this; + } public SalesforceMultiSourceConfig build() { return new SalesforceMultiSourceConfig(referenceName, consumerKey, consumerSecret, username, password, loginUrl, datetimeAfter, datetimeBefore, duration, offset, whiteList, - blackList, sObjectNameField, securityToken, oAuthInfo - ); + blackList, sObjectNameField, securityToken, oAuthInfo, operation); } } diff --git a/src/test/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceSourceConfigBuilder.java b/src/test/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceSourceConfigBuilder.java index ca3615b0..03625936 100644 --- a/src/test/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceSourceConfigBuilder.java +++ b/src/test/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceSourceConfigBuilder.java @@ -37,6 +37,7 @@ public class SalesforceSourceConfigBuilder { private Boolean enablePKChunk; private Integer chunkSize; private String parent; + private String operation; public SalesforceSourceConfigBuilder setReferenceName(String referenceName) { this.referenceName = referenceName; @@ -123,9 +124,14 @@ public SalesforceSourceConfigBuilder setParent(String parent) { return this; } + public SalesforceSourceConfigBuilder setOperation(String operation) { + this.operation = operation; + return this; + } + public SalesforceSourceConfig build() { return new SalesforceSourceConfig(referenceName, consumerKey, consumerSecret, username, password, loginUrl, query, sObjectName, datetimeAfter, datetimeBefore, duration, offset, schema, - securityToken, null, enablePKChunk, chunkSize, parent); + securityToken, operation, null, enablePKChunk, chunkSize, parent); } } diff --git a/widgets/Salesforce-batchsource.json b/widgets/Salesforce-batchsource.json index d7cd44f3..701b1c6e 100644 --- a/widgets/Salesforce-batchsource.json +++ b/widgets/Salesforce-batchsource.json @@ -145,6 +145,25 @@ { "label": "Advanced", "properties": [ + { + "widget-type": "radio-group", + "label": "SOQL Operation Type", + "name": "operation", + "widget-attributes": { + "layout": "inline", + "default": "query", + "options": [ + { + "id": "query", + "label": "query" + }, + { + "id": "queryAll", + "label": "queryAll" + } + ] + } + }, { "widget-type": "toggle", "name": "enablePKChunk", diff --git a/widgets/SalesforceMultiObjects-batchsource.json b/widgets/SalesforceMultiObjects-batchsource.json index aafa0185..d27fce4a 100644 --- a/widgets/SalesforceMultiObjects-batchsource.json +++ b/widgets/SalesforceMultiObjects-batchsource.json @@ -141,6 +141,25 @@ { "label": "Advanced", "properties": [ + { + "widget-type": "radio-group", + "label": "SOQL Operation Type", + "name": "operation", + "widget-attributes": { + "layout": "inline", + "default": "query", + "options": [ + { + "id": "query", + "label": "query" + }, + { + "id": "queryAll", + "label": "queryAll" + } + ] + } + }, { "widget-type": "textbox", "label": "SObject Name Field",