diff --git a/.github/workflows/test_bwc.yml b/.github/workflows/test_bwc.yml new file mode 100644 index 000000000..b5f2e2d16 --- /dev/null +++ b/.github/workflows/test_bwc.yml @@ -0,0 +1,45 @@ +name: BWC +on: + push: + branches: + - "**" + pull_request: + branches: + - "**" + +jobs: + Build-ff-linux: + strategy: + matrix: + java: [11,17,21] + fail-fast: false + + name: Test Flow Framework BWC + runs-on: ubuntu-latest + + steps: + - name: Setup Java ${{ matrix.java }} + uses: actions/setup-java@v4 + with: + distribution: 'temurin' + java-version: ${{ matrix.java }} + + - name: Checkout Flow Framework + uses: actions/checkout@v4 + + - name: Assemble Flow Framework + run: | + plugin_version=`./gradlew properties -q | grep "opensearch_build:" | awk '{print $2}'` + echo plugin_version $plugin_version + ./gradlew assemble + echo "Creating ./src/test/resources/org/opensearch/flowframework/bwc/flow-framework/$plugin_version ..." + mkdir -p ./src/test/resources/org/opensearch/flowframework/bwc/flow-framework/$plugin_version + echo "Copying ./build/distributions/*.zip to ./src/test/resources/org/opensearch/flowframework/bwc/flow-framework/$plugin_version ..." + ls ./build/distributions/ + cp ./build/distributions/*.zip ./src/test/resources/org/opensearch/flowframework/bwc/flow-framework/$plugin_version + echo "Copied ./build/distributions/*.zip to ./src/test/resources/org/opensearch/flowframework/bwc/flow-framework/$plugin_version ..." + ls ./src/test/resources/org/opensearch/flowframework/bwc/flow-framework/$plugin_version + - name: Run Flow Framework Backwards Compatibility Tests + run: | + echo "Running backwards compatibility tests ..." + ./gradlew bwcTestSuite -Dtests.security.manager=false diff --git a/build.gradle b/build.gradle index a652cca1f..2f008a1c6 100644 --- a/build.gradle +++ b/build.gradle @@ -1,5 +1,6 @@ import java.nio.file.Files import org.opensearch.gradle.testclusters.OpenSearchCluster +import org.opensearch.gradle.testclusters.StandaloneRestIntegTestTask import org.opensearch.gradle.test.RestIntegTestTask import java.util.concurrent.Callable import java.nio.file.Paths @@ -23,6 +24,16 @@ buildscript { opensearch_no_snapshot = opensearch_build.replace("-SNAPSHOT","") System.setProperty('tests.security.manager', 'false') common_utils_version = System.getProperty("common_utils.version", opensearch_build) + + bwcVersionShort = "2.12.0" + bwcVersion = bwcVersionShort + ".0" + bwcOpenSearchFFDownload = 'https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/' + bwcVersionShort + '/latest/linux/x64/tar/builds/' + + 'opensearch/plugins/opensearch-flow-framework-' + bwcVersion + '.zip' + baseName = "ffBwcCluster" + bwcFilePath = "src/test/resources/org/opensearch/flowframework/bwc/" + bwcFlowFrameworkPath = bwcFilePath + "flowframework/" + + isSameMajorVersion = opensearch_version.split("\\.")[0] == bwcVersionShort.split("\\.")[0] } repositories { @@ -78,6 +89,9 @@ dependencyLicenses.enabled = false // This requires an additional Jar not published as part of build-tools loggerUsageCheck.enabled = false thirdPartyAudit.enabled = false +// Allow test cases to be named Tests without having to be inherited from LuceneTestCase. +// see https://github.com/elastic/elasticsearch/blob/323f312bbc829a63056a79ebe45adced5099f6e6/buildSrc/src/main/java/org/elasticsearch/gradle/precommit/TestingConventionsTasks.java +testingConventions.enabled = false // No need to validate pom, as we do not upload to maven/sonatype validateNebulaPom.enabled = false @@ -192,6 +206,12 @@ jacocoTestReport { } tasks.named("check").configure { dependsOn(jacocoTestReport) } +tasks.named("yamlRestTest").configure { + filter { + excludeTestsMatching "org.opensearch.flowframework.rest.*IT" + excludeTestsMatching "org.opensearch.flowframework.bwc.*IT" + } +} // Set up integration tests task integTest(type: RestIntegTestTask) { @@ -231,6 +251,13 @@ integTest { } } + // Exclude BWC tests, run separately + if (System.getProperty("tests.rest.bwcsuite") == null) { + filter { + excludeTestsMatching "org.opensearch.flowframework.bwc.*IT" + } + } + // Exclude integration tests that require security plugin if (System.getProperty("https") == null || System.getProperty("https") == "false") { filter { @@ -425,6 +452,166 @@ task integTestRemote(type: RestIntegTestTask) { } } +2.times {i -> + testClusters { + "${baseName}$i" { + testDistribution = "ARCHIVE" + versions = [bwcVersionShort, opensearch_version] + numberOfNodes = 3 + plugin(provider(new Callable(){ + @Override + RegularFile call() throws Exception { + return new RegularFile() { + @Override + File getAsFile() { + if (new File("$project.rootDir/$bwcFilePath/flow-framework/$bwcVersion").exists()) { + project.delete(files("$project.rootDir/$bwcFilePath/flow-framework/$bwcVersion")) + } + project.mkdir bwcFlowFrameworkPath + bwcVersion + ant.get(src: bwcOpenSearchFFDownload, + dest: bwcFlowFrameworkPath + bwcVersion, + httpusecaches: false) + return fileTree(bwcFlowFrameworkPath + bwcVersion).getSingleFile() + } + } + } + })) + setting 'path.repo', "${buildDir}/cluster/shared/repo/${baseName}" + setting 'http.content_type.required', 'true' + } + } +} + +List> plugins = [ + provider(new Callable(){ + @Override + RegularFile call() throws Exception { + return new RegularFile() { + @Override + File getAsFile() { + return configurations.zipArchive.asFileTree.getSingleFile() + } + } + } + }), + provider(new Callable(){ + @Override + RegularFile call() throws Exception { + return new RegularFile() { + @Override + File getAsFile() { + return fileTree(bwcFilePath + "flow-framework/" + project.version).getSingleFile() + } + } + } + }) + ] + +// Creates 2 test clusters with 3 nodes of the old version. +2.times {i -> + task "${baseName}#oldVersionClusterTask$i"(type: StandaloneRestIntegTestTask) { + onlyIf { isSameMajorVersion || (i == 1) } + useCluster testClusters."${baseName}$i" + filter { + includeTestsMatching "org.opensearch.flowframework.bwc.*IT" + } + systemProperty 'tests.rest.bwcsuite', 'old_cluster' + systemProperty 'tests.rest.bwcsuite_round', 'old' + systemProperty 'tests.plugin_bwc_version', bwcVersion + nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}$i".allHttpSocketURI.join(",")}") + nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}$i".getName()}") + } +} + +// Upgrades one node of the old cluster to new OpenSearch version with upgraded plugin version +// This results in a mixed cluster with 2 nodes on the old version and 1 upgraded node. +// This is also used as a one third upgraded cluster for a rolling upgrade. +task "${baseName}#mixedClusterTask"(type: StandaloneRestIntegTestTask) { + onlyIf { isSameMajorVersion } + useCluster testClusters."${baseName}0" + dependsOn "${baseName}#oldVersionClusterTask0" + doFirst { + testClusters."${baseName}0".upgradeNodeAndPluginToNextVersion(plugins) + } + filter { + includeTestsMatching "org.opensearch.flowframework.bwc.*IT" + } + systemProperty 'tests.rest.bwcsuite', 'mixed_cluster' + systemProperty 'tests.rest.bwcsuite_round', 'first' + systemProperty 'tests.plugin_bwc_version', bwcVersion + nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}0".allHttpSocketURI.join(",")}") + nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}0".getName()}") +} + +// Upgrades the second node to new OpenSearch version with upgraded plugin version after the first node is upgraded. +// This results in a mixed cluster with 1 node on the old version and 2 upgraded nodes. +// This is used for rolling upgrade. +task "${baseName}#twoThirdsUpgradedClusterTask"(type: StandaloneRestIntegTestTask) { + onlyIf { isSameMajorVersion } + dependsOn "${baseName}#mixedClusterTask" + useCluster testClusters."${baseName}0" + doFirst { + testClusters."${baseName}0".upgradeNodeAndPluginToNextVersion(plugins) + } + filter { + includeTestsMatching "org.opensearch.flowframework.bwc.*IT" + } + systemProperty 'tests.rest.bwcsuite', 'mixed_cluster' + systemProperty 'tests.rest.bwcsuite_round', 'second' + systemProperty 'tests.plugin_bwc_version', bwcVersion + nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}0".allHttpSocketURI.join(",")}") + nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}0".getName()}") +} + +// Upgrades the third node to new OpenSearch version with upgraded plugin version after the second node is upgraded. +// This results in a fully upgraded cluster. +// This is used for rolling upgrade. +task "${baseName}#rollingUpgradeClusterTask"(type: StandaloneRestIntegTestTask) { + onlyIf { isSameMajorVersion } + dependsOn "${baseName}#twoThirdsUpgradedClusterTask" + useCluster testClusters."${baseName}0" + doFirst { + testClusters."${baseName}0".upgradeNodeAndPluginToNextVersion(plugins) + } + filter { + includeTestsMatching "org.opensearch.flowframework.bwc.*IT" + } + mustRunAfter "${baseName}#mixedClusterTask" + systemProperty 'tests.rest.bwcsuite', 'mixed_cluster' + systemProperty 'tests.rest.bwcsuite_round', 'third' + systemProperty 'tests.plugin_bwc_version', bwcVersion + nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}0".allHttpSocketURI.join(",")}") + nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}0".getName()}") +} + +// Upgrades all the nodes of the old cluster to new OpenSearch version with upgraded plugin version +// at the same time resulting in a fully upgraded cluster. +task "${baseName}#fullRestartClusterTask"(type: StandaloneRestIntegTestTask) { + dependsOn "${baseName}#oldVersionClusterTask1" + useCluster testClusters."${baseName}1" + doFirst { + testClusters."${baseName}1".upgradeAllNodesAndPluginsToNextVersion(plugins) + } + filter { + includeTestsMatching "org.opensearch.flowframework.bwc.*IT" + } + systemProperty 'tests.rest.bwcsuite', 'upgraded_cluster' + systemProperty 'tests.plugin_bwc_version', bwcVersion + nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}1".allHttpSocketURI.join(",")}") + nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}1".getName()}") +} + +// A bwc test suite which runs all the bwc tasks combined. +task bwcTestSuite(type: StandaloneRestIntegTestTask) { + filter { + excludeTestsMatching '**.*Test*' + excludeTestsMatching '**.*IT*' + setFailOnNoMatchingTests(false) + } + dependsOn tasks.named("${baseName}#mixedClusterTask") + dependsOn tasks.named("${baseName}#rollingUpgradeClusterTask") + dependsOn tasks.named("${baseName}#fullRestartClusterTask") +} // test retry configuration allprojects { @@ -438,6 +625,11 @@ allprojects { } } } + // Needed for Gradle 9.0 + tasks.withType(StandaloneRestIntegTestTask).configureEach { + testClassesDirs = sourceSets.test.output.classesDirs + classpath = sourceSets.test.runtimeClasspath + } } // Automatically sets up the integration test cluster locally diff --git a/src/main/java/org/opensearch/flowframework/model/Template.java b/src/main/java/org/opensearch/flowframework/model/Template.java index 8c1732a12..54a1eb65d 100644 --- a/src/main/java/org/opensearch/flowframework/model/Template.java +++ b/src/main/java/org/opensearch/flowframework/model/Template.java @@ -101,8 +101,8 @@ public Template( this.workflows = Map.copyOf(workflows); this.uiMetadata = uiMetadata; this.user = user; - this.createdTime = createdTime == null ? Instant.now() : createdTime; - this.lastUpdatedTime = (lastUpdatedTime == null || lastUpdatedTime.isBefore(this.createdTime)) ? this.createdTime : lastUpdatedTime; + this.createdTime = createdTime; + this.lastUpdatedTime = lastUpdatedTime; this.lastProvisionedTime = lastProvisionedTime; } diff --git a/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java index f987d7948..cd4b06422 100644 --- a/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java @@ -96,6 +96,7 @@ public CreateWorkflowTransportAction( protected void doExecute(Task task, WorkflowRequest request, ActionListener listener) { User user = getUserContext(client); + Instant creationTime = Instant.now(); Template templateWithUser = new Template( request.getTemplate().name(), request.getTemplate().description(), @@ -105,9 +106,9 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener> responseMap = (Map>) getAsMap(uri).get("nodes"); + for (Map response : responseMap.values()) { + List> plugins = (List>) response.get("plugins"); + Set pluginNames = plugins.stream().map(map -> map.get("name")).collect(Collectors.toSet()); + String workflowId = createNoopTemplate(); + Template t = getTemplate(workflowId); + switch (CLUSTER_TYPE) { + case OLD: + assertTrue(pluginNames.contains("opensearch-flow-framework")); + // mapping for 2.12 does not include time stamps + assertNull(t.createdTime()); + assertNull(t.lastUpdatedTime()); + assertNull(t.lastProvisionedTime()); + break; + case MIXED: + assertTrue(pluginNames.contains("opensearch-flow-framework")); + // Time stamps may or may not be null depending on whether index has been accessed by new version node + // So just test that the template parses + assertNull(t.lastProvisionedTime()); + break; + case UPGRADED: + assertTrue(pluginNames.contains("opensearch-flow-framework")); + // mapping for 2.13+ includes time stamps + assertNotNull(t.createdTime()); + assertEquals(t.createdTime(), t.lastUpdatedTime()); + assertNull(t.lastProvisionedTime()); + break; + } + break; + } + } + + private String getUri() { + switch (CLUSTER_TYPE) { + case OLD: + return "_nodes/" + CLUSTER_NAME + "-0/plugins"; + case MIXED: + String round = System.getProperty("tests.rest.bwcsuite_round"); + if (round.equals("second")) { + return "_nodes/" + CLUSTER_NAME + "-1/plugins"; + } else if (round.equals("third")) { + return "_nodes/" + CLUSTER_NAME + "-2/plugins"; + } else { + return "_nodes/" + CLUSTER_NAME + "-0/plugins"; + } + case UPGRADED: + return "_nodes/plugins"; + default: + throw new AssertionError("unknown cluster type: " + CLUSTER_TYPE); + } + } + + private String createNoopTemplate() throws IOException, ParseException { + Response response = TestHelpers.makeRequest( + client(), + "POST", + "_plugins/_flow_framework/workflow", + null, + "{\"name\":\"test\", \"workflows\":{\"provision\": {\"nodes\": [{\"id\":\"test-step\", \"type\":\"noop\"}]}}}", + List.of(new BasicHeader(HttpHeaders.USER_AGENT, "")) + ); + assertEquals(RestStatus.CREATED.getStatus(), response.getStatusLine().getStatusCode()); + + Map responseMap = entityAsMap(response); + String workflowId = (String) responseMap.get(WORKFLOW_ID); + assertNotNull(workflowId); + return workflowId; + } + + private Template getTemplate(String workflowId) throws IOException, ParseException { + Response response = TestHelpers.makeRequest( + client(), + "GET", + "_plugins/_flow_framework/workflow/" + workflowId, + null, + "", + List.of(new BasicHeader(HttpHeaders.USER_AGENT, "")) + ); + assertEquals(RestStatus.OK.getStatus(), response.getStatusLine().getStatusCode()); + + String body = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8); + return Template.parse(body); + } +} diff --git a/src/test/java/org/opensearch/flowframework/model/TemplateTests.java b/src/test/java/org/opensearch/flowframework/model/TemplateTests.java index d39455fd8..63dbf31f6 100644 --- a/src/test/java/org/opensearch/flowframework/model/TemplateTests.java +++ b/src/test/java/org/opensearch/flowframework/model/TemplateTests.java @@ -41,6 +41,7 @@ public void testTemplate() throws IOException { Workflow workflow = new Workflow(Map.of("key", "value"), nodes, edges); Map uiMetadata = null; + Instant now = Instant.now(); Template template = new Template( "test", "a test template", @@ -50,8 +51,8 @@ public void testTemplate() throws IOException { Map.of("workflow", workflow), uiMetadata, null, - null, - null, + now, + now, null ); @@ -63,9 +64,8 @@ public void testTemplate() throws IOException { assertEquals(uiMetadata, template.getUiMetadata()); Workflow wf = template.workflows().get("workflow"); assertNotNull(wf); - assertTrue(template.createdTime().isAfter(Instant.now().minusSeconds(10))); - assertFalse(template.createdTime().isAfter(Instant.now())); - assertEquals(template.createdTime(), template.lastUpdatedTime()); + assertEquals(now, template.createdTime()); + assertEquals(now, template.lastUpdatedTime()); assertNull(template.lastProvisionedTime()); assertEquals("Workflow [userParams={key=value}, nodes=[A, B], edges=[A->B]]", wf.toString()); @@ -80,9 +80,8 @@ public void testTemplate() throws IOException { assertEquals(uiMetadata, templateX.getUiMetadata()); Workflow wfX = templateX.workflows().get("workflow"); assertNotNull(wfX); - assertTrue(template.createdTime().isAfter(Instant.now().minusSeconds(10))); - assertFalse(template.createdTime().isAfter(Instant.now())); - assertEquals(template.createdTime(), template.lastUpdatedTime()); + assertEquals(now, template.createdTime()); + assertEquals(now, template.lastUpdatedTime()); assertNull(template.lastProvisionedTime()); assertEquals("Workflow [userParams={key=value}, nodes=[A, B], edges=[A->B]]", wfX.toString()); } diff --git a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java index d6c3ffa65..67e2041d5 100644 --- a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java +++ b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java @@ -8,6 +8,7 @@ */ package org.opensearch.flowframework.rest; +import org.apache.hc.core5.http.io.entity.EntityUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.action.search.SearchResponse; @@ -27,6 +28,8 @@ import org.junit.Before; import org.junit.ComparisonFailure; +import java.nio.charset.StandardCharsets; +import java.time.Instant; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -313,4 +316,48 @@ public void testCreateAndProvisionAgentFrameworkWorkflow() throws Exception { assertEquals(RestStatus.OK, TestHelpers.restStatus(deleteResponse)); } + public void testTimestamps() throws Exception { + Template noopTemplate = TestHelpers.createTemplateFromFile("noop.json"); + // Create the template, should have created and updated matching + Response response = createWorkflow(client(), noopTemplate); + Map responseMap = entityAsMap(response); + String workflowId = (String) responseMap.get(WORKFLOW_ID); + assertNotNull(workflowId); + + response = getWorkflow(client(), workflowId); + assertEquals(RestStatus.OK.getStatus(), response.getStatusLine().getStatusCode()); + Template t = Template.parse(EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8)); + Instant createdTime = t.createdTime(); + Instant lastUpdatedTime = t.lastUpdatedTime(); + assertNotNull(createdTime); + assertEquals(createdTime, lastUpdatedTime); + assertNull(t.lastProvisionedTime()); + + // Update the template, should have created same as before and updated newer + response = updateWorkflow(client(), workflowId, noopTemplate); + assertEquals(RestStatus.CREATED.getStatus(), response.getStatusLine().getStatusCode()); + + response = getWorkflow(client(), workflowId); + assertEquals(RestStatus.OK.getStatus(), response.getStatusLine().getStatusCode()); + t = Template.parse(EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8)); + assertEquals(createdTime, t.createdTime()); + assertTrue(t.lastUpdatedTime().isAfter(lastUpdatedTime)); + lastUpdatedTime = t.lastUpdatedTime(); + assertNull(t.lastProvisionedTime()); + + // Provision the template, should have created and updated same as before and provisioned newer + response = provisionWorkflow(client(), workflowId); + assertEquals(RestStatus.OK.getStatus(), response.getStatusLine().getStatusCode()); + + response = getWorkflow(client(), workflowId); + assertEquals(RestStatus.OK.getStatus(), response.getStatusLine().getStatusCode()); + t = Template.parse(EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8)); + assertEquals(createdTime, t.createdTime()); + assertEquals(lastUpdatedTime, t.lastUpdatedTime()); + assertTrue(t.lastProvisionedTime().isAfter(lastUpdatedTime)); + + // Clean up + response = deleteWorkflow(client(), workflowId); + assertEquals(RestStatus.OK.getStatus(), response.getStatusLine().getStatusCode()); + } } diff --git a/src/test/resources/template/noop.json b/src/test/resources/template/noop.json new file mode 100644 index 000000000..c0675151c --- /dev/null +++ b/src/test/resources/template/noop.json @@ -0,0 +1,13 @@ +{ + "name": "noop", + "workflows": { + "provision": { + "nodes": [ + { + "id": "no-op", + "type": "noop" + } + ] + } + } +}