Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Topological Sorting and Sequenced Execution #26

Merged
merged 12 commits into from
Sep 19, 2023
Merged

Topological Sorting and Sequenced Execution #26

merged 12 commits into from
Sep 19, 2023

Conversation

dbwiddis
Copy link
Member

@dbwiddis dbwiddis commented Sep 8, 2023

Description

Uses Topological Sorting to parse a JSON graph description of nodes and edges into a sequence of processes. Uses asynchronous execution with CompletableFutures to execute the processes.

Example input:

{
    "sequence": {
        "nodes": [
            {
                "id": "fetch_model"
            },
            {
                "id": "create_ingest_pipeline"
            },
            {
                "id": "create_search_pipeline"
            },
            {
                "id": "create_neural_search_index"
            }
        ],
        "edges": [
            {
                "source": "fetch_model",
                "dest": "create_ingest_pipeline"
            },
            {
                "source": "fetch_model",
                "dest": "create_search_pipeline"
            },
            {
                "source": "create_ingest_pipeline",
                "dest": "create_neural_search_index"
            },
            {
                "source": "create_search_pipeline",
                "dest": "create_neural_search_index"
            }
        ]
    }
}

Example output showing sequencing with parallel paths:

17:22:34.597 INFO  o.o.f.demo.Demo - Parsing graph to sequence...
17:22:34.634 DEBUG o.o.f.template.TemplateParser - Start node(s): [fetch_model]
17:22:34.635 DEBUG o.o.f.template.TemplateParser - Execution sequence: [fetch_model, create_search_pipeline, create_ingest_pipeline, create_neural_search_index]
17:22:34.636 INFO  o.o.f.demo.Demo - Queueing process [fetch_model]. Can start immediately!
17:22:34.638 INFO  o.o.f.demo.Demo - Queueing process [create_search_pipeline]. Must wait for [fetch_model] to complete first.
17:22:34.638 DEBUG o.o.f.template.ProcessNode - >>> Starting fetch_model.
17:22:34.638 INFO  o.o.f.demo.Demo - Queueing process [create_ingest_pipeline]. Must wait for [fetch_model] to complete first.
17:22:34.639 INFO  o.o.f.demo.Demo - Queueing process [create_neural_search_index]. Must wait for [create_ingest_pipeline, create_search_pipeline] to complete first.
17:22:37.640 DEBUG o.o.f.template.ProcessNode - <<< Completed fetch_model
17:22:37.640 DEBUG o.o.f.template.ProcessNode - >>> Starting create_ingest_pipeline.
17:22:37.640 DEBUG o.o.f.template.ProcessNode - >>> Starting create_search_pipeline.
17:22:40.644 DEBUG o.o.f.template.ProcessNode - <<< Completed create_ingest_pipeline
17:22:42.644 DEBUG o.o.f.template.ProcessNode - <<< Completed create_search_pipeline
17:22:42.644 DEBUG o.o.f.template.ProcessNode - >>> Starting create_neural_search_index.
17:22:44.648 DEBUG o.o.f.template.ProcessNode - <<< Completed create_neural_search_index
17:22:44.648 INFO  o.o.f.demo.Demo - All done!

Example showing initialized data:

{
    "sequence": {
        "nodes": [
            {
                "id": "create_index",
                "index_name": "demo"
            },
            {
                "id": "create_another_index",
                "index_name": "second_demo"
            }
        ],
        "edges": [
            {
                "source": "create_index",
                "dest": "create_another_index"
            }
        ]
    }
}

And the processing:

10:37:55.530 INFO  o.o.f.demo.DataDemo - Parsing graph to sequence...
10:37:55.594 DEBUG o.o.f.template.TemplateParser - Start node(s): [create_index]
10:37:55.595 DEBUG o.o.f.template.TemplateParser - Execution sequence: [create_index, create_another_index]
10:37:55.595 INFO  o.o.f.demo.DataDemo - Queueing process [create_index]. Can start immediately!
10:37:55.599 INFO  o.o.f.demo.DataDemo - Queueing process [create_another_index]. Must wait for [create_index] to complete first.
10:37:55.599 DEBUG o.o.f.template.ProcessNode - >>> Starting create_index.
10:37:55.684 DEBUG o.o.f.demo.CreateIndexWorkflowStep - Initialization sent params: {index=demo}, content: {aliases=[], settings={}, mappings={}}
10:37:57.699 DEBUG o.o.f.template.ProcessNode - <<< Completed create_index
10:37:57.699 DEBUG o.o.f.template.ProcessNode - >>> Starting create_another_index.
10:37:57.700 DEBUG o.o.f.demo.CreateIndexWorkflowStep - Initialization sent params: {index=second_demo}, content: {aliases=[], settings={}, mappings={}}
10:37:57.700 DEBUG o.o.f.demo.CreateIndexWorkflowStep - Previous step sent params: {}, content: {index=demo}
10:37:59.705 DEBUG o.o.f.template.ProcessNode - <<< Completed create_another_index
10:37:59.705 INFO  o.o.f.demo.DataDemo - All done!

Issues Resolved

Fixes #17
Fixes #19
Fixes #29
Fixes #39

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Copy link
Member

@owaiskazi19 owaiskazi19 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First pass review

@dbwiddis dbwiddis mentioned this pull request Sep 11, 2023
4 tasks
@codecov
Copy link

codecov bot commented Sep 16, 2023

Codecov Report

Merging #26 (d1cbcbd) into main (0f0b65d) will increase coverage by 74.26%.
The diff coverage is 76.51%.

@@             Coverage Diff              @@
##              main      #26       +/-   ##
============================================
+ Coverage     0.00%   74.26%   +74.26%     
- Complexity       0       42       +42     
============================================
  Files            2        6        +4     
  Lines            4      136      +132     
  Branches         0       16       +16     
============================================
+ Hits             0      101      +101     
- Misses           4       25       +21     
- Partials         0       10       +10     
Files Changed Coverage Δ
...opensearch/flowframework/template/ProcessNode.java 60.71% <60.71%> (ø)
...ch/flowframework/template/ProcessSequenceEdge.java 69.23% <69.23%> (ø)
...nsearch/flowframework/template/TemplateParser.java 91.66% <91.66%> (ø)
...pensearch/flowframework/workflow/WorkflowData.java 100.00% <100.00%> (ø)

@dbwiddis
Copy link
Member Author

Merging #26 (136ceaa) into main (0f0b65d) will increase coverage by 74.26%.

I hereby claim the record for most code coverage improvement in this project ever!

Copy link
Member

@joshpalis joshpalis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initial pass

Copy link
Member

@owaiskazi19 owaiskazi19 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for putting this together. Did a thorough review this time :)

@owaiskazi19
Copy link
Member

I see this PR fixes #19 as well but I don't see any RestHandler for the execution API. Am I missing something?

Signed-off-by: Daniel Widdis <[email protected]>
@dbwiddis
Copy link
Member Author

I see this PR fixes #19 as well but I don't see any RestHandler for the execution API. Am I missing something?

It completes all the items listed in #19's description.

API in this context is not referring to a REST API

@owaiskazi19
Copy link
Member

owaiskazi19 commented Sep 19, 2023

API in this context is not referring to a REST API

Won't we need a REST API though for frontend/users to trigger the process sequencing which is already handled in this PR? Don't want to hold this PR but we should create a REST API for the execution. How about handling it in #19 and keeping the issue the open?

@dbwiddis
Copy link
Member Author

Won't we need a REST API though for frontend to trigger the process sequencing which is already handled in this PR?

Yes, we'll need a REST call to send the template in that we can't do until we have the template design finalized. I see a few scattered REST API issues, is there a META with all of the required ones listed somewhere?

@owaiskazi19
Copy link
Member

owaiskazi19 commented Sep 19, 2023

Yes, we'll need a REST call to send the template in that we can't do until we have the template design finalized. I see a few scattered REST API issues, is there a META with all of the required ones listed somewhere?

I was under the impression #19 is for the REST execution API. Based on our decided design here we will need 2 APIs:

  1. Execute API for frontend/users to trigger the worklow and pass use case template in the payload.
  2. Orchestrate API to handle the operations to be done by the workflow.

I'll go ahead and approve the PR and we can have a new issue for the execution API.

Copy link
Member

@owaiskazi19 owaiskazi19 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing the feedback patiently. LGTM!

@dbwiddis
Copy link
Member Author

I was under the impression #19 is for the REST execution API. Based on our decided design here we will need 2 APIs:

  1. Execute API for frontend/users to trigger the worklow and pass use case template in the payload.
  2. Orchestrate API to handle the operations to be done by the workflow.

So exection is by orchestrate? That's what triggers the workflow using internal client stuff.

The template APIs would be CRUD.

We should definitely clarify all this in a new meta issue summarizing the various RFC comments.

@owaiskazi19
Copy link
Member

owaiskazi19 commented Sep 19, 2023

So exection is by orchestrate? That's what triggers the workflow using internal client stuff.

Execution is basically setting up the workflow which should be done by execution API. The operations to run on the workflow like Search/ingest would be done using orchestrate API.

The template APIs would be CRUD.

The execution API should be able to pass the created use case template from frontend to backend as payload.

@dbwiddis dbwiddis added the backport 2.x backport PRs to 2.x branch label Sep 19, 2023
@dbwiddis dbwiddis merged commit a574f47 into opensearch-project:main Sep 19, 2023
11 checks passed
@dbwiddis dbwiddis deleted the topo-sort branch September 19, 2023 17:51
opensearch-trigger-bot bot pushed a commit that referenced this pull request Sep 19, 2023
* Topological Sorting and Sequenced Execution

Signed-off-by: Daniel Widdis <[email protected]>

* Add javadocs

Signed-off-by: Daniel Widdis <[email protected]>

* Update demo to link to Workflow interface

Signed-off-by: Daniel Widdis <[email protected]>

* Replace System.out with logging

Signed-off-by: Daniel Widdis <[email protected]>

* Update with new interface signatures

Signed-off-by: Daniel Widdis <[email protected]>

* Demo passing input data at parse-time

Signed-off-by: Daniel Widdis <[email protected]>

* Demo passing data in between steps

Signed-off-by: Daniel Widdis <[email protected]>

* Change execute arg to list and refactor demo classes to own package

Signed-off-by: Daniel Widdis <[email protected]>

* Significantly simplify input/output data passing

Signed-off-by: Daniel Widdis <[email protected]>

* Add tests

Signed-off-by: Daniel Widdis <[email protected]>

* Fix javadocs and forbidden API issues

Signed-off-by: Daniel Widdis <[email protected]>

* Address code review comments

Signed-off-by: Daniel Widdis <[email protected]>

---------

Signed-off-by: Daniel Widdis <[email protected]>
(cherry picked from commit a574f47)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
joshpalis pushed a commit that referenced this pull request Sep 19, 2023
Topological Sorting and Sequenced Execution (#26)

* Topological Sorting and Sequenced Execution



* Add javadocs



* Update demo to link to Workflow interface



* Replace System.out with logging



* Update with new interface signatures



* Demo passing input data at parse-time



* Demo passing data in between steps



* Change execute arg to list and refactor demo classes to own package



* Significantly simplify input/output data passing



* Add tests



* Fix javadocs and forbidden API issues



* Address code review comments



---------


(cherry picked from commit a574f47)

Signed-off-by: Daniel Widdis <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backport 2.x backport PRs to 2.x branch
Projects
None yet
5 participants