diff --git a/java/food-ordering/README.md b/java/food-ordering/README.md index 8d284876..e08beafc 100644 --- a/java/food-ordering/README.md +++ b/java/food-ordering/README.md @@ -61,12 +61,12 @@ Restate has a psql interface to query the state of the system. If you buy some products via the webUI, you can see how the order workflow is executed by querying the state of the order status service: ```shell -watch -n 1 'psql -h localhost -p 9071 -c "select service, service_key_utf8, key, value_utf8 from state s where s.service='"'"'order.OrderStatusService'"'"';"' +watch -n 1 'psql -h localhost -p 9071 -c "select service, service_key_utf8, key, value_utf8 from state s where s.service='"'"'examples.order.OrderStatusService'"'"';"' ``` Or have a look at the state of all the services, except for the driver simulator: ```shell -watch -n 1 'psql -h localhost -p 9071 -c "select service, service_key_utf8, key, value_utf8 from state s where s.service not in ('"'"'order.DriverMobileAppSimulator'"'"');"' +watch -n 1 'psql -h localhost -p 9071 -c "select service, service_key_utf8, key, value_utf8 from state s where s.service not in ('"'"'examples.order.DriverMobileAppSimulator'"'"');"' ``` Or you can check the state of the ongoing invocations via: @@ -77,7 +77,7 @@ watch -n 1 'psql -h localhost -p 9071 -c "select service, method, service_key_ut ## Exploring the demo ### The order workflow -You can find the implementation of each of the services under `app/restate-app/src/main/java/dev/restate/sdk/examples/`. +You can find the implementation of each of the services under [`app/restate-app/src/main/java/examples/order`](app/restate-app/src/main/java/examples/order). The flow of an incoming order is as follows: 1. When the customer places an order via the web UI (localhost:3000), an order event is published to Kafka. 2. Restate subscribes to the order topic and triggers the order workflow for each incoming event. This subscription is set up by executing two curl commands, as done in the Docker compose file (`docker-compose.yaml`) by the `runtimesetup` container. diff --git a/java/food-ordering/app/restate-app/build.gradle.kts b/java/food-ordering/app/restate-app/build.gradle.kts index 2c9ac0cf..d0c3eb33 100644 --- a/java/food-ordering/app/restate-app/build.gradle.kts +++ b/java/food-ordering/app/restate-app/build.gradle.kts @@ -9,13 +9,16 @@ plugins { repositories { mavenCentral() + maven("https://s01.oss.sonatype.org/content/repositories/snapshots/") } -val restateVersion = "0.7.0" +val restateVersion = "0.8.0-SNAPSHOT" dependencies { // Restate SDK + annotationProcessor("dev.restate:sdk-api-gen:$restateVersion") implementation("dev.restate:sdk-api:$restateVersion") + implementation("dev.restate:sdk-workflow-api:$restateVersion") implementation("dev.restate:sdk-http-vertx:$restateVersion") // To use Jackson to read/write state entries (optional) implementation("dev.restate:sdk-serde-jackson:$restateVersion") @@ -29,7 +32,7 @@ dependencies { compileOnly("org.apache.tomcat:annotations-api:6.0.53") //Kafka - implementation("org.apache.kafka:kafka-clients:3.0.0") + implementation("org.apache.kafka:kafka-clients:3.6.1") // Logging (optional) implementation("org.apache.logging.log4j:log4j-core:2.20.0") @@ -63,11 +66,11 @@ protobuf { // Set main class application { - mainClass.set("dev.restate.sdk.examples.AppMain") + mainClass.set("examples.order.AppMain") } jib { to.image = "restate-app:0.0.1" - container.mainClass = "dev.restate.sdk.examples.AppMain" + container.mainClass = "examples.order.AppMain" } diff --git a/java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/DeliveryManager.java b/java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/DeliveryManager.java deleted file mode 100644 index 7233f38c..00000000 --- a/java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/DeliveryManager.java +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Copyright (c) 2024 - Restate Software, Inc., Restate GmbH - * - * This file is part of the Restate examples, - * which is released under the MIT license. - * - * You can find a copy of the license in the file LICENSE - * in the root directory of this repository or package or at - * https://github.com/restatedev/examples/ - */ - -package dev.restate.sdk.examples; - -import static dev.restate.sdk.examples.generated.OrderProto.*; -import static dev.restate.sdk.examples.utils.TypeUtils.statusToProto; - -import dev.restate.sdk.RestateContext; -import dev.restate.sdk.common.CoreSerdes; -import dev.restate.sdk.common.Serde; -import dev.restate.sdk.common.StateKey; -import dev.restate.sdk.common.TerminalException; -import dev.restate.sdk.examples.generated.DeliveryManagerRestate; -import dev.restate.sdk.examples.generated.DriverDeliveryMatcherRestate; -import dev.restate.sdk.examples.generated.DriverDigitalTwinRestate; -import dev.restate.sdk.examples.generated.OrderStatusServiceRestate; -import dev.restate.sdk.examples.types.DeliveryInformation; -import dev.restate.sdk.examples.types.Location; -import dev.restate.sdk.examples.types.StatusEnum; -import dev.restate.sdk.examples.utils.GeoUtils; -import dev.restate.sdk.serde.jackson.JacksonSerdes; - -/** - * Manages the delivery of the order to the customer. Keyed by the order ID (similar to the - * OrderService and OrderStatusService). - */ -public class DeliveryManager extends DeliveryManagerRestate.DeliveryManagerRestateImplBase { - - // State key to store all relevant information about the delivery. - private static final StateKey DELIVERY_INFO = - StateKey.of("delivery-info", JacksonSerdes.of(DeliveryInformation.class)); - - private static final Serde locationSerde = JacksonSerdes.of(Location.class); - - /** - * Finds a driver, assigns the delivery job to the driver, and updates the status of the order. - * Gets called by the OrderService when a new order has been prepared and needs to be delivered. - */ - @Override - public void start(RestateContext ctx, DeliveryRequest request) throws TerminalException { - - // Temporary placeholder: random location - var restaurantLocation = ctx.sideEffect(locationSerde, () -> GeoUtils.randomLocation()); - var customerLocation = ctx.sideEffect(locationSerde, () -> GeoUtils.randomLocation()); - - // Store the delivery information in Restate's state store - DeliveryInformation deliveryInfo = - new DeliveryInformation( - request.getOrderId(), - request.getCallback(), - request.getRestaurantId(), - restaurantLocation, - customerLocation, - false); - ctx.set(DELIVERY_INFO, deliveryInfo); - - // Acquire a driver - var driverAwakeable = ctx.awakeable(CoreSerdes.STRING_UTF8); - DriverDeliveryMatcherRestate.newClient(ctx) - .oneWay() - .requestDriverForDelivery( - DeliveryCallback.newBuilder() - .setRegion(GeoUtils.DEMO_REGION) - .setDeliveryCallbackId(driverAwakeable.id()) - .build()); - // Wait until the driver pool service has located a driver - // This awakeable gets resolved either immediately when there is a pending delivery - // or later, when a new delivery comes in. - var driverId = driverAwakeable.await(); - - // Assign the driver to the job - DriverDigitalTwinRestate.newClient(ctx) - .assignDeliveryJob( - AssignDeliveryRequest.newBuilder() - .setDriverId(driverId) - .setOrderId(request.getOrderId()) - .setRestaurantId(request.getRestaurantId()) - .setRestaurantLocation(restaurantLocation.toProto()) - .setCustomerLocation(customerLocation.toProto()) - .build()) - .await(); - - // Update the status of the order to "waiting for the driver" - OrderStatusServiceRestate.newClient(ctx) - .oneWay() - .setStatus(statusToProto(request.getOrderId(), StatusEnum.WAITING_FOR_DRIVER)); - } - - /** - * Notifies that the delivery was picked up. Gets called by the DriverService.NotifyDeliveryPickup - * when the driver has arrived at the restaurant. - */ - @Override - public void notifyDeliveryPickup(RestateContext ctx, OrderId request) throws TerminalException { - // Retrieve the delivery information for this delivery - var delivery = - ctx.get(DELIVERY_INFO) - .orElseThrow( - () -> - new TerminalException( - "Delivery was picked up but there is no ongoing delivery.")); - // Update the status of the delivery to "picked up" - delivery.notifyPickup(); - ctx.set(DELIVERY_INFO, delivery); - - // Update the status of the order to "in delivery" - OrderStatusServiceRestate.newClient(ctx) - .oneWay() - .setStatus(statusToProto(delivery.getOrderId(), StatusEnum.IN_DELIVERY)); - } - - /** - * Notifies that the order was delivered. Gets called by the DriverService.NotifyDeliveryDelivered - * when the driver has delivered the order to the customer. - */ - @Override - public void notifyDeliveryDelivered(RestateContext ctx, OrderId request) - throws TerminalException { - // Retrieve the delivery information for this delivery - var delivery = - ctx.get(DELIVERY_INFO) - .orElseThrow( - () -> - new TerminalException( - "Delivery was delivered but there is no ongoing delivery.")); - // Order has been delivered, so state can be cleared - ctx.clear(DELIVERY_INFO); - - // Notify the OrderService that the delivery has been completed - ctx.awakeableHandle(delivery.getCallbackId()).resolve(CoreSerdes.VOID, null); - } - - /** - * Updates the location of the order. Gets called by - * DriverService.HandleDriverLocationUpdateEvent() (digital twin of the driver) when the driver - * has moved to a new location. - */ - @Override - public void handleDriverLocationUpdate(RestateContext ctx, DeliveryLocationUpdate request) - throws TerminalException { - // Retrieve the delivery information for this delivery - var delivery = - ctx.get(DELIVERY_INFO) - .orElseThrow( - () -> - new TerminalException( - "Driver is doing a delivery but there is no ongoing delivery.")); - - // Parse the new location, and calculate the ETA of the delivery to the customer - var location = Location.fromProto(request.getLocation()); - var eta = - delivery.isOrderPickedUp() - ? GeoUtils.calculateEtaMillis(location, delivery.getCustomerLocation()) - : GeoUtils.calculateEtaMillis(location, delivery.getRestaurantLocation()) - + GeoUtils.calculateEtaMillis( - delivery.getRestaurantLocation(), delivery.getCustomerLocation()); - - // Update the ETA of the order - OrderStatusServiceRestate.newClient(ctx) - .oneWay() - .setETA(OrderStatus.newBuilder().setOrderId(delivery.getOrderId()).setEta(eta).build()); - } -} diff --git a/java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/OrderWorkflow.java b/java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/OrderWorkflow.java deleted file mode 100644 index ce0092d6..00000000 --- a/java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/OrderWorkflow.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Copyright (c) 2024 - Restate Software, Inc., Restate GmbH - * - * This file is part of the Restate examples, - * which is released under the MIT license. - * - * You can find a copy of the license in the file LICENSE - * in the root directory of this repository or package or at - * https://github.com/restatedev/examples/ - */ - -package dev.restate.sdk.examples; - -import static dev.restate.sdk.examples.utils.TypeUtils.statusToProto; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import dev.restate.sdk.RestateContext; -import dev.restate.sdk.common.CoreSerdes; -import dev.restate.sdk.common.TerminalException; -import dev.restate.sdk.examples.clients.PaymentClient; -import dev.restate.sdk.examples.clients.RestaurantClient; -import dev.restate.sdk.examples.generated.DeliveryManagerRestate; -import dev.restate.sdk.examples.generated.OrderProto.*; -import dev.restate.sdk.examples.generated.OrderStatusServiceRestate; -import dev.restate.sdk.examples.generated.OrderWorkflowRestate; -import dev.restate.sdk.examples.types.OrderRequest; -import dev.restate.sdk.examples.types.StatusEnum; -import java.time.Duration; -import java.util.UUID; - -/** - * Order processing workflow Gets called for each Kafka event that is published to the order topic. - * The event contains the order ID and the raw JSON order. The workflow handles the payment, asks - * the restaurant to start the preparation, and triggers the delivery. - */ -public class OrderWorkflow extends OrderWorkflowRestate.OrderWorkflowRestateImplBase { - private final RestaurantClient restaurant = RestaurantClient.get(); - private final PaymentClient paymentClnt = PaymentClient.get(); - - @Override - public void handleOrderCreationEvent(RestateContext ctx, KafkaOrderEvent event) - throws TerminalException { - var orderStatusSend = OrderStatusServiceRestate.newClient(ctx); - - ObjectMapper mapper = new ObjectMapper(); - OrderRequest order; - try { - order = mapper.readValue(event.getPayload().toStringUtf8(), OrderRequest.class); - } catch (JsonProcessingException e) { - throw new TerminalException("Parsing raw JSON order failed: " + e.getMessage()); - } - String id = order.getOrderId(); - - // 1. Set status - orderStatusSend.oneWay().setStatus(statusToProto(id, StatusEnum.CREATED)); - - // 2. Handle payment - String token = ctx.sideEffect(CoreSerdes.STRING_UTF8, () -> UUID.randomUUID().toString()); - boolean paid = - ctx.sideEffect( - CoreSerdes.BOOLEAN, () -> paymentClnt.charge(id, token, order.getTotalCost())); - - if (!paid) { - orderStatusSend.oneWay().setStatus(statusToProto(id, StatusEnum.REJECTED)); - return; - } - - // 3. Schedule preparation - orderStatusSend.setStatus(statusToProto(order.getOrderId(), StatusEnum.SCHEDULED)); - ctx.sleep(Duration.ofMillis(order.getDeliveryDelay())); - - // 4. Trigger preparation - var preparationAwakeable = ctx.awakeable(CoreSerdes.VOID); - ctx.sideEffect(() -> restaurant.prepare(id, preparationAwakeable.id())); - orderStatusSend.setStatus(statusToProto(id, StatusEnum.IN_PREPARATION)); - - preparationAwakeable.await(); - orderStatusSend.setStatus(statusToProto(id, StatusEnum.SCHEDULING_DELIVERY)); - - // 5. Find a driver and start delivery - var deliveryAwakeable = ctx.awakeable(CoreSerdes.VOID); - - var deliveryRequest = - DeliveryRequest.newBuilder() - .setOrderId(id) - .setRestaurantId(order.getRestaurantId()) - .setCallback(deliveryAwakeable.id()) - .build(); - DeliveryManagerRestate.newClient(ctx).oneWay().start(deliveryRequest); - deliveryAwakeable.await(); - orderStatusSend.setStatus(statusToProto(order.getOrderId(), StatusEnum.DELIVERED)); - } -} diff --git a/java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/utils/TypeUtils.java b/java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/utils/TypeUtils.java deleted file mode 100644 index 7084b579..00000000 --- a/java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/utils/TypeUtils.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright (c) 2024 - Restate Software, Inc., Restate GmbH - * - * This file is part of the Restate examples, - * which is released under the MIT license. - * - * You can find a copy of the license in the file LICENSE - * in the root directory of this repository or package or at - * https://github.com/restatedev/examples/ - */ - -package dev.restate.sdk.examples.utils; - -import dev.restate.sdk.examples.generated.OrderProto.*; -import dev.restate.sdk.examples.types.StatusEnum; - -public class TypeUtils { - public static OrderStatus statusToProto(String id, StatusEnum statusEnum) { - return OrderStatus.newBuilder() - .setOrderId(id) - .setStatus(Status.valueOf(statusEnum.name())) - .build(); - } - - public static OrderId toOrderIdProto(String id) { - return OrderId.newBuilder().setOrderId(id).build(); - } -} diff --git a/java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/AppMain.java b/java/food-ordering/app/restate-app/src/main/java/examples/order/AppMain.java similarity index 80% rename from java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/AppMain.java rename to java/food-ordering/app/restate-app/src/main/java/examples/order/AppMain.java index 3007447c..7f14a482 100644 --- a/java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/AppMain.java +++ b/java/food-ordering/app/restate-app/src/main/java/examples/order/AppMain.java @@ -9,17 +9,17 @@ * https://github.com/restatedev/examples/ */ -package dev.restate.sdk.examples; +package examples.order; -import dev.restate.sdk.examples.external.DriverMobileAppSimulator; +import examples.order.external.DriverMobileAppSimulator; import dev.restate.sdk.http.vertx.RestateHttpEndpointBuilder; public class AppMain { public static void main(String[] args) { RestateHttpEndpointBuilder.builder() - .withService(new OrderWorkflow()) + .with(new OrderWorkflow()) + .withService(new OrderWorkflowSubmitter()) .withService(new OrderStatusService()) - .withService(new DeliveryManager()) .withService(new DriverDeliveryMatcher()) .withService(new DriverDigitalTwin()) .withService(new DriverMobileAppSimulator()) // external mobile app on driver's phone diff --git a/java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/DriverDeliveryMatcher.java b/java/food-ordering/app/restate-app/src/main/java/examples/order/DriverDeliveryMatcher.java similarity index 84% rename from java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/DriverDeliveryMatcher.java rename to java/food-ordering/app/restate-app/src/main/java/examples/order/DriverDeliveryMatcher.java index 033d7a78..174430ea 100644 --- a/java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/DriverDeliveryMatcher.java +++ b/java/food-ordering/app/restate-app/src/main/java/examples/order/DriverDeliveryMatcher.java @@ -9,16 +9,16 @@ * https://github.com/restatedev/examples/ */ -package dev.restate.sdk.examples; - -import static dev.restate.sdk.examples.generated.OrderProto.*; +package examples.order; import com.fasterxml.jackson.core.type.TypeReference; -import dev.restate.sdk.RestateContext; +import dev.restate.sdk.KeyedContext; import dev.restate.sdk.common.CoreSerdes; import dev.restate.sdk.common.StateKey; import dev.restate.sdk.common.TerminalException; -import dev.restate.sdk.examples.generated.DriverDeliveryMatcherRestate; +import examples.order.generated.DriverDeliveryMatcherRestate; +import examples.order.generated.DriverDeliveryMatcherRestate; +import examples.order.generated.OrderProto; import dev.restate.sdk.serde.jackson.JacksonSerdes; import java.util.LinkedList; import java.util.Queue; @@ -44,7 +44,7 @@ public class DriverDeliveryMatcher * in line. If no pending deliveries, driver is added to the available driver pool */ @Override - public void setDriverAvailable(RestateContext ctx, DriverPoolAvailableNotification request) + public void setDriverAvailable(KeyedContext ctx, OrderProto.DriverPoolAvailableNotification request) throws TerminalException { var pendingDeliveries = ctx.get(PENDING_DELIVERIES).orElse(new LinkedList<>()); @@ -54,7 +54,7 @@ public void setDriverAvailable(RestateContext ctx, DriverPoolAvailableNotificati // Update the queue in state. Delivery was removed. ctx.set(PENDING_DELIVERIES, pendingDeliveries); // Notify that delivery is ongoing - ctx.awakeableHandle(nextDelivery).resolve(CoreSerdes.STRING_UTF8, request.getDriverId()); + ctx.awakeableHandle(nextDelivery).resolve(CoreSerdes.JSON_STRING, request.getDriverId()); return; } @@ -69,7 +69,7 @@ public void setDriverAvailable(RestateContext ctx, DriverPoolAvailableNotificati * available. If no available drivers, the delivery is added to the pending deliveries queue */ @Override - public void requestDriverForDelivery(RestateContext ctx, DeliveryCallback request) + public void requestDriverForDelivery(KeyedContext ctx, OrderProto.DeliveryCallback request) throws TerminalException { var availableDrivers = ctx.get(AVAILABLE_DRIVERS).orElse(new LinkedList<>()); @@ -80,7 +80,7 @@ public void requestDriverForDelivery(RestateContext ctx, DeliveryCallback reques ctx.set(AVAILABLE_DRIVERS, availableDrivers); // Notify that delivery is ongoing ctx.awakeableHandle(request.getDeliveryCallbackId()) - .resolve(CoreSerdes.STRING_UTF8, nextAvailableDriver); + .resolve(CoreSerdes.JSON_STRING, nextAvailableDriver); return; } diff --git a/java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/DriverDigitalTwin.java b/java/food-ordering/app/restate-app/src/main/java/examples/order/DriverDigitalTwin.java similarity index 74% rename from java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/DriverDigitalTwin.java rename to java/food-ordering/app/restate-app/src/main/java/examples/order/DriverDigitalTwin.java index 378116ef..4e52f5f4 100644 --- a/java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/DriverDigitalTwin.java +++ b/java/food-ordering/app/restate-app/src/main/java/examples/order/DriverDigitalTwin.java @@ -9,21 +9,19 @@ * https://github.com/restatedev/examples/ */ -package dev.restate.sdk.examples; +package examples.order; -import static dev.restate.sdk.examples.generated.OrderProto.*; -import static dev.restate.sdk.examples.utils.TypeUtils.toOrderIdProto; +import static examples.order.generated.OrderProto.*; import com.google.protobuf.Empty; -import dev.restate.sdk.RestateContext; +import dev.restate.sdk.KeyedContext; import dev.restate.sdk.common.StateKey; import dev.restate.sdk.common.TerminalException; -import dev.restate.sdk.examples.generated.DeliveryManagerRestate; -import dev.restate.sdk.examples.generated.DriverDeliveryMatcherRestate; -import dev.restate.sdk.examples.generated.DriverDigitalTwinRestate; -import dev.restate.sdk.examples.types.AssignedDelivery; -import dev.restate.sdk.examples.types.DriverStatus; -import dev.restate.sdk.examples.types.Location; +import examples.order.generated.DriverDeliveryMatcherRestate; +import examples.order.generated.DriverDigitalTwinRestate; +import examples.order.types.AssignedDelivery; +import examples.order.types.DriverStatus; +import examples.order.types.Location; import dev.restate.sdk.serde.jackson.JacksonSerdes; /** @@ -50,7 +48,7 @@ public class DriverDigitalTwin extends DriverDigitalTwinRestate.DriverDigitalTwi * (DriverMobileAppSimulator) calls this method. */ @Override - public void setDriverAvailable(RestateContext ctx, DriverAvailableNotification request) + public void setDriverAvailable(KeyedContext ctx, DriverAvailableNotification request) throws TerminalException { expectStatus(ctx, DriverStatus.IDLE); @@ -70,7 +68,7 @@ public void setDriverAvailable(RestateContext ctx, DriverAvailableNotification r * location. */ @Override - public void assignDeliveryJob(RestateContext ctx, AssignDeliveryRequest request) + public void assignDeliveryJob(KeyedContext ctx, AssignDeliveryRequest request) throws TerminalException { expectStatus(ctx, DriverStatus.WAITING_FOR_WORK); @@ -89,20 +87,16 @@ public void assignDeliveryJob(RestateContext ctx, AssignDeliveryRequest request) ctx.get(DRIVER_LOCATION) .ifPresent( loc -> - DeliveryManagerRestate.newClient(ctx) + new OrderWorkflowRestateClient(ctx, request.getOrderId()) .oneWay() - .handleDriverLocationUpdate( - DeliveryLocationUpdate.newBuilder() - .setOrderId(request.getOrderId()) - .setLocation(loc.toProto()) - .build())); + .handleDriverLocationUpdate(loc)); } /** * Gets called by the driver's mobile app when he has picked up the delivery from the restaurant. */ @Override - public void notifyDeliveryPickup(RestateContext ctx, DriverId request) throws TerminalException { + public void notifyDeliveryPickup(KeyedContext ctx, DriverId request) throws TerminalException { expectStatus(ctx, DriverStatus.DELIVERING); // Retrieve the ongoing delivery and update its status @@ -116,14 +110,14 @@ public void notifyDeliveryPickup(RestateContext ctx, DriverId request) throws Te ctx.set(ASSIGNED_DELIVERY, currentDelivery); // Update the status of the delivery in the delivery manager - DeliveryManagerRestate.newClient(ctx) - .oneWay() - .notifyDeliveryPickup(toOrderIdProto(currentDelivery.getOrderId())); + new OrderWorkflowRestateClient(ctx, currentDelivery.getOrderId()) + .oneWay() + .notifyDeliveryPickup(); } /** Gets called by the driver's mobile app when he has delivered the order to the customer. */ @Override - public void notifyDeliveryDelivered(RestateContext ctx, DriverId request) + public void notifyDeliveryDelivered(KeyedContext ctx, DriverId request) throws TerminalException { expectStatus(ctx, DriverStatus.DELIVERING); @@ -138,9 +132,9 @@ public void notifyDeliveryDelivered(RestateContext ctx, DriverId request) ctx.clear(ASSIGNED_DELIVERY); // Notify the delivery service that the delivery was delivered - DeliveryManagerRestate.newClient(ctx) - .oneWay() - .notifyDeliveryDelivered(toOrderIdProto(assignedDelivery.getOrderId())); + new OrderWorkflowRestateClient(ctx, assignedDelivery.getOrderId()) + .oneWay() + .notifyDeliveryDelivered(); // Update the status of the driver to idle ctx.set(DRIVER_STATUS, DriverStatus.IDLE); @@ -148,7 +142,7 @@ public void notifyDeliveryDelivered(RestateContext ctx, DriverId request) /** Gets called by the driver's mobile app when he has moved to a new location. */ @Override - public void handleDriverLocationUpdateEvent(RestateContext ctx, KafkaDriverLocationEvent request) + public void handleDriverLocationUpdateEvent(KeyedContext ctx, KafkaDriverLocationEvent request) throws TerminalException { // Update the location of the driver Location location = JacksonSerdes.of(Location.class).deserialize(request.getLocation()); @@ -158,13 +152,9 @@ public void handleDriverLocationUpdateEvent(RestateContext ctx, KafkaDriverLocat ctx.get(ASSIGNED_DELIVERY) .ifPresent( delivery -> - DeliveryManagerRestate.newClient(ctx) - .oneWay() - .handleDriverLocationUpdate( - DeliveryLocationUpdate.newBuilder() - .setOrderId(delivery.getOrderId()) - .setLocation(location.toProto()) - .build())); + new OrderWorkflowRestateClient(ctx, delivery.getOrderId()) + .oneWay() + .handleDriverLocationUpdate(location)); } /** @@ -173,7 +163,7 @@ public void handleDriverLocationUpdateEvent(RestateContext ctx, KafkaDriverLocat * got assigned to him. */ @Override - public AssignedDeliveryResponse getAssignedDelivery(RestateContext ctx, DriverId request) + public AssignedDeliveryResponse getAssignedDelivery(KeyedContext ctx, DriverId request) throws TerminalException { var assignedDelivery = ctx.get(ASSIGNED_DELIVERY); @@ -197,7 +187,7 @@ public AssignedDeliveryResponse getAssignedDelivery(RestateContext ctx, DriverId // If the driver is in a different state, a terminal exception is thrown that stops any retries // from taking place. // Is only called from inside the driver service. - private void expectStatus(RestateContext ctx, DriverStatus expectedStatus) { + private void expectStatus(KeyedContext ctx, DriverStatus expectedStatus) { var currentStatus = ctx.get(DRIVER_STATUS).orElse(DriverStatus.IDLE); if (currentStatus != expectedStatus) { diff --git a/java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/OrderStatusService.java b/java/food-ordering/app/restate-app/src/main/java/examples/order/OrderStatusService.java similarity index 53% rename from java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/OrderStatusService.java rename to java/food-ordering/app/restate-app/src/main/java/examples/order/OrderStatusService.java index 0ea1cf5e..bb56b6a9 100644 --- a/java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/OrderStatusService.java +++ b/java/food-ordering/app/restate-app/src/main/java/examples/order/OrderStatusService.java @@ -9,28 +9,28 @@ * https://github.com/restatedev/examples/ */ -package dev.restate.sdk.examples; +package examples.order; -import static dev.restate.sdk.examples.generated.OrderProto.*; +import static examples.order.generated.OrderProto.*; -import dev.restate.sdk.RestateContext; +import dev.restate.sdk.KeyedContext; import dev.restate.sdk.common.CoreSerdes; import dev.restate.sdk.common.StateKey; import dev.restate.sdk.common.TerminalException; -import dev.restate.sdk.examples.generated.OrderStatusServiceRestate; -import dev.restate.sdk.examples.types.StatusEnum; +import examples.order.generated.OrderStatusServiceRestate; +import examples.order.types.StatusEnum; public class OrderStatusService extends OrderStatusServiceRestate.OrderStatusServiceRestateImplBase { - private static final StateKey ORDER_STATUS = - StateKey.of("order-status", CoreSerdes.STRING_UTF8); - private static final StateKey ORDER_ETA = StateKey.of("order-eta", CoreSerdes.LONG); + private static final StateKey ORDER_ETA = StateKey.of("order-eta", CoreSerdes.JSON_LONG); /** Gets called by the webUI frontend to display the status of an order. */ @Override - public OrderStatus get(RestateContext ctx, OrderId request) throws TerminalException { - var orderStatusState = ctx.get(ORDER_STATUS).orElse("NEW"); - var status = StatusEnum.valueOf(orderStatusState); + public OrderStatus get(KeyedContext ctx, OrderId request) throws TerminalException { + var status = new OrderWorkflowRestateClient(ctx, request.getOrderId()) + .getState(OrderWorkflow.STATUS) + .await() + .orElse(StatusEnum.NEW); var eta = ctx.get(ORDER_ETA).orElse(-1L); return OrderStatus.newBuilder() .setOrderId(request.getOrderId()) @@ -40,12 +40,7 @@ public OrderStatus get(RestateContext ctx, OrderId request) throws TerminalExcep } @Override - public void setStatus(RestateContext ctx, OrderStatus request) throws TerminalException { - ctx.set(ORDER_STATUS, request.getStatus().name()); - } - - @Override - public void setETA(RestateContext ctx, OrderStatus request) throws TerminalException { + public void setETA(KeyedContext ctx, NewOrderETA request) throws TerminalException { ctx.set(ORDER_ETA, request.getEta()); } } diff --git a/java/food-ordering/app/restate-app/src/main/java/examples/order/OrderWorkflow.java b/java/food-ordering/app/restate-app/src/main/java/examples/order/OrderWorkflow.java new file mode 100644 index 00000000..c1830338 --- /dev/null +++ b/java/food-ordering/app/restate-app/src/main/java/examples/order/OrderWorkflow.java @@ -0,0 +1,188 @@ +/* + * Copyright (c) 2024 - Restate Software, Inc., Restate GmbH + * + * This file is part of the Restate examples, + * which is released under the MIT license. + * + * You can find a copy of the license in the file LICENSE + * in the root directory of this repository or package or at + * https://github.com/restatedev/examples/ + */ + +package examples.order; + +import dev.restate.sdk.annotation.Service; +import dev.restate.sdk.annotation.ServiceType; +import dev.restate.sdk.annotation.Shared; +import dev.restate.sdk.annotation.Workflow; +import dev.restate.sdk.common.CoreSerdes; +import dev.restate.sdk.common.Serde; +import dev.restate.sdk.common.StateKey; +import dev.restate.sdk.common.TerminalException; +import examples.order.clients.PaymentClient; +import examples.order.clients.RestaurantClient; +import examples.order.generated.DriverDeliveryMatcherRestate; +import examples.order.generated.DriverDigitalTwinRestate; +import examples.order.generated.OrderProto; +import examples.order.generated.OrderStatusServiceRestate; +import examples.order.types.DeliveryInformation; +import examples.order.types.Location; +import examples.order.types.OrderRequest; +import examples.order.types.StatusEnum; +import examples.order.utils.GeoUtils; +import examples.order.generated.DriverDeliveryMatcherRestate; +import dev.restate.sdk.serde.jackson.JacksonSerdes; +import dev.restate.sdk.workflow.DurablePromiseKey; +import dev.restate.sdk.workflow.WorkflowContext; +import dev.restate.sdk.workflow.WorkflowSharedContext; + +import java.time.Duration; +import java.util.UUID; + +/** + * Order processing workflow Gets called for each Kafka event that is published to the order topic. + * The event contains the order ID and the raw JSON order. The workflow handles the payment, asks + * the restaurant to start the preparation, and triggers the delivery. + */ +@Service(ServiceType.WORKFLOW) +public class OrderWorkflow { + + public final static StateKey STATUS = StateKey.of("status", JacksonSerdes.of(StatusEnum.class)); + private static final StateKey DELIVERY_INFO = StateKey.of("delivery-info", JacksonSerdes.of(DeliveryInformation.class)); + + private static final Serde LOCATION_SERDE = JacksonSerdes.of(Location.class); + + private static final DurablePromiseKey PICKED_UP_SIGNAL = DurablePromiseKey.of("delivery-picked-up", CoreSerdes.VOID); + private static final DurablePromiseKey DELIVERED_SIGNAL = DurablePromiseKey.of("delivery-delivered", CoreSerdes.VOID); + + private final RestaurantClient restaurant = RestaurantClient.get(); + private final PaymentClient paymentClnt = PaymentClient.get(); + + @Workflow + public void run(WorkflowContext ctx, OrderRequest order) + throws TerminalException { + String orderId = ctx.workflowKey(); + + // 1. Set status + ctx.set(STATUS, StatusEnum.CREATED); + + // 2. Handle payment + String token = ctx.sideEffect(CoreSerdes.JSON_STRING, () -> UUID.randomUUID().toString()); + boolean paid = + ctx.sideEffect( + CoreSerdes.JSON_BOOLEAN, () -> paymentClnt.charge(orderId, token, order.getTotalCost())); + + if (!paid) { + ctx.set(STATUS, StatusEnum.REJECTED); + return; + } + + // 3. Schedule preparation + ctx.set(STATUS, StatusEnum.SCHEDULED); + ctx.sleep(Duration.ofMillis(order.getDeliveryDelay())); + + // 4. Trigger preparation + var preparationAwakeable = ctx.awakeable(CoreSerdes.VOID); + ctx.sideEffect(() -> restaurant.prepare(orderId, preparationAwakeable.id())); + ctx.set(STATUS, StatusEnum.IN_PREPARATION); + + preparationAwakeable.await(); + ctx.set(STATUS, StatusEnum.SCHEDULING_DELIVERY); + + var restaurantLocation = ctx.sideEffect(LOCATION_SERDE, GeoUtils::randomLocation); + var customerLocation = ctx.sideEffect(LOCATION_SERDE, GeoUtils::randomLocation); + + // 5. Store the delivery information in Restate's state store, the UI can query this + DeliveryInformation deliveryInfo = + new DeliveryInformation( + order.getRestaurantId(), + restaurantLocation, + customerLocation, + false); + ctx.set(DELIVERY_INFO, deliveryInfo); + + // 6. Acquire a driver + var driverAwakeable = ctx.awakeable(CoreSerdes.JSON_STRING); + DriverDeliveryMatcherRestate.newClient(ctx) + .oneWay() + .requestDriverForDelivery( + OrderProto.DeliveryCallback.newBuilder() + .setRegion(GeoUtils.DEMO_REGION) + .setDeliveryCallbackId(driverAwakeable.id()) + .build()); + ctx.set(STATUS, StatusEnum.WAITING_FOR_DRIVER); + // Wait until the driver pool service has located a driver + // This awakeable gets resolved either immediately when there is a pending delivery + // or later, when a new delivery comes in. + var driverId = driverAwakeable.await(); + + // 7. Assign the driver to the job, the driver will notify back the workflow by using the other methods + DriverDigitalTwinRestate.newClient(ctx) + .assignDeliveryJob( + OrderProto.AssignDeliveryRequest.newBuilder() + .setDriverId(driverId) + .setOrderId(orderId) + .setRestaurantId(order.getRestaurantId()) + .setRestaurantLocation(restaurantLocation.toProto()) + .setCustomerLocation(customerLocation.toProto()) + .build()) + .await(); + ctx.set(STATUS, StatusEnum.SCHEDULING_DELIVERY); + + // 8. Wait for the delivery to be picked up by the driver + ctx.durablePromise(PICKED_UP_SIGNAL).awaitable().await(); + ctx.set(STATUS, StatusEnum.IN_DELIVERY); + + // 9. Wait for the delivery to be completed + ctx.durablePromise(DELIVERED_SIGNAL).awaitable().await(); + + // 10. Delivered, enjoy the food! + ctx.set(STATUS, StatusEnum.DELIVERED); + } + + // --- Methods invoked by DriverDigitalTwin to update the delivery status + + /** + * Notifies that the delivery was picked up by the driver. + */ + @Shared + public void notifyDeliveryPickup(WorkflowSharedContext ctx) throws TerminalException { + ctx.durablePromiseHandle(PICKED_UP_SIGNAL).resolve(null); + } + + /** + * Notifies that the order was delivered. + */ + @Shared + public void notifyDeliveryDelivered(WorkflowSharedContext ctx) + throws TerminalException { + ctx.durablePromiseHandle(DELIVERED_SIGNAL).resolve(null); + } + + /** + * Updates the location of the order. + */ + @Shared + public void handleDriverLocationUpdate(WorkflowSharedContext ctx, Location location) + throws TerminalException { + // Retrieve the delivery information for this delivery + var delivery = + ctx.get(DELIVERY_INFO) + .orElseThrow( + () -> + new TerminalException( + "We expect the order already has a delivery registered.")); + + // Parse the new location, and calculate the ETA of the delivery to the customer + var eta = + delivery.isOrderPickedUp() + ? GeoUtils.calculateEtaMillis(location, delivery.getCustomerLocation()) + : GeoUtils.calculateEtaMillis(location, delivery.getRestaurantLocation()) + + GeoUtils.calculateEtaMillis(delivery.getRestaurantLocation(), delivery.getCustomerLocation()); + + // Update the ETA of the order + OrderStatusServiceRestate.newClient(ctx) + .oneWay() + .setETA(OrderProto.NewOrderETA.newBuilder().setOrderId(ctx.workflowKey()).setEta(eta).build()); + } +} diff --git a/java/food-ordering/app/restate-app/src/main/java/examples/order/OrderWorkflowSubmitter.java b/java/food-ordering/app/restate-app/src/main/java/examples/order/OrderWorkflowSubmitter.java new file mode 100644 index 00000000..50115be5 --- /dev/null +++ b/java/food-ordering/app/restate-app/src/main/java/examples/order/OrderWorkflowSubmitter.java @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2024 - Restate Software, Inc., Restate GmbH + * + * This file is part of the Restate examples, + * which is released under the MIT license. + * + * You can find a copy of the license in the file LICENSE + * in the root directory of this repository or package or at + * https://github.com/restatedev/examples/ + */ + +package examples.order; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import dev.restate.sdk.KeyedContext; +import dev.restate.sdk.common.TerminalException; +import examples.order.generated.OrderProto.KafkaOrderEvent; +import examples.order.generated.OrderWorkflowSubmitterRestate; +import examples.order.types.OrderRequest; + +/** + * This service submits order workflows starting from kafka events + */ +public class OrderWorkflowSubmitter extends OrderWorkflowSubmitterRestate.OrderWorkflowSubmitterRestateImplBase { + + @Override + public void handleOrderCreationEvent(KeyedContext ctx, KafkaOrderEvent event) + throws TerminalException { + ObjectMapper mapper = new ObjectMapper(); + OrderRequest order; + try { + order = mapper.readValue(event.getPayload().toStringUtf8(), OrderRequest.class); + } catch (JsonProcessingException e) { + throw new TerminalException("Parsing raw JSON order failed: " + e.getMessage()); + } + + var orderWorkflow = new OrderWorkflowRestateClient(ctx, order.getOrderId()); + orderWorkflow.submit(order); + } +} diff --git a/java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/clients/KafkaPublisher.java b/java/food-ordering/app/restate-app/src/main/java/examples/order/clients/KafkaPublisher.java similarity index 97% rename from java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/clients/KafkaPublisher.java rename to java/food-ordering/app/restate-app/src/main/java/examples/order/clients/KafkaPublisher.java index 0d84d65c..90c6a63d 100644 --- a/java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/clients/KafkaPublisher.java +++ b/java/food-ordering/app/restate-app/src/main/java/examples/order/clients/KafkaPublisher.java @@ -9,7 +9,7 @@ * https://github.com/restatedev/examples/ */ -package dev.restate.sdk.examples.clients; +package examples.order.clients; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; diff --git a/java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/clients/PaymentClient.java b/java/food-ordering/app/restate-app/src/main/java/examples/order/clients/PaymentClient.java similarity index 95% rename from java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/clients/PaymentClient.java rename to java/food-ordering/app/restate-app/src/main/java/examples/order/clients/PaymentClient.java index 6d7816dc..776c640b 100644 --- a/java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/clients/PaymentClient.java +++ b/java/food-ordering/app/restate-app/src/main/java/examples/order/clients/PaymentClient.java @@ -9,7 +9,7 @@ * https://github.com/restatedev/examples/ */ -package dev.restate.sdk.examples.clients; +package examples.order.clients; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; diff --git a/java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/clients/RestaurantClient.java b/java/food-ordering/app/restate-app/src/main/java/examples/order/clients/RestaurantClient.java similarity index 97% rename from java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/clients/RestaurantClient.java rename to java/food-ordering/app/restate-app/src/main/java/examples/order/clients/RestaurantClient.java index 74913b64..1a6c631d 100644 --- a/java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/clients/RestaurantClient.java +++ b/java/food-ordering/app/restate-app/src/main/java/examples/order/clients/RestaurantClient.java @@ -9,7 +9,7 @@ * https://github.com/restatedev/examples/ */ -package dev.restate.sdk.examples.clients; +package examples.order.clients; import dev.restate.sdk.common.TerminalException; import java.io.IOException; diff --git a/java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/external/DriverMobileAppSimulator.java b/java/food-ordering/app/restate-app/src/main/java/examples/order/external/DriverMobileAppSimulator.java similarity index 88% rename from java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/external/DriverMobileAppSimulator.java rename to java/food-ordering/app/restate-app/src/main/java/examples/order/external/DriverMobileAppSimulator.java index 06af8cb3..78b39568 100644 --- a/java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/external/DriverMobileAppSimulator.java +++ b/java/food-ordering/app/restate-app/src/main/java/examples/order/external/DriverMobileAppSimulator.java @@ -9,19 +9,18 @@ * https://github.com/restatedev/examples/ */ -package dev.restate.sdk.examples.external; +package examples.order.external; -import static dev.restate.sdk.examples.generated.OrderProto.*; - -import dev.restate.sdk.RestateContext; +import dev.restate.sdk.KeyedContext; import dev.restate.sdk.common.StateKey; import dev.restate.sdk.common.TerminalException; -import dev.restate.sdk.examples.clients.KafkaPublisher; -import dev.restate.sdk.examples.generated.DriverDigitalTwinRestate; -import dev.restate.sdk.examples.generated.DriverMobileAppSimulatorRestate; -import dev.restate.sdk.examples.types.AssignedDelivery; -import dev.restate.sdk.examples.types.Location; -import dev.restate.sdk.examples.utils.GeoUtils; +import examples.order.clients.KafkaPublisher; +import examples.order.generated.DriverDigitalTwinRestate; +import examples.order.generated.DriverMobileAppSimulatorRestate; +import examples.order.types.AssignedDelivery; +import examples.order.types.Location; +import examples.order.utils.GeoUtils; +import examples.order.generated.OrderProto; import dev.restate.sdk.serde.jackson.JacksonSerdes; import java.time.Duration; import org.apache.logging.log4j.LogManager; @@ -53,7 +52,7 @@ public class DriverMobileAppSimulator /** Mimics the driver setting himself to available in the app */ @Override - public void startDriver(RestateContext ctx, DriverId request) throws TerminalException { + public void startDriver(KeyedContext ctx, OrderProto.DriverId request) throws TerminalException { // If this driver was already created, do nothing if (ctx.get(CURRENT_LOCATION).isPresent()) { return; @@ -68,7 +67,7 @@ public void startDriver(RestateContext ctx, DriverId request) throws TerminalExc // Tell the digital twin of the driver in the food ordering app, that he is available DriverDigitalTwinRestate.newClient(ctx) .setDriverAvailable( - DriverAvailableNotification.newBuilder() + OrderProto.DriverAvailableNotification.newBuilder() .setDriverId(request.getDriverId()) .setRegion(GeoUtils.DEMO_REGION) .build()) @@ -83,7 +82,7 @@ public void startDriver(RestateContext ctx, DriverId request) throws TerminalExc * again after a short delay. */ @Override - public void pollForWork(RestateContext ctx, DriverId request) throws TerminalException { + public void pollForWork(KeyedContext ctx, OrderProto.DriverId request) throws TerminalException { var thisDriverSim = DriverMobileAppSimulatorRestate.newClient(ctx); // Ask the digital twin of the driver in the food ordering app, if he already got a job assigned @@ -113,7 +112,7 @@ public void pollForWork(RestateContext ctx, DriverId request) throws TerminalExc /** Periodically lets the food ordering app know the new location */ @Override - public void move(RestateContext ctx, DriverId request) throws TerminalException { + public void move(KeyedContext ctx, OrderProto.DriverId request) throws TerminalException { var thisDriverSim = DriverMobileAppSimulatorRestate.newClient(ctx); var assignedDelivery = ctx.get(ASSIGNED_DELIVERY) @@ -151,7 +150,7 @@ public void move(RestateContext ctx, DriverId request) throws TerminalException DriverDigitalTwinRestate.newClient(ctx) .oneWay() .setDriverAvailable( - DriverAvailableNotification.newBuilder() + OrderProto.DriverAvailableNotification.newBuilder() .setDriverId(request.getDriverId()) .setRegion(GeoUtils.DEMO_REGION) .build()); diff --git a/java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/types/AssignedDelivery.java b/java/food-ordering/app/restate-app/src/main/java/examples/order/types/AssignedDelivery.java similarity index 97% rename from java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/types/AssignedDelivery.java rename to java/food-ordering/app/restate-app/src/main/java/examples/order/types/AssignedDelivery.java index c6d9254a..492fc7ca 100644 --- a/java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/types/AssignedDelivery.java +++ b/java/food-ordering/app/restate-app/src/main/java/examples/order/types/AssignedDelivery.java @@ -9,7 +9,7 @@ * https://github.com/restatedev/examples/ */ -package dev.restate.sdk.examples.types; +package examples.order.types; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; diff --git a/java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/types/DeliveryInformation.java b/java/food-ordering/app/restate-app/src/main/java/examples/order/types/DeliveryInformation.java similarity index 79% rename from java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/types/DeliveryInformation.java rename to java/food-ordering/app/restate-app/src/main/java/examples/order/types/DeliveryInformation.java index 60b8cd1b..6ad261ea 100644 --- a/java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/types/DeliveryInformation.java +++ b/java/food-ordering/app/restate-app/src/main/java/examples/order/types/DeliveryInformation.java @@ -9,14 +9,12 @@ * https://github.com/restatedev/examples/ */ -package dev.restate.sdk.examples.types; +package examples.order.types; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; public class DeliveryInformation { - private final String orderId; - private final String callbackId; private final String restaurantId; private final Location restaurantLocation; private final Location customerLocation; @@ -24,28 +22,16 @@ public class DeliveryInformation { @JsonCreator public DeliveryInformation( - @JsonProperty("orderId") String orderId, - @JsonProperty("callbackId") String callbackId, @JsonProperty("restaurantId") String restaurantId, @JsonProperty("restaurantLocation") Location restaurantLocation, @JsonProperty("customerLocation") Location customerLocation, @JsonProperty("orderPickedUp") boolean orderPickedUp) { - this.orderId = orderId; - this.callbackId = callbackId; this.restaurantId = restaurantId; this.restaurantLocation = restaurantLocation; this.customerLocation = customerLocation; this.orderPickedUp = orderPickedUp; } - public String getOrderId() { - return orderId; - } - - public String getCallbackId() { - return callbackId; - } - public String getRestaurantId() { return restaurantId; } diff --git a/java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/types/DriverStatus.java b/java/food-ordering/app/restate-app/src/main/java/examples/order/types/DriverStatus.java similarity index 90% rename from java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/types/DriverStatus.java rename to java/food-ordering/app/restate-app/src/main/java/examples/order/types/DriverStatus.java index 65f915e2..ee8ae5e2 100644 --- a/java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/types/DriverStatus.java +++ b/java/food-ordering/app/restate-app/src/main/java/examples/order/types/DriverStatus.java @@ -9,7 +9,7 @@ * https://github.com/restatedev/examples/ */ -package dev.restate.sdk.examples.types; +package examples.order.types; public enum DriverStatus { IDLE, diff --git a/java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/types/Location.java b/java/food-ordering/app/restate-app/src/main/java/examples/order/types/Location.java similarity index 92% rename from java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/types/Location.java rename to java/food-ordering/app/restate-app/src/main/java/examples/order/types/Location.java index 3570b114..0f964398 100644 --- a/java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/types/Location.java +++ b/java/food-ordering/app/restate-app/src/main/java/examples/order/types/Location.java @@ -9,11 +9,11 @@ * https://github.com/restatedev/examples/ */ -package dev.restate.sdk.examples.types; +package examples.order.types; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import dev.restate.sdk.examples.generated.OrderProto; +import examples.order.generated.OrderProto; public class Location { private final double lon; diff --git a/java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/types/OrderRequest.java b/java/food-ordering/app/restate-app/src/main/java/examples/order/types/OrderRequest.java similarity index 97% rename from java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/types/OrderRequest.java rename to java/food-ordering/app/restate-app/src/main/java/examples/order/types/OrderRequest.java index 19e16eee..f902dbdd 100644 --- a/java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/types/OrderRequest.java +++ b/java/food-ordering/app/restate-app/src/main/java/examples/order/types/OrderRequest.java @@ -9,7 +9,7 @@ * https://github.com/restatedev/examples/ */ -package dev.restate.sdk.examples.types; +package examples.order.types; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; diff --git a/java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/types/Product.java b/java/food-ordering/app/restate-app/src/main/java/examples/order/types/Product.java similarity index 95% rename from java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/types/Product.java rename to java/food-ordering/app/restate-app/src/main/java/examples/order/types/Product.java index 9a4cd0cd..7ca2d4d4 100644 --- a/java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/types/Product.java +++ b/java/food-ordering/app/restate-app/src/main/java/examples/order/types/Product.java @@ -9,7 +9,7 @@ * https://github.com/restatedev/examples/ */ -package dev.restate.sdk.examples.types; +package examples.order.types; import com.fasterxml.jackson.annotation.JsonProperty; diff --git a/java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/types/StatusEnum.java b/java/food-ordering/app/restate-app/src/main/java/examples/order/types/StatusEnum.java similarity index 90% rename from java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/types/StatusEnum.java rename to java/food-ordering/app/restate-app/src/main/java/examples/order/types/StatusEnum.java index 96da52d9..c8f2aa0a 100644 --- a/java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/types/StatusEnum.java +++ b/java/food-ordering/app/restate-app/src/main/java/examples/order/types/StatusEnum.java @@ -9,7 +9,7 @@ * https://github.com/restatedev/examples/ */ -package dev.restate.sdk.examples.types; +package examples.order.types; public enum StatusEnum { NEW(0), @@ -21,8 +21,7 @@ public enum StatusEnum { IN_DELIVERY(6), DELIVERED(7), REJECTED(8), - CANCELLED(9), - UNKNOWN(10); + CANCELLED(9); private final int value; diff --git a/java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/utils/GeoUtils.java b/java/food-ordering/app/restate-app/src/main/java/examples/order/utils/GeoUtils.java similarity index 95% rename from java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/utils/GeoUtils.java rename to java/food-ordering/app/restate-app/src/main/java/examples/order/utils/GeoUtils.java index 548165e3..87b6d34d 100644 --- a/java/food-ordering/app/restate-app/src/main/java/dev/restate/sdk/examples/utils/GeoUtils.java +++ b/java/food-ordering/app/restate-app/src/main/java/examples/order/utils/GeoUtils.java @@ -9,9 +9,9 @@ * https://github.com/restatedev/examples/ */ -package dev.restate.sdk.examples.utils; +package examples.order.utils; -import dev.restate.sdk.examples.types.Location; +import examples.order.types.Location; public class GeoUtils { public static final String DEMO_REGION = "San Jose (CA)"; diff --git a/java/food-ordering/app/restate-app/src/main/java/examples/order/utils/TypeUtils.java b/java/food-ordering/app/restate-app/src/main/java/examples/order/utils/TypeUtils.java new file mode 100644 index 00000000..3d8375a4 --- /dev/null +++ b/java/food-ordering/app/restate-app/src/main/java/examples/order/utils/TypeUtils.java @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2024 - Restate Software, Inc., Restate GmbH + * + * This file is part of the Restate examples, + * which is released under the MIT license. + * + * You can find a copy of the license in the file LICENSE + * in the root directory of this repository or package or at + * https://github.com/restatedev/examples/ + */ + +package examples.order.utils; + +import examples.order.generated.OrderProto; + +public class TypeUtils { + + public static OrderProto.OrderId toOrderIdProto(String id) { + return OrderProto.OrderId.newBuilder().setOrderId(id).build(); + } +} diff --git a/java/food-ordering/app/restate-app/src/main/proto/order.proto b/java/food-ordering/app/restate-app/src/main/proto/order.proto index df599cae..99bd0f17 100644 --- a/java/food-ordering/app/restate-app/src/main/proto/order.proto +++ b/java/food-ordering/app/restate-app/src/main/proto/order.proto @@ -1,15 +1,15 @@ syntax = "proto3"; -package order; +package examples.order; import "google/protobuf/empty.proto"; import "dev/restate/ext.proto"; import "dev/restate/events.proto"; -option java_package = "dev.restate.sdk.examples.generated"; +option java_package = "examples.order.generated"; option java_outer_classname = "OrderProto"; -service OrderWorkflow { +service OrderWorkflowSubmitter { option (dev.restate.ext.service_type) = KEYED; rpc HandleOrderCreationEvent(KafkaOrderEvent) returns (google.protobuf.Empty); @@ -20,13 +20,13 @@ message KafkaOrderEvent { bytes payload = 2 [(dev.restate.ext.field) = EVENT_PAYLOAD]; } +// Interface to retrieve status service OrderStatusService { option (dev.restate.ext.service_type) = KEYED; rpc Get(OrderId) returns (OrderStatus); - rpc SetStatus(OrderStatus) returns (google.protobuf.Empty); - rpc SetETA(OrderStatus) returns (google.protobuf.Empty); + rpc SetETA(NewOrderETA) returns (google.protobuf.Empty); } message OrderId { @@ -39,6 +39,11 @@ message OrderStatus { int64 eta = 3; } +message NewOrderETA { + string order_id = 1 [(dev.restate.ext.field) = KEY]; + int64 eta = 3; +} + enum Status { NEW = 0; CREATED = 1; @@ -52,26 +57,6 @@ enum Status { CANCELLED = 9; } -service DeliveryManager { - option (dev.restate.ext.service_type) = KEYED; - - rpc Start(DeliveryRequest) returns (google.protobuf.Empty); - rpc NotifyDeliveryPickup(OrderId) returns (google.protobuf.Empty); - rpc NotifyDeliveryDelivered(OrderId) returns (google.protobuf.Empty); - rpc HandleDriverLocationUpdate(DeliveryLocationUpdate) returns (google.protobuf.Empty); -} - -message DeliveryRequest { - string order_id = 1 [(dev.restate.ext.field) = KEY]; - string restaurant_id = 2; - string callback = 3; -} - -message DeliveryLocationUpdate { - string order_id = 1 [(dev.restate.ext.field) = KEY]; - Location location = 2; -} - message Location { double lon = 1; double lat = 2; diff --git a/java/food-ordering/docker-compose.yaml b/java/food-ordering/docker-compose.yaml index 976f81a0..029960d2 100644 --- a/java/food-ordering/docker-compose.yaml +++ b/java/food-ordering/docker-compose.yaml @@ -79,7 +79,7 @@ services: - KAFKA_BOOTSTRAP_SERVERS=broker:29092 runtime: - image: docker.io/restatedev/restate:0.7 + image: ghcr.io/restatedev/restate:main depends_on: - restate_app - restaurantpos @@ -91,7 +91,7 @@ services: - "9071:9071" - "8080:8080" volumes: - - ./restate-docker.yaml:/restate.yaml + - ./restate-docker.yaml:/restate.yaml:z,ro environment: - RESTATE_OBSERVABILITY__TRACING__ENDPOINT=http://jaeger:4317 - RESTATE_CONFIG=/restate.yaml @@ -106,11 +106,11 @@ services: apk add --no-cache bash jq curl && curl -X POST 'runtime:9070/deployments' -H 'content-type: application/json' -d '{\"uri\": \"http://restate_app:9080\"}' && sleep 3 && - curl -X POST 'runtime:9070/subscriptions' -H 'content-type: application/json' -d '{ \"source\":\"kafka://my-cluster/orders\", \"sink\":\"service://order.OrderWorkflow/HandleOrderCreationEvent\" }' && - curl -X POST 'runtime:9070/subscriptions' -H 'content-type: application/json' -d '{ \"source\":\"kafka://my-cluster/driver-updates\", \"sink\":\"service://order.DriverDigitalTwin/HandleDriverLocationUpdateEvent\" }' && + curl -X POST 'runtime:9070/subscriptions' -H 'content-type: application/json' -d '{ \"source\":\"kafka://my-cluster/orders\", \"sink\":\"service://examples.order.OrderWorkflowSubmitter/HandleOrderCreationEvent\" }' && + curl -X POST 'runtime:9070/subscriptions' -H 'content-type: application/json' -d '{ \"source\":\"kafka://my-cluster/driver-updates\", \"sink\":\"service://examples.order.DriverDigitalTwin/HandleDriverLocationUpdateEvent\" }' && sleep 3 && - curl -X POST -H 'content-type: application/json' 'runtime:8080/order.DriverMobileAppSimulator/StartDriver' -d '{\"driver_id\": \"driver-A\"}' && - curl -X POST -H 'content-type: application/json' 'runtime:8080/order.DriverMobileAppSimulator/StartDriver' -d '{\"driver_id\": \"driver-B\"}' && + curl -X POST -H 'content-type: application/json' 'runtime:8080/examples.order.DriverMobileAppSimulator/StartDriver' -d '{\"driver_id\": \"driver-A\"}' && + curl -X POST -H 'content-type: application/json' 'runtime:8080/examples.order.DriverMobileAppSimulator/StartDriver' -d '{\"driver_id\": \"driver-B\"}' && exit 1"] foodorderingwebui: diff --git a/java/food-ordering/webui/src/components/Cart/Cart.tsx b/java/food-ordering/webui/src/components/Cart/Cart.tsx index 5f76df16..86f1b7bf 100644 --- a/java/food-ordering/webui/src/components/Cart/Cart.tsx +++ b/java/food-ordering/webui/src/components/Cart/Cart.tsx @@ -97,12 +97,12 @@ const Cart = () => { await publishToKafka(kafkaRecord); } else { console.info(request); - sendRequestToRestate('order.OrderService', 'Create', request); + sendRequestToRestate('examples.order.OrderService', 'Create', request); } let done = false; while (!done) { - const newOrderStatus = await sendRequestToRestate('order.OrderStatusService', 'Get', { + const newOrderStatus = await sendRequestToRestate('examples.order.OrderStatusService', 'Get', { order_id: user!.user_id, }); console.info(newOrderStatus); diff --git a/java/food-ordering/webui/src/contexts/status-context/useOrderStatus.tsx b/java/food-ordering/webui/src/contexts/status-context/useOrderStatus.tsx index a54cf523..85e1707f 100644 --- a/java/food-ordering/webui/src/contexts/status-context/useOrderStatus.tsx +++ b/java/food-ordering/webui/src/contexts/status-context/useOrderStatus.tsx @@ -10,7 +10,7 @@ const useOrderStatus = () => { const fetchOrderStatus = useCallback(() => { if (user) { console.debug(`Found user ${user}`); - sendRequestToRestate('order.OrderStatusService', 'Get', { + sendRequestToRestate('examples.order.OrderStatusService', 'Get', { order_id: user!.shopping_cart_id, }).then((response) => { setOrderStatus(response.response);