Skip to content

Commit

Permalink
Added user level access control based on backend roles (#838)
Browse files Browse the repository at this point in the history
* Implemented backend role filtering for Flow Framework

Signed-off-by: owaiskazi19 <[email protected]>

* Spotless Fixes

Signed-off-by: owaiskazi19 <[email protected]>

* Added secured integ tests

Signed-off-by: owaiskazi19 <[email protected]>

* Fixed threadcontext and an integ test

Signed-off-by: owaiskazi19 <[email protected]>

* Added javadocs and fixed checkstyle

Signed-off-by: owaiskazi19 <[email protected]>

* Added backend role filtering for reprovisioning API

Signed-off-by: owaiskazi19 <[email protected]>

* Fixed exceptions

Signed-off-by: owaiskazi19 <[email protected]>

* Updated CHANGELOG

Signed-off-by: owaiskazi19 <[email protected]>

* Fixed forbidden APIs in tests

Signed-off-by: owaiskazi19 <[email protected]>

* Added secured integ tests for reprovision workflow

Signed-off-by: owaiskazi19 <[email protected]>

* Fixed checkstyle violation

Signed-off-by: owaiskazi19 <[email protected]>

* Added more tests and resolved PR comments

Signed-off-by: Owais <[email protected]>

* Addressed additional PR Comments

Signed-off-by: Owais <[email protected]>

* Updated the javadoc

Signed-off-by: Owais <[email protected]>

---------

Signed-off-by: owaiskazi19 <[email protected]>
Signed-off-by: Owais <[email protected]>
(cherry picked from commit 60458a6)
  • Loading branch information
owaiskazi19 committed Aug 26, 2024
1 parent f032867 commit 94bde98
Show file tree
Hide file tree
Showing 31 changed files with 2,362 additions and 457 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
## [Unreleased 2.x](https://github.com/opensearch-project/flow-framework/compare/2.14...2.x)
### Features
- Adds reprovision API to support updating search pipelines, ingest pipelines index settings ([#804](https://github.com/opensearch-project/flow-framework/pull/804))
- Adds user level access control based on backend roles ([#838](https://github.com/opensearch-project/flow-framework/pull/838))

### Enhancements
### Bug Fixes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.opensearch.flowframework.transport.SearchWorkflowStateAction;
import org.opensearch.flowframework.transport.SearchWorkflowStateTransportAction;
import org.opensearch.flowframework.transport.SearchWorkflowTransportAction;
import org.opensearch.flowframework.transport.handler.SearchHandler;
import org.opensearch.flowframework.util.EncryptorUtils;
import org.opensearch.flowframework.workflow.WorkflowProcessSorter;
import org.opensearch.flowframework.workflow.WorkflowStepFactory;
Expand Down Expand Up @@ -84,6 +85,7 @@
import static org.opensearch.flowframework.common.CommonValue.PROVISION_WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FILTER_BY_BACKEND_ROLES;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOWS;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOW_STEPS;
Expand Down Expand Up @@ -135,7 +137,16 @@ public Collection<Object> createComponents(
);
WorkflowProcessSorter workflowProcessSorter = new WorkflowProcessSorter(workflowStepFactory, threadPool, flowFrameworkSettings);

return List.of(workflowStepFactory, workflowProcessSorter, encryptorUtils, flowFrameworkIndicesHandler, flowFrameworkSettings);
SearchHandler searchHandler = new SearchHandler(settings, clusterService, client, FlowFrameworkSettings.FILTER_BY_BACKEND_ROLES);

return List.of(
workflowStepFactory,
workflowProcessSorter,
encryptorUtils,
flowFrameworkIndicesHandler,
searchHandler,
flowFrameworkSettings
);
}

@Override
Expand Down Expand Up @@ -179,7 +190,14 @@ public List<RestHandler> getRestHandlers(

@Override
public List<Setting<?>> getSettings() {
return List.of(FLOW_FRAMEWORK_ENABLED, MAX_WORKFLOWS, MAX_WORKFLOW_STEPS, WORKFLOW_REQUEST_TIMEOUT, TASK_REQUEST_RETRY_DURATION);
return List.of(
FLOW_FRAMEWORK_ENABLED,
MAX_WORKFLOWS,
MAX_WORKFLOW_STEPS,
WORKFLOW_REQUEST_TIMEOUT,
TASK_REQUEST_RETRY_DURATION,
FILTER_BY_BACKEND_ROLES
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,14 @@ public class FlowFrameworkSettings {
Setting.Property.Dynamic
);

/** This setting sets the backend role filtering */
public static final Setting<Boolean> FILTER_BY_BACKEND_ROLES = Setting.boolSetting(
"plugins.flow_framework.filter_by_backend_roles",
false,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

/**
* Instantiate this class.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.commons.authuser.User;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.flowframework.common.CommonValue;
import org.opensearch.flowframework.common.FlowFrameworkSettings;
import org.opensearch.flowframework.exception.FlowFrameworkException;
Expand Down Expand Up @@ -49,7 +52,10 @@
import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX;
import static org.opensearch.flowframework.common.CommonValue.PROVISIONING_PROGRESS_FIELD;
import static org.opensearch.flowframework.common.CommonValue.STATE_FIELD;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FILTER_BY_BACKEND_ROLES;
import static org.opensearch.flowframework.util.ParseUtils.checkFilterByBackendRoles;
import static org.opensearch.flowframework.util.ParseUtils.getUserContext;
import static org.opensearch.flowframework.util.ParseUtils.getWorkflow;

/**
* Transport Action to index or update a use case template within the Global Context
Expand All @@ -63,6 +69,9 @@ public class CreateWorkflowTransportAction extends HandledTransportAction<Workfl
private final Client client;
private final FlowFrameworkSettings flowFrameworkSettings;
private final PluginsService pluginsService;
private volatile Boolean filterByEnabled;
private final ClusterService clusterService;
private final NamedXContentRegistry xContentRegistry;

/**
* Instantiates a new CreateWorkflowTransportAction
Expand All @@ -73,6 +82,9 @@ public class CreateWorkflowTransportAction extends HandledTransportAction<Workfl
* @param flowFrameworkSettings Plugin settings
* @param client The client used to make the request to OS
* @param pluginsService The plugin service
* @param clusterService the cluster service
* @param xContentRegistry the named content registry
* @param settings the plugin settings
*/
@Inject
public CreateWorkflowTransportAction(
Expand All @@ -82,20 +94,93 @@ public CreateWorkflowTransportAction(
FlowFrameworkIndicesHandler flowFrameworkIndicesHandler,
FlowFrameworkSettings flowFrameworkSettings,
Client client,
PluginsService pluginsService
PluginsService pluginsService,
ClusterService clusterService,
NamedXContentRegistry xContentRegistry,
Settings settings
) {
super(CreateWorkflowAction.NAME, transportService, actionFilters, WorkflowRequest::new);
this.workflowProcessSorter = workflowProcessSorter;
this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler;
this.flowFrameworkSettings = flowFrameworkSettings;
this.client = client;
this.pluginsService = pluginsService;
filterByEnabled = FILTER_BY_BACKEND_ROLES.get(settings);
this.clusterService = clusterService;
clusterService.getClusterSettings().addSettingsUpdateConsumer(FILTER_BY_BACKEND_ROLES, it -> filterByEnabled = it);
this.xContentRegistry = xContentRegistry;
}

@Override
protected void doExecute(Task task, WorkflowRequest request, ActionListener<WorkflowResponse> listener) {

User user = getUserContext(client);
String workflowId = request.getWorkflowId();
try {
resolveUserAndExecute(user, workflowId, listener, () -> createExecute(request, user, listener));
} catch (Exception e) {
logger.error("Failed to create workflow", e);
listener.onFailure(e);
}
}

/**
* Resolve user and execute the workflow function
* @param requestedUser the user making the request
* @param workflowId the workflow id
* @param listener the action listener
* @param function the workflow function to execute
*/
private void resolveUserAndExecute(
User requestedUser,
String workflowId,
ActionListener<WorkflowResponse> listener,
Runnable function
) {
try {
// Check if user has backend roles
// When filter by is enabled, block users creating/updating workflows who do not have backend roles.
if (filterByEnabled == Boolean.TRUE) {
try {
checkFilterByBackendRoles(requestedUser);
} catch (FlowFrameworkException e) {
logger.error(e.getMessage(), e);
listener.onFailure(e);
return;
}
}
if (workflowId != null) {
// requestedUser == null means security is disabled or user is superadmin. In this case we don't need to
// check if request user have access to the workflow or not. But we still need to get current workflow for
// this case, so we can keep current workflow's user data.
boolean filterByBackendRole = requestedUser == null ? false : filterByEnabled;
// Update workflow request, check if user has permissions to update the workflow
// Get workflow and verify backend roles
getWorkflow(requestedUser, workflowId, filterByBackendRole, listener, function, client, clusterService, xContentRegistry);
} else {
// Create Workflow. No need to get current workflow.
function.run();
}
} catch (Exception e) {
String errorMessage = "Failed to create or update workflow";
if (e instanceof FlowFrameworkException) {
listener.onFailure(e);
} else {
listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
}
}
}

/**
* Execute the create or update request
* 1. Validate workflows if requested
* 2. Create or update global context index
* 3. Create or update state index
* 4. Create or update provisioning progress index
* @param request the workflow request
* @param user the user making the request
* @param listener the action listener
*/
private void createExecute(WorkflowRequest request, User user, ActionListener<WorkflowResponse> listener) {
Instant creationTime = Instant.now();
Template templateWithUser = new Template(
request.getTemplate().name(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,24 @@
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.commons.authuser.User;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

import static org.opensearch.flowframework.common.CommonValue.CLEAR_STATUS;
import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FILTER_BY_BACKEND_ROLES;
import static org.opensearch.flowframework.util.ParseUtils.getUserContext;
import static org.opensearch.flowframework.util.ParseUtils.resolveUserAndExecute;

/**
* Transport action to retrieve a use case template within the Global Context
Expand All @@ -37,50 +44,90 @@ public class DeleteWorkflowTransportAction extends HandledTransportAction<Workfl

private final FlowFrameworkIndicesHandler flowFrameworkIndicesHandler;
private final Client client;
private volatile Boolean filterByEnabled;
private final ClusterService clusterService;
private final NamedXContentRegistry xContentRegistry;

/**
* Instantiates a new DeleteWorkflowTransportAction instance
* @param transportService the transport service
* @param actionFilters action filters
* @param flowFrameworkIndicesHandler The Flow Framework indices handler
* @param client the OpenSearch Client
* @param clusterService the cluster service
* @param xContentRegistry contentRegister to parse get response
* @param settings the plugin settings
*/
@Inject
public DeleteWorkflowTransportAction(
TransportService transportService,
ActionFilters actionFilters,
FlowFrameworkIndicesHandler flowFrameworkIndicesHandler,
Client client
Client client,
ClusterService clusterService,
NamedXContentRegistry xContentRegistry,
Settings settings
) {
super(DeleteWorkflowAction.NAME, transportService, actionFilters, WorkflowRequest::new);
this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler;
this.client = client;
filterByEnabled = FILTER_BY_BACKEND_ROLES.get(settings);
this.xContentRegistry = xContentRegistry;
this.clusterService = clusterService;
clusterService.getClusterSettings().addSettingsUpdateConsumer(FILTER_BY_BACKEND_ROLES, it -> filterByEnabled = it);
}

@Override
protected void doExecute(Task task, WorkflowRequest request, ActionListener<DeleteResponse> listener) {
if (flowFrameworkIndicesHandler.doesIndexExist(GLOBAL_CONTEXT_INDEX)) {
String workflowId = request.getWorkflowId();
DeleteRequest deleteRequest = new DeleteRequest(GLOBAL_CONTEXT_INDEX, workflowId);
User user = getUserContext(client);

ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext();
logger.info("Deleting workflow doc: {}", workflowId);
client.delete(deleteRequest, ActionListener.runBefore(listener, context::restore));

// Whether to force deletion of corresponding state
final boolean clearStatus = Booleans.parseBoolean(request.getParams().get(CLEAR_STATUS), false);
ActionListener<DeleteResponse> stateListener = ActionListener.wrap(response -> {
logger.info("Deleted workflow state doc: {}", workflowId);
}, exception -> { logger.info("Failed to delete workflow state doc: {}", workflowId, exception); });
flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(workflowId, clearStatus, canDelete -> {
if (Boolean.TRUE.equals(canDelete)) {
flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc(workflowId, stateListener);
}
}, stateListener);
resolveUserAndExecute(
user,
workflowId,
filterByEnabled,
listener,
() -> executeDeleteRequest(request, listener, context),
client,
clusterService,
xContentRegistry
);

} else {
String errorMessage = "There are no templates in the global context";
logger.error(errorMessage);
listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.NOT_FOUND));
}
}

/**
* Executes the delete request
* @param request the workflow request
* @param listener the action listener
* @param context the thread context
*/
private void executeDeleteRequest(
WorkflowRequest request,
ActionListener<DeleteResponse> listener,
ThreadContext.StoredContext context
) {
String workflowId = request.getWorkflowId();
DeleteRequest deleteRequest = new DeleteRequest(GLOBAL_CONTEXT_INDEX, workflowId);
logger.info("Deleting workflow doc: {}", workflowId);
client.delete(deleteRequest, ActionListener.runBefore(listener, context::restore));

// Whether to force deletion of corresponding state
final boolean clearStatus = Booleans.parseBoolean(request.getParams().get(CLEAR_STATUS), false);
ActionListener<DeleteResponse> stateListener = ActionListener.wrap(response -> {
logger.info("Deleted workflow state doc: {}", workflowId);
}, exception -> { logger.info("Failed to delete workflow state doc: {}", workflowId, exception); });
flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(workflowId, clearStatus, canDelete -> {
if (Boolean.TRUE.equals(canDelete)) {
flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc(workflowId, stateListener);
}
}, stateListener);
}
}
Loading

0 comments on commit 94bde98

Please sign in to comment.