Skip to content

Commit

Permalink
Merge branch 'main' into build-script
Browse files Browse the repository at this point in the history
  • Loading branch information
owaiskazi19 committed Nov 20, 2023
2 parents 709881b + d3ab0ef commit aa7f508
Show file tree
Hide file tree
Showing 52 changed files with 1,210 additions and 131 deletions.
7 changes: 6 additions & 1 deletion .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
# Spotless requires JDK 17+
- uses: actions/setup-java@v3
with:
java-version: 17
distribution: temurin
- name: Spotless Check
run: ./gradlew spotlessCheck
javadoc:
Expand Down Expand Up @@ -46,7 +51,7 @@ jobs:
distribution: temurin
- name: Build and Run Tests
run: |
./gradlew check -x integTest -x yamlRestTest
./gradlew check -x integTest -x yamlRestTest -x spotlessJava
- name: Upload Coverage Report
if: ${{ matrix.codecov }}
uses: codecov/codecov-action@v3
Expand Down
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ dependencies {
configurations.all {
resolutionStrategy {
force("com.google.guava:guava:32.1.3-jre") // CVE for 31.1
force("org.eclipse.platform:org.eclipse.core.runtime:3.29.0") // CVE for 3.26.100
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,13 @@
import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.rest.RestCreateWorkflowAction;
import org.opensearch.flowframework.rest.RestGetWorkflowAction;
import org.opensearch.flowframework.rest.RestProvisionWorkflowAction;
import org.opensearch.flowframework.rest.RestSearchWorkflowAction;
import org.opensearch.flowframework.transport.CreateWorkflowAction;
import org.opensearch.flowframework.transport.CreateWorkflowTransportAction;
import org.opensearch.flowframework.transport.GetWorkflowAction;
import org.opensearch.flowframework.transport.GetWorkflowTransportAction;
import org.opensearch.flowframework.transport.ProvisionWorkflowAction;
import org.opensearch.flowframework.transport.ProvisionWorkflowTransportAction;
import org.opensearch.flowframework.transport.SearchWorkflowAction;
Expand Down Expand Up @@ -92,10 +95,9 @@ public Collection<Object> createComponents(
flowFrameworkFeatureEnabledSetting = new FlowFrameworkFeatureEnabledSetting(clusterService, settings);

MachineLearningNodeClient mlClient = new MachineLearningNodeClient(client);
WorkflowStepFactory workflowStepFactory = new WorkflowStepFactory(clusterService, client, mlClient);
WorkflowProcessSorter workflowProcessSorter = new WorkflowProcessSorter(workflowStepFactory, threadPool);

FlowFrameworkIndicesHandler flowFrameworkIndicesHandler = new FlowFrameworkIndicesHandler(client, clusterService);
WorkflowStepFactory workflowStepFactory = new WorkflowStepFactory(clusterService, client, mlClient, flowFrameworkIndicesHandler);
WorkflowProcessSorter workflowProcessSorter = new WorkflowProcessSorter(workflowStepFactory, threadPool);

return ImmutableList.of(workflowStepFactory, workflowProcessSorter, flowFrameworkIndicesHandler);
}
Expand All @@ -113,7 +115,8 @@ public List<RestHandler> getRestHandlers(
return ImmutableList.of(
new RestCreateWorkflowAction(flowFrameworkFeatureEnabledSetting, settings, clusterService),
new RestProvisionWorkflowAction(flowFrameworkFeatureEnabledSetting),
new RestSearchWorkflowAction(flowFrameworkFeatureEnabledSetting)
new RestSearchWorkflowAction(flowFrameworkFeatureEnabledSetting),
new RestGetWorkflowAction(flowFrameworkFeatureEnabledSetting)
);
}

Expand All @@ -122,7 +125,8 @@ public List<RestHandler> getRestHandlers(
return ImmutableList.of(
new ActionHandler<>(CreateWorkflowAction.INSTANCE, CreateWorkflowTransportAction.class),
new ActionHandler<>(ProvisionWorkflowAction.INSTANCE, ProvisionWorkflowTransportAction.class),
new ActionHandler<>(SearchWorkflowAction.INSTANCE, SearchWorkflowTransportAction.class)
new ActionHandler<>(SearchWorkflowAction.INSTANCE, SearchWorkflowTransportAction.class),
new ActionHandler<>(GetWorkflowAction.INSTANCE, GetWorkflowTransportAction.class)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ private CommonValue() {}
public static final String USER_FIELD = "user";

/** The transport action name prefix */
public static final String TRANSPORT_ACION_NAME_PREFIX = "cluster:admin/opensearch/flow_framework/";
public static final String TRANSPORT_ACTION_NAME_PREFIX = "cluster:admin/opensearch/flow_framework/";
/** The base URI for this plugin's rest actions */
public static final String FLOW_FRAMEWORK_BASE_URI = "/_plugins/_flow_framework";
/** The URI for this plugin's workflow rest actions */
Expand Down Expand Up @@ -148,4 +148,9 @@ private CommonValue() {}
public static final String USER_OUTPUTS_FIELD = "user_outputs";
/** The template field name for template resources created */
public static final String RESOURCES_CREATED_FIELD = "resources_created";
/** The field name for the ResourceCreated's resource ID */
public static final String RESOURCE_ID_FIELD = "resource_id";
/** The field name for the ResourceCreated's resource name */
public static final String WORKFLOW_STEP_NAME = "workflow_step_name";

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.flowframework.model.State;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.model.WorkflowState;
import org.opensearch.script.Script;

import java.io.IOException;
import java.net.URL;
Expand Down Expand Up @@ -311,7 +312,7 @@ public void putInitialStateToWorkflowState(String workflowId, User user, ActionL
.state(State.NOT_STARTED.name())
.provisioningProgress(ProvisioningProgress.NOT_STARTED.name())
.user(user)
.resourcesCreated(Collections.emptyMap())
.resourcesCreated(Collections.emptyList())
.userOutputs(Collections.emptyMap())
.build();
initWorkflowStateIndexIfAbsent(ActionListener.wrap(indexCreated -> {
Expand Down Expand Up @@ -402,4 +403,38 @@ public void updateFlowFrameworkSystemIndexDoc(
}
}
}

/**
* Updates a document in the workflow state index
* @param indexName the index that we will be updating a document of.
* @param documentId the document ID
* @param script the given script to update doc
* @param listener action listener
*/
public void updateFlowFrameworkSystemIndexDocWithScript(
String indexName,
String documentId,
Script script,
ActionListener<UpdateResponse> listener
) {
if (!doesIndexExist(indexName)) {
String exceptionMessage = "Failed to update document for given workflow due to missing " + indexName + " index";
logger.error(exceptionMessage);
listener.onFailure(new Exception(exceptionMessage));
} else {
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
UpdateRequest updateRequest = new UpdateRequest(indexName, documentId);
// TODO: Also add ability to change other fields at the same time when adding detailed provision progress
updateRequest.script(script);
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
// TODO: decide what condition can be considered as an update conflict and add retry strategy
client.update(updateRequest, ActionListener.runBefore(listener, () -> context.restore()));
} catch (Exception e) {
logger.error("Failed to update {} entry : {}. {}", indexName, documentId, e.getMessage());
listener.onFailure(
new FlowFrameworkException("Failed to update " + indexName + "entry: " + documentId, ExceptionsHelper.status(e))
);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,7 @@ public enum ProvisioningProgress {
/** In Progress State */
IN_PROGRESS,
/** Done State */
DONE
DONE,
/** Failed State */
FAILED
}
123 changes: 123 additions & 0 deletions src/main/java/org/opensearch/flowframework/model/ResourceCreated.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.model;

import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;

import java.io.IOException;

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.flowframework.common.CommonValue.RESOURCE_ID_FIELD;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STEP_NAME;

/**
* This represents an object in the WorkflowState {@link WorkflowState}.
*/
// TODO: create an enum to add the resource name itself for each step example (create_connector_step -> connector)
public class ResourceCreated implements ToXContentObject, Writeable {

private final String workflowStepName;
private final String resourceId;

/**
* Create this resources created object with given resource name and ID.
* @param workflowStepName The workflow step name associating to the step where it was created
* @param resourceId The resources ID for relating to the created resource
*/
public ResourceCreated(String workflowStepName, String resourceId) {
this.workflowStepName = workflowStepName;
this.resourceId = resourceId;
}

/**
* Create this resources created object with an StreamInput
* @param input the input stream to read from
* @throws IOException if failed to read input stream
*/
public ResourceCreated(StreamInput input) throws IOException {
this.workflowStepName = input.readString();
this.resourceId = input.readString();
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
XContentBuilder xContentBuilder = builder.startObject()
.field(WORKFLOW_STEP_NAME, workflowStepName)
.field(RESOURCE_ID_FIELD, resourceId);
return xContentBuilder.endObject();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(workflowStepName);
out.writeString(resourceId);
}

/**
* Gets the resource id.
*
* @return the resourceId.
*/
public String resourceId() {
return resourceId;
}

/**
* Gets the workflow step name associated to the created resource
*
* @return the workflowStepName.
*/
public String workflowStepName() {
return workflowStepName;
}

/**
* Parse raw JSON content into a ResourceCreated instance.
*
* @param parser JSON based content parser
* @return the parsed ResourceCreated instance
* @throws IOException if content can't be parsed correctly
*/
public static ResourceCreated parse(XContentParser parser) throws IOException {
String workflowStepName = null;
String resourceId = null;

ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String fieldName = parser.currentName();
parser.nextToken();

switch (fieldName) {
case WORKFLOW_STEP_NAME:
workflowStepName = parser.text();
break;
case RESOURCE_ID_FIELD:
resourceId = parser.text();
break;
default:
throw new IOException("Unable to parse field [" + fieldName + "] in a resources_created object.");
}
}
if (workflowStepName == null || resourceId == null) {
throw new IOException("A ResourceCreated object requires both a workflowStepName and resourceId.");
}
return new ResourceCreated(workflowStepName, resourceId);
}

@Override
public String toString() {
return "resources_Created [resource_name=" + workflowStepName + ", id=" + resourceId + "]";
}

}
21 changes: 20 additions & 1 deletion src/main/java/org/opensearch/flowframework/model/Template.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.common.xcontent.yaml.YamlXContent;
import org.opensearch.commons.authuser.User;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
Expand All @@ -39,7 +41,7 @@
/**
* The Template is the central data structure which configures workflows. This object is used to parse JSON communicated via REST API.
*/
public class Template implements ToXContentObject {
public class Template implements ToXContentObject, Writeable {

private String name;
private String description;
Expand Down Expand Up @@ -240,6 +242,23 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return xContentBuilder.endObject();
}

// TODO: fix writeable when implementing get workflow API
@Override
public void writeTo(StreamOutput output) throws IOException {
output.writeString(name);
output.writeOptionalString(description);
output.writeString(useCase);
output.writeVersion(templateVersion);
// output.writeList((List<? extends Writeable>) compatibilityVersion);
output.writeMapWithConsistentOrder(workflows);
if (user != null) {
output.writeBoolean(true); // user exists
user.writeTo(output);
} else {
output.writeBoolean(false); // user does not exist
}
}

/**
* Parse raw json content into a Template instance.
*
Expand Down
Loading

0 comments on commit aa7f508

Please sign in to comment.