Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add coroutines support. #15

Merged
merged 4 commits into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ repositories {
}

dependencies {
// Base module
// Base modules
implementation "net.infumia:pubsub:VERSION"
implementation "net.infumia:pubsub-codec:VERSION"
// Required, https://mvnrepository.com/artifact/com.github.ben-manes.caffeine/caffeine/
implementation "com.github.ben-manes.caffeine:caffeine:2.9.3" // for java-8+
implementation "com.github.ben-manes.caffeine:caffeine:3.1.8" // for java-11+
Expand All @@ -21,12 +22,17 @@ dependencies {

// A simple codec using Jackson (Optional)
implementation "net.infumia:pubsub-jackson:VERSION"
// Required, https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind
// Required, https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind/
implementation "com.fasterxml.jackson.core:jackson-databind:2.17.1"

// Kotlin extensions (Optional)
implementation "net.infumia:pubsub-kotlin:VERSION"

// Kotlin coroutines (Optional)
implementation "net.infumia:pubsub-kotlin-coroutines:VERSION"
// Required, https://mvnrepository.com/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core/
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.1"

// Kotlin protobuf serializer (Optional)
implementation "net.infumia:pubsub-kotlin-protobuf:VERSION"
// Required, https://mvnrepository.com/artifact/org.jetbrains.kotlin/kotlin-reflect/
Expand Down
3 changes: 2 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ subprojects {
}
}

val projectName = project.property("artifact-id") as String
val moduleName = project.findProperty("artifact-id") as String?
val projectName = "pubsub${if (moduleName == null) "" else "-$moduleName"}"
val signRequired = project.hasProperty("sign-required")

extensions.configure<MavenPublishBaseExtension> {
Expand Down
1 change: 1 addition & 0 deletions codec/gradle.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
artifact-id=codec
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
/**
* The interface for encoding and decoding objects of type {@link T} to and from byte arrays.
*
* @param <T> the type of the object to be encoded and decoded.
* @param <T> type of the object to be encoded and decoded.
*/
public interface Codec<T> {
/**
Expand All @@ -17,7 +17,7 @@ public interface Codec<T> {
/**
* Decodes a byte array into an object of type {@link T}.
*
* @param bytes the byte array to decode.
* @param bytes the bytes array to decode.
* @return the decoded object.
*/
T decode(byte[] bytes);
Expand Down
2 changes: 2 additions & 0 deletions common/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
dependencies {
compileOnly(project(":codec"))

compileOnly(libs.caffeine)
}
1 change: 0 additions & 1 deletion common/gradle.properties

This file was deleted.

63 changes: 15 additions & 48 deletions common/src/main/java/net/infumia/pubsub/Broker.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package net.infumia.pubsub;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
Expand Down Expand Up @@ -34,17 +33,16 @@ public interface Broker extends AutoCloseable {
* @param message the message to send. Cannot be {@code null}
* @param targets the targets to send the message to. Can be empty, cannot be {@code null}
*/
default void send(final Object message, final Target... targets) {
this.send(message, Arrays.asList(targets));
}
void send(Object message, Target... targets);

/**
* Listens for messages using a handler.
*
* @param handler the handler to process incoming messages. Cannot be {@code null}
* @param <T> the type of the message to listen for.
* @return an {@link AutoCloseable} to stop listening. Cannot be {@code null}
*/
AutoCloseable listen(Handler<?> handler);
<T> AutoCloseable listen(Handler<T> handler);

/**
* Listens for messages of a specific type using a consumer.
Expand All @@ -54,19 +52,7 @@ default void send(final Object message, final Target... targets) {
* @param <T> the type of the message to listen for.
* @return an {@link AutoCloseable} to stop listening. Cannot be {@code null}
*/
default <T> AutoCloseable listen(final Class<T> type, final Consumer<T> handler) {
return this.listen(new Handler<T>() {
@Override
public Class<T> type() {
return type;
}

@Override
public void handle(final T t) {
handler.accept(t);
}
});
}
<T> AutoCloseable listen(Class<T> type, Consumer<T> handler);

/**
* Sends a request to targets and awaits a response within a timeout.
Expand All @@ -91,46 +77,39 @@ <R> CompletableFuture<R> request(Object message, Class<R> responseType, Duration
* @param <R> the type of the response.
* @return a {@link CompletableFuture} representing the response. Cannot be {@code null}
*/
default <R> CompletableFuture<R> request(final Object message, final Class<R> responseType, final Duration timeout,
final Target... targets) {
return this.request(message, responseType, timeout, Arrays.asList(targets));
}
<R> CompletableFuture<R> request(Object message, Class<R> responseType, Duration timeout, Target... targets);

/**
* Sends a request and awaits a response using the default timeout.
* Sends a request to targets and awaits a response using the default timeout.
*
* @param message the request message. Cannot be {@code null}
* @param responseType the class object representing the response type {@link R}. Cannot be {@code null}
* @param targets the targets to send the request to. Can be empty, cannot be {@code null}
* @param targets the targets to send the request to. Cannot be {@code null}
* @param <R> the type of the response.
* @return a {@link CompletableFuture} representing the response. Cannot be {@code null}
*/
default <R> CompletableFuture<R> request(final Object message, final Class<R> responseType,
final Target... targets) {
return this.request(message, responseType, Internal.REQUEST_TIMEOUT, targets);
}
<R> CompletableFuture<R> request(Object message, Class<R> responseType, Collection<Target> targets);

/**
* Sends a request to targets and awaits a response using the default timeout.
* Sends a request and awaits a response using the default timeout.
*
* @param message the request message. Cannot be {@code null}
* @param responseType the class object representing the response type {@link R}. Cannot be {@code null}
* @param targets the targets to send the request to. Cannot be {@code null}
* @param targets the targets to send the request to. Can be empty, cannot be {@code null}
* @param <R> the type of the response.
* @return a {@link CompletableFuture} representing the response. Cannot be {@code null}
*/
default <R> CompletableFuture<R> request(final Object message, final Class<R> responseType,
final Collection<Target> targets) {
return this.request(message, responseType, Internal.REQUEST_TIMEOUT, targets);
}
<R> CompletableFuture<R> request(Object message, Class<R> responseType, Target... targets);

/**
* Registers a responder to handle requests of a specific type.
*
* @param responder the responder to handle requests. Cannot be {@code null}
* @param <T> the type of the request message.
* @param <Y> the type of the response.
* @return an {@link AutoCloseable} to stop responding. Cannot be {@code null}
*/
AutoCloseable respond(Responder<?, ?> responder);
<T, Y> AutoCloseable respond(Responder<T, Y> responder);

/**
* Registers a function to respond to requests of a specific type.
Expand All @@ -141,19 +120,7 @@ default <R> CompletableFuture<R> request(final Object message, final Class<R> re
* @param <Y> the type of the response.
* @return an {@link AutoCloseable} to stop responding. Cannot be {@code null}
*/
default <T, Y> AutoCloseable respond(final Class<T> type, final Function<T, Y> responder) {
return this.respond(new Responder<T, Y>() {
@Override
public Class<T> type() {
return type;
}

@Override
public Y handle(final T t) {
return responder.apply(t);
}
});
}
<T, Y> AutoCloseable respond(Class<T> type, Function<T, Y> responder);

@Override
void close();
Expand Down
63 changes: 57 additions & 6 deletions common/src/main/java/net/infumia/pubsub/BrokerStringAbstract.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@
import com.github.benmanes.caffeine.cache.Caffeine;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;

/**
* Abstract class for a message broker that operates with strings and supports sending, receiving, and responding to messages.
* Abstract class for a message broker that operates with strings and supports sending, receiving,
* and responding to messages.
*/
public abstract class BrokerStringAbstract implements Broker {
private final UUID brokerId = UUID.randomUUID();
Expand Down Expand Up @@ -45,12 +49,26 @@ public final void send(final Object message, final Collection<Target> targets) {
}

@Override
public final AutoCloseable listen(final Handler<?> handler) {
return this.respond(new HandlerToResponder<>(handler));
public void send(final Object message, final Target... targets) {
this.send(message, Arrays.asList(targets));
}

@Override
public final <R> CompletableFuture<R> request(final Object message, final Class<R> responseType, final Duration timeout, final Collection<Target> targets) {
public final <T> AutoCloseable listen(final Handler<T> handler) {
return this.listen(handler.type(), handler);
}

@Override
public <T> AutoCloseable listen(final Class<T> type, final Consumer<T> handler) {
return this.respond(type, message -> {
handler.accept(message);
return null;
});
}

@Override
public final <R> CompletableFuture<R> request(final Object message, final Class<R> responseType,
final Duration timeout, final Collection<Target> targets) {
final Envelope envelope = Internal.newEnvelope(this.codecProvider, this.brokerId, message);
final AwaitingResponder<R> responder = new AwaitingResponder<>(responseType);
this.awaitingResponders.put(envelope.messageId, responder);
Expand All @@ -59,10 +77,43 @@ public final <R> CompletableFuture<R> request(final Object message, final Class<
}

@Override
public final AutoCloseable respond(final Responder<?, ?> responder) {
public <R> CompletableFuture<R> request(final Object message, final Class<R> responseType, final Duration timeout,
final Target... targets) {
return this.request(message, responseType, timeout, Arrays.asList(targets));
}

@Override
public <R> CompletableFuture<R> request(final Object message, final Class<R> responseType,
final Collection<Target> targets) {
return this.request(message, responseType, Internal.REQUEST_TIMEOUT, targets);
}

@Override
public <R> CompletableFuture<R> request(final Object message, final Class<R> responseType,
final Target... targets) {
return this.request(message, responseType, Internal.REQUEST_TIMEOUT, targets);
}

@Override
public final <T, Y> AutoCloseable respond(final Responder<T, Y> responder) {
return this.handlerRegistry.register(this.messageTypeId(responder.type()), responder);
}

@Override
public <T, Y> AutoCloseable respond(final Class<T> type, final Function<T, Y> responder) {
return this.respond(new Responder<T, Y>() {
@Override
public Class<T> type() {
return type;
}

@Override
public Y apply(final T message) {
return responder.apply(message);
}
});
}

@Override
public void close() {
this.handlerRegistry.close();
Expand Down Expand Up @@ -146,7 +197,7 @@ private <T, Y> Envelope handleEnvelope(
final Responder<T, Y> responder
) {
final T decoded = this.codecProvider.provide(responder.type()).decode(envelope.messagePayload);
final Y response = responder.handle(decoded);
final Y response = responder.apply(decoded);
if (response == null) {
return null;
} else {
Expand Down
11 changes: 3 additions & 8 deletions common/src/main/java/net/infumia/pubsub/Handler.java
Original file line number Diff line number Diff line change
@@ -1,22 +1,17 @@
package net.infumia.pubsub;

import java.util.function.Consumer;

/**
* The interface for handling Pub/Sub messages of type {@link T}.
*
* @param <T> the type of the message to be handled.
*/
public interface Handler<T> {
public interface Handler<T> extends Consumer<T> {
/**
* Retrieves the class type of the message being handled.
*
* @return the {@link Class} object representing the type {@link T}.
*/
Class<T> type();

/**
* Handles a message of type {@link T} received from Pub/Sub.
*
* @param t the message to handle.
*/
void handle(T t);
}
20 changes: 0 additions & 20 deletions common/src/main/java/net/infumia/pubsub/HandlerToResponder.java

This file was deleted.

12 changes: 3 additions & 9 deletions common/src/main/java/net/infumia/pubsub/Responder.java
Original file line number Diff line number Diff line change
@@ -1,25 +1,19 @@
package net.infumia.pubsub;

import java.util.function.Function;

/**
* The interface for handling Pub/Sub messages of type {@code T} and providing a response of type {@code Y}.
*
* @param <T> the type of the message to be handled.
* @param <Y> the type of the response.
*/
public interface Responder<T, Y> {
public interface Responder<T, Y> extends Function<T, Y> {

/**
* Retrieves the class type of the message being handled.
*
* @return the {@link Class} object representing the type {@link T}.
*/
Class<T> type();

/**
* Handles a message of type {@link T} received from Pub/Sub and provides a response of type {@link Y}.
*
* @param t the message to handle.
* @return the response of type {@link Y}.
*/
Y handle(T t);
}
3 changes: 2 additions & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ indra = "3.1.3"
[libraries]
caffeine = { module = "com.github.ben-manes.caffeine:caffeine", version = "2.9.3" }
redis = { module = "io.lettuce:lettuce-core", version = "6.3.2.RELEASE" }
jackson = { module = "com.fasterxml.jackson.core:jackson-databind", version = "2.17.1" }
kotlin-reflect = { module = "org.jetbrains.kotlin:kotlin-reflect", version.ref = "kotlin" }
kotlinx-serialization-core = { module = "org.jetbrains.kotlinx:kotlinx-serialization-core", version.ref = "kotlinserialization" }
kotlinx-serialization-protobuf = { module = "org.jetbrains.kotlinx:kotlinx-serialization-protobuf", version.ref = "kotlinserialization" }
jackson = { module = "com.fasterxml.jackson.core:jackson-databind", version = "2.17.1" }
kotlinx-coroutines = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-core", version = "1.8.1" }

[plugins]
kotlin = { id = "org.jetbrains.kotlin.jvm", version.ref = "kotlin" }
Expand Down
2 changes: 1 addition & 1 deletion jackson/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
dependencies {
compileOnly(project(":common"))
compileOnly(project(":codec"))

compileOnly(libs.jackson)
}
2 changes: 1 addition & 1 deletion jackson/gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
artifact-id=pubsub-jackson
artifact-id=jackson
Loading