diff --git a/kotlin/coroutines/src/main/kotlin/net/infumia/pubsub/BrokerCoroutines.kt b/kotlin/coroutines/src/main/kotlin/net/infumia/pubsub/BrokerCoroutines.kt index 743a5a3..9e8b937 100644 --- a/kotlin/coroutines/src/main/kotlin/net/infumia/pubsub/BrokerCoroutines.kt +++ b/kotlin/coroutines/src/main/kotlin/net/infumia/pubsub/BrokerCoroutines.kt @@ -89,6 +89,23 @@ interface BrokerCoroutines : AutoCloseable { vararg targets: Target ): R + /** + * Sends a request and awaits a response within a specified timeout. + * + * @param R the type of the response. + * @param message the request message. + * @param responseType the KClass representing the expected response type. + * @param timeout the duration to wait for a response. + * @param targets the targets to send the request to. + * @return the response to the request. + */ + suspend fun request( + message: Any, + responseType: KClass, + timeout: Duration, + vararg targets: Pair + ): R + /** * Sends a request and awaits a response using the default timeout. * @@ -111,6 +128,17 @@ interface BrokerCoroutines : AutoCloseable { */ suspend fun request(message: Any, responseType: KClass, vararg targets: Target): R + /** + * Sends a request and awaits a response using the default timeout. + * + * @param R the type of the response. + * @param message the request message. + * @param responseType the KClass representing the expected response type. + * @param targets the targets to send the request to. + * @return the response to the request. + */ + suspend fun request(message: Any, responseType: KClass, vararg targets: Pair): R + /** * Registers a responder to handle incoming messages of a specific type and produce a response. * @@ -177,6 +205,19 @@ suspend inline fun BrokerCoroutines.request( vararg targets: Target ): R = request(message, R::class, *targets) +/** + * Sends a request and awaits a response using the default timeout. + * + * @param R the type of the response. + * @param message the request message. + * @param targets the targets to send the request to. + * @return the response to the request. + */ +suspend inline fun BrokerCoroutines.request( + message: Any, + vararg targets: Pair +): R = request(message, R::class, *targets) + /** * Registers a function to respond to incoming messages of a specific type. * diff --git a/kotlin/coroutines/src/main/kotlin/net/infumia/pubsub/BrokerCoroutinesImpl.kt b/kotlin/coroutines/src/main/kotlin/net/infumia/pubsub/BrokerCoroutinesImpl.kt index 7e5cb72..f8b3c78 100644 --- a/kotlin/coroutines/src/main/kotlin/net/infumia/pubsub/BrokerCoroutinesImpl.kt +++ b/kotlin/coroutines/src/main/kotlin/net/infumia/pubsub/BrokerCoroutinesImpl.kt @@ -47,12 +47,37 @@ internal class BrokerCoroutinesImpl( vararg targets: Target ): R = this.delegate.request(message, responseType.java, timeout.toJavaDuration(), *targets).await() + override suspend fun request( + message: Any, + responseType: KClass, + timeout: Duration, + vararg targets: Pair + ): R = + this.delegate.request( + message, + responseType.java, + timeout.toJavaDuration(), + targets.map { Target.of(it.first, it.second) } + ).await() + override suspend fun request(message: Any, responseType: KClass, targets: Collection): R = this.delegate.request(message, responseType.java, Internal.REQUEST_TIMEOUT, targets).await() override suspend fun request(message: Any, responseType: KClass, vararg targets: Target): R = this.delegate.request(message, responseType.java, Internal.REQUEST_TIMEOUT, *targets).await() + override suspend fun request( + message: Any, + responseType: KClass, + vararg targets: Pair + ): R = + this.delegate.request( + message, + responseType.java, + Internal.REQUEST_TIMEOUT, + targets.map { Target.of(it.first, it.second) } + ).await() + override suspend fun respond(responder: ResponderCoroutines): AutoCloseable = this.delegate.respond(responder.type.java) { scope.launch { responder(it) } } diff --git a/kotlin/extensions/src/main/kotlin/net/infumia/pubsub/BrokerExtension.kt b/kotlin/extensions/src/main/kotlin/net/infumia/pubsub/BrokerExtension.kt index b6b6fd4..d3dafd9 100644 --- a/kotlin/extensions/src/main/kotlin/net/infumia/pubsub/BrokerExtension.kt +++ b/kotlin/extensions/src/main/kotlin/net/infumia/pubsub/BrokerExtension.kt @@ -34,6 +34,17 @@ inline fun Broker.listen(noinline handler: (T) -> Unit): AutoC inline fun Broker.request(message: Any, vararg targets: Target): CompletableFuture = this.request(message, R::class.java, *targets) +/** + * Sends a message and expects a response of a specific type. + * + * @param message the message to send. + * @param targets the targets to send the message to. + * @param R the type of the expected response. + * @return a [CompletableFuture] representing the response to the message. + */ +inline fun Broker.request(message: Any, vararg targets: Pair): CompletableFuture = + this.request(message, R::class.java, targets.map { Target.of(it.first, it.second) }) + /** * Registers a function to respond to messages of a specific type. *