Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport-2.x] Added user level access control based on backend roles (#838) #847

Merged
merged 2 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
## [Unreleased 2.x](https://github.com/opensearch-project/flow-framework/compare/2.14...2.x)
### Features
- Adds reprovision API to support updating search pipelines, ingest pipelines index settings ([#804](https://github.com/opensearch-project/flow-framework/pull/804))
- Adds user level access control based on backend roles ([#838](https://github.com/opensearch-project/flow-framework/pull/838))

### Enhancements
### Bug Fixes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.opensearch.flowframework.transport.SearchWorkflowStateAction;
import org.opensearch.flowframework.transport.SearchWorkflowStateTransportAction;
import org.opensearch.flowframework.transport.SearchWorkflowTransportAction;
import org.opensearch.flowframework.transport.handler.SearchHandler;
import org.opensearch.flowframework.util.EncryptorUtils;
import org.opensearch.flowframework.workflow.WorkflowProcessSorter;
import org.opensearch.flowframework.workflow.WorkflowStepFactory;
Expand Down Expand Up @@ -84,6 +85,7 @@
import static org.opensearch.flowframework.common.CommonValue.PROVISION_WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FILTER_BY_BACKEND_ROLES;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOWS;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOW_STEPS;
Expand Down Expand Up @@ -135,7 +137,16 @@ public Collection<Object> createComponents(
);
WorkflowProcessSorter workflowProcessSorter = new WorkflowProcessSorter(workflowStepFactory, threadPool, flowFrameworkSettings);

return List.of(workflowStepFactory, workflowProcessSorter, encryptorUtils, flowFrameworkIndicesHandler, flowFrameworkSettings);
SearchHandler searchHandler = new SearchHandler(settings, clusterService, client, FlowFrameworkSettings.FILTER_BY_BACKEND_ROLES);

return List.of(
workflowStepFactory,
workflowProcessSorter,
encryptorUtils,
flowFrameworkIndicesHandler,
searchHandler,
flowFrameworkSettings
);
}

@Override
Expand Down Expand Up @@ -179,7 +190,14 @@ public List<RestHandler> getRestHandlers(

@Override
public List<Setting<?>> getSettings() {
return List.of(FLOW_FRAMEWORK_ENABLED, MAX_WORKFLOWS, MAX_WORKFLOW_STEPS, WORKFLOW_REQUEST_TIMEOUT, TASK_REQUEST_RETRY_DURATION);
return List.of(
FLOW_FRAMEWORK_ENABLED,
MAX_WORKFLOWS,
MAX_WORKFLOW_STEPS,
WORKFLOW_REQUEST_TIMEOUT,
TASK_REQUEST_RETRY_DURATION,
FILTER_BY_BACKEND_ROLES
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,14 @@ public class FlowFrameworkSettings {
Setting.Property.Dynamic
);

/** This setting sets the backend role filtering */
public static final Setting<Boolean> FILTER_BY_BACKEND_ROLES = Setting.boolSetting(
"plugins.flow_framework.filter_by_backend_roles",
false,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

/**
* Instantiate this class.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.commons.authuser.User;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.flowframework.common.CommonValue;
import org.opensearch.flowframework.common.FlowFrameworkSettings;
import org.opensearch.flowframework.exception.FlowFrameworkException;
Expand Down Expand Up @@ -49,7 +52,10 @@
import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX;
import static org.opensearch.flowframework.common.CommonValue.PROVISIONING_PROGRESS_FIELD;
import static org.opensearch.flowframework.common.CommonValue.STATE_FIELD;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FILTER_BY_BACKEND_ROLES;
import static org.opensearch.flowframework.util.ParseUtils.checkFilterByBackendRoles;
import static org.opensearch.flowframework.util.ParseUtils.getUserContext;
import static org.opensearch.flowframework.util.ParseUtils.getWorkflow;

/**
* Transport Action to index or update a use case template within the Global Context
Expand All @@ -63,6 +69,9 @@
private final Client client;
private final FlowFrameworkSettings flowFrameworkSettings;
private final PluginsService pluginsService;
private volatile Boolean filterByEnabled;
private final ClusterService clusterService;
private final NamedXContentRegistry xContentRegistry;

/**
* Instantiates a new CreateWorkflowTransportAction
Expand All @@ -73,6 +82,9 @@
* @param flowFrameworkSettings Plugin settings
* @param client The client used to make the request to OS
* @param pluginsService The plugin service
* @param clusterService the cluster service
* @param xContentRegistry the named content registry
* @param settings the plugin settings
*/
@Inject
public CreateWorkflowTransportAction(
Expand All @@ -82,20 +94,93 @@
FlowFrameworkIndicesHandler flowFrameworkIndicesHandler,
FlowFrameworkSettings flowFrameworkSettings,
Client client,
PluginsService pluginsService
PluginsService pluginsService,
ClusterService clusterService,
NamedXContentRegistry xContentRegistry,
Settings settings
) {
super(CreateWorkflowAction.NAME, transportService, actionFilters, WorkflowRequest::new);
this.workflowProcessSorter = workflowProcessSorter;
this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler;
this.flowFrameworkSettings = flowFrameworkSettings;
this.client = client;
this.pluginsService = pluginsService;
filterByEnabled = FILTER_BY_BACKEND_ROLES.get(settings);
this.clusterService = clusterService;
clusterService.getClusterSettings().addSettingsUpdateConsumer(FILTER_BY_BACKEND_ROLES, it -> filterByEnabled = it);
this.xContentRegistry = xContentRegistry;
}

@Override
protected void doExecute(Task task, WorkflowRequest request, ActionListener<WorkflowResponse> listener) {

User user = getUserContext(client);
String workflowId = request.getWorkflowId();
try {
resolveUserAndExecute(user, workflowId, listener, () -> createExecute(request, user, listener));
} catch (Exception e) {
logger.error("Failed to create workflow", e);
listener.onFailure(e);

Check warning on line 122 in src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java#L120-L122

Added lines #L120 - L122 were not covered by tests
}
}

/**
* Resolve user and execute the workflow function
* @param requestedUser the user making the request
* @param workflowId the workflow id
* @param listener the action listener
* @param function the workflow function to execute
*/
private void resolveUserAndExecute(
User requestedUser,
String workflowId,
ActionListener<WorkflowResponse> listener,
Runnable function
) {
try {
// Check if user has backend roles
// When filter by is enabled, block users creating/updating workflows who do not have backend roles.
if (filterByEnabled == Boolean.TRUE) {
try {
checkFilterByBackendRoles(requestedUser);
} catch (FlowFrameworkException e) {
logger.error(e.getMessage(), e);
listener.onFailure(e);
return;
}
}
if (workflowId != null) {
// requestedUser == null means security is disabled or user is superadmin. In this case we don't need to
// check if request user have access to the workflow or not. But we still need to get current workflow for
// this case, so we can keep current workflow's user data.
boolean filterByBackendRole = requestedUser == null ? false : filterByEnabled;
// Update workflow request, check if user has permissions to update the workflow
// Get workflow and verify backend roles
getWorkflow(requestedUser, workflowId, filterByBackendRole, listener, function, client, clusterService, xContentRegistry);
} else {
// Create Workflow. No need to get current workflow.
function.run();
}
} catch (Exception e) {
String errorMessage = "Failed to create or update workflow";

Check warning on line 164 in src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java#L163-L164

Added lines #L163 - L164 were not covered by tests
if (e instanceof FlowFrameworkException) {
listener.onFailure(e);

Check warning on line 166 in src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java#L166

Added line #L166 was not covered by tests
} else {
listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));

Check warning on line 168 in src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java#L168

Added line #L168 was not covered by tests
}
}
}

/**
* Execute the create or update request
* 1. Validate workflows if requested
* 2. Create or update global context index
* 3. Create or update state index
* 4. Create or update provisioning progress index
* @param request the workflow request
* @param user the user making the request
* @param listener the action listener
*/
private void createExecute(WorkflowRequest request, User user, ActionListener<WorkflowResponse> listener) {
Instant creationTime = Instant.now();
Template templateWithUser = new Template(
request.getTemplate().name(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,24 @@
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.commons.authuser.User;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

import static org.opensearch.flowframework.common.CommonValue.CLEAR_STATUS;
import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FILTER_BY_BACKEND_ROLES;
import static org.opensearch.flowframework.util.ParseUtils.getUserContext;
import static org.opensearch.flowframework.util.ParseUtils.resolveUserAndExecute;

/**
* Transport action to retrieve a use case template within the Global Context
Expand All @@ -37,50 +44,90 @@

private final FlowFrameworkIndicesHandler flowFrameworkIndicesHandler;
private final Client client;
private volatile Boolean filterByEnabled;
private final ClusterService clusterService;
private final NamedXContentRegistry xContentRegistry;

/**
* Instantiates a new DeleteWorkflowTransportAction instance
* @param transportService the transport service
* @param actionFilters action filters
* @param flowFrameworkIndicesHandler The Flow Framework indices handler
* @param client the OpenSearch Client
* @param clusterService the cluster service
* @param xContentRegistry contentRegister to parse get response
* @param settings the plugin settings
*/
@Inject
public DeleteWorkflowTransportAction(
TransportService transportService,
ActionFilters actionFilters,
FlowFrameworkIndicesHandler flowFrameworkIndicesHandler,
Client client
Client client,
ClusterService clusterService,
NamedXContentRegistry xContentRegistry,
Settings settings
) {
super(DeleteWorkflowAction.NAME, transportService, actionFilters, WorkflowRequest::new);
this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler;
this.client = client;
filterByEnabled = FILTER_BY_BACKEND_ROLES.get(settings);
this.xContentRegistry = xContentRegistry;
this.clusterService = clusterService;
clusterService.getClusterSettings().addSettingsUpdateConsumer(FILTER_BY_BACKEND_ROLES, it -> filterByEnabled = it);
}

@Override
protected void doExecute(Task task, WorkflowRequest request, ActionListener<DeleteResponse> listener) {
if (flowFrameworkIndicesHandler.doesIndexExist(GLOBAL_CONTEXT_INDEX)) {
String workflowId = request.getWorkflowId();
DeleteRequest deleteRequest = new DeleteRequest(GLOBAL_CONTEXT_INDEX, workflowId);
User user = getUserContext(client);

ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext();
logger.info("Deleting workflow doc: {}", workflowId);
client.delete(deleteRequest, ActionListener.runBefore(listener, context::restore));

// Whether to force deletion of corresponding state
final boolean clearStatus = Booleans.parseBoolean(request.getParams().get(CLEAR_STATUS), false);
ActionListener<DeleteResponse> stateListener = ActionListener.wrap(response -> {
logger.info("Deleted workflow state doc: {}", workflowId);
}, exception -> { logger.info("Failed to delete workflow state doc: {}", workflowId, exception); });
flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(workflowId, clearStatus, canDelete -> {
if (Boolean.TRUE.equals(canDelete)) {
flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc(workflowId, stateListener);
}
}, stateListener);
resolveUserAndExecute(
user,
workflowId,
filterByEnabled,
listener,
() -> executeDeleteRequest(request, listener, context),
client,
clusterService,
xContentRegistry
);

} else {
String errorMessage = "There are no templates in the global context";
logger.error(errorMessage);
listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.NOT_FOUND));
}
}

/**
* Executes the delete request
* @param request the workflow request
* @param listener the action listener
* @param context the thread context
*/
private void executeDeleteRequest(
WorkflowRequest request,
ActionListener<DeleteResponse> listener,
ThreadContext.StoredContext context
) {
String workflowId = request.getWorkflowId();
DeleteRequest deleteRequest = new DeleteRequest(GLOBAL_CONTEXT_INDEX, workflowId);
logger.info("Deleting workflow doc: {}", workflowId);
client.delete(deleteRequest, ActionListener.runBefore(listener, context::restore));

// Whether to force deletion of corresponding state
final boolean clearStatus = Booleans.parseBoolean(request.getParams().get(CLEAR_STATUS), false);
ActionListener<DeleteResponse> stateListener = ActionListener.wrap(response -> {
logger.info("Deleted workflow state doc: {}", workflowId);
}, exception -> { logger.info("Failed to delete workflow state doc: {}", workflowId, exception); });

Check warning on line 126 in src/main/java/org/opensearch/flowframework/transport/DeleteWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/DeleteWorkflowTransportAction.java#L125-L126

Added lines #L125 - L126 were not covered by tests
flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(workflowId, clearStatus, canDelete -> {
if (Boolean.TRUE.equals(canDelete)) {
flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc(workflowId, stateListener);

Check warning on line 129 in src/main/java/org/opensearch/flowframework/transport/DeleteWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/DeleteWorkflowTransportAction.java#L129

Added line #L129 was not covered by tests
}
}, stateListener);

Check warning on line 131 in src/main/java/org/opensearch/flowframework/transport/DeleteWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/DeleteWorkflowTransportAction.java#L131

Added line #L131 was not covered by tests
}
}
Loading
Loading