Skip to content

Commit

Permalink
Use case pages (#390)
Browse files Browse the repository at this point in the history
* Add first version of use case pages - workflows

* Update links and icons

* Fix title

* Workflows as use case page

* Add event processing use case page

* Change spotlight to scrollycoding for event processing

* Update icons event processing

* First version microservices use case page

* Update microservice orchestration page

* Add idempotency token to microservices orchestration page

* Async tasks use case page

* Async task use case page

* Fix code animation async tasks

* Finish async tasks use case page

* Event processing page dynamic code snippets

* Set width of microservices orchestration image

* Microservice orchestration code scrolling

* Workflow page finetuning

* Fix links and bug in SDK workflow page

* Finetune some descriptions
  • Loading branch information
gvdongen authored Jun 5, 2024
1 parent 9a23986 commit bc318f3
Show file tree
Hide file tree
Showing 51 changed files with 3,271 additions and 253 deletions.
365 changes: 181 additions & 184 deletions code_snippets/ts/package-lock.json

Large diffs are not rendered by default.

49 changes: 49 additions & 0 deletions code_snippets/ts/src/use_cases/async_tasks/fan_out_fan_in.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import * as restate from "@restatedev/restate-sdk";
import { Context, CombineablePromise } from "@restatedev/restate-sdk";

// <start_here>
const workerService = restate.service({
name: "worker",
handlers: {
run: async (ctx: Context, task: Task) => {
// Split the task in subtasks
const subtasks: SubTask[] = await ctx.run("split task",
() => split(task));

const resultPromises = [];
for (const subtask of subtasks) {
const subResultPromise = ctx.serviceClient(workerService)
.runSubtask(subtask);
resultPromises.push(subResultPromise);
}

const results = await CombineablePromise.all(resultPromises);
return aggregate(results);
},

runSubtask: async (ctx: Context, subtask: SubTask) => {
// Processing logic goes here ...
// Can be moved to a separate service to scale independently
}
}
});

export const handler = restate.endpoint().bind(workerService).lambdaHandler();

// <end_here>

// ----------------------- Stubs to please the compiler -----------------------

type Task = {}
type Result = {}

type SubTask = { }
type SubTaskResult = void

async function split(task: Task): Promise<SubTask[]> {
return [];
}

async function aggregate(packages: SubTaskResult[]): Promise<Result> {
return {};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// --------------- define async task logic as a service handler ---------------

import * as restate from "@restatedev/restate-sdk";
import {Context} from "@restatedev/restate-sdk";

// <start_here>
const asyncTaskService = restate.service({
name: "taskWorker",
handlers: {
runTask: async (ctx: Context, params: TaskOpts) => {
return someHeavyWork(params);
}
}
});

export type AsyncTaskService = typeof asyncTaskService;

const endpoint = restate.endpoint().bind(asyncTaskService).listen(9080);
// <end_here>

// ----------------------- Stubs to please the compiler -----------------------

export type TaskOpts = {}

function someHeavyWork(work: TaskOpts) { return "Work!"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import * as restate from "@restatedev/restate-sdk-clients";
import { SendOpts } from "@restatedev/restate-sdk-clients";
import {AsyncTaskService, TaskOpts} from "./async_task_service";

/*
* Restate is as a sophisticated task queue, with extra features like
* stateful tasks, queues per key, or workflows-as-code within tasks,
* and reliable timers.
*
* Every handler in Restate is executed asynchronously and can be treated
* as a reliable asynchronous task. Restate persists invocations, restarts
* upon failures, reliably queues them under backpressure.
*/

// ------------------- submit and await tasks like RPC calls ------------------

const RESTATE_URL = process.env.RESTATE_URL ?? "http://localhost:8080";

// <start_here>
async function submitAndAwaitTask(task: TaskOpts) {
const rs = restate.connect({ url: RESTATE_URL });

const taskHandle = await rs
.serviceSendClient<AsyncTaskService>({ name: "taskWorker" })
.runTask(
task,
SendOpts.from({ idempotencyKey: "dQw4w9WgXcQ" })
);

// await the handler's result
const result = await rs.result(taskHandle);
}

async function attachToTask(taskHandle: string) {
const rs = restate.connect({ url: RESTATE_URL });
const result2 = await rs.result<string>(JSON.parse(taskHandle));
}

// <end_here>
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import * as restate from "@restatedev/restate-sdk-clients";
import { SendOpts } from "@restatedev/restate-sdk-clients";
import {AsyncTaskService, TaskOpts} from "./async_task_service";

/*
* Restate is as a sophisticated task queue, with extra features like
* stateful tasks, queues per key, or workflows-as-code within tasks,
* and reliable timers.
*
* Every handler in Restate is executed asynchronously and can be treated
* as a reliable asynchronous task. Restate persists invocations, restarts
* upon failures, reliably queues them under backpressure.
*/

// ------------------- submit and await tasks like RPC calls ------------------

const RESTATE_URL = process.env.RESTATE_URL ?? "http://localhost:8080";

// <start_here>
async function submitAndAwaitTask(task: TaskOpts) {
const rs = restate.connect({ url: RESTATE_URL });

const taskHandle = await rs
.serviceSendClient<AsyncTaskService>({ name: "taskWorker" })
.runTask(
task,
SendOpts.from({ idempotencyKey: "dQw4w9WgXcQ" })
);

// await the handler's result
const result = await rs.result(taskHandle);
}

// <end_here>
50 changes: 50 additions & 0 deletions code_snippets/ts/src/use_cases/async_tasks/sync_to_async/client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import * as restate from "@restatedev/restate-sdk-clients";
import * as readline from "readline";
import {DataPrepService} from "./data_preparation_service";

const RESTATE_URL = process.env.RESTATE_URL ?? "http://localhost:8080";
// Client:
//
// The client calls the data preparation workflow and awaits the result for
// a while. If the workflow doesn't complete within that time, it signals the
// workflow to send an email instead.

// <start_here>
const rs = restate.connect({ url: RESTATE_URL });
const dataPrepService: DataPrepService = { name: "dataPrep" };

async function downloadData(userId: string) {
const workflowId = userId;

const dataPrep = rs.workflowClient(dataPrepService, workflowId);

await dataPrep.workflowSubmit({ userId });

const result = await withTimeout(dataPrep.workflowAttach(), 30_000);

if (result === Timeout) {
const email = await readLine("This takes long... Just mail us the link later");
await dataPrep.resultAsEmail({ email });
return;
}

// ... process directly ...
}
// <end_here>

const Timeout = Symbol("Timeout")

function withTimeout<T>(promise: Promise<T>, millis: number): Promise<T | typeof Timeout> {
const timeoutPromise = new Promise<typeof Timeout>((resolve) => setTimeout(resolve, millis, Timeout));
return Promise.race([promise, timeoutPromise]);
}

async function readLine(prompt: string): Promise<string> {
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout
});

return new Promise<string>(resolve => rl.question(prompt, resolve)).finally(() => rl.close());
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import * as restate from "@restatedev/restate-sdk";
import {WorkflowContext, WorkflowSharedContext} from "@restatedev/restate-sdk";

// <start_here>
const dataPreparationService = restate.workflow({
name: "dataPrep",
handlers: {
run: async (ctx: WorkflowContext, args: { userId: string }) => {
const url = await ctx.run("create S3 bucket", () => createS3Bucket());
await ctx.run("upload data", () => uploadData(url));

ctx.promise<URL>("url").resolve(url);

return url;
},

resultAsEmail: async (ctx: WorkflowSharedContext, req: { email: string }) => {
const url = await ctx.promise<URL>("url");
await ctx.run("send email", () => sendEmail(url, req.email ));
}
}
});

export type DataPrepService = typeof dataPreparationService;
// <end_here>

async function createS3Bucket(): Promise<URL> {
const bucket = Number(Math.random() * 1_000_000_000).toString(16);
return new URL(`https://s3-eu-central-1.amazonaws.com/${bucket}/`);
}

async function uploadData(target: URL) {
// simulate some work by delaying for a while. sometimes takes really long.
return new Promise((resolve) => setTimeout(resolve, Math.random() < 0.0 ? 1_500 : 10_000));
}

async function sendEmail(url: URL, email: string) {
// send email
console.log(` >>> Sending email to '${email}' with URL ${url}`);
}
52 changes: 52 additions & 0 deletions code_snippets/ts/src/use_cases/event_processing.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import * as restate from "@restatedev/restate-sdk";

// <start_here>
const userUpdates = restate.object({
name: "userUpdates",
handlers: {
updateUserEvent: async (ctx: restate.ObjectContext, event: UserUpdate) => {
const { profile, permissions, resources } = verifyEvent(event);

let userId = await ctx.run(() => updateProfile(profile));
while (userId === NOT_READY) {
await ctx.sleep(5_000);
userId = await ctx.run(() => updateProfile(profile));
}

const roleId = await ctx.run(() => setPermissions(userId, permissions));
await ctx.run(() => provisionResources(userId, roleId, resources));
},
},
});
// <end_here>


import { TerminalError } from "@restatedev/restate-sdk";

export type UserUpdate = {
profile: string;
permissions: string;
resources: string;
};

export const NOT_READY = "NOT_READY";

export async function updateProfile(profile: string, token?: string): Promise<string> {
return Math.random() < 0.8 ? NOT_READY : profile + "-id";
}
export async function setPermissions(
id: string,
permissions: string,
token?: string
): Promise<string> {
return permissions;
}
export async function provisionResources(user: string, role: string, resources: string) {}

export function verifyEvent(request: UserUpdate): UserUpdate {
if (request?.profile && request?.permissions && request?.resources) {
return request;
} else {
throw new TerminalError("Incomplete event");
}
}
49 changes: 49 additions & 0 deletions code_snippets/ts/src/use_cases/events_state.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (c) 2024 - Restate Software, Inc., Restate GmbH
*
* This file is part of the Restate Examples for the Node.js/TypeScript SDK,
* which is released under the MIT license.
*
* You can find a copy of the license in the file LICENSE
* in the root directory of this repository or package or at
* https://github.com/restatedev/examples/blob/main/LICENSE
*/

import * as restate from "@restatedev/restate-sdk";
import {ObjectContext} from "@restatedev/restate-sdk";

// <start_here>
const eventEnricher = restate.object({
name: "profile",
handlers: {
userEvent: async (ctx: ObjectContext, event: UserProfile) => {
ctx.set("user", event);
ctx.objectSendClient(EventEnricher, ctx.key,{ delay: 1000 }).emit();
},

featureEvent: async (ctx: ObjectContext, featureEvent: string) => {
const userEvent = await ctx.get<UserProfile>("user");
(userEvent!.features ??= []).push(featureEvent);
ctx.set("user", userEvent)
},

emit: async (ctx: ObjectContext) => {
send(ctx.key, await ctx.get("user"));
ctx.clearAll();
}
}
})

type EventEnricherType = typeof eventEnricher;
const EventEnricher: EventEnricherType = {name:"profile"}
// <end_here>

type UserProfile = {
id: string;
name: string;
email: string;
features: string[];
};

function send(key: string, user: UserProfile | null) {
}
27 changes: 27 additions & 0 deletions code_snippets/ts/src/use_cases/idempotency.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import * as restate from "@restatedev/restate-sdk-clients";
import { Opts } from "@restatedev/restate-sdk-clients";
import express from 'express';
import {ProductService} from "./idempotency_utils";

const app = express()

process.env.RESTATE_URL = "localhost:8080";

// <start_here>
const rs = restate.connect({ url: process.env.RESTATE_URL });
const productService: ProductService = {name: "product"};

app.get('/reserve/:product/:reservationId', async (req, res) => {
const { product, reservationId } = req.params;

// withClass(1:5) highlight-line
const products = rs.serviceClient(productService);
const reservation = await products.reserve(
product,
Opts.from({ idempotencyKey : reservationId })
);

res.json(reservation);
})
// <end_here>

11 changes: 11 additions & 0 deletions code_snippets/ts/src/use_cases/idempotency_utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import * as restate from "@restatedev/restate-sdk";
import {Context} from "@restatedev/restate-sdk";

const productService = restate.service({
name: "product",
handlers: {
reserve: async (ctx: Context, product: string) => {}
}
});

export type ProductService = typeof productService;
Loading

0 comments on commit bc318f3

Please sign in to comment.