From 83269111725cd2ee543281c11f1642fc7926326b Mon Sep 17 00:00:00 2001 From: johannes karoff Date: Tue, 11 Jul 2023 13:55:17 +0200 Subject: [PATCH] handle aws sns-subscribe-confirmation messages on event endpoint --- cli/src/main/scala/http/DevServer.scala | 38 +++++++++++++++++++------ 1 file changed, 30 insertions(+), 8 deletions(-) diff --git a/cli/src/main/scala/http/DevServer.scala b/cli/src/main/scala/http/DevServer.scala index 0daaeb2..1f51121 100644 --- a/cli/src/main/scala/http/DevServer.scala +++ b/cli/src/main/scala/http/DevServer.scala @@ -8,6 +8,7 @@ import funstack.local.helper.{AccessToken, Base64Codec} import funstack.local.ws.WebsocketConnections import net.exoego.facade.aws_lambda import net.exoego.facade.aws_lambda.{APIGatewayProxyEventV2, APIGatewayProxyStructuredResultV2} +import org.scalajs.dom.Fetch import typings.node.httpMod.{createServer, IncomingMessage, Server, ServerResponse} import typings.node.{Buffer => JsBuffer} @@ -15,6 +16,7 @@ import java.net.URI import scala.concurrent.Future import scala.scalajs.js import scala.scalajs.js.JSConverters._ +import scala.util.{Failure, Success} object DevServer { type FunctionType = @@ -54,13 +56,33 @@ object DevServer { req.url.toOption match { case Some("/__/send/event") => try { - val bodyStr = body.result() - val request = js.JSON.parse(bodyStr) - val subscriptionKey = request.MessageAttributes.subscription_key.Value.asInstanceOf[String] - val message = request.Message.asInstanceOf[String] - WebsocketConnections.sendSubscription(subscriptionKey, message) - res.statusCode = 200 - res.end() + val bodyStr = body.result() + val request = js.JSON.parse(bodyStr) + + val subscribeUrl = request.SubscribeURL.asInstanceOf[js.UndefOr[String]].toOption + + val result = subscribeUrl match { + case Some(url) => + Fetch.fetch(url).toFuture.flatMap { response => + if (response.ok) Future.successful(()) + else Future.failed(new Exception(s"Unexpected status code from subscribe url: ${response.status}")) + } + case None => + val subscriptionKey = request.MessageAttributes.subscription_key.Value.asInstanceOf[String] + val message = request.Message.asInstanceOf[String] + WebsocketConnections.sendSubscription(subscriptionKey, message) + Future.successful(()) + } + + result.onComplete { + case Success(()) => + res.statusCode = 200 + res.end() + case Failure(error) => + println(s"Failed to handle send event: ${error.getMessage}") + res.statusCode = 500 + res.end() + } } catch { case error: Throwable => @@ -89,7 +111,7 @@ object DevServer { _ <- semaphore.acquire.unsafeToFuture() result <- handler(gatewayEvent, lambdaContext).toFuture.attempt _ <- semaphore.release.unsafeToFuture() - } yield result match { + } result match { case Right(result) => result.headers.foreach { headers => headers.foreach { case (key, value) =>