Skip to content

Commit

Permalink
more extension. (#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
portlek committed Jun 14, 2024
1 parent 962a48e commit 024af46
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,23 @@ interface BrokerCoroutines : AutoCloseable {
vararg targets: Target
): R

/**
* Sends a request and awaits a response within a specified timeout.
*
* @param R the type of the response.
* @param message the request message.
* @param responseType the KClass representing the expected response type.
* @param timeout the duration to wait for a response.
* @param targets the targets to send the request to.
* @return the response to the request.
*/
suspend fun <R : Any> request(
message: Any,
responseType: KClass<R>,
timeout: Duration,
vararg targets: Pair<String, String>
): R

/**
* Sends a request and awaits a response using the default timeout.
*
Expand All @@ -111,6 +128,17 @@ interface BrokerCoroutines : AutoCloseable {
*/
suspend fun <R : Any> request(message: Any, responseType: KClass<R>, vararg targets: Target): R

/**
* Sends a request and awaits a response using the default timeout.
*
* @param R the type of the response.
* @param message the request message.
* @param responseType the KClass representing the expected response type.
* @param targets the targets to send the request to.
* @return the response to the request.
*/
suspend fun <R : Any> request(message: Any, responseType: KClass<R>, vararg targets: Pair<String, String>): R

/**
* Registers a responder to handle incoming messages of a specific type and produce a response.
*
Expand Down Expand Up @@ -177,6 +205,19 @@ suspend inline fun <reified R : Any> BrokerCoroutines.request(
vararg targets: Target
): R = request(message, R::class, *targets)

/**
* Sends a request and awaits a response using the default timeout.
*
* @param R the type of the response.
* @param message the request message.
* @param targets the targets to send the request to.
* @return the response to the request.
*/
suspend inline fun <reified R : Any> BrokerCoroutines.request(
message: Any,
vararg targets: Pair<String, String>
): R = request(message, R::class, *targets)

/**
* Registers a function to respond to incoming messages of a specific type.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,37 @@ internal class BrokerCoroutinesImpl(
vararg targets: Target
): R = this.delegate.request(message, responseType.java, timeout.toJavaDuration(), *targets).await()

override suspend fun <R : Any> request(
message: Any,
responseType: KClass<R>,
timeout: Duration,
vararg targets: Pair<String, String>
): R =
this.delegate.request(
message,
responseType.java,
timeout.toJavaDuration(),
targets.map { Target.of(it.first, it.second) }
).await()

override suspend fun <R : Any> request(message: Any, responseType: KClass<R>, targets: Collection<Target>): R =
this.delegate.request(message, responseType.java, Internal.REQUEST_TIMEOUT, targets).await()

override suspend fun <R : Any> request(message: Any, responseType: KClass<R>, vararg targets: Target): R =
this.delegate.request(message, responseType.java, Internal.REQUEST_TIMEOUT, *targets).await()

override suspend fun <R : Any> request(
message: Any,
responseType: KClass<R>,
vararg targets: Pair<String, String>
): R =
this.delegate.request(
message,
responseType.java,
Internal.REQUEST_TIMEOUT,
targets.map { Target.of(it.first, it.second) }
).await()

override suspend fun <T : Any, Y: Any> respond(responder: ResponderCoroutines<T, Y>): AutoCloseable =
this.delegate.respond(responder.type.java) { scope.launch { responder(it) } }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,17 @@ inline fun <reified T : Any> Broker.listen(noinline handler: (T) -> Unit): AutoC
inline fun <reified R : Any> Broker.request(message: Any, vararg targets: Target): CompletableFuture<R> =
this.request(message, R::class.java, *targets)

/**
* Sends a message and expects a response of a specific type.
*
* @param message the message to send.
* @param targets the targets to send the message to.
* @param R the type of the expected response.
* @return a [CompletableFuture] representing the response to the message.
*/
inline fun <reified R : Any> Broker.request(message: Any, vararg targets: Pair<String, String>): CompletableFuture<R> =
this.request(message, R::class.java, targets.map { Target.of(it.first, it.second) })

/**
* Registers a function to respond to messages of a specific type.
*
Expand Down

0 comments on commit 024af46

Please sign in to comment.