Skip to content

Commit

Permalink
handle aws sns-subscribe-confirmation messages on event endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
cornerman committed Jul 11, 2023
1 parent 72574be commit 8326911
Showing 1 changed file with 30 additions and 8 deletions.
38 changes: 30 additions & 8 deletions cli/src/main/scala/http/DevServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ import funstack.local.helper.{AccessToken, Base64Codec}
import funstack.local.ws.WebsocketConnections
import net.exoego.facade.aws_lambda
import net.exoego.facade.aws_lambda.{APIGatewayProxyEventV2, APIGatewayProxyStructuredResultV2}
import org.scalajs.dom.Fetch
import typings.node.httpMod.{createServer, IncomingMessage, Server, ServerResponse}
import typings.node.{Buffer => JsBuffer}

import java.net.URI
import scala.concurrent.Future
import scala.scalajs.js
import scala.scalajs.js.JSConverters._
import scala.util.{Failure, Success}

object DevServer {
type FunctionType =
Expand Down Expand Up @@ -54,13 +56,33 @@ object DevServer {
req.url.toOption match {
case Some("/__/send/event") =>
try {
val bodyStr = body.result()
val request = js.JSON.parse(bodyStr)
val subscriptionKey = request.MessageAttributes.subscription_key.Value.asInstanceOf[String]
val message = request.Message.asInstanceOf[String]
WebsocketConnections.sendSubscription(subscriptionKey, message)
res.statusCode = 200
res.end()
val bodyStr = body.result()
val request = js.JSON.parse(bodyStr)

val subscribeUrl = request.SubscribeURL.asInstanceOf[js.UndefOr[String]].toOption

val result = subscribeUrl match {
case Some(url) =>
Fetch.fetch(url).toFuture.flatMap { response =>
if (response.ok) Future.successful(())
else Future.failed(new Exception(s"Unexpected status code from subscribe url: ${response.status}"))
}
case None =>
val subscriptionKey = request.MessageAttributes.subscription_key.Value.asInstanceOf[String]
val message = request.Message.asInstanceOf[String]
WebsocketConnections.sendSubscription(subscriptionKey, message)
Future.successful(())
}

result.onComplete {
case Success(()) =>
res.statusCode = 200
res.end()
case Failure(error) =>
println(s"Failed to handle send event: ${error.getMessage}")
res.statusCode = 500
res.end()
}
}
catch {
case error: Throwable =>
Expand Down Expand Up @@ -89,7 +111,7 @@ object DevServer {
_ <- semaphore.acquire.unsafeToFuture()
result <- handler(gatewayEvent, lambdaContext).toFuture.attempt
_ <- semaphore.release.unsafeToFuture()
} yield result match {
} result match {
case Right(result) =>
result.headers.foreach { headers =>
headers.foreach { case (key, value) =>
Expand Down

0 comments on commit 8326911

Please sign in to comment.