Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Workflow] Add examples for reuse ID policy #4237

Closed
wants to merge 7 commits into from
Closed
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,179 @@ There are several different kinds of tasks that a workflow can schedule, includi

### Workflow identity

Each workflow you define has a type name, and individual executions of a workflow require a unique _instance ID_. Workflow instance IDs can be generated by your app code, which is useful when workflows correspond to business entities like documents or jobs, or can be auto-generated UUIDs. A workflow's instance ID is useful for debugging and also for managing workflows using the [Workflow APIs]({{< ref workflow_api.md >}}).
Each workflow you define has a type name, and individual executions of a workflow require a unique _instance ID_. Only one workflow instance with a given ID can exist at any given time. Workflow instance IDs can be generated by your app code, which is useful when workflows correspond to business entities like documents or jobs, or can be auto-generated UUIDs. A workflow's instance ID is useful for debugging and also for managing workflows using the [Workflow APIs]({{< ref workflow_api.md >}}).

Only one workflow instance with a given ID can exist at any given time. However, if a workflow instance completes or fails, its ID can be reused by a new workflow instance. Note, however, that the new workflow instance effectively replaces the old one in the configured state store.
#### Reusing workflow identities

If a workflow instance completes or fails, its ID can be reused by a new workflow instance. The new workflow instance effectively replaces the old one in the configured state store.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear here, you can set a policy called "api.REUSE_ID_ACTION_TERMINATE" that does force an existing workflow to terminate so you can start a new workflow with the same id.

Is this correct? Little unclear here in the wording. If so, then it would also be good to have a list of the reuse ID policies somewhere in the docs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes that's correct

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kaibocai - Can you then provide a list of the api.REUSE_ID_ACTION_XX values and what each one does in a table?


You can use the following policies to reuse workflow IDs:

| Policies | Description |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So there are only two Workflow reuse ID policies? What is the default if not set?

Why would use REUSE_ID_ACTION_IGNORE? This just means that a new workflow can never be started if there is an existing one already?

| -------- | ----------- |
| `api.REUSE_ID_ACTION_IGNORE` | Ignores the a new workflow being created with the same ID as an existing workflow if the existing workflow is `RUNNING`, `COMPLETED`, or `PENDING`. |
| `api.REUSE_ID_ACTION_TERMINATE` | Terminates a new workflow created with the same ID as an existing workflow if the existing workflow is `RUNNING`, `COMPLETED`, or `PENDING`. |

**Example 1**

The following example demonstrates the default behavior, erroring out the workflow when reusing the workflow ID. In the example:

1. The workflow calls a single activity with orchestration ID reuse policy.
1. The reuse ID policy specifies the action `IGNORE_IF_RUNNING_OR_COMPLETED` and the target statuses of `RUNNING`, `COMPLETED`, `PENDING`.
1. The second call to create a workflow with the same instance ID throws an error when trying to reuse the workflow ID.

```go
func main() {
r := task.NewTaskRegistry()
r.AddOrchestratorN("SingleActivity", func(ctx *task.OrchestrationContext) (any, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code sample is for the Durable Task SDK. We should instead provide a code sample for Dapr Workflow.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, please rework this code to use Workflow and keep this as simple as needed. I suggest having this in at least 2 languages also so that this is open to more developers to understand (at least one of .NET or Java

var input string
if err := ctx.GetInput(&input); err != nil {
return nil, err
}
var output string
err := ctx.CallActivity("SayHello", task.WithActivityInput(input)).Await(&output)
return output, err
})
r.AddActivityN("SayHello", func(ctx task.ActivityContext) (any, error) {
var name string
if err := ctx.GetInput(&name); err != nil {
return nil, err
}
return fmt.Sprintf("Hello, %s!", name), nil
})

ctx := context.Background()
client, engine := startEngine(ctx, r)

instanceID := api.InstanceID("IGNORE_IF_RUNNING_OR_COMPLETED")
reuseIDPolicy := &api.OrchestrationIdReusePolicy{
Action: api.REUSE_ID_ACTION_IGNORE,
OperationStatus: []api.OrchestrationStatus{api.RUNTIME_STATUS_RUNNING, api.RUNTIME_STATUS_COMPLETED, api.RUNTIME_STATUS_PENDING},
}

// Run the orchestration.
id, err := client.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("World"), api.WithInstanceID(instanceID))
if err != nil {
fmt.Println(err)
return
}
// Wait for orchestration to start...
client.WaitForOrchestrationStart(ctx, id)
// Schedule the workflow again using the same id. However it will error out since it already exists.
id, err = client.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("World"), api.WithInstanceID(id), api.WithOrchestrationIdReusePolicy(reuseIDPolicy))
if err != nil {
fmt.Println(err)
return
}
}
```


**Example 2**

The following example demonstrates `main` executing a workflow twice. In the example:
1. The workflow calls a single activity with orchestration ID reuse policy.
1. The reuse ID policy specifies the action `IGNORE_IF_RUNNING_OR_COMPLETED` and the target statuses of `RUNNING`, `COMPLETED`, `PENDING`.
1. The second call to create a workflow with the same instance ID is expected to be ignored if the first workflow instance is one of the target statuses.

```go
func main() {
r := task.NewTaskRegistry()
r.AddOrchestratorN("SingleActivity", func(ctx *task.OrchestrationContext) (any, error) {
var input string
if err := ctx.GetInput(&input); err != nil {
return nil, err
}
var output string
err := ctx.CallActivity("SayHello", task.WithActivityInput(input)).Await(&output)
return output, err
})
r.AddActivityN("SayHello", func(ctx task.ActivityContext) (any, error) {
var name string
if err := ctx.GetInput(&name); err != nil {
return nil, err
}
return fmt.Sprintf("Hello, %s!", name), nil
})

ctx := context.Background()
client, engine := startEngine(ctx, r)

instanceID := api.InstanceID("IGNORE_IF_RUNNING_OR_COMPLETED")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The string value here is intended to be the workflow ID, which is typically some business identity or is random (in which case we don't use it at all). It looks like we're trying to specify a policy as a workflow ID, which is a bit confusing. I think it would be better to not specify an explicit workflow ID at all to avoid confusion.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given this code all need to change to workflow example, this can be changed at the same time.

reuseIDPolicy := &api.OrchestrationIdReusePolicy{
Action: api.REUSE_ID_ACTION_IGNORE,
OperationStatus: []api.OrchestrationStatus{api.RUNTIME_STATUS_RUNNING, api.RUNTIME_STATUS_COMPLETED, api.RUNTIME_STATUS_PENDING},
}

// Run the orchestration.
id, err := client.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("World"), api.WithInstanceID(instanceID))
if err != nil {
fmt.Println(err)
return
}
// Wait for orchestration to start...
client.WaitForOrchestrationStart(ctx, id)
// Schedule the workflow again using the same id. However it will ignore creating the new orchestration, since the id is already in use.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean "ignore". You mean nothing happens (which is strange), or it will eventually get rescheduled. A silent failure event would be really hard to determine. We need to be clearer here on what happens IMO. Can you explain this more and why I would choose this option?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kaibocai - See my questions. It would help to clarify clearly when these actions are used.
Action: api.REUSE_ID_ACTION_IGNORE

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean "ignore". You mean nothing happens (which is strange), or it will eventually get rescheduled. A silent failure event would be really hard to determine. We need to be clearer here on what happens IMO. Can you explain this more and why I would choose this option?

@kaibocai an explanation of this would help me as well!

Copy link
Contributor

@kaibocai kaibocai Jun 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so when you want to start an orchestration but only when it's not in running, then you can use this option. More discussion details can be found microsoft/durabletask-go#42

Copy link
Member

@msfussell msfussell Jul 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kaibocai - Ok, but this is a discussion and there is no final conclusion here of what was actually implemented. We need a table for the reuse ID options and what each one does. Can you add this?

id, err = client.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("World"), api.WithInstanceID(id), api.WithOrchestrationIdReusePolicy(reuseIDPolicy))
if err != nil {
fmt.Println(err)
return
}
}
```

**Example 3**

In the following example:
1. The workflow calls a single activity with the orchestration ID reuse policy.
1. The reuse ID policy contains the action to `TERMINATE` and target statuses `RUNNING`, `COMPLETED`, and `PENDING`.
1. The second call to create a workflow with the same workflow instance ID is expected to terminate the first workflow instance and create a new workflow instance if in one of the target statuses.

```go
func main() {
r := task.NewTaskRegistry()
r.AddOrchestratorN("SingleActivity", func(ctx *task.OrchestrationContext) (any, error) {
var input string
if err := ctx.GetInput(&input); err != nil {
return nil, err
}
var output string
err := ctx.CallActivity("SayHello", task.WithActivityInput(input)).Await(&output)
return output, err
})
r.AddActivityN("SayHello", func(ctx task.ActivityContext) (any, error) {
var name string
if err := ctx.GetInput(&name); err != nil {
return nil, err
}
return fmt.Sprintf("Hello, %s!", name), nil
})

ctx := context.Background()
client, engine := startEngine(ctx, r)

instanceID := api.InstanceID("IGNORE_IF_RUNNING_OR_COMPLETED")
reuseIDPolicy := &api.OrchestrationIdReusePolicy{
Action: api.REUSE_ID_ACTION_TERMINATE,
OperationStatus: []api.OrchestrationStatus{api.RUNTIME_STATUS_RUNNING, api.RUNTIME_STATUS_COMPLETED, api.RUNTIME_STATUS_PENDING},
}

// Run the orchestration
id, err := client.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("World"), api.WithInstanceID(instanceID))
if err != nil {
fmt.Println(err)
return
}
// Wait for orchestration to start
client.WaitForOrchestrationStart(ctx, id)
// Schedule again. This time, the workflow is successfully started (not ignored), since the policy terminates the existing workflow with the id and starts a new one
id, err = client.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("World"), api.WithInstanceID(id), api.WithOrchestrationIdReusePolicy(reuseIDPolicy))
if err != nil {
fmt.Println(err)
return
}
}
```

### Workflow replay

Expand Down
Loading