Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Encrypt/Decrypt template credentials #197

Merged
merged 24 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
9bb74eb
added RegisterRemoteModelStep and tests
joshpalis Nov 7, 2023
d93a581
Adding RegisterLocalModelStep, fixing tests, adding input/ouput defin…
joshpalis Nov 7, 2023
a5997b7
Merge branch 'main' into registerModel
joshpalis Nov 7, 2023
366f05e
Fixing javadoc warnings, fixing log message
joshpalis Nov 8, 2023
5f0a357
Addressing PR comments,making description field optional for Register…
joshpalis Nov 8, 2023
285eb6f
moving modelConfig builder before adding allConfig
joshpalis Nov 8, 2023
da2e4b3
initial implementation
joshpalis Nov 21, 2023
541d3f7
Merge main
joshpalis Nov 21, 2023
005b6e1
Fixing create workflow transport action
joshpalis Nov 21, 2023
b9a50a0
Removing duplicate register_remote_model validator
joshpalis Nov 21, 2023
4994a2d
Adding bouncy castle dependency to resolve encryption issue
joshpalis Nov 22, 2023
50d4b7c
Merge branch 'main' into encrypt
joshpalis Nov 24, 2023
b8e8ad7
Fixing CreateWorkflowTransportActionTests
joshpalis Nov 24, 2023
fb3ad2e
Adding initial unit tests for encryptor utils
joshpalis Nov 24, 2023
38ff53c
Implemented encryption/decryption for workflow node user inputs with …
joshpalis Nov 25, 2023
f345965
Addressing PR comments
joshpalis Nov 27, 2023
8fdb93e
Suppressing unchecked warning, making credential strings constants
joshpalis Nov 27, 2023
e320b7d
Removing setMasterKey from initializeMasterKey method
joshpalis Nov 27, 2023
5a72c4d
Adding final template encryption decryption test
joshpalis Nov 27, 2023
93df639
Merge branch 'main' into encrypt
joshpalis Nov 29, 2023
d8aa0b7
Addressing PR comments, changing master key index name to config, fix…
joshpalis Nov 29, 2023
eacdebd
Added TODO
joshpalis Nov 29, 2023
4775c44
changing getMasterKeyIndexMapping method name
joshpalis Nov 29, 2023
4b57f32
Removing unnecessary aws sdk dependency
joshpalis Nov 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ dependencies {
api group: 'org.opensearch', name:'opensearch-ml-client', version: "${opensearch_build}"
implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.14.0'
implementation "org.opensearch:common-utils:${common_utils_version}"
implementation platform('software.amazon.awssdk:bom:2.21.15')
implementation 'com.amazonaws:aws-encryption-sdk-java:2.4.1'
implementation 'org.bouncycastle:bcprov-jdk18on:1.77'

configurations.all {
resolutionStrategy {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.flowframework.transport.ProvisionWorkflowTransportAction;
import org.opensearch.flowframework.transport.SearchWorkflowAction;
import org.opensearch.flowframework.transport.SearchWorkflowTransportAction;
import org.opensearch.flowframework.util.EncryptorUtils;
import org.opensearch.flowframework.workflow.WorkflowProcessSorter;
import org.opensearch.flowframework.workflow.WorkflowStepFactory;
import org.opensearch.ml.client.MachineLearningNodeClient;
Expand Down Expand Up @@ -96,7 +97,8 @@ public Collection<Object> createComponents(
this.clusterService = clusterService;
flowFrameworkFeatureEnabledSetting = new FlowFrameworkFeatureEnabledSetting(clusterService, settings);
MachineLearningNodeClient mlClient = new MachineLearningNodeClient(client);
FlowFrameworkIndicesHandler flowFrameworkIndicesHandler = new FlowFrameworkIndicesHandler(client, clusterService);
EncryptorUtils encryptorUtils = new EncryptorUtils(clusterService, client);
FlowFrameworkIndicesHandler flowFrameworkIndicesHandler = new FlowFrameworkIndicesHandler(client, clusterService, encryptorUtils);
WorkflowStepFactory workflowStepFactory = new WorkflowStepFactory(
settings,
clusterService,
Expand All @@ -106,7 +108,7 @@ public Collection<Object> createComponents(
);
WorkflowProcessSorter workflowProcessSorter = new WorkflowProcessSorter(workflowStepFactory, threadPool);

return ImmutableList.of(workflowStepFactory, workflowProcessSorter, flowFrameworkIndicesHandler);
return ImmutableList.of(workflowStepFactory, workflowProcessSorter, encryptorUtils, flowFrameworkIndicesHandler);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ private CommonValue() {}
public static final String WORKFLOW_STATE_INDEX_MAPPING = "mappings/workflow-state.json";
/** Workflow State index mapping version */
public static final Integer WORKFLOW_STATE_INDEX_VERSION = 1;
/** Master Key Index Name */
public static final String MASTER_KEY_INDEX = ".plugins-master-key";
joshpalis marked this conversation as resolved.
Show resolved Hide resolved
/** Master Key index mapping file path */
public static final String MASTER_KEY_INDEX_MAPPING = "mappings/master-key.json";
/** Master key index mapping version */
public static final Integer MASTER_KEY_INDEX_VERSION = 1;
/** Master key field name */
public static final String MASTER_KEY = "master_key";
joshpalis marked this conversation as resolved.
Show resolved Hide resolved

/** The template field name for template use case */
public static final String USE_CASE_FIELD = "use_case";
Expand Down Expand Up @@ -119,8 +127,8 @@ private CommonValue() {}
public static final String PROTOCOL_FIELD = "protocol";
/** Connector parameters field */
public static final String PARAMETERS_FIELD = "parameters";
/** Connector credentials field */
public static final String CREDENTIALS_FIELD = "credentials";
/** Connector credential field */
public static final String CREDENTIAL_FIELD = "credential";
dbwiddis marked this conversation as resolved.
Show resolved Hide resolved
/** Connector actions field */
public static final String ACTIONS_FIELD = "actions";
/** Backend roles for the model */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX;
import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX_VERSION;
import static org.opensearch.flowframework.common.CommonValue.MASTER_KEY_INDEX;
import static org.opensearch.flowframework.common.CommonValue.MASTER_KEY_INDEX_VERSION;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX_VERSION;

Expand All @@ -36,6 +38,14 @@ public enum FlowFrameworkIndex {
WORKFLOW_STATE_INDEX,
ThrowingSupplierWrapper.throwingSupplierWrapper(FlowFrameworkIndicesHandler::getWorkflowStateMappings),
WORKFLOW_STATE_INDEX_VERSION
),
/**
* Master Key Index
*/
MASTER_KEY(
MASTER_KEY_INDEX,
ThrowingSupplierWrapper.throwingSupplierWrapper(FlowFrameworkIndicesHandler::getMasterKeyMappings),
MASTER_KEY_INDEX_VERSION
);

private final String indexName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.flowframework.indices;

import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Resources;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -17,6 +18,7 @@
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.WriteRequest;
Expand All @@ -38,6 +40,7 @@
import org.opensearch.flowframework.model.State;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.model.WorkflowState;
import org.opensearch.flowframework.util.EncryptorUtils;
import org.opensearch.script.Script;

import java.io.IOException;
Expand All @@ -50,6 +53,9 @@
import static org.opensearch.core.rest.RestStatus.INTERNAL_SERVER_ERROR;
import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX;
import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX_MAPPING;
import static org.opensearch.flowframework.common.CommonValue.MASTER_KEY;
import static org.opensearch.flowframework.common.CommonValue.MASTER_KEY_INDEX;
import static org.opensearch.flowframework.common.CommonValue.MASTER_KEY_INDEX_MAPPING;
import static org.opensearch.flowframework.common.CommonValue.META;
import static org.opensearch.flowframework.common.CommonValue.NO_SCHEMA_VERSION;
import static org.opensearch.flowframework.common.CommonValue.SCHEMA_VERSION_FIELD;
Expand All @@ -64,17 +70,20 @@ public class FlowFrameworkIndicesHandler {
private static final Logger logger = LogManager.getLogger(FlowFrameworkIndicesHandler.class);
private final Client client;
private final ClusterService clusterService;
private final EncryptorUtils encryptorUtils;
private static final Map<String, AtomicBoolean> indexMappingUpdated = new HashMap<>();
private static final Map<String, Object> indexSettings = Map.of("index.auto_expand_replicas", "0-1");

/**
* constructor
* @param client the open search client
* @param clusterService ClusterService
* @param encryptorUtils encryption utility
*/
public FlowFrameworkIndicesHandler(Client client, ClusterService clusterService) {
public FlowFrameworkIndicesHandler(Client client, ClusterService clusterService, EncryptorUtils encryptorUtils) {
this.client = client;
this.clusterService = clusterService;
this.encryptorUtils = encryptorUtils;
for (FlowFrameworkIndex mlIndex : FlowFrameworkIndex.values()) {
indexMappingUpdated.put(mlIndex.getIndexName(), new AtomicBoolean(false));
}
Expand Down Expand Up @@ -104,6 +113,15 @@ public static String getWorkflowStateMappings() throws IOException {
return getIndexMappings(WORKFLOW_STATE_INDEX_MAPPING);
}

/**
* Get master-key index mapping
* @return master-key index mapping
* @throws IOException if mapping file cannot be read correctly
*/
public static String getMasterKeyMappings() throws IOException {
joshpalis marked this conversation as resolved.
Show resolved Hide resolved
return getIndexMappings(MASTER_KEY_INDEX_MAPPING);
}

/**
* Create global context index if it's absent
* @param listener The action listener
Expand All @@ -120,6 +138,14 @@ public void initWorkflowStateIndexIfAbsent(ActionListener<Boolean> listener) {
initFlowFrameworkIndexIfAbsent(FlowFrameworkIndex.WORKFLOW_STATE, listener);
}

/**
* Create master key index if it's absent
* @param listener The action listener
*/
public void initMasterKeyIndexIfAbsent(ActionListener<Boolean> listener) {
initFlowFrameworkIndexIfAbsent(FlowFrameworkIndex.MASTER_KEY, listener);
}

/**
* Checks if the given index exists
* @param indexName the name of the index
Expand Down Expand Up @@ -287,7 +313,8 @@ public void putTemplateToGlobalContext(Template template, ActionListener<IndexRe
XContentBuilder builder = XContentFactory.jsonBuilder();
ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()
) {
request.source(template.toXContent(builder, ToXContent.EMPTY_PARAMS))
Template templateWithEncryptedCredentials = encryptorUtils.encryptTemplateCredentials(template);
request.source(templateWithEncryptedCredentials.toXContent(builder, ToXContent.EMPTY_PARAMS))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.index(request, ActionListener.runBefore(listener, () -> context.restore()));
} catch (Exception e) {
Expand All @@ -301,6 +328,67 @@ public void putTemplateToGlobalContext(Template template, ActionListener<IndexRe
}));
}

/**
* Initializes master key index and EncryptorUtls
* @param listener action listener
*/
public void initializeMasterKeyIndex(ActionListener<Boolean> listener) {
initMasterKeyIndexIfAbsent(ActionListener.wrap(indexCreated -> {
if (!indexCreated) {
listener.onFailure(new FlowFrameworkException("No response to create global_context index", INTERNAL_SERVER_ERROR));
return;
}

// Index has either been created or it already exists, need to check if master key has been initalized already, if not then
// generate
// This is necessary in case of global context index restoration from snapshot, will need to use the same master key to decrypt
// stored credentials
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {

GetRequest getRequest = new GetRequest(MASTER_KEY_INDEX).id(MASTER_KEY);
client.get(getRequest, ActionListener.wrap(getResponse -> {

if (!getResponse.isExists()) {

// Generate new key and index
final String masterKey = encryptorUtils.generateMasterKey();
IndexRequest masterKeyIndexRequest = new IndexRequest(MASTER_KEY_INDEX).id(MASTER_KEY)
.source(ImmutableMap.of(MASTER_KEY, masterKey))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

client.index(masterKeyIndexRequest, ActionListener.wrap(indexResponse -> {
// Set generated key to master
logger.info("Master key has been initialized successfully");
encryptorUtils.setMasterKey(masterKey);
listener.onResponse(true);
}, indexException -> {
logger.error("Failed to index master key", indexException);
listener.onFailure(indexException);
}));

} else {
// Set existing key to master
logger.info("Master key has already been initialized");
final String masterKey = (String) getResponse.getSourceAsMap().get(MASTER_KEY);
this.encryptorUtils.setMasterKey(masterKey);
listener.onResponse(true);
}
}, getRequestException -> {
logger.error("Failed to search for master key from master_key index", getRequestException);
listener.onFailure(getRequestException);
}));

} catch (Exception e) {
logger.error("Failed to retrieve master key from master_key index", e);
listener.onFailure(e);
}

}, createIndexException -> {
logger.error("Failed to create master_key index", createIndexException);
joshpalis marked this conversation as resolved.
Show resolved Hide resolved
listener.onFailure(createIndexException);
}));
}

/**
* add document insert into global context index
* @param workflowId the workflowId, corresponds to document ID of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,29 +119,54 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
listener.onFailure(ffe);
return;
} else {
// Create new global context and state index entries
flowFrameworkIndicesHandler.putTemplateToGlobalContext(templateWithUser, ActionListener.wrap(globalContextResponse -> {
flowFrameworkIndicesHandler.putInitialStateToWorkflowState(
globalContextResponse.getId(),
user,
ActionListener.wrap(stateResponse -> {
logger.info("create state workflow doc");
listener.onResponse(new WorkflowResponse(globalContextResponse.getId()));
}, exception -> {
logger.error("Failed to save workflow state : {}", exception.getMessage());
if (exception instanceof FlowFrameworkException) {
listener.onFailure(exception);
} else {
listener.onFailure(new FlowFrameworkException(exception.getMessage(), RestStatus.BAD_REQUEST));
}
})
);
// Initialize master key index and create new global context and state index entries
flowFrameworkIndicesHandler.initializeMasterKeyIndex(ActionListener.wrap(isInitialized -> {
if (!isInitialized) {
listener.onFailure(
new FlowFrameworkException("Failed to initalize master key index", RestStatus.INTERNAL_SERVER_ERROR)
joshpalis marked this conversation as resolved.
Show resolved Hide resolved
);
} else {
// Create new global context and state index entries
flowFrameworkIndicesHandler.putTemplateToGlobalContext(
templateWithUser,
ActionListener.wrap(globalContextResponse -> {
flowFrameworkIndicesHandler.putInitialStateToWorkflowState(
globalContextResponse.getId(),
user,
ActionListener.wrap(stateResponse -> {
logger.info("create state workflow doc");
listener.onResponse(new WorkflowResponse(globalContextResponse.getId()));
}, exception -> {
logger.error("Failed to save workflow state : {}", exception.getMessage());
if (exception instanceof FlowFrameworkException) {
listener.onFailure(exception);
} else {
listener.onFailure(
new FlowFrameworkException(exception.getMessage(), RestStatus.BAD_REQUEST)
);
}
})
);
}, exception -> {
logger.error("Failed to save use case template : {}", exception.getMessage());
if (exception instanceof FlowFrameworkException) {
listener.onFailure(exception);
} else {
listener.onFailure(
new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception))
);
}

})
);
}
}, exception -> {
logger.error("Failed to save use case template : {}", exception.getMessage());
logger.error("Failed to initialize master key index : {}", exception.getMessage());
joshpalis marked this conversation as resolved.
Show resolved Hide resolved
if (exception instanceof FlowFrameworkException) {
listener.onFailure(exception);
} else {
listener.onFailure(new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception)));

joshpalis marked this conversation as resolved.
Show resolved Hide resolved
}

}));
Expand Down
Loading
Loading