Skip to content

Commit

Permalink
Added javadocs and fixed checkstyle
Browse files Browse the repository at this point in the history
Signed-off-by: owaiskazi19 <[email protected]>
  • Loading branch information
owaiskazi19 committed Aug 16, 2024
1 parent b67041b commit 83f6eb8
Show file tree
Hide file tree
Showing 18 changed files with 211 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@
import static org.opensearch.flowframework.common.CommonValue.PROVISIONING_PROGRESS_FIELD;
import static org.opensearch.flowframework.common.CommonValue.STATE_FIELD;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FILTER_BY_BACKEND_ROLES;
import static org.opensearch.flowframework.util.ParseUtils.*;
import static org.opensearch.flowframework.util.ParseUtils.checkFilterByBackendRoles;
import static org.opensearch.flowframework.util.ParseUtils.getUserContext;
import static org.opensearch.flowframework.util.ParseUtils.getWorkflow;

/**
* Transport Action to index or update a use case template within the Global Context
Expand Down Expand Up @@ -82,6 +84,9 @@ public class CreateWorkflowTransportAction extends HandledTransportAction<Workfl
* @param flowFrameworkSettings Plugin settings
* @param client The client used to make the request to OS
* @param pluginsService The plugin service
* @param clusterService the cluster service
* @param xContentRegistry the named content registry
* @param settings the plugin settings
*/
@Inject
public CreateWorkflowTransportAction(
Expand Down Expand Up @@ -116,11 +121,18 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
try {
resolveUserAndExecute(user, workflowId, listener, () -> createExecute(request, user, listener));
} catch (Exception e) {
logger.error(e);
logger.error("Failed to create workflow", e);
listener.onFailure(e);
}
}

/**
* Resolve user and execute the workflow function
* @param requestedUser the user making the request
* @param workflowId the workflow id
* @param listener the action listener
* @param function the workflow function to execute
*/
private void resolveUserAndExecute(
User requestedUser,
String workflowId,
Expand Down Expand Up @@ -154,6 +166,16 @@ private void resolveUserAndExecute(
}
}

/**
* Execute the create or update request
* 1. Validate workflows if requested
* 2. Create or update global context index
* 3. Create or update state index
* 4. Create or update provisioning progress index
* @param request the workflow request
* @param user the user making the request
* @param listener the action listener
*/
protected void createExecute(WorkflowRequest request, User user, ActionListener<WorkflowResponse> listener) {
Instant creationTime = Instant.now();
Template templateWithUser = new Template(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ public class DeleteWorkflowTransportAction extends HandledTransportAction<Workfl
* @param actionFilters action filters
* @param flowFrameworkIndicesHandler The Flow Framework indices handler
* @param client the OpenSearch Client
* @param clusterService the cluster service
* @param xContentRegistry contentRegister to parse get response
* @param settings the plugin settings
*/
@Inject
public DeleteWorkflowTransportAction(
Expand Down Expand Up @@ -100,6 +103,12 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Dele
}
}

/**
* Executes the delete request
* @param request the workflow request
* @param listener the action listener
* @param context the thread context
*/
protected void executeDeleteRequest(
WorkflowRequest request,
ActionListener<DeleteResponse> listener,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ public class DeprovisionWorkflowTransportAction extends HandledTransportAction<W
* @param workflowStepFactory The factory instantiating workflow steps
* @param flowFrameworkIndicesHandler Class to handle all internal system indices actions
* @param flowFrameworkSettings The plugin settings
* @param clusterService the cluster service
* @param xContentRegistry contentRegister to parse get response
* @param settings the plugin settings
*/
@Inject
public DeprovisionWorkflowTransportAction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ public class GetWorkflowStateTransportAction extends HandledTransportAction<GetW
* @param actionFilters action filters
* @param client The client used to make the request to OS
* @param xContentRegistry contentRegister to parse get response
* @param clusterService the cluster service
* @param settings the plugin settings
*/
@Inject
public GetWorkflowStateTransportAction(
Expand Down Expand Up @@ -100,6 +102,12 @@ protected void doExecute(Task task, GetWorkflowStateRequest request, ActionListe
}
}

/**
* Execute the get workflow state request
* @param request the get workflow state request
* @param listener the action listener
* @param context the thread context
*/
private void executeGetWorkflowStateRequest(
GetWorkflowStateRequest request,
ActionListener<GetWorkflowStateResponse> listener,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ public class GetWorkflowTransportAction extends HandledTransportAction<WorkflowR
* @param flowFrameworkIndicesHandler The Flow Framework indices handler
* @param encryptorUtils Encryptor utils
* @param client the Opensearch Client
* @param xContentRegistry contentRegister to parse get response
* @param clusterService the cluster service
* @param settings the plugin settings
*/
@Inject
public GetWorkflowTransportAction(
Expand Down Expand Up @@ -113,6 +116,12 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<GetW

}

/**
* Execute the get request against the global context index
* @param request the workflow request
* @param listener the action listener
* @param context the thread context
*/
private void executeGetRequest(
WorkflowRequest request,
ActionListener<GetWorkflowResponse> listener,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ public class ProvisionWorkflowTransportAction extends HandledTransportAction<Wor
* @param flowFrameworkIndicesHandler Class to handle all internal system indices actions
* @param encryptorUtils Utility class to handle encryption/decryption
* @param pluginsService The Plugins Service
* @param clusterService the cluster service
* @param xContentRegistry the named content registry
* @param settings the plugin settings
*/
@Inject
public ProvisionWorkflowTransportAction(
Expand Down Expand Up @@ -141,6 +144,19 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
}
}

/**
* Execute the provision request
* 1. Retrieve template from global context
* 2. Decrypt template
* 3. Sort and validate graph
* 4. Update state index
* 5. Execute workflow asynchronously
* 6. Update last provisioned field in template
* 7. Return response
* @param request the workflow request
* @param listener the action listener
* @param context the thread context
*/
private void executeProvisionRequest(
WorkflowRequest request,
ActionListener<WorkflowResponse> listener,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,32 @@
import static org.opensearch.flowframework.util.ParseUtils.isAdmin;
import static org.opensearch.flowframework.util.RestHandlerUtils.getSourceContext;

/**
* Handle general search request, check user role and return search response.
*/
public class SearchHandler {
private final Logger logger = LogManager.getLogger(SearchHandler.class);
private final Client client;
private volatile Boolean filterEnabled;

/**
* Instantiates a new SearchHandler
* @param settings settings
* @param clusterService cluster service
* @param client The node client to retrieve a stored use case template
* @param filterByBackendRoleSetting filter role backend settings
*/
public SearchHandler(Settings settings, ClusterService clusterService, Client client, Setting<Boolean> filterByBackendRoleSetting) {
this.client = client;
filterEnabled = filterByBackendRoleSetting.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(filterByBackendRoleSetting, it -> filterEnabled = it);
}

/**
* Search workflows in global context
* @param request SearchRequest
* @param actionListener ActionListener
*/
public void search(SearchRequest request, ActionListener<SearchResponse> actionListener) {
// AccessController should take care of letting the user with right permission to view the workflow
User user = ParseUtils.getUserContext(client);
Expand All @@ -50,6 +65,13 @@ public void search(SearchRequest request, ActionListener<SearchResponse> actionL
}
}

/**
* Validate user role and call search
* @param request SearchRequest
* @param user User
* @param listener ActionListener
* @param context ThreadContext
*/
public void validateRole(
SearchRequest request,
User user,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
*/
package org.opensearch.flowframework.transport.handler;

/**
* Interface for a workflow function.
*
*/
public interface WorkflowFunction {
/**
* Performs this operation.
Expand Down
66 changes: 64 additions & 2 deletions src/main/java/org/opensearch/flowframework/util/ParseUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,26 @@
import org.opensearch.flowframework.transport.handler.WorkflowFunction;
import org.opensearch.flowframework.workflow.WorkflowData;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.query.*;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.query.TermsQueryBuilder;
import org.opensearch.index.query.NestedQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.ml.repackage.com.google.common.collect.ImmutableList;
import org.opensearch.search.builder.SearchSourceBuilder;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.time.Instant;
import java.util.*;
import java.util.Map;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.Locale;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Optional;
import java.util.Map.Entry;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -238,6 +249,12 @@ public static User getUserContext(Client client) {
return User.parse(userStr);
}

/**
* Add user backend roles filter to search source builder=
* @param user the user
* @param searchSourceBuilder search builder
* @return search builder with filter added
*/
public static SearchSourceBuilder addUserBackendRolesFilter(User user, SearchSourceBuilder searchSourceBuilder) {
if (user == null) {
return searchSourceBuilder;
Expand All @@ -262,6 +279,17 @@ public static SearchSourceBuilder addUserBackendRolesFilter(User user, SearchSou
return searchSourceBuilder;
}

/**
* Resolve user and execute the function
* @param requestedUser the user to execute the request
* @param workflowId workflow id
* @param filterByEnabled filter by enabled setting
* @param listener action listener
* @param function workflow function
* @param client node client
* @param clusterService cluster service
* @param xContentRegistry contentRegister to parse get response
*/
public static void resolveUserAndExecute(
User requestedUser,
String workflowId,
Expand All @@ -286,6 +314,14 @@ public static void resolveUserAndExecute(
}
}

/**
* Check if requested user has backend role required to access the resource
* @param requestedUser the user to execute the request
* @param resourceUser user of the resource
* @param workflowId workflow id
* @return boolean if the requested user has backend role required to access the resource
* @throws Exception exception
*/
private static boolean checkUserPermissions(User requestedUser, User resourceUser, String workflowId) throws Exception {
if (resourceUser.getBackendRoles() == null || requestedUser.getBackendRoles() == null) {
return false;
Expand All @@ -307,6 +343,11 @@ private static boolean checkUserPermissions(User requestedUser, User resourceUse
return false;
}

/**
* Check if filter by backend roles is enabled and validate the requested user
* @param requestedUser the user to execute the request
* @return error message if validation fails, null otherwise
*/
public static String checkFilterByBackendRoles(User requestedUser) {
if (requestedUser == null) {
return "Filter by backend roles is enabled and User is null";
Expand All @@ -321,6 +362,17 @@ public static String checkFilterByBackendRoles(User requestedUser) {
return null;
}

/**
* Get workflow
* @param requestUser the user to execute the request
* @param workflowId workflow id
* @param filterByEnabled filter by enabled setting
* @param listener action listener
* @param function workflow function
* @param client node client
* @param clusterService cluster service
* @param xContentRegistry contentRegister to parse get response
*/
public static void getWorkflow(
User requestUser,
String workflowId,
Expand Down Expand Up @@ -358,6 +410,16 @@ public static void getWorkflow(
}
}

/**
* Execute the function if user has permissions to access the resource
* @param requestUser the user to execute the request
* @param response get response
* @param workflowId workflow id
* @param filterByEnabled filter by enabled setting
* @param listener action listener
* @param function workflow function
* @param xContentRegistry contentRegister to parse get response
*/
public static void onGetWorkflowResponse(
GetResponse response,
User requestUser,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ public static FetchSourceContext getSourceContext(User user, SearchSourceBuilder
}
}

/**
* Create an XContentParser from the provided NamedXContentRegistry and BytesReference
* @param xContentRegistry content registry
* @param bytesReference bytes reference
* @return XContentParser
* @throws IOException if error occurs while creating parser
*/
public static XContentParser createXContentParserFromRegistry(NamedXContentRegistry xContentRegistry, BytesReference bytesReference)
throws IOException {
return XContentHelper.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, bytesReference, XContentType.JSON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.flowframework.common.FlowFrameworkSettings.*;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FILTER_BY_BACKEND_ROLES;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOWS;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOW_STEPS;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.TASK_REQUEST_RETRY_DURATION;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.WORKFLOW_REQUEST_TIMEOUT;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,12 @@
import org.junit.Before;

import java.io.IOException;
import java.util.*;
import java.util.Map;
import java.util.Optional;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

Expand Down
Loading

0 comments on commit 83f6eb8

Please sign in to comment.