Skip to content

Commit

Permalink
remove extra new line.
Browse files Browse the repository at this point in the history
add queryAdd operation

add propertyName to the validation methods.

added validation check for the query operation

added more verbose message

added more verbose message

changes for docs and widgets
  • Loading branch information
jster1357 authored and snehalakshmisha committed Feb 2, 2022
1 parent 1b4f75a commit ec0fb59
Show file tree
Hide file tree
Showing 13 changed files with 119 additions and 17 deletions.
6 changes: 5 additions & 1 deletion docs/Salesforce-batchsource.md
Original file line number Diff line number Diff line change
Expand Up @@ -256,4 +256,8 @@ PK chunking only works with the following objects:

Support also includes custom objects, and any Sharing and History tables that support standard objects.

**Chunk Size:** Specify size of chunk. Maximum Size is 250,000. Default Size is 100,000.
**Chunk Size:** Specify size of chunk. Maximum Size is 250,000. Default Size is 100,000.

**Query Operation:**
Specify the query operation to run on the table. If query is selected, only current records will be returned.
If queryAll is selected, all current and deleted records will be returned. Default operation is query.
4 changes: 4 additions & 0 deletions docs/SalesforceMultiObjects-batchsource.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ from 3am (inclusive) to 9am (exclusive). The duration is specified using numbers
Several units can be specified, but each unit can only be used once. For example, `2 days, 1 hours, 30 minutes`.
The duration is ignored if a value is already specified for `Last Modified After` or `Last Modified Before`.

**Query Operation:**
Specify the query operation to run on the table. If query is selected, only current records will be returned.
If queryAll is selected, all current and deleted records will be returned. Default operation is query.

**Offset:** Filter data to only read records where the system field `LastModifiedDate` is less than the logical start time
of the pipeline minus the given offset. For example, if duration is '6 hours' and the offset is '1 hours', and the pipeline
runs at 9am, data last modified between 2am (inclusive) and 8am (exclusive) will be read.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.cdap.plugin.salesforce.plugin.source.batch;

import com.sforce.async.OperationEnum;
import com.sforce.ws.ConnectionException;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
Expand Down Expand Up @@ -76,6 +77,14 @@ public abstract class SalesforceBaseSourceConfig extends BaseSalesforceConfig {
@Macro
private String offset;

@Name(SalesforceSourceConstants.PROPERTY_OPERATION)
@Description("If set to query, the query result will only return current rows. If set to queryAll, " +
"all records, including deletes will be sourced")
@Nullable
private String operation;

private static final String DEFAULT_OPERATION = "query";

protected SalesforceBaseSourceConfig(String referenceName,
@Nullable String consumerKey,
@Nullable String consumerSecret,
Expand All @@ -87,12 +96,14 @@ protected SalesforceBaseSourceConfig(String referenceName,
@Nullable String duration,
@Nullable String offset,
@Nullable String securityToken,
@Nullable OAuthInfo oAuthInfo) {
@Nullable OAuthInfo oAuthInfo,
@Nullable String operation) {
super(referenceName, consumerKey, consumerSecret, username, password, loginUrl, securityToken, oAuthInfo);
this.datetimeAfter = datetimeAfter;
this.datetimeBefore = datetimeBefore;
this.duration = duration;
this.offset = offset;
this.operation = operation;
}

public Map<ChronoUnit, Integer> getDuration() {
Expand Down Expand Up @@ -134,6 +145,11 @@ protected void validateFilters(FailureCollector collector) {
} catch (InvalidConfigException e) {
collector.addFailure(e.getMessage(), null).withConfigProperty(e.getProperty());
}
try {
validateOperationProperty(SalesforceSourceConstants.PROPERTY_OPERATION, getOperation());
} catch (InvalidConfigException e) {
collector.addFailure(e.getMessage(), null).withConfigProperty(e.getProperty());
}
}

/**
Expand Down Expand Up @@ -191,7 +207,6 @@ private SObjectFilterDescriptor getSObjectFilterDescriptor(long logicalStartTime
return filterDescriptor;
}

@Nullable
private void validateIntervalFilterProperty(String propertyName, String datetime) {
if (containsMacro(propertyName)) {
return;
Expand All @@ -205,6 +220,16 @@ private void validateIntervalFilterProperty(String propertyName, String datetime
}
}

private void validateOperationProperty(String propertyName, String operation) {
try {
OperationEnum.valueOf(operation);
} catch (InvalidConfigException e) {
throw new InvalidConfigException(
String.format("Invalid Query Operation: '%s'. Valid operation values are query and queryAll.",
operation), propertyName);
}
}

private void validateRangeFilterProperty(String propertyName, Map<ChronoUnit, Integer> rangeValue) {
if (containsMacro(propertyName) || rangeValue.isEmpty()) {
return;
Expand Down Expand Up @@ -271,4 +296,8 @@ private int parseUnitValue(String propertyName, String value) {
private ZonedDateTime parseDatetime(String datetime) throws DateTimeParseException {
return StringUtils.isBlank(datetime) ? null : ZonedDateTime.parse(datetime, DateTimeFormatter.ISO_DATE_TIME);
}

public String getOperation() {
return operation == null ? DEFAULT_OPERATION : operation;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void prepareRun(BatchSourceContext context) throws ConnectionException {
config.getLoginUrl());
BulkConnection bulkConnection = SalesforceSplitUtil.getBulkConnection(authenticatorCredentials);
List<SalesforceSplit> querySplits = queries.parallelStream()
.map(query -> SalesforceSplitUtil.getQuerySplits(query, bulkConnection, false))
.map(query -> SalesforceSplitUtil.getQuerySplits(query, bulkConnection, false, config.getOperation()))
.flatMap(Collection::stream).collect(Collectors.toList());
// store the jobIds so be used in onRunFinish() to close the connections
querySplits.parallelStream().forEach(salesforceSplit -> jobIds.add(salesforceSplit.getJobId()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ public void prepareRun(BatchSourceContext context) {
}
bulkConnection.addHeader(SalesforceSourceConstants.HEADER_ENABLE_PK_CHUNK, String.join(";", chunkHeaderValues));
}
List<SalesforceSplit> querySplits = SalesforceSplitUtil.getQuerySplits(query, bulkConnection, enablePKChunk);
List<SalesforceSplit> querySplits = SalesforceSplitUtil.getQuerySplits(query, bulkConnection,
enablePKChunk, config.getOperation());
querySplits.parallelStream().forEach(salesforceSplit -> jobIds.add(salesforceSplit.getJobId()));
context.setInput(Input.of(config.referenceName, new SalesforceInputFormatProvider(
config, ImmutableMap.of(sObjectName, schema.toString()), querySplits, null)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,10 @@ public SalesforceMultiSourceConfig(String referenceName,
@Nullable String blackList,
@Nullable String sObjectNameField,
@Nullable String securityToken,
@Nullable OAuthInfo oAuthInfo) {
@Nullable OAuthInfo oAuthInfo,
@Nullable String operation) {
super(referenceName, consumerKey, consumerSecret, username, password, loginUrl,
datetimeAfter, datetimeBefore, duration, offset, securityToken, oAuthInfo);
datetimeAfter, datetimeBefore, duration, offset, securityToken, oAuthInfo, operation);
this.whiteList = whiteList;
this.blackList = blackList;
this.sObjectNameField = sObjectNameField;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,13 @@ public class SalesforceSourceConfig extends SalesforceBaseSourceConfig {
@Nullable String offset,
@Nullable String schema,
@Nullable String securityToken,
@Nullable String operation,
@Nullable OAuthInfo oAuthInfo,
@Nullable Boolean enablePKChunk,
@Nullable Integer chunkSize,
@Nullable String parent) {
super(referenceName, consumerKey, consumerSecret, username, password, loginUrl,
datetimeAfter, datetimeBefore, duration, offset, securityToken, oAuthInfo);
datetimeAfter, datetimeBefore, duration, offset, securityToken, oAuthInfo, operation);
this.query = query;
this.sObjectName = sObjectName;
this.schema = schema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class SalesforceSourceConstants {

public static final String PROPERTY_QUERY = "query";
public static final String PROPERTY_SOBJECT_NAME = "sObjectName";
public static final String PROPERTY_OPERATION = "operation";

public static final String PROPERTY_PK_CHUNK_ENABLE_NAME = "enablePKChunk";
public static final String PROPERTY_CHUNK_SIZE_NAME = "chunkSize";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.sforce.async.JobStateEnum;
import com.sforce.async.OperationEnum;
import io.cdap.plugin.salesforce.BulkAPIBatchException;
import io.cdap.plugin.salesforce.InvalidConfigException;
import io.cdap.plugin.salesforce.SObjectDescriptor;
import io.cdap.plugin.salesforce.SalesforceBulkUtil;
import io.cdap.plugin.salesforce.SalesforceConnectionUtil;
Expand Down Expand Up @@ -56,8 +57,8 @@ public final class SalesforceSplitUtil {
* @return list of salesforce splits
*/
public static List<SalesforceSplit> getQuerySplits(String query, BulkConnection bulkConnection,
boolean enablePKChunk) {
return Stream.of(getBatches(query, bulkConnection, enablePKChunk))
boolean enablePKChunk, String operation) {
return Stream.of(getBatches(query, bulkConnection, enablePKChunk, operation))
.map(batch -> new SalesforceSplit(batch.getJobId(), batch.getId(), query))
.collect(Collectors.toList());
}
Expand All @@ -72,13 +73,14 @@ public static List<SalesforceSplit> getQuerySplits(String query, BulkConnection
* @param enablePKChunk enable PK Chunking
* @return array of batch info
*/
private static BatchInfo[] getBatches(String query, BulkConnection bulkConnection, boolean enablePKChunk) {
private static BatchInfo[] getBatches(String query, BulkConnection bulkConnection,
boolean enablePKChunk, String operation) {
try {
if (!SalesforceQueryUtil.isQueryUnderLengthLimit(query)) {
LOG.debug("Wide object query detected. Query length '{}'", query.length());
query = SalesforceQueryUtil.createSObjectIdQuery(query);
}
BatchInfo[] batches = runBulkQuery(bulkConnection, query, enablePKChunk);
BatchInfo[] batches = runBulkQuery(bulkConnection, query, enablePKChunk, operation);
LOG.debug("Number of batches received from Salesforce: '{}'", batches.length);
return batches;
} catch (AsyncApiException | IOException e) {
Expand All @@ -96,11 +98,13 @@ private static BatchInfo[] getBatches(String query, BulkConnection bulkConnectio
* @throws AsyncApiException if there is an issue creating the job
* @throws IOException failed to close the query
*/
private static BatchInfo[] runBulkQuery(BulkConnection bulkConnection, String query, boolean enablePKChunk)
private static BatchInfo[] runBulkQuery(BulkConnection bulkConnection, String query,
boolean enablePKChunk, String operation)
throws AsyncApiException, IOException {

SObjectDescriptor sObjectDescriptor = SObjectDescriptor.fromQuery(query);
JobInfo job = SalesforceBulkUtil.createJob(bulkConnection, sObjectDescriptor.getName(), OperationEnum.query, null);
JobInfo job = SalesforceBulkUtil.createJob(bulkConnection, sObjectDescriptor.getName(),
getOperationEnum(operation), null);
BatchInfo batchInfo;
try (ByteArrayInputStream bout = new ByteArrayInputStream(query.getBytes())) {
batchInfo = bulkConnection.createBatchFromStream(job, bout);
Expand Down Expand Up @@ -212,4 +216,13 @@ public static void closeJobs(Set<String> jobIds, AuthenticatorCredentials authen
throw runtimeException;
}
}

private static OperationEnum getOperationEnum(String operation) {
try {
return OperationEnum.valueOf(operation);
} catch (IllegalArgumentException ex) {
throw new InvalidConfigException("Unsupported value for operation: " + operation,
SalesforceSourceConstants.PROPERTY_OPERATION);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class SalesforceMultiSourceConfigBuilder {
private String sObjectNameField;
private String securityToken;
private OAuthInfo oAuthInfo;
private String operation;

public SalesforceMultiSourceConfigBuilder setReferenceName(String referenceName) {
this.referenceName = referenceName;
Expand Down Expand Up @@ -110,11 +111,14 @@ public SalesforceMultiSourceConfigBuilder setoAuthInfo(OAuthInfo oAuthInfo) {
return this;
}

public SalesforceMultiSourceConfigBuilder setOperation(String operation) {
this.operation = operation;
return this;
}

public SalesforceMultiSourceConfig build() {
return new SalesforceMultiSourceConfig(referenceName, consumerKey, consumerSecret, username, password, loginUrl,
datetimeAfter, datetimeBefore, duration, offset, whiteList,
blackList, sObjectNameField, securityToken, oAuthInfo
);
blackList, sObjectNameField, securityToken, oAuthInfo, operation);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class SalesforceSourceConfigBuilder {
private Boolean enablePKChunk;
private Integer chunkSize;
private String parent;
private String operation;

public SalesforceSourceConfigBuilder setReferenceName(String referenceName) {
this.referenceName = referenceName;
Expand Down Expand Up @@ -123,9 +124,14 @@ public SalesforceSourceConfigBuilder setParent(String parent) {
return this;
}

public SalesforceSourceConfigBuilder setOperation(String operation) {
this.operation = operation;
return this;
}

public SalesforceSourceConfig build() {
return new SalesforceSourceConfig(referenceName, consumerKey, consumerSecret, username, password, loginUrl,
query, sObjectName, datetimeAfter, datetimeBefore, duration, offset, schema,
securityToken, null, enablePKChunk, chunkSize, parent);
securityToken, operation, null, enablePKChunk, chunkSize, parent);
}
}
19 changes: 19 additions & 0 deletions widgets/Salesforce-batchsource.json
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,25 @@
{
"label": "Advanced",
"properties": [
{
"widget-type": "radio-group",
"label": "SOQL Operation Type",
"name": "operation",
"widget-attributes": {
"layout": "inline",
"default": "query",
"options": [
{
"id": "query",
"label": "query"
},
{
"id": "queryAll",
"label": "queryAll"
}
]
}
},
{
"widget-type": "toggle",
"name": "enablePKChunk",
Expand Down
19 changes: 19 additions & 0 deletions widgets/SalesforceMultiObjects-batchsource.json
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,25 @@
{
"label": "Advanced",
"properties": [
{
"widget-type": "radio-group",
"label": "SOQL Operation Type",
"name": "operation",
"widget-attributes": {
"layout": "inline",
"default": "query",
"options": [
{
"id": "query",
"label": "query"
},
{
"id": "queryAll",
"label": "queryAll"
}
]
}
},
{
"widget-type": "textbox",
"label": "SObject Name Field",
Expand Down

0 comments on commit ec0fb59

Please sign in to comment.