From 9bfde17322f797a7ee48bc27a751933d94ed5fb3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hasan=20Demirta=C5=9F?= Date: Fri, 14 Jun 2024 18:01:45 +0300 Subject: [PATCH] add coroutines support. (#15) * add coroutines support. * simplify. * simplify artifact-ids in gradle.properties. * seperate the codec. --- README.md | 10 +- build.gradle.kts | 3 +- codec/gradle.properties | 1 + .../main/java/net/infumia/pubsub/Codec.java | 4 +- .../net/infumia/pubsub/CodecProvider.java | 0 .../infumia/pubsub/CodecProviderCached.java | 0 common/build.gradle.kts | 2 + common/gradle.properties | 1 - .../main/java/net/infumia/pubsub/Broker.java | 63 ++---- .../infumia/pubsub/BrokerStringAbstract.java | 63 +++++- .../main/java/net/infumia/pubsub/Handler.java | 11 +- .../infumia/pubsub/HandlerToResponder.java | 20 -- .../java/net/infumia/pubsub/Responder.java | 12 +- gradle/libs.versions.toml | 3 +- jackson/build.gradle.kts | 2 +- jackson/gradle.properties | 2 +- kotlin/coroutines/build.gradle.kts | 5 + kotlin/coroutines/gradle.properties | 1 + .../net/infumia/pubsub/BrokerCoroutines.kt | 182 ++++++++++++++++++ .../pubsub/BrokerCoroutinesExtension.kt | 12 ++ .../infumia/pubsub/BrokerCoroutinesImpl.kt | 61 ++++++ .../net/infumia/pubsub/HandlerCoroutines.kt | 17 ++ .../net/infumia/pubsub/ResponderCoroutines.kt | 18 ++ kotlin/extensions/gradle.properties | 2 +- kotlin/protobuf/build.gradle.kts | 2 +- kotlin/protobuf/gradle.properties | 2 +- redis/build.gradle.kts | 1 + redis/gradle.properties | 2 +- .../java/net/infumia/pubsub/BrokerRedis.java | 28 ++- .../pubsub/BrokerRedisNoTargetProvider.java | 14 ++ settings.gradle.kts | 3 +- 31 files changed, 438 insertions(+), 109 deletions(-) create mode 100644 codec/gradle.properties rename {common => codec}/src/main/java/net/infumia/pubsub/Codec.java (82%) rename {common => codec}/src/main/java/net/infumia/pubsub/CodecProvider.java (100%) rename {common => codec}/src/main/java/net/infumia/pubsub/CodecProviderCached.java (100%) delete mode 100644 common/gradle.properties delete mode 100644 common/src/main/java/net/infumia/pubsub/HandlerToResponder.java create mode 100644 kotlin/coroutines/build.gradle.kts create mode 100644 kotlin/coroutines/gradle.properties create mode 100644 kotlin/coroutines/src/main/kotlin/net/infumia/pubsub/BrokerCoroutines.kt create mode 100644 kotlin/coroutines/src/main/kotlin/net/infumia/pubsub/BrokerCoroutinesExtension.kt create mode 100644 kotlin/coroutines/src/main/kotlin/net/infumia/pubsub/BrokerCoroutinesImpl.kt create mode 100644 kotlin/coroutines/src/main/kotlin/net/infumia/pubsub/HandlerCoroutines.kt create mode 100644 kotlin/coroutines/src/main/kotlin/net/infumia/pubsub/ResponderCoroutines.kt diff --git a/README.md b/README.md index d163f16..8ce0979 100644 --- a/README.md +++ b/README.md @@ -8,8 +8,9 @@ repositories { } dependencies { - // Base module + // Base modules implementation "net.infumia:pubsub:VERSION" + implementation "net.infumia:pubsub-codec:VERSION" // Required, https://mvnrepository.com/artifact/com.github.ben-manes.caffeine/caffeine/ implementation "com.github.ben-manes.caffeine:caffeine:2.9.3" // for java-8+ implementation "com.github.ben-manes.caffeine:caffeine:3.1.8" // for java-11+ @@ -21,12 +22,17 @@ dependencies { // A simple codec using Jackson (Optional) implementation "net.infumia:pubsub-jackson:VERSION" - // Required, https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind + // Required, https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind/ implementation "com.fasterxml.jackson.core:jackson-databind:2.17.1" // Kotlin extensions (Optional) implementation "net.infumia:pubsub-kotlin:VERSION" + // Kotlin coroutines (Optional) + implementation "net.infumia:pubsub-kotlin-coroutines:VERSION" + // Required, https://mvnrepository.com/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core/ + implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.1" + // Kotlin protobuf serializer (Optional) implementation "net.infumia:pubsub-kotlin-protobuf:VERSION" // Required, https://mvnrepository.com/artifact/org.jetbrains.kotlin/kotlin-reflect/ diff --git a/build.gradle.kts b/build.gradle.kts index 6e33a6c..cd86067 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -42,7 +42,8 @@ subprojects { } } - val projectName = project.property("artifact-id") as String + val moduleName = project.findProperty("artifact-id") as String? + val projectName = "pubsub${if (moduleName == null) "" else "-$moduleName"}" val signRequired = project.hasProperty("sign-required") extensions.configure { diff --git a/codec/gradle.properties b/codec/gradle.properties new file mode 100644 index 0000000..21588ff --- /dev/null +++ b/codec/gradle.properties @@ -0,0 +1 @@ +artifact-id=codec diff --git a/common/src/main/java/net/infumia/pubsub/Codec.java b/codec/src/main/java/net/infumia/pubsub/Codec.java similarity index 82% rename from common/src/main/java/net/infumia/pubsub/Codec.java rename to codec/src/main/java/net/infumia/pubsub/Codec.java index 92c90b7..f017b9b 100644 --- a/common/src/main/java/net/infumia/pubsub/Codec.java +++ b/codec/src/main/java/net/infumia/pubsub/Codec.java @@ -3,7 +3,7 @@ /** * The interface for encoding and decoding objects of type {@link T} to and from byte arrays. * - * @param the type of the object to be encoded and decoded. + * @param type of the object to be encoded and decoded. */ public interface Codec { /** @@ -17,7 +17,7 @@ public interface Codec { /** * Decodes a byte array into an object of type {@link T}. * - * @param bytes the byte array to decode. + * @param bytes the bytes array to decode. * @return the decoded object. */ T decode(byte[] bytes); diff --git a/common/src/main/java/net/infumia/pubsub/CodecProvider.java b/codec/src/main/java/net/infumia/pubsub/CodecProvider.java similarity index 100% rename from common/src/main/java/net/infumia/pubsub/CodecProvider.java rename to codec/src/main/java/net/infumia/pubsub/CodecProvider.java diff --git a/common/src/main/java/net/infumia/pubsub/CodecProviderCached.java b/codec/src/main/java/net/infumia/pubsub/CodecProviderCached.java similarity index 100% rename from common/src/main/java/net/infumia/pubsub/CodecProviderCached.java rename to codec/src/main/java/net/infumia/pubsub/CodecProviderCached.java diff --git a/common/build.gradle.kts b/common/build.gradle.kts index 26caeb1..5cf7c7c 100644 --- a/common/build.gradle.kts +++ b/common/build.gradle.kts @@ -1,3 +1,5 @@ dependencies { + compileOnly(project(":codec")) + compileOnly(libs.caffeine) } diff --git a/common/gradle.properties b/common/gradle.properties deleted file mode 100644 index 36d7d3d..0000000 --- a/common/gradle.properties +++ /dev/null @@ -1 +0,0 @@ -artifact-id=pubsub diff --git a/common/src/main/java/net/infumia/pubsub/Broker.java b/common/src/main/java/net/infumia/pubsub/Broker.java index cf3303c..0a6b9d6 100644 --- a/common/src/main/java/net/infumia/pubsub/Broker.java +++ b/common/src/main/java/net/infumia/pubsub/Broker.java @@ -1,7 +1,6 @@ package net.infumia.pubsub; import java.time.Duration; -import java.util.Arrays; import java.util.Collection; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; @@ -34,17 +33,16 @@ public interface Broker extends AutoCloseable { * @param message the message to send. Cannot be {@code null} * @param targets the targets to send the message to. Can be empty, cannot be {@code null} */ - default void send(final Object message, final Target... targets) { - this.send(message, Arrays.asList(targets)); - } + void send(Object message, Target... targets); /** * Listens for messages using a handler. * * @param handler the handler to process incoming messages. Cannot be {@code null} + * @param the type of the message to listen for. * @return an {@link AutoCloseable} to stop listening. Cannot be {@code null} */ - AutoCloseable listen(Handler handler); + AutoCloseable listen(Handler handler); /** * Listens for messages of a specific type using a consumer. @@ -54,19 +52,7 @@ default void send(final Object message, final Target... targets) { * @param the type of the message to listen for. * @return an {@link AutoCloseable} to stop listening. Cannot be {@code null} */ - default AutoCloseable listen(final Class type, final Consumer handler) { - return this.listen(new Handler() { - @Override - public Class type() { - return type; - } - - @Override - public void handle(final T t) { - handler.accept(t); - } - }); - } + AutoCloseable listen(Class type, Consumer handler); /** * Sends a request to targets and awaits a response within a timeout. @@ -91,46 +77,39 @@ CompletableFuture request(Object message, Class responseType, Duration * @param the type of the response. * @return a {@link CompletableFuture} representing the response. Cannot be {@code null} */ - default CompletableFuture request(final Object message, final Class responseType, final Duration timeout, - final Target... targets) { - return this.request(message, responseType, timeout, Arrays.asList(targets)); - } + CompletableFuture request(Object message, Class responseType, Duration timeout, Target... targets); /** - * Sends a request and awaits a response using the default timeout. + * Sends a request to targets and awaits a response using the default timeout. * * @param message the request message. Cannot be {@code null} * @param responseType the class object representing the response type {@link R}. Cannot be {@code null} - * @param targets the targets to send the request to. Can be empty, cannot be {@code null} + * @param targets the targets to send the request to. Cannot be {@code null} * @param the type of the response. * @return a {@link CompletableFuture} representing the response. Cannot be {@code null} */ - default CompletableFuture request(final Object message, final Class responseType, - final Target... targets) { - return this.request(message, responseType, Internal.REQUEST_TIMEOUT, targets); - } + CompletableFuture request(Object message, Class responseType, Collection targets); /** - * Sends a request to targets and awaits a response using the default timeout. + * Sends a request and awaits a response using the default timeout. * * @param message the request message. Cannot be {@code null} * @param responseType the class object representing the response type {@link R}. Cannot be {@code null} - * @param targets the targets to send the request to. Cannot be {@code null} + * @param targets the targets to send the request to. Can be empty, cannot be {@code null} * @param the type of the response. * @return a {@link CompletableFuture} representing the response. Cannot be {@code null} */ - default CompletableFuture request(final Object message, final Class responseType, - final Collection targets) { - return this.request(message, responseType, Internal.REQUEST_TIMEOUT, targets); - } + CompletableFuture request(Object message, Class responseType, Target... targets); /** * Registers a responder to handle requests of a specific type. * * @param responder the responder to handle requests. Cannot be {@code null} + * @param the type of the request message. + * @param the type of the response. * @return an {@link AutoCloseable} to stop responding. Cannot be {@code null} */ - AutoCloseable respond(Responder responder); + AutoCloseable respond(Responder responder); /** * Registers a function to respond to requests of a specific type. @@ -141,19 +120,7 @@ default CompletableFuture request(final Object message, final Class re * @param the type of the response. * @return an {@link AutoCloseable} to stop responding. Cannot be {@code null} */ - default AutoCloseable respond(final Class type, final Function responder) { - return this.respond(new Responder() { - @Override - public Class type() { - return type; - } - - @Override - public Y handle(final T t) { - return responder.apply(t); - } - }); - } + AutoCloseable respond(Class type, Function responder); @Override void close(); diff --git a/common/src/main/java/net/infumia/pubsub/BrokerStringAbstract.java b/common/src/main/java/net/infumia/pubsub/BrokerStringAbstract.java index c3fd87a..3c18947 100644 --- a/common/src/main/java/net/infumia/pubsub/BrokerStringAbstract.java +++ b/common/src/main/java/net/infumia/pubsub/BrokerStringAbstract.java @@ -4,13 +4,17 @@ import com.github.benmanes.caffeine.cache.Caffeine; import java.time.Duration; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.function.Function; /** - * Abstract class for a message broker that operates with strings and supports sending, receiving, and responding to messages. + * Abstract class for a message broker that operates with strings and supports sending, receiving, + * and responding to messages. */ public abstract class BrokerStringAbstract implements Broker { private final UUID brokerId = UUID.randomUUID(); @@ -45,12 +49,26 @@ public final void send(final Object message, final Collection targets) { } @Override - public final AutoCloseable listen(final Handler handler) { - return this.respond(new HandlerToResponder<>(handler)); + public void send(final Object message, final Target... targets) { + this.send(message, Arrays.asList(targets)); } @Override - public final CompletableFuture request(final Object message, final Class responseType, final Duration timeout, final Collection targets) { + public final AutoCloseable listen(final Handler handler) { + return this.listen(handler.type(), handler); + } + + @Override + public AutoCloseable listen(final Class type, final Consumer handler) { + return this.respond(type, message -> { + handler.accept(message); + return null; + }); + } + + @Override + public final CompletableFuture request(final Object message, final Class responseType, + final Duration timeout, final Collection targets) { final Envelope envelope = Internal.newEnvelope(this.codecProvider, this.brokerId, message); final AwaitingResponder responder = new AwaitingResponder<>(responseType); this.awaitingResponders.put(envelope.messageId, responder); @@ -59,10 +77,43 @@ public final CompletableFuture request(final Object message, final Class< } @Override - public final AutoCloseable respond(final Responder responder) { + public CompletableFuture request(final Object message, final Class responseType, final Duration timeout, + final Target... targets) { + return this.request(message, responseType, timeout, Arrays.asList(targets)); + } + + @Override + public CompletableFuture request(final Object message, final Class responseType, + final Collection targets) { + return this.request(message, responseType, Internal.REQUEST_TIMEOUT, targets); + } + + @Override + public CompletableFuture request(final Object message, final Class responseType, + final Target... targets) { + return this.request(message, responseType, Internal.REQUEST_TIMEOUT, targets); + } + + @Override + public final AutoCloseable respond(final Responder responder) { return this.handlerRegistry.register(this.messageTypeId(responder.type()), responder); } + @Override + public AutoCloseable respond(final Class type, final Function responder) { + return this.respond(new Responder() { + @Override + public Class type() { + return type; + } + + @Override + public Y apply(final T message) { + return responder.apply(message); + } + }); + } + @Override public void close() { this.handlerRegistry.close(); @@ -146,7 +197,7 @@ private Envelope handleEnvelope( final Responder responder ) { final T decoded = this.codecProvider.provide(responder.type()).decode(envelope.messagePayload); - final Y response = responder.handle(decoded); + final Y response = responder.apply(decoded); if (response == null) { return null; } else { diff --git a/common/src/main/java/net/infumia/pubsub/Handler.java b/common/src/main/java/net/infumia/pubsub/Handler.java index bdecd93..3470c57 100644 --- a/common/src/main/java/net/infumia/pubsub/Handler.java +++ b/common/src/main/java/net/infumia/pubsub/Handler.java @@ -1,22 +1,17 @@ package net.infumia.pubsub; +import java.util.function.Consumer; + /** * The interface for handling Pub/Sub messages of type {@link T}. * * @param the type of the message to be handled. */ -public interface Handler { +public interface Handler extends Consumer { /** * Retrieves the class type of the message being handled. * * @return the {@link Class} object representing the type {@link T}. */ Class type(); - - /** - * Handles a message of type {@link T} received from Pub/Sub. - * - * @param t the message to handle. - */ - void handle(T t); } diff --git a/common/src/main/java/net/infumia/pubsub/HandlerToResponder.java b/common/src/main/java/net/infumia/pubsub/HandlerToResponder.java deleted file mode 100644 index 4b22251..0000000 --- a/common/src/main/java/net/infumia/pubsub/HandlerToResponder.java +++ /dev/null @@ -1,20 +0,0 @@ -package net.infumia.pubsub; - -final class HandlerToResponder implements Responder { - private final Handler delegate; - - HandlerToResponder(final Handler delegate) { - this.delegate = delegate; - } - - @Override - public Class type() { - return this.delegate.type(); - } - - @Override - public Object handle(final T t) { - this.delegate.handle(t); - return null; - } -} diff --git a/common/src/main/java/net/infumia/pubsub/Responder.java b/common/src/main/java/net/infumia/pubsub/Responder.java index 7c4cff3..9a2ab7c 100644 --- a/common/src/main/java/net/infumia/pubsub/Responder.java +++ b/common/src/main/java/net/infumia/pubsub/Responder.java @@ -1,12 +1,14 @@ package net.infumia.pubsub; +import java.util.function.Function; + /** * The interface for handling Pub/Sub messages of type {@code T} and providing a response of type {@code Y}. * * @param the type of the message to be handled. * @param the type of the response. */ -public interface Responder { +public interface Responder extends Function { /** * Retrieves the class type of the message being handled. @@ -14,12 +16,4 @@ public interface Responder { * @return the {@link Class} object representing the type {@link T}. */ Class type(); - - /** - * Handles a message of type {@link T} received from Pub/Sub and provides a response of type {@link Y}. - * - * @param t the message to handle. - * @return the response of type {@link Y}. - */ - Y handle(T t); } diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 65ec3f4..c312f00 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -6,10 +6,11 @@ indra = "3.1.3" [libraries] caffeine = { module = "com.github.ben-manes.caffeine:caffeine", version = "2.9.3" } redis = { module = "io.lettuce:lettuce-core", version = "6.3.2.RELEASE" } +jackson = { module = "com.fasterxml.jackson.core:jackson-databind", version = "2.17.1" } kotlin-reflect = { module = "org.jetbrains.kotlin:kotlin-reflect", version.ref = "kotlin" } kotlinx-serialization-core = { module = "org.jetbrains.kotlinx:kotlinx-serialization-core", version.ref = "kotlinserialization" } kotlinx-serialization-protobuf = { module = "org.jetbrains.kotlinx:kotlinx-serialization-protobuf", version.ref = "kotlinserialization" } -jackson = { module = "com.fasterxml.jackson.core:jackson-databind", version = "2.17.1" } +kotlinx-coroutines = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-core", version = "1.8.1" } [plugins] kotlin = { id = "org.jetbrains.kotlin.jvm", version.ref = "kotlin" } diff --git a/jackson/build.gradle.kts b/jackson/build.gradle.kts index c778490..a247c02 100644 --- a/jackson/build.gradle.kts +++ b/jackson/build.gradle.kts @@ -1,5 +1,5 @@ dependencies { - compileOnly(project(":common")) + compileOnly(project(":codec")) compileOnly(libs.jackson) } diff --git a/jackson/gradle.properties b/jackson/gradle.properties index 469fd60..0dfee20 100644 --- a/jackson/gradle.properties +++ b/jackson/gradle.properties @@ -1 +1 @@ -artifact-id=pubsub-jackson +artifact-id=jackson diff --git a/kotlin/coroutines/build.gradle.kts b/kotlin/coroutines/build.gradle.kts new file mode 100644 index 0000000..59ad7b7 --- /dev/null +++ b/kotlin/coroutines/build.gradle.kts @@ -0,0 +1,5 @@ +dependencies { + compileOnly(project(":common")) + + compileOnly(libs.kotlinx.coroutines) +} diff --git a/kotlin/coroutines/gradle.properties b/kotlin/coroutines/gradle.properties new file mode 100644 index 0000000..49750fd --- /dev/null +++ b/kotlin/coroutines/gradle.properties @@ -0,0 +1 @@ +artifact-id=kotlin-coroutines diff --git a/kotlin/coroutines/src/main/kotlin/net/infumia/pubsub/BrokerCoroutines.kt b/kotlin/coroutines/src/main/kotlin/net/infumia/pubsub/BrokerCoroutines.kt new file mode 100644 index 0000000..26ac0bc --- /dev/null +++ b/kotlin/coroutines/src/main/kotlin/net/infumia/pubsub/BrokerCoroutines.kt @@ -0,0 +1,182 @@ +package net.infumia.pubsub + +import kotlin.reflect.KClass +import kotlin.time.Duration + +/** + * Interface representing a coroutine-based message broker that supports sending, receiving, and responding to messages. + */ +interface BrokerCoroutines : AutoCloseable { + /** + * Initializes the message broker, performing any necessary setup or initialization tasks. + */ + suspend fun initialize() + + /** + * Sends a message to the specified collection of targets. + * + * @param message the message to send. + * @param targets the targets to send the message to. + */ + suspend fun send(message: Any, targets: Collection) + + /** + * Sends a message to the specified targets. + * + * @param message the message to send. + * @param targets the targets to send the message to. + */ + suspend fun send(message: Any, vararg targets: Target) + + /** + * Registers a handler to listen for messages of a specific type. + * + * @param T the type of the message to listen for. + * @param handler the handler to process incoming messages. + * @return an [AutoCloseable] that can be used to unregister the handler. + */ + suspend fun listen(handler: HandlerCoroutines): AutoCloseable + + /** + * Registers a handler to listen for messages of a specific type. + * + * @param T the type of the message to listen for. + * @param type the KClass representing the type of the message. + * @param handler the suspend function to process incoming messages. + * @return an [AutoCloseable] that can be used to unregister the handler. + */ + suspend fun listen(type: KClass, handler: suspend (T) -> Unit): AutoCloseable + + /** + * Sends a request and awaits a response within a specified timeout. + * + * @param R the type of the response. + * @param message the request message. + * @param responseType the KClass representing the expected response type. + * @param timeout the duration to wait for a response. + * @param targets the collection of targets to send the request to. + * @return the response to the request. + */ + suspend fun request( + message: Any, + responseType: KClass, + timeout: Duration, + targets: Collection + ): R + + /** + * Sends a request and awaits a response within a specified timeout. + * + * @param R the type of the response. + * @param message the request message. + * @param responseType the KClass representing the expected response type. + * @param timeout the duration to wait for a response. + * @param targets the targets to send the request to. + * @return the response to the request. + */ + suspend fun request( + message: Any, + responseType: KClass, + timeout: Duration, + vararg targets: Target + ): R + + /** + * Sends a request and awaits a response using the default timeout. + * + * @param R the type of the response. + * @param message the request message. + * @param responseType the KClass representing the expected response type. + * @param targets the collection of targets to send the request to. + * @return the response to the request. + */ + suspend fun request(message: Any, responseType: KClass, targets: Collection): R + + /** + * Sends a request and awaits a response using the default timeout. + * + * @param R the type of the response. + * @param message the request message. + * @param responseType the KClass representing the expected response type. + * @param targets the targets to send the request to. + * @return the response to the request. + */ + suspend fun request(message: Any, responseType: KClass, vararg targets: Target): R + + /** + * Registers a responder to handle incoming messages of a specific type and produce a response. + * + * @param T the type of the request message. + * @param Y the type of the response. + * @param responder the responder to handle incoming messages and produce responses. + * @return an [AutoCloseable] that can be used to unregister the responder. + */ + suspend fun respond(responder: ResponderCoroutines): AutoCloseable + + /** + * Registers a function to respond to incoming messages of a specific type. + * + * @param T the type of the request message. + * @param Y the type of the response. + * @param type the KClass representing the type of the request message. + * @param responder the suspend function to handle incoming messages and produce responses. + * @return an [AutoCloseable] that can be used to unregister the responder. + */ + suspend fun respond(type: KClass, responder: suspend (T) -> Y?): AutoCloseable + + /** + * Closes the message broker and releases any resources it holds. + */ + override fun close() +} + +/** + * Registers a handler to listen for messages of a specific type. + * + * @param T the type of the message to listen for. + * @param handler the suspend function to process incoming messages. + * @return an [AutoCloseable] that can be used to unregister the handler. + */ +suspend inline fun BrokerCoroutines.listen( + noinline handler: suspend (T) -> Unit +): AutoCloseable = listen(T::class, handler) + +/** + * Sends a request and awaits a response within a specified timeout. + * + * @param R the type of the response. + * @param message the request message. + * @param timeout the duration to wait for a response. + * @param targets the targets to send the request to. + * @return the response to the request. + */ +suspend inline fun BrokerCoroutines.request( + message: Any, + timeout: Duration, + vararg targets: Target, +): R = request(message, R::class, timeout, *targets) + +/** + * Sends a request and awaits a response using the default timeout. + * + * @param R the type of the response. + * @param message the request message. + * @param targets the targets to send the request to. + * @return the response to the request. + */ +suspend inline fun BrokerCoroutines.request( + message: Any, + vararg targets: Target +): R = request(message, R::class, *targets) + +/** + * Registers a function to respond to incoming messages of a specific type. + * + * @param T the type of the request message. + * @param R the type of the response. + * @param handler the suspend function to handle incoming messages and produce responses. + * @return an [AutoCloseable] that can be used to unregister the responder. + */ +suspend inline fun BrokerCoroutines.respond( + noinline handler: suspend (T) -> R? +): AutoCloseable = respond(T::class, handler) diff --git a/kotlin/coroutines/src/main/kotlin/net/infumia/pubsub/BrokerCoroutinesExtension.kt b/kotlin/coroutines/src/main/kotlin/net/infumia/pubsub/BrokerCoroutinesExtension.kt new file mode 100644 index 0000000..57efa11 --- /dev/null +++ b/kotlin/coroutines/src/main/kotlin/net/infumia/pubsub/BrokerCoroutinesExtension.kt @@ -0,0 +1,12 @@ +package net.infumia.pubsub + +import kotlinx.coroutines.CoroutineScope + +/** + * Extension for the [Broker] interface to integrate with coroutines. + * + * @param scope the [CoroutineScope] to be used with the broker. + * @return a [BrokerCoroutines] instance. + */ +fun Broker.withCoroutines(scope: CoroutineScope): BrokerCoroutines = + BrokerCoroutinesImpl(this, scope) diff --git a/kotlin/coroutines/src/main/kotlin/net/infumia/pubsub/BrokerCoroutinesImpl.kt b/kotlin/coroutines/src/main/kotlin/net/infumia/pubsub/BrokerCoroutinesImpl.kt new file mode 100644 index 0000000..07a2a94 --- /dev/null +++ b/kotlin/coroutines/src/main/kotlin/net/infumia/pubsub/BrokerCoroutinesImpl.kt @@ -0,0 +1,61 @@ +package net.infumia.pubsub + +import kotlin.reflect.KClass +import kotlin.time.Duration +import kotlin.time.toJavaDuration +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.future.await +import kotlinx.coroutines.launch + +internal class BrokerCoroutinesImpl( + private val delegate: Broker, + private val scope: CoroutineScope +) : BrokerCoroutines { + override suspend fun initialize() { + this.delegate.initialize() + } + + override suspend fun send(message: Any, targets: Collection) { + this.delegate.send(message, targets) + } + + override suspend fun send(message: Any, vararg targets: Target) { + this.delegate.send(message, *targets) + } + + override suspend fun listen(handler: HandlerCoroutines): AutoCloseable = + this.delegate.listen(handler.type.java) { scope.launch { handler(it) } } + + override suspend fun listen(type: KClass, handler: suspend (T) -> Unit): AutoCloseable = + this.delegate.listen(type.java) { scope.launch { handler(it) } } + + override suspend fun request( + message: Any, + responseType: KClass, + timeout: Duration, + targets: Collection + ): R = this.delegate.request(message, responseType.java, timeout.toJavaDuration(), targets).await() + + override suspend fun request( + message: Any, + responseType: KClass, + timeout: Duration, + vararg targets: Target + ): R = this.delegate.request(message, responseType.java, timeout.toJavaDuration(), *targets).await() + + override suspend fun request(message: Any, responseType: KClass, targets: Collection): R = + this.delegate.request(message, responseType.java, Internal.REQUEST_TIMEOUT, targets).await() + + override suspend fun request(message: Any, responseType: KClass, vararg targets: Target): R = + this.delegate.request(message, responseType.java, Internal.REQUEST_TIMEOUT, *targets).await() + + override suspend fun respond(responder: ResponderCoroutines): AutoCloseable = + this.delegate.respond(responder.type.java) { scope.launch { responder(it) } } + + override suspend fun respond(type: KClass, responder: suspend (T) -> Y?): AutoCloseable = + this.delegate.respond(type.java) { scope.launch { responder(it) } } + + override fun close() { + this.delegate.close() + } +} diff --git a/kotlin/coroutines/src/main/kotlin/net/infumia/pubsub/HandlerCoroutines.kt b/kotlin/coroutines/src/main/kotlin/net/infumia/pubsub/HandlerCoroutines.kt new file mode 100644 index 0000000..0a0c857 --- /dev/null +++ b/kotlin/coroutines/src/main/kotlin/net/infumia/pubsub/HandlerCoroutines.kt @@ -0,0 +1,17 @@ +package net.infumia.pubsub + +import kotlin.reflect.KClass + +/** + * The interface for handling Pub/Sub messages of type [T]. + * + * @param T the type of the message to be handled. + */ +interface HandlerCoroutines : (T) -> Unit { + /** + * Retrieves the class type of the message being handled. + * + * @return the [KClass] object representing the type [T]. + */ + val type: KClass +} diff --git a/kotlin/coroutines/src/main/kotlin/net/infumia/pubsub/ResponderCoroutines.kt b/kotlin/coroutines/src/main/kotlin/net/infumia/pubsub/ResponderCoroutines.kt new file mode 100644 index 0000000..8b5e1a2 --- /dev/null +++ b/kotlin/coroutines/src/main/kotlin/net/infumia/pubsub/ResponderCoroutines.kt @@ -0,0 +1,18 @@ +package net.infumia.pubsub + +import kotlin.reflect.KClass + +/** + * The interface for handling Pub/Sub messages of type [T] and providing a response of type [Y]. + * + * @param T the type of the message to be handled. + * @param Y the type of the response. + */ +interface ResponderCoroutines : (T) -> Y? { + /** + * Retrieves the class type of the message being handled. + * + * @return the [KClass] object representing the type [T]. + */ + val type: KClass +} diff --git a/kotlin/extensions/gradle.properties b/kotlin/extensions/gradle.properties index 03c31f9..692e222 100644 --- a/kotlin/extensions/gradle.properties +++ b/kotlin/extensions/gradle.properties @@ -1 +1 @@ -artifact-id=pubsub-kotlin +artifact-id=kotlin diff --git a/kotlin/protobuf/build.gradle.kts b/kotlin/protobuf/build.gradle.kts index ec909e7..5393cfb 100644 --- a/kotlin/protobuf/build.gradle.kts +++ b/kotlin/protobuf/build.gradle.kts @@ -3,7 +3,7 @@ plugins { } dependencies { - compileOnly(project(":common")) + compileOnly(project(":codec")) compileOnly(libs.kotlin.reflect) compileOnly(libs.kotlinx.serialization.core) diff --git a/kotlin/protobuf/gradle.properties b/kotlin/protobuf/gradle.properties index 524d8d3..c97fb66 100644 --- a/kotlin/protobuf/gradle.properties +++ b/kotlin/protobuf/gradle.properties @@ -1 +1 @@ -artifact-id=pubsub-kotlin-protobuf +artifact-id=kotlin-protobuf diff --git a/redis/build.gradle.kts b/redis/build.gradle.kts index f07a39f..623996a 100644 --- a/redis/build.gradle.kts +++ b/redis/build.gradle.kts @@ -1,4 +1,5 @@ dependencies { + compileOnly(project(":codec")) compileOnly(project(":common")) compileOnly(libs.redis) diff --git a/redis/gradle.properties b/redis/gradle.properties index e0ce7b0..e2dd57b 100644 --- a/redis/gradle.properties +++ b/redis/gradle.properties @@ -1 +1 @@ -artifact-id=pubsub-redis +artifact-id=redis diff --git a/redis/src/main/java/net/infumia/pubsub/BrokerRedis.java b/redis/src/main/java/net/infumia/pubsub/BrokerRedis.java index a53ed8a..fee8c63 100644 --- a/redis/src/main/java/net/infumia/pubsub/BrokerRedis.java +++ b/redis/src/main/java/net/infumia/pubsub/BrokerRedis.java @@ -8,6 +8,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.concurrent.Executor; import java.util.stream.Collectors; /** @@ -16,6 +17,7 @@ public abstract class BrokerRedis extends BrokerStringAbstract { private final Lazy> channelPrefixes; private final RedisClientProvider clientProvider; + private final Executor executor; private StatefulRedisConnection publishConnection; private StatefulRedisPubSubConnection subscribeConnection; @@ -23,12 +25,15 @@ public abstract class BrokerRedis extends BrokerStringAbstract { /** * Ctor. * - * @param codecProvider the CodecProvider used for encoding and decoding messages. Cannot be null. - * @param clientProvider the RedisClientProvider used for obtaining Redis client connections. Cannot be null. + * @param codecProvider the CodecProvider used for encoding and decoding messages. Cannot be null. + * @param clientProvider the RedisClientProvider used for getting Redis client connections. Cannot be null. + * @param executor the Executor used for calling handlers whenever redis receives a messages. Can be null. */ - public BrokerRedis(final CodecProvider codecProvider, final RedisClientProvider clientProvider) { + public BrokerRedis(final CodecProvider codecProvider, final RedisClientProvider clientProvider, + final Executor executor) { super(codecProvider); this.clientProvider = clientProvider; + this.executor = executor; this.channelPrefixes = Lazy.of(() -> { final ArrayList channels = new ArrayList<>(); channels.addAll(Internal.channelPrefixFor(Collections.emptySet())); @@ -44,8 +49,19 @@ public BrokerRedis(final CodecProvider codecProvider, final RedisClientProvider }); } + /** + * Ctor. + * + * @param codecProvider the CodecProvider used for encoding and decoding messages. Cannot be null. + * @param clientProvider the RedisClientProvider used for getting Redis client connections. Cannot be null. + */ + public BrokerRedis(final CodecProvider codecProvider, final RedisClientProvider clientProvider) { + this(codecProvider, clientProvider, null); + } + @Override protected void connect() { + final Executor executor = BrokerRedis.this.executor; final RedisClient client = this.clientProvider.provide(); this.publishConnection = client.connect(); this.subscribeConnection = client.connectPubSub(); @@ -53,7 +69,11 @@ protected void connect() { this.subscribeConnection.addListener(new RedisPubSubAdapter() { @Override public void message(final String pattern, final String channel, final String message) { - BrokerRedis.this.callHandlers(channel, message); + if (executor == null) { + BrokerRedis.this.callHandlers(channel, message); + } else { + executor.execute(() -> BrokerRedis.this.callHandlers(channel, message)); + } } }); final String[] channels = this.channelPrefixes.get().stream() diff --git a/redis/src/main/java/net/infumia/pubsub/BrokerRedisNoTargetProvider.java b/redis/src/main/java/net/infumia/pubsub/BrokerRedisNoTargetProvider.java index 036153c..f70bda6 100644 --- a/redis/src/main/java/net/infumia/pubsub/BrokerRedisNoTargetProvider.java +++ b/redis/src/main/java/net/infumia/pubsub/BrokerRedisNoTargetProvider.java @@ -1,9 +1,23 @@ package net.infumia.pubsub; +import java.util.concurrent.Executor; + /** * A concrete implementation of {@link BrokerRedis} that does not provide a target provider. */ public final class BrokerRedisNoTargetProvider extends BrokerRedis { + /** + * Ctor. + * + * @param codecProvider the CodecProvider used for encoding and decoding messages. + * @param clientProvider the RedisClientProvider used for obtaining Redis client connections. + * @param executor the Executor used for calling handlers whenever redis receives a messages. Can be null. + */ + public BrokerRedisNoTargetProvider(final CodecProvider codecProvider, final RedisClientProvider clientProvider, + final Executor executor) { + super(codecProvider, clientProvider, executor); + } + /** * Ctor. * diff --git a/settings.gradle.kts b/settings.gradle.kts index 38c6d16..a9cd125 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -4,7 +4,8 @@ plugins { rootProject.name = "pubsub" -include("common", "redis", "jackson", "kotlin-extensions", "kotlin-protobuf") +include("codec", "common", "redis", "jackson", "kotlin-extensions", "kotlin-coroutines", "kotlin-protobuf") project(":kotlin-extensions").projectDir = file("kotlin/extensions") +project(":kotlin-coroutines").projectDir = file("kotlin/coroutines") project(":kotlin-protobuf").projectDir = file("kotlin/protobuf")