Skip to content

Commit

Permalink
Commented tests and added javadoc
Browse files Browse the repository at this point in the history
Signed-off-by: Owais Kazi <[email protected]>
  • Loading branch information
owaiskazi19 committed Nov 8, 2023
1 parent 99a362e commit c4b9a68
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ public class FlowFrameworkSettings {
private FlowFrameworkSettings() {}

/** The upper limit of max workflows that can be created */
public static final int MAX_WORKFLOWS_LIMIT = 1000;
public static final int MAX_WORKFLOWS_LIMIT = 10000;

/** This setting sets max workflows that can be created */
public static final Setting<Integer> MAX_WORKFLOWS = Setting.intSetting(
"plugins.flow_framework.max_workflows",
100,
1000,
0,
MAX_WORKFLOWS_LIMIT,
Setting.Property.NodeScope,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,15 @@
*/
public abstract class AbstractSearchWorkflowAction<T extends ToXContentObject> extends BaseRestHandler {

/** Url Paths of the routes*/
protected final List<String> urlPaths;
/** Index on search operation needs to be performed*/
protected final String index;
/** Search class name*/
protected final Class<T> clazz;
/** Search action type*/
protected final ActionType<SearchResponse> actionType;
/** Settings to enable FlowFramework API*/
protected final FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
* Abstract action for the rest actions
*/
public abstract class AbstractWorkflowAction extends BaseRestHandler {
/** Timeout for the request*/
protected volatile TimeValue requestTimeout;
/** Max workflows that can be created*/
protected volatile Integer maxWorkflows;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,41 +118,41 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
FlowFrameworkException ffe = new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST);
listener.onFailure(ffe);
return;

Check warning on line 120 in src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java#L116-L120

Added lines #L116 - L120 were not covered by tests
}
}, e -> {
logger.error("Failed to updated use case template {} : {}", request.getWorkflowId(), e.getMessage());
if (e instanceof FlowFrameworkException) {
listener.onFailure(e);
} else {
listener.onFailure(new FlowFrameworkException(e.getMessage(), ExceptionsHelper.status(e)));
}
}));

// Create new global context and state index entries
flowFrameworkIndicesHandler.putTemplateToGlobalContext(templateWithUser, ActionListener.wrap(globalContextResponse -> {
flowFrameworkIndicesHandler.putInitialStateToWorkflowState(
globalContextResponse.getId(),
user,
ActionListener.wrap(stateResponse -> {
logger.info("create state workflow doc");
listener.onResponse(new WorkflowResponse(globalContextResponse.getId()));
// Create new global context and state index entries
flowFrameworkIndicesHandler.putTemplateToGlobalContext(templateWithUser, ActionListener.wrap(globalContextResponse -> {
flowFrameworkIndicesHandler.putInitialStateToWorkflowState(
globalContextResponse.getId(),

Check warning on line 125 in src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java#L123-L125

Added lines #L123 - L125 were not covered by tests
user,
ActionListener.wrap(stateResponse -> {
logger.info("create state workflow doc");
listener.onResponse(new WorkflowResponse(globalContextResponse.getId()));
}, exception -> {
logger.error("Failed to save workflow state : {}", exception.getMessage());

Check warning on line 131 in src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java#L127-L131

Added lines #L127 - L131 were not covered by tests
if (exception instanceof FlowFrameworkException) {
listener.onFailure(exception);

Check warning on line 133 in src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java#L133

Added line #L133 was not covered by tests
} else {
listener.onFailure(new FlowFrameworkException(exception.getMessage(), RestStatus.BAD_REQUEST));

Check warning on line 135 in src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java#L135

Added line #L135 was not covered by tests
}
})

Check warning on line 137 in src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java#L137

Added line #L137 was not covered by tests
);
}, exception -> {
logger.error("Failed to save workflow state : {}", exception.getMessage());
logger.error("Failed to save use case template : {}", exception.getMessage());

Check warning on line 140 in src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java#L140

Added line #L140 was not covered by tests
if (exception instanceof FlowFrameworkException) {
listener.onFailure(exception);
} else {
listener.onFailure(new FlowFrameworkException(exception.getMessage(), RestStatus.BAD_REQUEST));
listener.onFailure(new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception)));

Check warning on line 144 in src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java#L144

Added line #L144 was not covered by tests
}
})
);
}, exception -> {
logger.error("Failed to save use case template : {}", exception.getMessage());
if (exception instanceof FlowFrameworkException) {
listener.onFailure(exception);

}));

Check warning on line 147 in src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java#L147

Added line #L147 was not covered by tests
}
}, e -> {
logger.error("Failed to updated use case template {} : {}", request.getWorkflowId(), e.getMessage());

Check warning on line 150 in src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java#L149-L150

Added lines #L149 - L150 were not covered by tests
if (e instanceof FlowFrameworkException) {
listener.onFailure(e);

Check warning on line 152 in src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java#L152

Added line #L152 was not covered by tests
} else {
listener.onFailure(new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception)));
listener.onFailure(new FlowFrameworkException(e.getMessage(), ExceptionsHelper.status(e)));

Check warning on line 154 in src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java#L154

Added line #L154 was not covered by tests
}

}));
} else {
// Update existing entry, full document replacement
Expand Down Expand Up @@ -192,8 +192,8 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
/**
* Checks if the max workflows limit has been reachesd
* @param requestTimeOut request time out
* @param maxWorkflow max workflows
* @return
* @param maxWorkflow max workflows
* @param internalListener listener for search request
*/
protected void onSearchGlobalContext(TimeValue requestTimeOut, Integer maxWorkflow, ActionListener<Boolean> internalListener) {
QueryBuilder query = QueryBuilders.matchAllQuery();
Expand All @@ -202,7 +202,6 @@ protected void onSearchGlobalContext(TimeValue requestTimeOut, Integer maxWorkfl
SearchRequest searchRequest = new SearchRequest(CommonValue.GLOBAL_CONTEXT_INDEX).source(searchSourceBuilder);

Check warning on line 202 in src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java#L202

Added line #L202 was not covered by tests

client.search(searchRequest, ActionListener.wrap(searchResponse -> {

Check warning on line 204 in src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java#L204

Added line #L204 was not covered by tests
logger.info("SEARCH RESPONSE SIZE {}", searchResponse.getHits().getTotalHits().value);
if (searchResponse.getHits().getTotalHits().value >= maxWorkflow) {
internalListener.onResponse(false);

Check warning on line 206 in src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java#L206

Added line #L206 was not covered by tests
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/
public class RestHandlerUtils {

/** Fields that need to be excluded from the Search Response*/
public static final String[] USER_EXCLUDE = new String[] { CommonValue.USER_FIELD };

private RestHandlerUtils() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,22 +161,46 @@ public void testFailedDryRunValidation() {
assertEquals("No start node detected: all nodes have a predecessor.", exceptionCaptor.getValue().getMessage());
}

/* public void testMaxWorkflow() {
ActionListener<WorkflowResponse> listener = mock(ActionListener.class);
ArgumentCaptor<ActionListener<Boolean>> actionListenerCaptor = ArgumentCaptor.forClass(ActionListener.class);
WorkflowRequest createNewWorkflow = new WorkflowRequest(null, template, null, 1);
doAnswer(invocation -> {
ActionListener<Boolean> responseListener = invocation.getArgument(1);
responseListener.onResponse(true);
return null;
}).when(mock(CreateWorkflowTransportAction.class)).onSearchGlobalContext(any(), any(), actionListenerCaptor.capture());
createWorkflowTransportAction.doExecute(mock(Task.class), createNewWorkflow, listener);
ArgumentCaptor<Exception> exceptionCaptor = ArgumentCaptor.forClass(Exception.class);
verify(listener, times(1)).onFailure(exceptionCaptor.capture());
assertEquals("Unable to fetch the workflows", exceptionCaptor.getValue().getMessage());
}
public void testFailedToCreateNewWorkflow() {
@SuppressWarnings("unchecked")
ActionListener<WorkflowResponse> listener = mock(ActionListener.class);
WorkflowRequest createNewWorkflow = new WorkflowRequest(null, template, null, null);
ArgumentCaptor<ActionListener<Boolean>> actionListenerCaptor = ArgumentCaptor.forClass(ActionListener.class);
WorkflowRequest createNewWorkflow = new WorkflowRequest(null, template, null, 1);
doAnswer(invocation -> {
ActionListener<IndexResponse> responseListener = invocation.getArgument(1);
responseListener.onFailure(new Exception("Failed to create global_context index"));
return null;
}).when(flowFrameworkIndicesHandler).putTemplateToGlobalContext(any(Template.class), any());
doAnswer(invocation -> {
ActionListener<SearchResponse> responseListener = invocation.getArgument(1);
responseListener.onResponse(new SearchResponse(null));
return null;
}).when(mock(CreateWorkflowTransportAction.class)).onSearchGlobalContext(any(), any(), actionListenerCaptor.capture());
createWorkflowTransportAction.doExecute(mock(Task.class), createNewWorkflow, listener);
ArgumentCaptor<Exception> exceptionCaptor = ArgumentCaptor.forClass(Exception.class);
verify(listener, times(1)).onFailure(exceptionCaptor.capture());
assertEquals("Failed to create global_context index", exceptionCaptor.getValue().getMessage());
}
}*/

public void testFailedToUpdateWorkflow() {
@SuppressWarnings("unchecked")
Expand Down

0 comments on commit c4b9a68

Please sign in to comment.