Skip to content

Commit

Permalink
Fixed instances of getting authenticator credentials through a helper
Browse files Browse the repository at this point in the history
method that assumed credentials were of the form username/password/etc.
This failed for pipelines utilizing the OAuth flow support.

Removed the aforementioned helper method, as all instances of it being
used could instead get the credentials through the base config class.
This config class correctly handles the OAuth flow.
  • Loading branch information
ssmith-google committed Feb 4, 2022
1 parent ec0fb59 commit 95ce349
Show file tree
Hide file tree
Showing 5 changed files with 3 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,4 @@ public static AuthenticatorCredentials getAuthenticatorCredentials(Configuration
conf.get(SalesforceConstants.CONFIG_CONSUMER_SECRET),
conf.get(SalesforceConstants.CONFIG_LOGIN_URL));
}
/**
* Creates {@link AuthenticatorCredentials} instance based on given parameters.
*
* @param username Salesforce username
* @param password Salesforce password
* @param consumerKey Salesforce consumer key
* @param consumerSecret Salesforce consumer secret
* @param loginUrl Salesforce authentication url
* @return authenticator credentials
*/
public static AuthenticatorCredentials getAuthenticatorCredentials(String username, String password,
String consumerKey, String consumerSecret,
String loginUrl) {
return new AuthenticatorCredentials(username, password, consumerKey, consumerSecret, loginUrl);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.sforce.async.JobInfo;
import io.cdap.cdap.api.data.batch.OutputFormatProvider;
import io.cdap.plugin.salesforce.SalesforceBulkUtil;
import io.cdap.plugin.salesforce.SalesforceConnectionUtil;
import io.cdap.plugin.salesforce.SalesforceConstants;
import io.cdap.plugin.salesforce.authenticator.Authenticator;
import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials;
Expand Down Expand Up @@ -71,10 +70,7 @@ public SalesforceOutputFormatProvider(SalesforceSinkConfig config) {
configBuilder.put(SalesforceSinkConstants.CONFIG_EXTERNAL_ID_FIELD, config.getExternalIdField());
}

AuthenticatorCredentials credentials =
SalesforceConnectionUtil.getAuthenticatorCredentials(config.getUsername(), config.getPassword(),
config.getConsumerKey(), config.getConsumerSecret(),
config.getLoginUrl());
AuthenticatorCredentials credentials = config.getAuthenticatorCredentials();

try {
BulkConnection bulkConnection = new BulkConnection(Authenticator.createConnectorConfig(credentials));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import io.cdap.cdap.etl.api.batch.BatchRuntimeContext;
import io.cdap.cdap.etl.api.batch.BatchSource;
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
import io.cdap.plugin.salesforce.SalesforceConnectionUtil;
import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSplitUtil;

Expand Down Expand Up @@ -97,11 +96,7 @@ public void prepareRun(BatchSourceContext context) throws ConnectionException {
(sObjectName, sObjectSchema) -> arguments.set(MULTI_SINK_PREFIX + sObjectName, sObjectSchema.toString()));

String sObjectNameField = config.getSObjectNameField();
authenticatorCredentials = SalesforceConnectionUtil.getAuthenticatorCredentials(config.getUsername(),
config.getPassword(),
config.getConsumerKey(),
config.getConsumerSecret(),
config.getLoginUrl());
authenticatorCredentials = config.getAuthenticatorCredentials();
BulkConnection bulkConnection = SalesforceSplitUtil.getBulkConnection(authenticatorCredentials);
List<SalesforceSplit> querySplits = queries.parallelStream()
.map(query -> SalesforceSplitUtil.getQuerySplits(query, bulkConnection, false, config.getOperation()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
import io.cdap.plugin.common.LineageRecorder;
import io.cdap.plugin.salesforce.SObjectDescriptor;
import io.cdap.plugin.salesforce.SalesforceConnectionUtil;
import io.cdap.plugin.salesforce.SalesforceSchemaUtil;
import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants;
Expand Down Expand Up @@ -112,11 +111,7 @@ public void prepareRun(BatchSourceContext context) {

String query = config.getQuery(context.getLogicalStartTime());
String sObjectName = SObjectDescriptor.fromQuery(query).getName();
authenticatorCredentials = SalesforceConnectionUtil.getAuthenticatorCredentials(config.getUsername(),
config.getPassword(),
config.getConsumerKey(),
config.getConsumerSecret(),
config.getLoginUrl());
authenticatorCredentials = config.getAuthenticatorCredentials();
BulkConnection bulkConnection = SalesforceSplitUtil.getBulkConnection(authenticatorCredentials);
boolean enablePKChunk = config.getEnablePKChunk();
if (enablePKChunk) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import io.cdap.plugin.salesforce.InvalidConfigException;
import io.cdap.plugin.salesforce.SObjectDescriptor;
import io.cdap.plugin.salesforce.SalesforceBulkUtil;
import io.cdap.plugin.salesforce.SalesforceConnectionUtil;
import io.cdap.plugin.salesforce.SalesforceQueryUtil;
import io.cdap.plugin.salesforce.authenticator.Authenticator;
import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials;
Expand Down Expand Up @@ -124,22 +123,6 @@ private static BatchInfo[] runBulkQuery(BulkConnection bulkConnection, String qu
}


/**
* Initializes bulk connection based on given Hadoop credentials configuration.
*
* @return bulk connection instance
*/
public static BulkConnection getBulkConnection(String username, String password,
String consumerKey, String consumerSecret, String loginUrl) {
AuthenticatorCredentials authenticatorCredentials = SalesforceConnectionUtil
.getAuthenticatorCredentials(username, password, consumerKey, consumerSecret, loginUrl);
try {
return new BulkConnection(Authenticator.createConnectorConfig(authenticatorCredentials));
} catch (AsyncApiException e) {
throw new RuntimeException("There was issue communicating with Salesforce", e);
}
}

/**
* Initializes bulk connection based on given Hadoop credentials configuration.
*
Expand Down

0 comments on commit 95ce349

Please sign in to comment.