From 95ce349c8223997fdd6d70a6ab058b08e457bafc Mon Sep 17 00:00:00 2001 From: Steven Smith Date: Fri, 4 Feb 2022 10:46:29 -0800 Subject: [PATCH] Fixed instances of getting authenticator credentials through a helper method that assumed credentials were of the form username/password/etc. This failed for pipelines utilizing the OAuth flow support. Removed the aforementioned helper method, as all instances of it being used could instead get the credentials through the base config class. This config class correctly handles the OAuth flow. --- .../salesforce/SalesforceConnectionUtil.java | 15 --------------- .../batch/SalesforceOutputFormatProvider.java | 6 +----- .../batch/SalesforceBatchMultiSource.java | 7 +------ .../source/batch/SalesforceBatchSource.java | 7 +------ .../source/batch/util/SalesforceSplitUtil.java | 17 ----------------- 5 files changed, 3 insertions(+), 49 deletions(-) diff --git a/src/main/java/io/cdap/plugin/salesforce/SalesforceConnectionUtil.java b/src/main/java/io/cdap/plugin/salesforce/SalesforceConnectionUtil.java index 37892054..d6e10b3b 100644 --- a/src/main/java/io/cdap/plugin/salesforce/SalesforceConnectionUtil.java +++ b/src/main/java/io/cdap/plugin/salesforce/SalesforceConnectionUtil.java @@ -61,19 +61,4 @@ public static AuthenticatorCredentials getAuthenticatorCredentials(Configuration conf.get(SalesforceConstants.CONFIG_CONSUMER_SECRET), conf.get(SalesforceConstants.CONFIG_LOGIN_URL)); } - /** - * Creates {@link AuthenticatorCredentials} instance based on given parameters. - * - * @param username Salesforce username - * @param password Salesforce password - * @param consumerKey Salesforce consumer key - * @param consumerSecret Salesforce consumer secret - * @param loginUrl Salesforce authentication url - * @return authenticator credentials - */ - public static AuthenticatorCredentials getAuthenticatorCredentials(String username, String password, - String consumerKey, String consumerSecret, - String loginUrl) { - return new AuthenticatorCredentials(username, password, consumerKey, consumerSecret, loginUrl); - } } diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceOutputFormatProvider.java b/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceOutputFormatProvider.java index c2819459..3316f4b6 100644 --- a/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceOutputFormatProvider.java +++ b/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceOutputFormatProvider.java @@ -21,7 +21,6 @@ import com.sforce.async.JobInfo; import io.cdap.cdap.api.data.batch.OutputFormatProvider; import io.cdap.plugin.salesforce.SalesforceBulkUtil; -import io.cdap.plugin.salesforce.SalesforceConnectionUtil; import io.cdap.plugin.salesforce.SalesforceConstants; import io.cdap.plugin.salesforce.authenticator.Authenticator; import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials; @@ -71,10 +70,7 @@ public SalesforceOutputFormatProvider(SalesforceSinkConfig config) { configBuilder.put(SalesforceSinkConstants.CONFIG_EXTERNAL_ID_FIELD, config.getExternalIdField()); } - AuthenticatorCredentials credentials = - SalesforceConnectionUtil.getAuthenticatorCredentials(config.getUsername(), config.getPassword(), - config.getConsumerKey(), config.getConsumerSecret(), - config.getLoginUrl()); + AuthenticatorCredentials credentials = config.getAuthenticatorCredentials(); try { BulkConnection bulkConnection = new BulkConnection(Authenticator.createConnectorConfig(credentials)); diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchMultiSource.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchMultiSource.java index 73f545a9..de5d2601 100644 --- a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchMultiSource.java +++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchMultiSource.java @@ -32,7 +32,6 @@ import io.cdap.cdap.etl.api.batch.BatchRuntimeContext; import io.cdap.cdap.etl.api.batch.BatchSource; import io.cdap.cdap.etl.api.batch.BatchSourceContext; -import io.cdap.plugin.salesforce.SalesforceConnectionUtil; import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials; import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSplitUtil; @@ -97,11 +96,7 @@ public void prepareRun(BatchSourceContext context) throws ConnectionException { (sObjectName, sObjectSchema) -> arguments.set(MULTI_SINK_PREFIX + sObjectName, sObjectSchema.toString())); String sObjectNameField = config.getSObjectNameField(); - authenticatorCredentials = SalesforceConnectionUtil.getAuthenticatorCredentials(config.getUsername(), - config.getPassword(), - config.getConsumerKey(), - config.getConsumerSecret(), - config.getLoginUrl()); + authenticatorCredentials = config.getAuthenticatorCredentials(); BulkConnection bulkConnection = SalesforceSplitUtil.getBulkConnection(authenticatorCredentials); List querySplits = queries.parallelStream() .map(query -> SalesforceSplitUtil.getQuerySplits(query, bulkConnection, false, config.getOperation())) diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchSource.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchSource.java index 1c035ac9..af8dbb7f 100644 --- a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchSource.java +++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchSource.java @@ -35,7 +35,6 @@ import io.cdap.cdap.etl.api.batch.BatchSourceContext; import io.cdap.plugin.common.LineageRecorder; import io.cdap.plugin.salesforce.SObjectDescriptor; -import io.cdap.plugin.salesforce.SalesforceConnectionUtil; import io.cdap.plugin.salesforce.SalesforceSchemaUtil; import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials; import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants; @@ -112,11 +111,7 @@ public void prepareRun(BatchSourceContext context) { String query = config.getQuery(context.getLogicalStartTime()); String sObjectName = SObjectDescriptor.fromQuery(query).getName(); - authenticatorCredentials = SalesforceConnectionUtil.getAuthenticatorCredentials(config.getUsername(), - config.getPassword(), - config.getConsumerKey(), - config.getConsumerSecret(), - config.getLoginUrl()); + authenticatorCredentials = config.getAuthenticatorCredentials(); BulkConnection bulkConnection = SalesforceSplitUtil.getBulkConnection(authenticatorCredentials); boolean enablePKChunk = config.getEnablePKChunk(); if (enablePKChunk) { diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSplitUtil.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSplitUtil.java index e8741250..3990a4ef 100644 --- a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSplitUtil.java +++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSplitUtil.java @@ -26,7 +26,6 @@ import io.cdap.plugin.salesforce.InvalidConfigException; import io.cdap.plugin.salesforce.SObjectDescriptor; import io.cdap.plugin.salesforce.SalesforceBulkUtil; -import io.cdap.plugin.salesforce.SalesforceConnectionUtil; import io.cdap.plugin.salesforce.SalesforceQueryUtil; import io.cdap.plugin.salesforce.authenticator.Authenticator; import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials; @@ -124,22 +123,6 @@ private static BatchInfo[] runBulkQuery(BulkConnection bulkConnection, String qu } - /** - * Initializes bulk connection based on given Hadoop credentials configuration. - * - * @return bulk connection instance - */ - public static BulkConnection getBulkConnection(String username, String password, - String consumerKey, String consumerSecret, String loginUrl) { - AuthenticatorCredentials authenticatorCredentials = SalesforceConnectionUtil - .getAuthenticatorCredentials(username, password, consumerKey, consumerSecret, loginUrl); - try { - return new BulkConnection(Authenticator.createConnectorConfig(authenticatorCredentials)); - } catch (AsyncApiException e) { - throw new RuntimeException("There was issue communicating with Salesforce", e); - } - } - /** * Initializes bulk connection based on given Hadoop credentials configuration. *