Skip to content

Commit

Permalink
Remove Kafka between food ordering UI and order workflow (#98)
Browse files Browse the repository at this point in the history
* Remove Kafka event handler as source of order workflow and clean up for conference presentation

* Fix readme
  • Loading branch information
gvdongen authored Feb 7, 2024
1 parent 6fedf9d commit 48d3139
Show file tree
Hide file tree
Showing 29 changed files with 612 additions and 669 deletions.
6 changes: 3 additions & 3 deletions typescript/food-ordering/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ watch -n 1 'psql -h localhost -p 9071 -c "select service, method, service_key_ut
## Exploring the demo

### The order workflow
You can find the implementation of each of the services under `app/src/restate-app/`.
You can find the implementation of each of the services under `app/src/order-app/`.
The flow of an incoming order is as follows:
1. When the customer places an order via the web UI (localhost:3000), an order event is published to Kafka.
2. Restate subscribes to the order topic and triggers the order workflow for each incoming event. This subscription is set up by executing two curl commands, as done in the Docker compose file (`docker-compose.yaml`) by the `runtimesetup` container.
Expand All @@ -86,10 +86,10 @@ The flow of an incoming order is as follows:
3. The order workflow then triggers the payment by calling a third-party payment provider (implemented as a stub in this example). To do this, the order workflow first generates an idempotency token via a side effect, and then uses this to call the payment provider. The payment provider can deduplicate retries via the idempotency key.
4. The workflow then sets the order status to `SCHEDULED` and sets a timer to continue processing after the delivery delay has passed. For example, if a customer ordered food for later in the day, the order will be scheduled for preparation at the requested time. If any failures occur during the sleep, Restate makes sure that the workflow will still wake up on time.
5. Once the timer fires, the order workflow creates an awakeable and sends a request to the restaurant point-of-sales system to start the preparation. This is done via an HTTP request from within a side effect. The status of the order is set to `IN_PREPARATION`. The restaurant will use the awakeable callback to signal when the prepration is done. Once this happens, the order workflow will continue and set the order status to `SCHEDULING_DELIVERY`.
6. Finally, the order workflow calls the delivery manager (`delivery_manager.ts`) to schedule the delivery of the order (see description below). It does this by using an awakeable, that the delivery manager will use to signal when the delivery is done. Once the delivery is done, the order workflow will set the order status to `DELIVERED`.
6. Finally, the order workflow calls the delivery manager of the delivery app (`app/src/delivery-app/delivery_manager.ts`) to schedule the delivery of the order (see description below). It does this by using an awakeable, that the delivery manager will use to signal when the delivery is done. Once the delivery is done, the order workflow will set the order status to `DELIVERED`.
### The delivery workflow
To get the order delivered a set of services work together. The delivery manager (`start` method in `delivery_manager.ts`) implements the delivery workflow. It tracks the delivery status, by storing it in Restate's state store, and then requests a driver to do the delivery. To do that, it requests a driver from the driver-delivery matcher. The driver-delivery matcher tracks available drivers and pending deliveries for each region, and matches drivers to deliveries.
To get the order delivered a set of services work together. Have a look at the code of the delivery app under `src/app/delivery-app`. The delivery manager (`start` method in `delivery_manager.ts`) implements the delivery workflow. It tracks the delivery status, by storing it in Restate's state store, and then requests a driver to do the delivery. To do that, it requests a driver from the driver-delivery matcher. The driver-delivery matcher tracks available drivers and pending deliveries for each region, and matches drivers to deliveries.
Once a driver has been found, the delivery manager assigns the delivery to the driver and sets the order status to `WAITING_FOR_DRIVER`. The delivery has started now. The delivery manager relies for the rest of the delivery updates on the driver digital twin.

The driver's digital twin (`driver_digital_twin.ts`) is the digital representation of a driver in the field. Each driver has a mobile app on his phone (here simulated by `external/driver_mobile_app_sim.ts`) which continuously sends updates to the digital twin of the driver:
Expand Down
21 changes: 21 additions & 0 deletions typescript/food-ordering/app/Dockerfile-delivery
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
FROM node:18-alpine

# dumb-init helps handling SIGTERM and SIGINT correctly
RUN apk add dumb-init

WORKDIR /usr/src/app

# copy package.json and package-lock.json separately to cache dependencies
COPY package*.json .
RUN npm install

COPY --chown=node:node .. .

RUN npm run build

RUN npm prune --production
ENV NODE_ENV production

EXPOSE 9081
USER node
ENTRYPOINT ["dumb-init", "node", "./dist/delivery-app/app.js"]
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@ ENV NODE_ENV production

EXPOSE 9080
USER node
ENTRYPOINT ["dumb-init", "node", "./dist/restate-app/app.js"]
ENTRYPOINT ["dumb-init", "node", "./dist/order-app/app.js"]
7 changes: 4 additions & 3 deletions typescript/food-ordering/app/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
"scripts": {
"build": "tsc --noEmitOnError",
"prebundle": "rm -rf dist",
"bundle": "esbuild src/restate-app/app.ts --bundle --minify --sourcemap --platform=node --target=es2020 --outfile=dist/index.js",
"bundle": "esbuild src/delivery-app/app.ts --bundle --minify --sourcemap --platform=node --target=es2020 --outfile=dist/index.js",
"postbundle": "cd dist && zip -r index.zip index.js*",
"app": "node ./dist/app/app.js",
"order-app": "node ./dist/app/order-app.js",
"delivery-app": "node ./dist/app/delivery-app.js",
"restaurant-app": "node ./dist/restaurant/server.js",
"app-dev": "ts-node-dev --watch src --respawn --transpile-only src/restate-app/app.ts"
"app-dev": "RESTATE_DEBUG_LOGGING=JOURNAL ts-node-dev --watch src --respawn --transpile-only src/order-app/app.ts"
},
"dependencies": {
"@types/node": "^20.6.3",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,14 @@ import * as restate from "@restatedev/restate-sdk";
import * as driverDigitalTwin from "./driver_digital_twin";
import * as driverDeliveryMatcher from "./driver_delivery_matcher";
import * as deliveryManager from "./delivery_manager";
import * as orderWorkflow from "./order_workflow";
import * as orderstatus from "./order_status_service";
import * as driverMobileAppSimulator from "./external/driver_mobile_app_sim";

if (require.main === module) {
restate
.createServer()
.bindKeyedRouter(orderWorkflow.service.path, orderWorkflow.router)
.bindKeyedRouter(orderstatus.service.path, orderstatus.router)
.bindKeyedRouter(driverDigitalTwin.service.path, driverDigitalTwin.router)
.bindKeyedRouter(driverDeliveryMatcher.service.path, driverDeliveryMatcher.router)
.bindKeyedRouter(deliveryManager.service.path, deliveryManager.router)
.bindKeyedRouter(driverMobileAppSimulator.service.path, driverMobileAppSimulator.router)
.listen(9080);
.listen(9081);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ const KAFKA_CONFIG: KafkaConfig = { brokers: [KAFKA_BOOTSTRAP_SERVERS] };
const KAFKA_TOPIC = "driver-updates";

export class Kafka_publisher {

private readonly kafka = new Kafka(KAFKA_CONFIG);
private readonly producer = this.kafka.producer();
private connected = false;
Expand Down
91 changes: 91 additions & 0 deletions typescript/food-ordering/app/src/delivery-app/delivery_manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright (c) 2024 - Restate Software, Inc., Restate GmbH
*
* This file is part of the Restate examples,
* which is released under the MIT license.
*
* You can find a copy of the license in the file LICENSE
* in the root directory of this repository or package or at
* https://github.com/restatedev/examples/
*/

import * as restate from "@restatedev/restate-sdk";
import * as driverDigitalTwin from "./driver_digital_twin";
import * as driverDeliveryMatcher from "./driver_delivery_matcher";
import * as geo from "./utils/geo";
import {DEMO_REGION, Location, DeliveryInformation, OrderAndPromise, Status} from "./types/types";
import * as orderstatus from "../order-app/order_status_service";

/**
* Manages the delivery of the order to the customer. Keyed by the order ID (similar to the
* OrderService and OrderStatusService).
*/
export const service: restate.ServiceApi<typeof router> = { path: "delivery-manager" };

const DELIVERY_INFO = "delivery-info";

export const router = restate.keyedRouter({
// Called by the OrderService when a new order has been prepared and needs to be delivered.
start: async (ctx: restate.RpcContext, deliveryId: string, { order, promise }: OrderAndPromise) => {
const [restaurantLocation, customerLocation] = await ctx.sideEffect(async () => [geo.randomLocation(), geo.randomLocation()]);

// Store the delivery information in Restate's state store
const deliveryInfo: DeliveryInformation = {
orderId: order.id,
orderPromise: promise,
restaurantId: order.restaurantId,
restaurantLocation,
customerLocation,
orderPickedUp: false
}
ctx.set(DELIVERY_INFO, deliveryInfo);

// Acquire a driver
const driverPromise = ctx.awakeable<string>();
ctx.send(driverDeliveryMatcher.service).requestDriverForDelivery(DEMO_REGION, { promiseId: driverPromise.id });
// Wait until the driver pool service has located a driver
// This awakeable gets resolved either immediately when there is a pending delivery
// or later, when a new delivery comes in.
const driverId = await driverPromise.promise;

// Assign the driver to the job
await ctx.rpc(driverDigitalTwin.service).assignDeliveryJob(driverId, {
deliveryId,
restaurantId: order.restaurantId,
restaurantLocation: deliveryInfo.restaurantLocation,
customerLocation: deliveryInfo.customerLocation
});

ctx.send(orderstatus.service).setStatus(order.id, Status.WAITING_FOR_DRIVER);
},

// called by the DriverService.NotifyDeliveryPickup when the driver has arrived at the restaurant.
notifyDeliveryPickup: async (ctx: restate.RpcContext, _deliveryId: string) => {
const delivery = (await ctx.get<DeliveryInformation>(DELIVERY_INFO))!;
delivery.orderPickedUp = true;
ctx.set(DELIVERY_INFO, delivery);

ctx.send(orderstatus.service).setStatus(delivery.orderId, Status.IN_DELIVERY);
},

// Called by the DriverService.NotifyDeliveryDelivered when the driver has delivered the order to the customer.
notifyDeliveryDelivered: async (ctx: restate.RpcContext, _deliveryId: string) => {
const delivery = (await ctx.get<DeliveryInformation>(DELIVERY_INFO))!;
ctx.clear(DELIVERY_INFO);

// Notify the OrderService that the delivery has been completed
ctx.resolveAwakeable(delivery.orderPromise, null);
},

// Called by DriverDigitalTwin.HandleDriverLocationUpdateEvent() when the driver moved to new location.
handleDriverLocationUpdate: async (ctx: restate.RpcContext, _deliveryId: string, location: Location) => {
const delivery = (await ctx.get<DeliveryInformation>(DELIVERY_INFO))!;

// Parse the new location, and calculate the ETA of the delivery to the customer
const eta = delivery.orderPickedUp
? geo.calculateEtaMillis(location, delivery.customerLocation)
: geo.calculateEtaMillis(location, delivery.restaurantLocation)
+ geo.calculateEtaMillis(delivery.restaurantLocation, delivery.customerLocation);
ctx.send(orderstatus.service).setETA(delivery.orderId, eta);
},
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright (c) 2024 - Restate Software, Inc., Restate GmbH
*
* This file is part of the Restate examples,
* which is released under the MIT license.
*
* You can find a copy of the license in the file LICENSE
* in the root directory of this repository or package or at
* https://github.com/restatedev/examples/
*/

import * as restate from "@restatedev/restate-sdk";
import { PendingDelivery } from "./types/types";

/**
* Links available drivers to delivery requests Keyed by the region. Each region has a pool of
* available drivers and orders waiting for a driver. This service is responsible for tracking and
* matching the two.
*/
export const service : restate.ServiceApi<typeof router> = { path: "driver-delivery-matcher" };

const PENDING_DELIVERIES = "pending-deliveries";
const AVAILABLE_DRIVERS = "available-drivers";

export const router = restate.keyedRouter({
setDriverAvailable: async (ctx: restate.RpcContext, _region: string, driverId: string) => {
// if we have deliveries already waiting, assign those
const pendingDeliveries = await ctx.get<PendingDelivery[]>(PENDING_DELIVERIES);
if (pendingDeliveries && pendingDeliveries.length > 0) {
const nextDelivery = pendingDeliveries.shift()!;
ctx.set(PENDING_DELIVERIES, pendingDeliveries);

// Notify that delivery is ongoing
ctx.resolveAwakeable(nextDelivery.promiseId, driverId);
return;
}

// otherwise remember driver as available
const availableDrivers = (await ctx.get<string[]>(AVAILABLE_DRIVERS)) ?? [];
availableDrivers.push(driverId);
ctx.set(AVAILABLE_DRIVERS, availableDrivers);
},

// Called when a new delivery gets scheduled.
requestDriverForDelivery: async (ctx: restate.RpcContext, _region: string, request: PendingDelivery) => {
// if a driver is available, assign the delivery right away
const availableDrivers = (await ctx.get<string[]>(AVAILABLE_DRIVERS));
if (availableDrivers && availableDrivers.length > 0) {
// Remove driver from the pool
const nextAvailableDriver = availableDrivers.shift()!;
ctx.set(AVAILABLE_DRIVERS, availableDrivers);

// Notify that delivery is ongoing
ctx.resolveAwakeable(request.promiseId, nextAvailableDriver);
return;
}

// otherwise store the delivery request until a new driver becomes available
const pendingDeliveries = (await ctx.get<PendingDelivery[]>(PENDING_DELIVERIES)) ?? [];
pendingDeliveries.push(request);
ctx.set(PENDING_DELIVERIES, pendingDeliveries);
},
})
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright (c) 2024 - Restate Software, Inc., Restate GmbH
*
* This file is part of the Restate examples,
* which is released under the MIT license.
*
* You can find a copy of the license in the file LICENSE
* in the root directory of this repository or package or at
* https://github.com/restatedev/examples/
*/

import * as restate from "@restatedev/restate-sdk";
import * as driverDeliveryMatcher from "./driver_delivery_matcher";
import * as deliveryManager from "./delivery_manager";
import {DeliveryRequest, DriverStatus, Location} from "./types/types";

/**
* Digital twin for the driver. Represents a driver and his status, assigned delivery, and location.
* Keyed by driver ID. The actual driver would have an application (mocked by Driver Mobile App Simulator) that
* calls this service.
*/
export const service: restate.ServiceApi<typeof router> = {path: "driver-digital-twin"};

const DRIVER_STATUS = "driver-status";
const ASSIGNED_DELIVERY = "assigned-delivery";
const DRIVER_LOCATION = "driver-location";

export const router = restate.keyedRouter({
// Called by driver's mobile app at start of workday or end of delivery
setDriverAvailable: async (ctx: restate.RpcContext, driverId: string, region: string) => {
await checkIfDriverInExpectedState(DriverStatus.IDLE, ctx);

ctx.set(DRIVER_STATUS, DriverStatus.WAITING_FOR_WORK);
ctx.send(driverDeliveryMatcher.service).setDriverAvailable(region, driverId);
},

// Gets polled by the driver's mobile app at regular intervals to check for assignments.
getAssignedDelivery: async (ctx: restate.RpcContext) => ctx.get<DeliveryRequest>(ASSIGNED_DELIVERY),

// Gets called by the delivery manager when this driver was assigned to do the delivery.
assignDeliveryJob: async (ctx: restate.RpcContext, _driverId: string, deliveryRequest: DeliveryRequest) => {
await checkIfDriverInExpectedState(DriverStatus.WAITING_FOR_WORK, ctx);

ctx.set(DRIVER_STATUS, DriverStatus.DELIVERING);
ctx.set(ASSIGNED_DELIVERY, deliveryRequest);

const currentLocation = await ctx.get<Location>(DRIVER_LOCATION);
if (currentLocation) {
ctx.send(deliveryManager.service).handleDriverLocationUpdate(deliveryRequest.deliveryId, currentLocation);
}
},

// Called by driver's mobile app at pickup from the restaurant.
notifyDeliveryPickUp: async (ctx: restate.RpcContext, _driverId: string) => {
await checkIfDriverInExpectedState(DriverStatus.DELIVERING, ctx);
const assignedDelivery = (await ctx.get<DeliveryRequest>(ASSIGNED_DELIVERY))!;
ctx.send(deliveryManager.service).notifyDeliveryPickup(assignedDelivery.deliveryId);
},

// Called by driver's mobile app after delivery success.
notifyDeliveryDelivered: async (ctx: restate.RpcContext, _driverId: string) => {
await checkIfDriverInExpectedState(DriverStatus.DELIVERING, ctx);

const assignedDelivery = (await ctx.get<DeliveryRequest>(ASSIGNED_DELIVERY))!;
ctx.clear(ASSIGNED_DELIVERY);
ctx.send(deliveryManager.service).notifyDeliveryDelivered(assignedDelivery.deliveryId);
ctx.set(DRIVER_STATUS, DriverStatus.IDLE);
},

// Called by the driver's mobile app when he has moved to a new location.
handleDriverLocationUpdateEvent: restate.keyedEventHandler(async (ctx: restate.RpcContext, event: restate.Event) => {
const location = event.json<Location>();
ctx.set(DRIVER_LOCATION, location);
const assignedDelivery = await ctx.get<DeliveryRequest>(ASSIGNED_DELIVERY);
if (assignedDelivery) {
ctx.send(deliveryManager.service).handleDriverLocationUpdate(assignedDelivery.deliveryId, location);
}
})
})


async function checkIfDriverInExpectedState(expectedStatus: DriverStatus, ctx: restate.RpcContext): Promise<void> {
const currentStatus = (await ctx.get<DriverStatus>(DRIVER_STATUS)) ?? DriverStatus.IDLE;

if (currentStatus !== expectedStatus) {
throw new restate.TerminalError(`Driver status wrong. Expected ${expectedStatus} but was ${currentStatus}`);
}
}
Loading

0 comments on commit 48d3139

Please sign in to comment.