Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

go-sdk workflow quickstart #973

Merged
merged 8 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions workflows/go/sdk/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# Dapr workflows

In this quickstart, you'll create a simple console application to demonstrate Dapr's workflow programming model and the workflow authoring client. The console app
starts and manages an order processing workflow.
The workflow management API provided by the Dapr client can also be used interchangeably with minor adjustments

This quickstart includes one project:

- Go app `order-processor`

The quickstart contains 1 workflow (OrderProcessingWorkflow) which simulates purchasing items from a store, and 5 unique activities within the workflow. These 5
activities are as follows:

- NotifyActivity: This activity utilizes a logger to print out messages throughout the workflow. These messages notify the user when there is insufficient
§inventory, their payment couldn't be processed, and more.
- ProcessPaymentActivity: This activity is responsible for processing and authorizing the payment.
- VerifyInventoryActivity: This activity checks the state store to ensure that there is enough inventory present for purchase.
- UpdateInventoryActivity: This activity removes the requested items from the state store and updates the store with the new remaining inventory value.
- RequestApprovalActivity: This activity seeks approval from Manager, if payment is greater than 50000 USD.

### Run the order processor workflow

1. Open a new terminal window and navigate to `order-processor` directory.
2. Run the console app with Dapr:
<!-- STEP
name: Running this example
expected_stdout_lines:
- "for 10 cars - $150000"
- "There are 100 cars available for purchase"
- "Requesting approval for payment of 150000USD for 10 cars"
- "has been approved!"
- "There are now 90 cars left in stock"
- "Workflow completed - result: COMPLETED"
output_match_mode: substring
background: true
timeout_seconds: 120
sleep: 30
-->

```sh

dapr run -f .
```

3. Expected output

```
== APP - order-processor == *** Welcome to the Dapr Workflow console app sample!
== APP - order-processor == *** Using this app, you can place orders that start workflows.
== APP - order-processor == dapr client initializing for: 127.0.0.1:50056
== APP - order-processor == adding base stock item: paperclip
== APP - order-processor == 2024/02/01 12:59:52 work item listener started
== APP - order-processor == INFO: 2024/02/01 12:59:52 starting background processor
== APP - order-processor == adding base stock item: cars
== APP - order-processor == adding base stock item: computers
== APP - order-processor == ==========Begin the purchase of item:==========
== APP - order-processor == NotifyActivity: Received order 48ee83b7-5d80-48d5-97f9-6b372f5480a5 for 10 cars - $150000
== APP - order-processor == VerifyInventoryActivity: Verifying inventory for order 48ee83b7-5d80-48d5-97f9-6b372f5480a5 of 10 cars
== APP - order-processor == VerifyInventoryActivity: There are 100 cars available for purchase
== APP - order-processor == RequestApprovalActivity: Requesting approval for payment of 150000USD for 10 cars
== APP - order-processor == NotifyActivity: Payment for order 48ee83b7-5d80-48d5-97f9-6b372f5480a5 has been approved!
== APP - order-processor == ProcessPaymentActivity: 48ee83b7-5d80-48d5-97f9-6b372f5480a5 for 10 - cars (150000USD)
== APP - order-processor == UpdateInventoryActivity: Checking Inventory for order 48ee83b7-5d80-48d5-97f9-6b372f5480a5 for 10 * cars
== APP - order-processor == UpdateInventoryActivity: There are now 90 cars left in stock
== APP - order-processor == NotifyActivity: Order 48ee83b7-5d80-48d5-97f9-6b372f5480a5 has completed!
== APP - order-processor == Workflow completed - result: COMPLETED
== APP - order-processor == Purchase of item is complete
```

4. Stop Dapr workflow with CTRL-C or:
<!-- END_STEP -->

```sh
dapr stop -f .
```



### View workflow output with Zipkin

For a more detailed view of the workflow activities (duration, progress etc.), try using Zipkin.

1. View Traces in Zipkin UI - In your browser go to http://localhost:9411 to view the workflow trace spans in the Zipkin web UI. The order-processor workflow
should be viewable with the following output in the Zipkin web UI. Note: the [openzipkin/zipkin](https://hub.docker.com/r/openzipkin/zipkin/) docker container is
launched on running `dapr init`.

<img src="img/workflow-trace-spans-zipkin.png">

### What happened?

When you ran the above comands:

1. First the "user" inputs an order for 10 cars into the concole app.
2. A unique order ID for the workflow is generated (in the above example, `b903d749cd814e099f06ebf4a56a2f90`) and the workflow is scheduled.
3. The `NotifyActivity` workflow activity sends a notification saying an order for 10 cars has been received.
4. The `VerifyInventoryActivity` workflow activity checks the inventory data, determines if you can supply the ordered item, and responds with the number of cars
in stock.
5. The `RequestApprovalActivity` workflow activity is triggered due to buisness logic for orders exceeding $50k and user is prompted to manually approve the
purchase before continuing the order.
6. The workflow starts and notifies you of its status.
7. The `ProcessPaymentActivity` workflow activity begins processing payment for order `b903d749cd814e099f06ebf4a56a2f90` and confirms if successful.
8. The `UpdateInventoryActivity` workflow activity updates the inventory with the current available cars after the order has been processed.
9. The `NotifyActivity` workflow activity sends a notification saying that order `b903d749cd814e099f06ebf4a56a2f90` has completed.
10. The workflow terminates as completed.






8 changes: 8 additions & 0 deletions workflows/go/sdk/dapr.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
version: 1
common:
resourcesPath: ../../components
apps:
- appDirPath: ./order-processor/
appID: order-processor
command: ["go", "run", "."]

Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 2 additions & 0 deletions workflows/go/sdk/makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
include ../../../docker.mk
include ../../../validate.mk
142 changes: 142 additions & 0 deletions workflows/go/sdk/order-processor/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package main

import (
"context"
"encoding/json"
"fmt"
"log"
"time"

"github.com/dapr/go-sdk/client"
"github.com/dapr/go-sdk/workflow"
)

var (
stateStoreName = "statestore"
workflowComponent = "dapr"
workflowName = "OrderProcessingWorkflow"
defaultItemName = "cars"
)

func main() {
fmt.Println("*** Welcome to the Dapr Workflow console app sample!")
fmt.Println("*** Using this app, you can place orders that start workflows.")

w, err := workflow.NewWorker()
if err != nil {
log.Fatalf("failed to start worker: %v", err)
}

if err := w.RegisterWorkflow(OrderProcessingWorkflow); err != nil {
log.Fatal(err)
}
if err := w.RegisterActivity(NotifyActivity); err != nil {
log.Fatal(err)
}
if err := w.RegisterActivity(RequestApprovalActivity); err != nil {
log.Fatal(err)
}
if err := w.RegisterActivity(VerifyInventoryActivity); err != nil {
log.Fatal(err)
}
if err := w.RegisterActivity(ProcessPaymentActivity); err != nil {
log.Fatal(err)
}
if err := w.RegisterActivity(UpdateInventoryActivity); err != nil {
log.Fatal(err)
}

if err := w.Start(); err != nil {
log.Fatal(err)
}

daprClient, err := client.NewClient()
if err != nil {
log.Fatalf("failed to initialise dapr client: %v", err)
}
wfClient, err := workflow.NewClient(workflow.WithDaprClient(daprClient))
if err != nil {
log.Fatalf("failed to initialise workflow client: %v", err)
}

inventory := []InventoryItem{
{ItemName: "paperclip", PerItemCost: 5, Quantity: 100},
{ItemName: "cars", PerItemCost: 15000, Quantity: 100},
{ItemName: "computers", PerItemCost: 500, Quantity: 100},
}
if err := restockInventory(daprClient, inventory); err != nil {
log.Fatalf("failed to restock: %v", err)
}

fmt.Println("==========Begin the purchase of item:==========")

itemName := defaultItemName
orderQuantity := 10

totalCost := inventory[1].PerItemCost * orderQuantity

orderPayload := OrderPayload{
ItemName: itemName,
Quantity: orderQuantity,
TotalCost: totalCost,
}

id, err := wfClient.ScheduleNewWorkflow(context.Background(), workflowName, workflow.WithInput(orderPayload))
if err != nil {
log.Fatalf("failed to start workflow: %v", err)
}

approvalSought := false

startTime := time.Now()

for {
timeDelta := time.Since(startTime)
metadata, err := wfClient.FetchWorkflowMetadata(context.Background(), id)
if err != nil {
log.Fatalf("failed to fetch workflow: %v", err)
}
if (metadata.RuntimeStatus == workflow.StatusCompleted) || (metadata.RuntimeStatus == workflow.StatusFailed) || (metadata.RuntimeStatus == workflow.StatusTerminated) {
fmt.Printf("Workflow completed - result: %v\n", metadata.RuntimeStatus.String())
break
}
if timeDelta.Seconds() >= 10 {
metadata, err := wfClient.FetchWorkflowMetadata(context.Background(), id)
if err != nil {
log.Fatalf("failed to fetch workflow: %v", err)
}
if totalCost > 50000 && !approvalSought && ((metadata.RuntimeStatus != workflow.StatusCompleted) || (metadata.RuntimeStatus != workflow.StatusFailed) || (metadata.RuntimeStatus != workflow.StatusTerminated)) {
approvalSought = true
promptForApproval(id)
}
}
// Sleep to not DoS the dapr dev instance
mikeee marked this conversation as resolved.
Show resolved Hide resolved
time.Sleep(time.Second)
}

fmt.Println("Purchase of item is complete")
}

func promptForApproval(id string) {
wfClient, err := workflow.NewClient()
if err != nil {
log.Fatalf("failed to initialise wfClient: %v", err)
}
if err := wfClient.RaiseEvent(context.Background(), id, "manager_approval"); err != nil {
mikeee marked this conversation as resolved.
Show resolved Hide resolved
log.Fatal(err)
}
}

func restockInventory(daprClient client.Client, inventory []InventoryItem) error {
for _, item := range inventory {
itemSerialized, err := json.Marshal(item)
if err != nil {
return err
}
fmt.Printf("adding base stock item: %s\n", item.ItemName)
if err := daprClient.SaveState(context.Background(), stateStoreName, item.ItemName, itemSerialized, nil); err != nil {
return err
}
}
return nil
}
43 changes: 43 additions & 0 deletions workflows/go/sdk/order-processor/models.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package main

type OrderPayload struct {
ItemName string `json:"item_name"`
TotalCost int `json:"total_cost"`
Quantity int `json:"quanity"`
}

type OrderResult struct {
Processed bool `json:"processed"`
}

type InventoryItem struct {
ItemName string `json:"item_name"`
PerItemCost int `json:"per_item_cost"`
Quantity int `json:"quanity"`
}

type InventoryRequest struct {
RequestID string `json:"request_id"`
ItemName string `json:"item_name"`
Quantity int `json:"quanity"`
}

type InventoryResult struct {
Success bool `json:"success"`
InventoryItem InventoryItem `json:"inventory_item"`
}

type PaymentRequest struct {
RequestID string `json:"request_id"`
ItemBeingPurchased string `json:"item_being_purchased"`
Amount int `json:"amount"`
Quantity int `json:"quantity"`
}

type ApprovalRequired struct {
Approval bool `json:"approval"`
}

type Notification struct {
Message string `json:"message"`
}
Loading
Loading