diff --git a/src/main/java/io/reactivesocket/DuplexConnection.java b/src/main/java/io/reactivesocket/DuplexConnection.java index e1811ed33..45baa01d6 100644 --- a/src/main/java/io/reactivesocket/DuplexConnection.java +++ b/src/main/java/io/reactivesocket/DuplexConnection.java @@ -31,5 +31,11 @@ public interface DuplexConnection extends Closeable { Observable getInput(); void addOutput(Publisher o, Completable callback); - + + default void addOutput(Frame frame, Completable callback) { + addOutput(s -> { + s.onNext(frame); + s.onComplete(); + }, callback); + } } diff --git a/src/main/java/io/reactivesocket/ReactiveSocket.java b/src/main/java/io/reactivesocket/ReactiveSocket.java index a3f6b204d..5d9ca68b3 100644 --- a/src/main/java/io/reactivesocket/ReactiveSocket.java +++ b/src/main/java/io/reactivesocket/ReactiveSocket.java @@ -36,439 +36,492 @@ import io.reactivesocket.rx.Observer; import uk.co.real_logic.agrona.BitUtil; +import static io.reactivesocket.LeaseGovernor.NULL_LEASE_GOVERNOR; + /** * Interface for a connection that supports sending requests and receiving responses * * Created by servers for connections Created on demand for clients */ public class ReactiveSocket implements AutoCloseable { - private static final RequestHandler EMPTY_HANDLER = new RequestHandler.Builder().build(); - - private static final Consumer DEFAULT_ERROR_STREAM = t -> { - // TODO should we use SLF4j, use System.err, or swallow by default? - System.err.println("ReactiveSocket ERROR => " + t.getMessage() - + " [Provide errorStream handler to replace this default]"); - }; - - private final DuplexConnection connection; - private final boolean isServer; - private final Consumer errorStream; - private Requester requester; - private Responder responder; - private final ConnectionSetupPayload requestorSetupPayload; - private final RequestHandler clientRequestHandler; - private final ConnectionSetupHandler responderConnectionHandler; - private final LeaseGovernor leaseGovernor; - - private ReactiveSocket(DuplexConnection connection, final boolean isServer, ConnectionSetupPayload serverRequestorSetupPayload, RequestHandler clientRequestHandler, - final ConnectionSetupHandler responderConnectionHandler, LeaseGovernor leaseGovernor, Consumer errorStream) { - this.connection = connection; - this.isServer = isServer; - this.requestorSetupPayload = serverRequestorSetupPayload; - this.clientRequestHandler = clientRequestHandler; - this.responderConnectionHandler = responderConnectionHandler; - this.leaseGovernor = leaseGovernor; - this.errorStream = errorStream; - } - - /** - * Create a ReactiveSocket from a client-side {@link DuplexConnection}. - *

- * A client-side connection is one that initiated the connection with a server and will define the ReactiveSocket behaviors via the {@link ConnectionSetupPayload} that define mime-types, leasing - * behavior and other connection-level details. - * - * @param connection - * DuplexConnection of client-side initiated connection for the ReactiveSocket protocol to use. - * @param setup - * ConnectionSetupPayload that defines mime-types and other connection behavior details. - * @param handler - * (Optional) RequestHandler for responding to requests from the server. If 'null' requests will be responded to with "Not Found" errors. - * @param errorStream - * (Optional) Callback for errors while processing streams over connection. If 'null' then error messages will be output to System.err. - * @return ReactiveSocket for start, shutdown and sending requests. - */ - public static ReactiveSocket fromClientConnection(DuplexConnection connection, ConnectionSetupPayload setup, RequestHandler handler, Consumer errorStream) { - if (connection == null) { - throw new IllegalArgumentException("DuplexConnection can not be null"); - } - if (setup == null) { - throw new IllegalArgumentException("ConnectionSetupPayload can not be null"); - } - final RequestHandler h = handler != null ? handler : EMPTY_HANDLER; - Consumer es = errorStream != null ? errorStream : DEFAULT_ERROR_STREAM; - return new ReactiveSocket(connection, false, setup, h, null, LeaseGovernor.NULL_LEASE_GOVERNOR, es); - } - - /** - * Create a ReactiveSocket from a client-side {@link DuplexConnection}. - *

- * A client-side connection is one that initiated the connection with a server and will define the ReactiveSocket behaviors via the {@link ConnectionSetupPayload} that define mime-types, leasing - * behavior and other connection-level details. - *

- * If this ReactiveSocket receives requests from the server it will respond with "Not Found" errors. - * - * @param connection - * DuplexConnection of client-side initiated connection for the ReactiveSocket protocol to use. - * @param setup - * ConnectionSetupPayload that defines mime-types and other connection behavior details. - * @param errorStream - * (Optional) Callback for errors while processing streams over connection. If 'null' then error messages will be output to System.err. - * @return ReactiveSocket for start, shutdown and sending requests. - */ - public static ReactiveSocket fromClientConnection(DuplexConnection connection, ConnectionSetupPayload setup, Consumer errorStream) { - return fromClientConnection(connection, setup, EMPTY_HANDLER, errorStream); - } - - public static ReactiveSocket fromClientConnection(DuplexConnection connection, ConnectionSetupPayload setup) { - return fromClientConnection(connection, setup, EMPTY_HANDLER, DEFAULT_ERROR_STREAM); - } - - /** - * Create a ReactiveSocket from a server-side {@link DuplexConnection}. - *

- * A server-side connection is one that accepted the connection from a client and will define the ReactiveSocket behaviors via the {@link ConnectionSetupPayload} that define mime-types, leasing - * behavior and other connection-level details. - * - * @param connection - * @param connectionHandler - * @param errorConsumer - * @return - */ - public static ReactiveSocket fromServerConnection(DuplexConnection connection, ConnectionSetupHandler connectionHandler, LeaseGovernor leaseGovernor, Consumer errorConsumer) { - return new ReactiveSocket(connection, true, null, null, connectionHandler, leaseGovernor, errorConsumer); - } - - public static ReactiveSocket fromServerConnection(DuplexConnection connection, ConnectionSetupHandler connectionHandler) { - return fromServerConnection(connection, connectionHandler, LeaseGovernor.NULL_LEASE_GOVERNOR, t -> { - }); - } - - /** - * Initiate a request response exchange - */ - public Publisher requestResponse(final Payload payload) { - assertRequester(); - return requester.requestResponse(payload); - } - - public Publisher fireAndForget(final Payload payload) { - assertRequester(); - return requester.fireAndForget(payload); - } - - public Publisher requestStream(final Payload payload) { - assertRequester(); - return requester.requestStream(payload); - } - - public Publisher requestSubscription(final Payload payload) { - assertRequester(); - return requester.requestSubscription(payload); - } - - public Publisher requestChannel(final Publisher payloads) { - assertRequester(); - return requester.requestChannel(payloads); - } - - public Publisher metadataPush(final Payload payload) { - assertRequester(); - return requester.metadataPush(payload); - } - - private void assertRequester() { - if (requester == null) { - if (isServer) { - if (responder == null) { - throw new IllegalStateException("Connection not initialized. Please 'start()' before submitting requests"); - } else { - throw new IllegalStateException("Setup not yet received from client. Please wait until Setup is completed, then retry."); - } - } else { - throw new IllegalStateException("Connection not initialized. Please 'start()' before submitting requests"); - } - } - } - - /** - * Client check for availability to send request based on lease - * - * @return 0.0 to 1.0 indicating availability of sending requests - */ - public double availability() { - // TODO: can happen in either direction - assertRequester(); - return requester.availability(); - } - - /** - * Server granting new lease information to client - * - * Initial lease semantics are that server waits for periodic granting of leases by server side. - * - * @param ttl - * @param numberOfRequests - */ - public void sendLease(int ttl, int numberOfRequests) { - // TODO: can happen in either direction - responder.sendLease(ttl, numberOfRequests); - } - - /** - * Start protocol processing on the given DuplexConnection. - */ - public final void start(Completable c) { - if (isServer) { - responder = Responder.createServerResponder(new ConnectionFilter(connection, ConnectionFilter.STREAMS.FROM_CLIENT_EVEN), - responderConnectionHandler, - leaseGovernor, - errorStream, - c, - setupPayload -> { - Completable two = new Completable() { - - // wait for 2 success, or 1 error to pass on - AtomicInteger count = new AtomicInteger(); - - @Override - public void success() { - if (count.incrementAndGet() == 2) { - requesterReady.success(); - } - } - - @Override - public void error(Throwable e) { - requesterReady.error(e); - } - - }; - requester = Requester.createServerRequester(new ConnectionFilter(connection, ConnectionFilter.STREAMS.FROM_SERVER_ODD), setupPayload, errorStream, two); - two.success(); // now that the reference is assigned in case of synchronous setup - }); - } else { - Completable both = new Completable() { - - // wait for 2 success, or 1 error to pass on - AtomicInteger count = new AtomicInteger(); - - @Override - public void success() { - if (count.incrementAndGet() == 2) { - c.success(); - } - } - - @Override - public void error(Throwable e) { - c.error(e); - } - - }; - requester = Requester.createClientRequester(new ConnectionFilter(connection, ConnectionFilter.STREAMS.FROM_CLIENT_EVEN), requestorSetupPayload, errorStream, new Completable() { - - @Override - public void success() { - requesterReady.success(); - both.success(); - } - - @Override - public void error(Throwable e) { - requesterReady.error(e); - both.error(e); - } - - }); - responder = Responder.createClientResponder(new ConnectionFilter(connection, ConnectionFilter.STREAMS.FROM_SERVER_ODD), clientRequestHandler, leaseGovernor, errorStream, both); - } - } - - private final CompositeCompletable requesterReady = new CompositeCompletable(); - - /** - * Invoked when Requester is ready with success or fail. - * - * @param c - */ - public final void onRequestReady(Completable c) { - requesterReady.add(c); - } - - /** - * Invoked when Requester is ready. Non-null exception if error. Null if success. - * - * @param c - */ - public final void onRequestReady(Consumer c) { - requesterReady.add(new Completable() { - - @Override - public void success() { - c.accept(null); - } - - @Override - public void error(Throwable e) { - c.accept(e); - } - - }); - } - - private static class ConnectionFilter implements DuplexConnection { - - private enum STREAMS { - FROM_CLIENT_EVEN, FROM_SERVER_ODD; - } - - private final DuplexConnection connection; - private final STREAMS s; - - private ConnectionFilter(DuplexConnection connection, STREAMS s) { - this.connection = connection; - this.s = s; - } - - @Override - public void close() throws IOException { - connection.close(); // forward - } - - @Override - public Observable getInput() { - return new Observable() { - - @Override - public void subscribe(Observer o) { - CompositeDisposable cd = new CompositeDisposable(); - o.onSubscribe(cd); - connection.getInput().subscribe(new Observer() { - - @Override - public void onNext(Frame t) { - int streamId = t.getStreamId(); - FrameType type = t.getType(); - if (streamId == 0) { - if (FrameType.SETUP.equals(type) && s == STREAMS.FROM_CLIENT_EVEN) { - o.onNext(t); - } else if (FrameType.LEASE.equals(type)) { - o.onNext(t); - } else if (FrameType.ERROR.equals(type)) { - // o.onNext(t); // TODO this doesn't work - } else if (FrameType.KEEPALIVE.equals(type)) { - o.onNext(t); // TODO need tests - } else if (FrameType.METADATA_PUSH.equals(type)) { - o.onNext(t); - } - } else if (BitUtil.isEven(streamId)) { - if (s == STREAMS.FROM_CLIENT_EVEN) { - o.onNext(t); - } - } else { - if (s == STREAMS.FROM_SERVER_ODD) { - o.onNext(t); - } - } - } - - @Override - public void onError(Throwable e) { - o.onError(e); - } - - @Override - public void onComplete() { - o.onComplete(); - } - - @Override - public void onSubscribe(Disposable d) { - cd.add(d); - } - - }); - } - - }; - } - - @Override - public void addOutput(Publisher o, Completable callback) { - connection.addOutput(o, callback); - } - - }; - - /** - * Start and block the current thread until startup is finished. - * - * @throws RuntimeException - * of InterruptedException - */ - public final void startAndWait() { - CountDownLatch latch = new CountDownLatch(1); - AtomicReference err = new AtomicReference(); - start(new Completable() { - - @Override - public void success() { - latch.countDown(); - } - - @Override - public void error(Throwable e) { - latch.countDown(); - } - - }); - try { - latch.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - if (err.get() != null) { - throw new RuntimeException(err.get()); - } - } - - @Override - public void close() throws Exception { - connection.close(); - leaseGovernor.unregister(responder); - if (requester != null) { - requester.shutdown(); - } - if (responder != null) { - responder.shutdown(); - } - } - - public void shutdown() { - try { - close(); - } catch (Exception e) { - throw new RuntimeException("Failed Shutdown", e); - } - } - - private static Publisher error(Throwable e) { - return (Subscriber s) -> { - s.onSubscribe(new Subscription() { - - @Override - public void request(long n) { - // should probably worry about n==0 - s.onError(e); - } - - @Override - public void cancel() { - // ignoring just because - } - - }); - - }; - } + private static final RequestHandler EMPTY_HANDLER = new RequestHandler.Builder().build(); + + private static final Consumer DEFAULT_ERROR_STREAM = t -> { + // TODO should we use SLF4j, use System.err, or swallow by default? + System.err.println("ReactiveSocket ERROR => " + t.getMessage() + + " [Provide errorStream handler to replace this default]"); + }; + + private final DuplexConnection connection; + private final boolean isServer; + private final Consumer errorStream; + private Requester requester; + private Responder responder; + private final ConnectionSetupPayload requestorSetupPayload; + private final RequestHandler clientRequestHandler; + private final ConnectionSetupHandler responderConnectionHandler; + private final LeaseGovernor leaseGovernor; + + private ReactiveSocket( + DuplexConnection connection, + boolean isServer, + ConnectionSetupPayload serverRequestorSetupPayload, + RequestHandler clientRequestHandler, + ConnectionSetupHandler responderConnectionHandler, + LeaseGovernor leaseGovernor, + Consumer errorStream + ) { + this.connection = connection; + this.isServer = isServer; + this.requestorSetupPayload = serverRequestorSetupPayload; + this.clientRequestHandler = clientRequestHandler; + this.responderConnectionHandler = responderConnectionHandler; + this.leaseGovernor = leaseGovernor; + this.errorStream = errorStream; + } + + /** + * Create a ReactiveSocket from a client-side {@link DuplexConnection}. + *

+ * A client-side connection is one that initiated the connection with a + * server and will define the ReactiveSocket behaviors via the + * {@link ConnectionSetupPayload} that define mime-types, leasing + * behavior and other connection-level details. + * + * @param connection + * DuplexConnection of client-side initiated connection for + * the ReactiveSocket protocol to use. + * @param setup + * ConnectionSetupPayload that defines mime-types and other + * connection behavior details. + * @param handler + * (Optional) RequestHandler for responding to requests from + * the server. If 'null' requests will be responded to with + * "Not Found" errors. + * @param errorStream + * (Optional) Callback for errors while processing streams + * over connection. If 'null' then error messages will be + * output to System.err. + * @return ReactiveSocket for start, shutdown and sending requests. + */ + public static ReactiveSocket fromClientConnection( + DuplexConnection connection, + ConnectionSetupPayload setup, + RequestHandler handler, + Consumer errorStream + ) { + if (connection == null) { + throw new IllegalArgumentException("DuplexConnection can not be null"); + } + if (setup == null) { + throw new IllegalArgumentException("ConnectionSetupPayload can not be null"); + } + final RequestHandler h = handler != null ? handler : EMPTY_HANDLER; + Consumer es = errorStream != null ? errorStream : DEFAULT_ERROR_STREAM; + return new ReactiveSocket(connection, false, setup, h, null, NULL_LEASE_GOVERNOR, es); + } + + /** + * Create a ReactiveSocket from a client-side {@link DuplexConnection}. + *

+ * A client-side connection is one that initiated the connection with a + * server and will define the ReactiveSocket behaviors via the + * {@link ConnectionSetupPayload} that define mime-types, leasing + * behavior and other connection-level details. + *

+ * If this ReactiveSocket receives requests from the server it will respond + * with "Not Found" errors. + * + * @param connection + * DuplexConnection of client-side initiated connection for the + * ReactiveSocket protocol to use. + * @param setup + * ConnectionSetupPayload that defines mime-types and other + * connection behavior details. + * @param errorStream + * (Optional) Callback for errors while processing streams over + * connection. If 'null' then error messages will be output to + * System.err. + * @return ReactiveSocket for start, shutdown and sending requests. + */ + public static ReactiveSocket fromClientConnection( + DuplexConnection connection, + ConnectionSetupPayload setup, + Consumer errorStream + ) { + return fromClientConnection(connection, setup, EMPTY_HANDLER, errorStream); + } + + public static ReactiveSocket fromClientConnection( + DuplexConnection connection, + ConnectionSetupPayload setup + ) { + return fromClientConnection(connection, setup, EMPTY_HANDLER, DEFAULT_ERROR_STREAM); + } + + /** + * Create a ReactiveSocket from a server-side {@link DuplexConnection}. + *

+ * A server-side connection is one that accepted the connection from a + * client and will define the ReactiveSocket behaviors via the + * {@link ConnectionSetupPayload} that define mime-types, leasing behavior + * and other connection-level details. + * + * @param connection + * @param connectionHandler + * @param errorConsumer + * @return + */ + public static ReactiveSocket fromServerConnection( + DuplexConnection connection, + ConnectionSetupHandler connectionHandler, + LeaseGovernor leaseGovernor, + Consumer errorConsumer + ) { + return new ReactiveSocket(connection, true, null, null, connectionHandler, + leaseGovernor, errorConsumer); + } + + public static ReactiveSocket fromServerConnection( + DuplexConnection connection, + ConnectionSetupHandler connectionHandler + ) { + return fromServerConnection(connection, connectionHandler, NULL_LEASE_GOVERNOR, t -> {}); + } + + /** + * Initiate a request response exchange + */ + public Publisher requestResponse(final Payload payload) { + assertRequester(); + return requester.requestResponse(payload); + } + + public Publisher fireAndForget(final Payload payload) { + assertRequester(); + return requester.fireAndForget(payload); + } + + public Publisher requestStream(final Payload payload) { + assertRequester(); + return requester.requestStream(payload); + } + + public Publisher requestSubscription(final Payload payload) { + assertRequester(); + return requester.requestSubscription(payload); + } + + public Publisher requestChannel(final Publisher payloads) { + assertRequester(); + return requester.requestChannel(payloads); + } + + public Publisher metadataPush(final Payload payload) { + assertRequester(); + return requester.metadataPush(payload); + } + + private void assertRequester() { + if (requester == null) { + if (isServer) { + if (responder == null) { + throw new IllegalStateException("Connection not initialized. " + + "Please 'start()' before submitting requests"); + } else { + throw new IllegalStateException("Setup not yet received from client. " + + "Please wait until Setup is completed, then retry."); + } + } else { + throw new IllegalStateException("Connection not initialized. " + + "Please 'start()' before submitting requests"); + } + } + } + + /** + * Client check for availability to send request based on lease + * + * @return 0.0 to 1.0 indicating availability of sending requests + */ + public double availability() { + // TODO: can happen in either direction + assertRequester(); + return requester.availability(); + } + + /** + * Server granting new lease information to client + * + * Initial lease semantics are that server waits for periodic granting of leases by server side. + * + * @param ttl + * @param numberOfRequests + */ + public void sendLease(int ttl, int numberOfRequests) { + // TODO: can happen in either direction + responder.sendLease(ttl, numberOfRequests); + } + + /** + * Start protocol processing on the given DuplexConnection. + */ + public final void start(Completable c) { + if (isServer) { + responder = Responder.createServerResponder( + new ConnectionFilter(connection, ConnectionFilter.STREAMS.FROM_CLIENT_EVEN), + responderConnectionHandler, + leaseGovernor, + errorStream, + c, + setupPayload -> { + Completable two = new Completable() { + // wait for 2 success, or 1 error to pass on + AtomicInteger count = new AtomicInteger(); + + @Override + public void success() { + if (count.incrementAndGet() == 2) { + requesterReady.success(); + } + } + + @Override + public void error(Throwable e) { + requesterReady.error(e); + } + }; + requester = Requester.createServerRequester( + new ConnectionFilter(connection, ConnectionFilter.STREAMS.FROM_SERVER_ODD), + setupPayload, + errorStream, + two + ); + two.success(); // now that the reference is assigned in case of synchronous setup + }); + } else { + Completable both = new Completable() { + // wait for 2 success, or 1 error to pass on + AtomicInteger count = new AtomicInteger(); + + @Override + public void success() { + if (count.incrementAndGet() == 2) { + c.success(); + } + } + + @Override + public void error(Throwable e) { + c.error(e); + } + }; + requester = Requester.createClientRequester( + new ConnectionFilter(connection, ConnectionFilter.STREAMS.FROM_CLIENT_EVEN), + requestorSetupPayload, + errorStream, + new Completable() { + @Override + public void success() { + requesterReady.success(); + both.success(); + } + + @Override + public void error(Throwable e) { + requesterReady.error(e); + both.error(e); + } + }); + responder = Responder.createClientResponder( + new ConnectionFilter(connection, ConnectionFilter.STREAMS.FROM_SERVER_ODD), + clientRequestHandler, + leaseGovernor, + errorStream, + both + ); + } + } + + private final CompositeCompletable requesterReady = new CompositeCompletable(); + + /** + * Invoked when Requester is ready with success or fail. + * + * @param c + */ + public final void onRequestReady(Completable c) { + requesterReady.add(c); + } + + /** + * Invoked when Requester is ready. Non-null exception if error. Null if success. + * + * @param c + */ + public final void onRequestReady(Consumer c) { + requesterReady.add(new Completable() { + @Override + public void success() { + c.accept(null); + } + + @Override + public void error(Throwable e) { + c.accept(e); + } + }); + } + + private static class ConnectionFilter implements DuplexConnection { + private enum STREAMS { + FROM_CLIENT_EVEN, FROM_SERVER_ODD; + } + + private final DuplexConnection connection; + private final STREAMS s; + + private ConnectionFilter(DuplexConnection connection, STREAMS s) { + this.connection = connection; + this.s = s; + } + + @Override + public void close() throws IOException { + connection.close(); // forward + } + + @Override + public Observable getInput() { + return new Observable() { + @Override + public void subscribe(Observer o) { + CompositeDisposable cd = new CompositeDisposable(); + o.onSubscribe(cd); + connection.getInput().subscribe(new Observer() { + + @Override + public void onNext(Frame t) { + int streamId = t.getStreamId(); + FrameType type = t.getType(); + if (streamId == 0) { + if (FrameType.SETUP.equals(type) && s == STREAMS.FROM_CLIENT_EVEN) { + o.onNext(t); + } else if (FrameType.LEASE.equals(type)) { + o.onNext(t); + } else if (FrameType.ERROR.equals(type)) { + // o.onNext(t); // TODO this doesn't work + } else if (FrameType.KEEPALIVE.equals(type)) { + o.onNext(t); // TODO need tests + } else if (FrameType.METADATA_PUSH.equals(type)) { + o.onNext(t); + } + } else if (BitUtil.isEven(streamId)) { + if (s == STREAMS.FROM_CLIENT_EVEN) { + o.onNext(t); + } + } else { + if (s == STREAMS.FROM_SERVER_ODD) { + o.onNext(t); + } + } + } + + @Override + public void onError(Throwable e) { + o.onError(e); + } + + @Override + public void onComplete() { + o.onComplete(); + } + + @Override + public void onSubscribe(Disposable d) { + cd.add(d); + } + }); + } + }; + } + + @Override + public void addOutput(Publisher o, Completable callback) { + connection.addOutput(o, callback); + } + + @Override + public void addOutput(Frame f, Completable callback) { + connection.addOutput(f, callback); + } + + }; + + /** + * Start and block the current thread until startup is finished. + * + * @throws RuntimeException + * of InterruptedException + */ + public final void startAndWait() { + CountDownLatch latch = new CountDownLatch(1); + AtomicReference err = new AtomicReference<>(); + start(new Completable() { + @Override + public void success() { + latch.countDown(); + } + + @Override + public void error(Throwable e) { + latch.countDown(); + } + }); + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + if (err.get() != null) { + throw new RuntimeException(err.get()); + } + } + + @Override + public void close() throws Exception { + connection.close(); + leaseGovernor.unregister(responder); + if (requester != null) { + requester.shutdown(); + } + if (responder != null) { + responder.shutdown(); + } + } + + public void shutdown() { + try { + close(); + } catch (Exception e) { + throw new RuntimeException("Failed Shutdown", e); + } + } + + private static Publisher error(Throwable e) { + return (Subscriber s) -> { + s.onSubscribe(new Subscription() { + @Override + public void request(long n) { + // should probably worry about n==0 + s.onError(e); + } + + @Override + public void cancel() { + // ignoring just because + } + }); + }; + } } diff --git a/src/main/java/io/reactivesocket/RequestHandler.java b/src/main/java/io/reactivesocket/RequestHandler.java index 7bd3cc8a2..1f8682c05 100644 --- a/src/main/java/io/reactivesocket/RequestHandler.java +++ b/src/main/java/io/reactivesocket/RequestHandler.java @@ -35,8 +35,8 @@ public abstract class RequestHandler { public static final Function> NO_FIRE_AND_FORGET_HANDLER = (payload -> PublisherUtils.errorVoid(new RuntimeException("No 'fireAndForget' handler"))); - public static final BiFunction, Publisher> NO_REQUEST_CHANNEL_HANDLER = - (initialPayload, payloads) -> PublisherUtils.errorPayload(new RuntimeException("No 'requestChannel' handler")); + public static final Function, Publisher> NO_REQUEST_CHANNEL_HANDLER = + (payloads) -> PublisherUtils.errorPayload(new RuntimeException("No 'requestChannel' handler")); public static final Function> NO_METADATA_PUSH_HANDLER = (payload -> PublisherUtils.errorVoid(new RuntimeException("No 'metadataPush' handler"))); @@ -50,15 +50,9 @@ public abstract class RequestHandler { public abstract Publisher handleFireAndForget(final Payload payload); /** - * @param initialPayload - * The first Payload contained in Publisher 'payloads'. - *

- * This is delivered each time to allow simple use of the first payload for routing decisions. - * @param payloads - * stream of Payloads. * @return */ - public abstract Publisher handleChannel(final Payload initialPayload, final Publisher payloads); + public abstract Publisher handleChannel(final Publisher inputs); public abstract Publisher handleMetadataPush(final Payload payload); @@ -68,7 +62,7 @@ public static class Builder private Function> handleRequestStream = NO_REQUEST_STREAM_HANDLER; private Function> handleRequestSubscription = NO_REQUEST_SUBSCRIPTION_HANDLER; private Function> handleFireAndForget = NO_FIRE_AND_FORGET_HANDLER; - private BiFunction, Publisher> handleRequestChannel = NO_REQUEST_CHANNEL_HANDLER; + private Function, Publisher> handleRequestChannel = NO_REQUEST_CHANNEL_HANDLER; private Function> handleMetadataPush = NO_METADATA_PUSH_HANDLER; public Builder withRequestResponse(final Function> handleRequestResponse) @@ -95,7 +89,7 @@ public Builder withFireAndForget(final Function> handle return this; } - public Builder withRequestChannel(final BiFunction , Publisher> handleRequestChannel) + public Builder withRequestChannel(final Function , Publisher> handleRequestChannel) { this.handleRequestChannel = handleRequestChannel; return this; @@ -131,9 +125,9 @@ public Publisher handleFireAndForget(Payload payload) return handleFireAndForget.apply(payload); } - public Publisher handleChannel(Payload initialPayload, Publisher payloads) + public Publisher handleChannel(Publisher inputs) { - return handleRequestChannel.apply(initialPayload, payloads); + return handleRequestChannel.apply(inputs); } public Publisher handleMetadataPush(Payload payload) diff --git a/src/main/java/io/reactivesocket/internal/Requester.java b/src/main/java/io/reactivesocket/internal/Requester.java index 2ec6b52fe..94cbdfc28 100644 --- a/src/main/java/io/reactivesocket/internal/Requester.java +++ b/src/main/java/io/reactivesocket/internal/Requester.java @@ -1,12 +1,12 @@ /** * Copyright 2015 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -53,839 +53,936 @@ */ public class Requester { - private final static Disposable CANCELLED = new EmptyDisposable(); - private final static int KEEPALIVE_INTERVAL_MS = 1000; - - private final boolean isServer; - private final DuplexConnection connection; - private final Int2ObjectHashMap> streamInputMap = new Int2ObjectHashMap<>(); - private final ConnectionSetupPayload setupPayload; - private final Consumer errorStream; - - private final boolean honorLease; - private long ttlExpiration; - private long numberOfRemainingRequests = 0; - private long timeOfLastKeepalive = 0; - private int streamCount = 0; // 0 is reserved for setup, all normal messages are >= 1 - - private static final long DEFAULT_BATCH = 1024; - private static final long REQUEST_THRESHOLD = 256; - - private volatile boolean requesterStarted = false; - - private Requester(boolean isServer, DuplexConnection connection, ConnectionSetupPayload setupPayload, Consumer errorStream) { - this.isServer = isServer; - this.connection = connection; - this.setupPayload = setupPayload; - this.errorStream = errorStream; - if (isServer) { - streamCount = 1; // server is odds - } else { - streamCount = 0; // client is even - } - - this.honorLease = setupPayload.willClientHonorLease(); - } - - public static Requester createClientRequester(DuplexConnection connection, ConnectionSetupPayload setupPayload, Consumer errorStream, Completable requesterCompletable) { - Requester requester = new Requester(false, connection, setupPayload, errorStream); - requester.start(requesterCompletable); - return requester; - } - - public static Requester createServerRequester(DuplexConnection connection, ConnectionSetupPayload setupPayload, Consumer errorStream, Completable requesterCompletable) { - Requester requester = new Requester(true, connection, setupPayload, errorStream); - requester.start(requesterCompletable); - return requester; - } - - public void shutdown() { - // TODO do something here - System.err.println("**** Requester.shutdown => this should actually do something"); - } - - public boolean isServer() { - return isServer; - } - - public long timeOfLastKeepalive() - { - return timeOfLastKeepalive; - } - - /** - * Request/Response with a single message response. - * - * @param payload - * @return - */ - public Publisher requestResponse(final Payload payload) { - return startRequestResponse(nextStreamId(), FrameType.REQUEST_RESPONSE, payload); - } - - /** - * Request/Stream with a finite multi-message response followed by a terminal state {@link Subscriber#onComplete()} or {@link Subscriber#onError(Throwable)}. - * - * @param payload - * @return - */ - public Publisher requestStream(final Payload payload) { - return startStream(nextStreamId(), FrameType.REQUEST_STREAM, payload); - } - - /** - * Fire-and-forget without a response from the server. - *

- * The returned {@link Publisher} will emit {@link Subscriber#onComplete()} or {@link Subscriber#onError(Throwable)} to represent success or failure in sending from the client side, but no - * feedback from the server will be returned. - * - * @param payload - * @return - */ - public Publisher fireAndForget(final Payload payload) { - if (payload == null) { - throw new IllegalStateException("Payload can not be null"); - } - assertStarted(); - return new Publisher() { - - @Override - public void subscribe(Subscriber child) { - child.onSubscribe(new Subscription() { - - final AtomicBoolean started = new AtomicBoolean(false); - - @Override - public void request(long n) { - if (n > 0 && started.compareAndSet(false, true)) { - numberOfRemainingRequests--; - - connection.addOutput(PublisherUtils.just(Frame.Request.from(nextStreamId(), FrameType.FIRE_AND_FORGET, payload, 0)), new Completable() { - - @Override - public void success() { - child.onComplete(); - } - - @Override - public void error(Throwable e) { - child.onError(e); - } - - }); - } - } - - @Override - public void cancel() { - // nothing to cancel on a fire-and-forget - } - - }); - } - - }; - } - - /** - * Send asynchonrous Metadata Push without a response from the server. - *

- * The returned {@link Publisher} will emit {@link Subscriber#onComplete()} or {@link Subscriber#onError(Throwable)} to - * represent success or failure in sending from the client side, but no feedback from the server will be returned. - * - * @param payload - * @return - */ - public Publisher metadataPush(final Payload payload) { - if (payload == null) { - throw new IllegalArgumentException("Payload can not be null"); - } - assertStarted(); - return (Subscriber child) -> - child.onSubscribe(new Subscription() { - - final AtomicBoolean started = new AtomicBoolean(false); - - @Override - public void request(long n) { - if (n > 0 && started.compareAndSet(false, true)) { - numberOfRemainingRequests--; - - connection.addOutput(PublisherUtils.just(Frame.Request.from(nextStreamId(), FrameType.METADATA_PUSH, payload, 0)), new Completable() { - - @Override - public void success() { - child.onComplete(); - } - - @Override - public void error(Throwable e) { - child.onError(e); - } - - }); - } - } - - @Override - public void cancel() { - // nothing to cancel on a metadataPush - } - - }); - } - - - /** - * Event subscription with an infinite multi-message response potentially terminated with an {@link Subscriber#onError(Throwable)}. - * - * @param payload - * @return - */ - public Publisher requestSubscription(final Payload payload) { - return startStream(nextStreamId(), FrameType.REQUEST_SUBSCRIPTION, payload); - } - - /** - * Request/Stream with a finite multi-message response followed by a terminal state {@link Subscriber#onComplete()} or {@link Subscriber#onError(Throwable)}. - * - * @param payloadStream - * @return - */ - public Publisher requestChannel(final Publisher payloadStream) { - return startChannel(nextStreamId(), FrameType.REQUEST_CHANNEL, payloadStream); - } - - private void assertStarted() { - if (!requesterStarted) { - throw new IllegalStateException("Requester not initialized. Please await 'start()' completion before submitting requests."); - } - } - - - /** - * Return availability of sending requests - * - * @return - */ - public double availability() { - if (!honorLease) { - return 1.0; - } - final long now = System.currentTimeMillis(); - double available = 0.0; - if (numberOfRemainingRequests > 0 && (now < ttlExpiration)) { - available = 1.0; - } - return available; - } - - /* - * Using payload/payloads with null check for efficiency so I don't have to allocate a Publisher for the most common case of single Payload - */ - private Publisher startStream(int streamId, FrameType type, Payload payload) { - assertStarted(); - return (Subscriber child) -> { - child.onSubscribe(new Subscription() { - - final AtomicBoolean started = new AtomicBoolean(false); - volatile StreamInputSubscriber streamInputSubscriber; - volatile UnicastSubject writer; - final AtomicLong requested = new AtomicLong(); // TODO does this need to be atomic? Can request(n) come from any thread? - final AtomicLong outstanding = new AtomicLong(); // TODO AtomicLong just so I can pass it around ... perf issue? or is there a thread-safety issue? - - @Override - public void request(long n) { - if(n <= 0) { - return; - } - BackpressureUtils.getAndAddRequest(requested, n); - if (started.compareAndSet(false, true)) { - // determine initial RequestN - long currentN = requested.get(); - final long requestN = currentN < DEFAULT_BATCH ? currentN : DEFAULT_BATCH; - // threshold - final long threshold = requestN == DEFAULT_BATCH ? REQUEST_THRESHOLD : requestN/3; - - - // declare output to transport - writer = UnicastSubject.create((w, rn) -> { - numberOfRemainingRequests--; - - // decrement as we request it - requested.addAndGet(-requestN); - // record how many we have requested - outstanding.addAndGet(requestN); - - // when transport connects we write the request frame for this stream - w.onNext(Frame.Request.from(streamId, type, payload, (int)requestN)); - }); - - // Response frames for this Stream - UnicastSubject transportInputSubject = UnicastSubject.create(); - synchronized(Requester.this) { - streamInputMap.put(streamId, transportInputSubject); - } - streamInputSubscriber = new StreamInputSubscriber(streamId, threshold, outstanding, requested, writer, child, () -> { - cancel(); - }); - transportInputSubject.subscribe(streamInputSubscriber); - - // connect to transport - connection.addOutput(writer, new Completable() - { - - @Override - public void success() - { - // nothing to do onSuccess - } - - @Override - public void error(Throwable e) { - child.onError(e); - cancel(); - } - - }); - } else { - // propagate further requestN frames - long currentN = requested.get(); - final long requestThreshold = REQUEST_THRESHOLD < currentN ? REQUEST_THRESHOLD : currentN/3; - requestIfNecessary(streamId, requestThreshold, currentN, outstanding.get(), writer, requested, outstanding); - } - - } - - @Override - public void cancel() { - synchronized(Requester.this) { - streamInputMap.remove(streamId); - } - if (!streamInputSubscriber.terminated.get()) { - writer.onNext(Frame.Cancel.from(streamId)); - } - streamInputSubscriber.parentSubscription.cancel(); - } - - }); - }; - } - - /* - * Using payload/payloads with null check for efficiency so I don't have to allocate a Publisher for the most common case of single Payload - */ - private Publisher startChannel(int streamId, FrameType type, Publisher payloads) { - if (payloads == null) { - throw new IllegalStateException("Both payload and payloads can not be null"); - } - assertStarted(); - return (Subscriber child) -> { - child.onSubscribe(new Subscription() { - - AtomicBoolean started = new AtomicBoolean(false); - volatile StreamInputSubscriber streamInputSubscriber; - volatile UnicastSubject writer; - final AtomicReference payloadsSubscription = new AtomicReference<>(); - final AtomicLong requested = new AtomicLong(); // TODO does this need to be atomic? Can request(n) come from any thread? - final AtomicLong outstanding = new AtomicLong(); // TODO AtomicLong just so I can pass it around ... perf issue? or is there a thread-safety issue? - - @Override - public void request(long n) { - if(n <= 0) { - return; - } - BackpressureUtils.getAndAddRequest(requested, n); - if (started.compareAndSet(false, true)) { - // determine initial RequestN - long currentN = requested.get(); - final long requestN = currentN < DEFAULT_BATCH ? currentN : DEFAULT_BATCH; - // threshold - final long threshold = requestN == DEFAULT_BATCH ? REQUEST_THRESHOLD : requestN/3; - - // declare output to transport - writer = UnicastSubject.create((w, rn) -> { - numberOfRemainingRequests--; - - // decrement as we request it - requested.addAndGet(-requestN); - // record how many we have requested - outstanding.addAndGet(requestN); - - connection.addOutput(new Publisher() { - - @Override - public void subscribe(Subscriber transport) { - transport.onSubscribe(new Subscription() { - - final AtomicBoolean started = new AtomicBoolean(false); - @Override - public void request(long n) { - if(n <= 0) { - return; - } - if(started.compareAndSet(false, true)) { - payloads.subscribe(new Subscriber() { - - @Override - public void onSubscribe(Subscription s) { - if (!payloadsSubscription.compareAndSet(null, s)) { - s.cancel(); // we are already unsubscribed - } else { - // we always start with 1 to initiate requestChannel, then wait for REQUEST_N from Responder to send more - s.request(1); - } - } - - // onNext is serialized by contract so this is okay as non-volatile primitive - boolean isInitialRequest = true; - - @Override - public void onNext(Payload p) { - if(isInitialRequest) { - isInitialRequest = false; - Frame f = Frame.Request.from(streamId, type, p, (int)requestN); - transport.onNext(f); - } else { - Frame f = Frame.Request.from(streamId, type, p, 0); - transport.onNext(f); - } - } - - @Override - public void onError(Throwable t) { - // TODO validate with unit tests - transport.onError(new RuntimeException("Error received from request stream.", t)); - child.onError(new RuntimeException("Error received from request stream.", t)); - cancel(); - } - - @Override - public void onComplete() { - Frame f = Frame.Request.from(streamId, FrameType.REQUEST_CHANNEL, RequestFrameFlyweight.FLAGS_REQUEST_CHANNEL_C); - transport.onNext(f); - transport.onComplete(); - } - - }); - } else { - // TODO we need to compose this requestN from transport with the remote REQUEST_N - } - - } - - @Override - public void cancel() { - - }}); - } - - }, new Completable() - { - - @Override - public void success() - { - // nothing to do onSuccess - } - - @Override - public void error(Throwable e) { - child.onError(e); - cancel(); - } - - }); - - }); - - // Response frames for this Stream - UnicastSubject transportInputSubject = UnicastSubject.create(); - synchronized(Requester.this) { - streamInputMap.put(streamId, transportInputSubject); - } - streamInputSubscriber = new StreamInputSubscriber(streamId, threshold, outstanding, requested, writer, child, payloadsSubscription, () -> { - cancel(); - }); - transportInputSubject.subscribe(streamInputSubscriber); - - // connect to transport - connection.addOutput(writer, new Completable() - { - - @Override - public void success() - { - // nothing to do onSuccess - } - - @Override - public void error(Throwable e) { - child.onError(e); - if (!(e instanceof Retryable)) { - cancel(); - } - } - - }); - } else { - // propagate further requestN frames - long currentN = requested.get(); - final long requestThreshold = REQUEST_THRESHOLD < currentN ? REQUEST_THRESHOLD : currentN/3; - requestIfNecessary(streamId, requestThreshold, currentN, outstanding.get(), writer, requested, outstanding); - } - } - - @Override - public void cancel() { - synchronized(Requester.this) { - streamInputMap.remove(streamId); - } - if (!streamInputSubscriber.terminated.get()) { - writer.onNext(Frame.Cancel.from(streamId)); - } - streamInputSubscriber.parentSubscription.cancel(); - if (payloadsSubscription != null) { - if (!payloadsSubscription.compareAndSet(null, EmptySubscription.INSTANCE)) { - payloadsSubscription.get().cancel(); // unsubscribe it if it already exists - } - } - } - - }); - }; - } - - /* - * Special-cased for performance reasons (achieved 20-30% throughput increase over using startStream for request/response) - */ - private Publisher startRequestResponse(int streamId, FrameType type, Payload payload) { - if (payload == null) { - throw new IllegalStateException("Both payload and payloads can not be null"); - } - assertStarted(); - return (Subscriber child) -> { - child.onSubscribe(new Subscription() { - - final AtomicBoolean started = new AtomicBoolean(false); - volatile StreamInputSubscriber streamInputSubscriber; - volatile UnicastSubject writer; - - @Override - public void request(long n) { - if (n > 0 && started.compareAndSet(false, true)) { - // Response frames for this Stream - UnicastSubject transportInputSubject = UnicastSubject.create(); - synchronized(Requester.this) { - streamInputMap.put(streamId, transportInputSubject); - } - streamInputSubscriber = new StreamInputSubscriber(streamId, 0, null, null, writer, child, () -> { - cancel(); - }); - transportInputSubject.subscribe(streamInputSubscriber); - - Publisher requestOutput = PublisherUtils.just(Frame.Request.from(streamId, type, payload, 1)); - // connect to transport - connection.addOutput(requestOutput, new Completable() { - - @Override - public void success() { - // nothing to do onSuccess - } - - @Override - public void error(Throwable e) - { - child.onError(e); - cancel(); - } - - }); - } - } - - @Override - public void cancel() { - if (!streamInputSubscriber.terminated.get()) { - connection.addOutput(PublisherUtils.just(Frame.Cancel.from(streamId)), new Completable() { - - @Override - public void success() { - // nothing to do onSuccess - } - - @Override - public void error(Throwable e) - { - child.onError(e); - } - - }); - } - synchronized(Requester.this) { - streamInputMap.remove(streamId); - } - streamInputSubscriber.parentSubscription.cancel(); - } - - }); - }; - } - - private final static class StreamInputSubscriber implements Subscriber { - final AtomicBoolean terminated = new AtomicBoolean(false); - volatile Subscription parentSubscription; - - private final int streamId; - private final long requestThreshold; - private final AtomicLong outstandingRequests; - private final AtomicLong requested; - private final UnicastSubject writer; - private final Subscriber child; - private final Runnable cancelAction; - private final AtomicReference requestStreamSubscription; - - public StreamInputSubscriber(int streamId, long threshold, AtomicLong outstanding, AtomicLong requested, UnicastSubject writer, Subscriber child, Runnable cancelAction) { - this.streamId = streamId; - this.requestThreshold = threshold; - this.requested = requested; - this.outstandingRequests = outstanding; - this.writer = writer; - this.child = child; - this.cancelAction = cancelAction; - this.requestStreamSubscription = null; - } - - public StreamInputSubscriber(int streamId, long threshold, AtomicLong outstanding, AtomicLong requested, UnicastSubject writer, Subscriber child, AtomicReference requestStreamSubscription, Runnable cancelAction) { - this.streamId = streamId; - this.requestThreshold = threshold; - this.requested = requested; - this.outstandingRequests = outstanding; - this.writer = writer; - this.child = child; - this.cancelAction = cancelAction; - this.requestStreamSubscription = requestStreamSubscription; - } - - @Override - public void onSubscribe(Subscription s) { - this.parentSubscription = s; - s.request(Long.MAX_VALUE); // no backpressure to transport (we will only receive what we've asked for already) - } - - @Override - public void onNext(Frame frame) { - FrameType type = frame.getType(); - // convert ERROR messages into terminal events - if (type == FrameType.NEXT_COMPLETE) { - terminated.set(true); - child.onNext(frame); - onComplete(); - cancel(); - } else if (type == FrameType.NEXT) { - child.onNext(frame); - long currentOutstanding = outstandingRequests.decrementAndGet(); - requestIfNecessary(streamId, requestThreshold, requested.get(), currentOutstanding, writer, requested, outstandingRequests); - } else if (type == FrameType.REQUEST_N) { - if(requestStreamSubscription != null) { - Subscription s = requestStreamSubscription.get(); - if(s != null) { - s.request(Frame.RequestN.requestN(frame)); - } else { - // TODO can this ever be null? - System.err.println("ReactiveSocket Requester DEBUG: requestStreamSubscription is null"); - } - return; - } - // TODO should we do anything if we don't find the stream? emitting an error is risky as the responder could have terminated and cleaned up already - } else if (type == FrameType.COMPLETE) { - terminated.set(true); - onComplete(); - cancel(); - } else if (type == FrameType.ERROR) { - terminated.set(true); - final ByteBuffer byteBuffer = frame.getData(); - String errorMessage = getByteBufferAsString(byteBuffer); - onError(new RuntimeException(errorMessage)); - cancel(); - } else { - onError(new RuntimeException("Unexpected FrameType: " + frame.getType())); - cancel(); - } - } - - @Override - public void onError(Throwable t) { - terminated.set(true); - child.onError(t); - } - - @Override - public void onComplete() { - terminated.set(true); - child.onComplete(); - } - - private void cancel() { - cancelAction.run(); - } - } - - private static void requestIfNecessary(int streamId, long requestThreshold, long currentN, long currentOutstanding, UnicastSubject writer, AtomicLong requested, AtomicLong outstanding) { - if(currentOutstanding <= requestThreshold) { - long batchSize = DEFAULT_BATCH - currentOutstanding; - final long requestN = currentN < batchSize ? currentN : batchSize; - - if (requestN > 0) { - // decrement as we request it - requested.addAndGet(-requestN); - // record how many we have requested - outstanding.addAndGet(requestN); - - writer.onNext(Frame.RequestN.from(streamId, (int)requestN)); - } - } - } - - private int nextStreamId() { - return streamCount += 2; // go by two since server is odd, client is even - } - - private void start(Completable onComplete) { - AtomicReference connectionSubscription = new AtomicReference<>(); - // get input from responder->requestor for responses - connection.getInput().subscribe(new Observer() { - public void onSubscribe(Disposable d) { - if (connectionSubscription.compareAndSet(null, d)) { - if(isServer) { - requesterStarted = true; - onComplete.success(); - } else { - // now that we are connected, send SETUP frame (asynchronously, other messages can continue being written after this) - connection.addOutput(PublisherUtils.just(Frame.Setup.from(setupPayload.getFlags(), KEEPALIVE_INTERVAL_MS, 0, setupPayload.metadataMimeType(), setupPayload.dataMimeType(), setupPayload)), - new Completable() { - - @Override - public void success() { - requesterStarted = true; - onComplete.success(); - } - - @Override - public void error(Throwable e) { - onComplete.error(e); - tearDown(e); - } - - }); - - connection.addOutput(PublisherUtils.keepaliveTicker(KEEPALIVE_INTERVAL_MS, TimeUnit.MILLISECONDS), - new Completable() - { - public void success() - { - } - - public void error(Throwable e) - { - onComplete.error(e); - tearDown(e); - } - }); - } - } else { - // means we already were cancelled - d.dispose(); - onComplete.error(new CancelException("Connection Is Already Cancelled")); - } - } - - private void tearDown(Throwable e) { - onError(e); - } - - public void onNext(Frame frame) { - int streamId = frame.getStreamId(); - if (streamId == 0) { - if (FrameType.ERROR.equals(frame.getType())) { - final Throwable throwable = Exceptions.from(frame); - onError(throwable); - } else if (FrameType.LEASE.equals(frame.getType()) && honorLease) { - numberOfRemainingRequests = Frame.Lease.numberOfRequests(frame); - final long now = System.currentTimeMillis(); - final int ttl = Frame.Lease.ttl(frame); - if (ttl == Integer.MAX_VALUE) { - // Integer.MAX_VALUE represents infinity - ttlExpiration = Long.MAX_VALUE; - } else { - ttlExpiration = now + ttl; - } - } else if (FrameType.KEEPALIVE.equals(frame.getType())) { - timeOfLastKeepalive = System.currentTimeMillis(); - } else { - onError(new RuntimeException("Received unexpected message type on stream 0: " + frame.getType().name())); - } - } else { - UnicastSubject streamSubject; - synchronized (Requester.this) { - streamSubject = streamInputMap.get(streamId); - } - if (streamSubject == null) { - if (streamId <= streamCount) { - // receiving a frame after a given stream has been cancelled/completed, so ignore (cancellation is async so there is a race condition) - return; - } else { - // message for stream that has never existed, we have a problem with the overall connection and must tear down - if (frame.getType() == FrameType.ERROR) { - String errorMessage = getByteBufferAsString(frame.getData()); - onError(new RuntimeException("Received error for non-existent stream: " + streamId + " Message: " + errorMessage)); - } else { - onError(new RuntimeException("Received message for non-existent stream: " + streamId)); - } - } - } else { - streamSubject.onNext(frame); - } - } - } - - public void onError(Throwable t) { - Collection> subjects = null; - synchronized (Requester.this) { - subjects = streamInputMap.values(); - } - subjects.forEach(subject -> subject.onError(t)); - // TODO: iterate over responder side and destroy world - errorStream.accept(t); - cancel(); - } - - public void onComplete() { - Collection> subjects = null; - synchronized (Requester.this) { - subjects = streamInputMap.values(); - } - subjects.forEach(subject -> subject.onComplete()); - cancel(); - } - - public void cancel() { // TODO this isn't used ... is it supposed to be? - if (!connectionSubscription.compareAndSet(null, CANCELLED)) { - // cancel the one that was there if we failed to set the sentinel - connectionSubscription.get().dispose(); - try { - connection.close(); - } catch (IOException e) { - errorStream.accept(e); - } - } - } - }); - } - - private static String getByteBufferAsString(ByteBuffer bb) { - final byte[] bytes = new byte[bb.capacity()]; - bb.get(bytes); - return new String(bytes, Charset.forName("UTF-8")); - } + private final static Disposable CANCELLED = new EmptyDisposable(); + private final static int KEEPALIVE_INTERVAL_MS = 1000; + + private final boolean isServer; + private final DuplexConnection connection; + private final Int2ObjectHashMap> streamInputMap = new Int2ObjectHashMap<>(); + private final ConnectionSetupPayload setupPayload; + private final Consumer errorStream; + + private final boolean honorLease; + private long ttlExpiration; + private long numberOfRemainingRequests = 0; + private long timeOfLastKeepalive = 0; + private int streamCount = 0; // 0 is reserved for setup, all normal messages are >= 1 + + private static final long DEFAULT_BATCH = 1024; + private static final long REQUEST_THRESHOLD = 256; + + private volatile boolean requesterStarted = false; + + private Requester( + boolean isServer, + DuplexConnection connection, + ConnectionSetupPayload setupPayload, + Consumer errorStream + ) { + this.isServer = isServer; + this.connection = connection; + this.setupPayload = setupPayload; + this.errorStream = errorStream; + if (isServer) { + streamCount = 1; // server is odds + } else { + streamCount = 0; // client is even + } + + this.honorLease = setupPayload.willClientHonorLease(); + } + + public static Requester createClientRequester( + DuplexConnection connection, + ConnectionSetupPayload setupPayload, + Consumer errorStream, + Completable requesterCompletable + ) { + Requester requester = new Requester(false, connection, setupPayload, errorStream); + requester.start(requesterCompletable); + return requester; + } + + public static Requester createServerRequester( + DuplexConnection connection, + ConnectionSetupPayload setupPayload, + Consumer errorStream, + Completable requesterCompletable + ) { + Requester requester = new Requester(true, connection, setupPayload, errorStream); + requester.start(requesterCompletable); + return requester; + } + + public void shutdown() { + // TODO do something here + System.err.println("**** Requester.shutdown => this should actually do something"); + } + + public boolean isServer() { + return isServer; + } + + public long timeOfLastKeepalive() + { + return timeOfLastKeepalive; + } + + /** + * Request/Response with a single message response. + * + * @param payload + * @return + */ + public Publisher requestResponse(final Payload payload) { + return startRequestResponse(nextStreamId(), FrameType.REQUEST_RESPONSE, payload); + } + + /** + * Request/Stream with a finite multi-message response followed by a + * terminal state {@link Subscriber#onComplete()} or + * {@link Subscriber#onError(Throwable)}. + * + * @param payload + * @return + */ + public Publisher requestStream(final Payload payload) { + return startStream(nextStreamId(), FrameType.REQUEST_STREAM, payload); + } + + /** + * Fire-and-forget without a response from the server. + *

+ * The returned {@link Publisher} will emit {@link Subscriber#onComplete()} + * or {@link Subscriber#onError(Throwable)} to represent success or failure + * in sending from the client side, but no feedback from the server will + * be returned. + * + * @param payload + * @return + */ + public Publisher fireAndForget(final Payload payload) { + if (payload == null) { + throw new IllegalStateException("Payload can not be null"); + } + assertStarted(); + return child -> child.onSubscribe(new Subscription() { + + final AtomicBoolean started = new AtomicBoolean(false); + + @Override + public void request(long n) { + if (n > 0 && started.compareAndSet(false, true)) { + numberOfRemainingRequests--; + + Frame fnfFrame = Frame.Request.from( + nextStreamId(), FrameType.FIRE_AND_FORGET, payload, 0); + connection.addOutput(fnfFrame, new Completable() { + @Override + public void success() { + child.onComplete(); + } + + @Override + public void error(Throwable e) { + child.onError(e); + } + }); + } + } + + @Override + public void cancel() { + // nothing to cancel on a fire-and-forget + } + }); + } + + /** + * Send asynchonrous Metadata Push without a response from the server. + *

+ * The returned {@link Publisher} will emit {@link Subscriber#onComplete()} + * or {@link Subscriber#onError(Throwable)} to represent success or failure + * in sending from the client side, but no feedback from the server will be + * returned. + * + * @param payload + * @return + */ + public Publisher metadataPush(final Payload payload) { + if (payload == null) { + throw new IllegalArgumentException("Payload can not be null"); + } + assertStarted(); + return (Subscriber child) -> + child.onSubscribe(new Subscription() { + + final AtomicBoolean started = new AtomicBoolean(false); + + @Override + public void request(long n) { + if (n > 0 && started.compareAndSet(false, true)) { + numberOfRemainingRequests--; + + Frame metadataPush = Frame.Request.from( + nextStreamId(), FrameType.METADATA_PUSH, payload, 0); + connection.addOutput(metadataPush, new Completable() { + @Override + public void success() { + child.onComplete(); + } + + @Override + public void error(Throwable e) { + child.onError(e); + } + }); + } + } + + @Override + public void cancel() { + // nothing to cancel on a metadataPush + } + }); + } + + + /** + * Event subscription with an infinite multi-message response potentially + * terminated with an {@link Subscriber#onError(Throwable)}. + * + * @param payload + * @return + */ + public Publisher requestSubscription(final Payload payload) { + return startStream(nextStreamId(), FrameType.REQUEST_SUBSCRIPTION, payload); + } + + /** + * Request/Stream with a finite multi-message response followed by a + * terminal state {@link Subscriber#onComplete()} or + * {@link Subscriber#onError(Throwable)}. + * + * @param payloadStream + * @return + */ + public Publisher requestChannel(final Publisher payloadStream) { + return startChannel(nextStreamId(), FrameType.REQUEST_CHANNEL, payloadStream); + } + + private void assertStarted() { + if (!requesterStarted) { + throw new IllegalStateException("Requester not initialized. " + + "Please await 'start()' completion before submitting requests."); + } + } + + + /** + * Return availability of sending requests + * + * @return + */ + public double availability() { + if (!honorLease) { + return 1.0; + } + final long now = System.currentTimeMillis(); + double available = 0.0; + if (numberOfRemainingRequests > 0 && (now < ttlExpiration)) { + available = 1.0; + } + return available; + } + + /* + * Using payload/payloads with null check for efficiency so I don't have to + * allocate a Publisher for the most common case of single Payload + */ + private Publisher startStream(int streamId, FrameType type, Payload payload) { + assertStarted(); + return (Subscriber child) -> { + child.onSubscribe(new Subscription() { + + final AtomicBoolean started = new AtomicBoolean(false); + volatile StreamInputSubscriber streamInputSubscriber; + volatile UnicastSubject writer; + // TODO does this need to be atomic? Can request(n) come from any thread? + final AtomicLong requested = new AtomicLong(); + // TODO AtomicLong just so I can pass it around ... perf issue? or is there a thread-safety issue? + final AtomicLong outstanding = new AtomicLong(); + + @Override + public void request(long n) { + if(n <= 0) { + return; + } + BackpressureUtils.getAndAddRequest(requested, n); + if (started.compareAndSet(false, true)) { + // determine initial RequestN + long currentN = requested.get(); + long requestN = currentN < DEFAULT_BATCH ? currentN : DEFAULT_BATCH; + long threshold = + requestN == DEFAULT_BATCH ? REQUEST_THRESHOLD : requestN / 3; + + // declare output to transport + writer = UnicastSubject.create((w, rn) -> { + numberOfRemainingRequests--; + + // decrement as we request it + requested.addAndGet(-requestN); + // record how many we have requested + outstanding.addAndGet(requestN); + + // when transport connects we write the request frame for this stream + w.onNext(Frame.Request.from(streamId, type, payload, (int)requestN)); + }); + + // Response frames for this Stream + UnicastSubject transportInputSubject = UnicastSubject.create(); + synchronized(Requester.this) { + streamInputMap.put(streamId, transportInputSubject); + } + streamInputSubscriber = new StreamInputSubscriber( + streamId, + threshold, + outstanding, + requested, + writer, + child, + this::cancel + ); + transportInputSubject.subscribe(streamInputSubscriber); + + // connect to transport + connection.addOutput(writer, new Completable() { + @Override + public void success() { + // nothing to do onSuccess + } + + @Override + public void error(Throwable e) { + child.onError(e); + cancel(); + } + }); + } else { + // propagate further requestN frames + long currentN = requested.get(); + long requestThreshold = + REQUEST_THRESHOLD < currentN ? REQUEST_THRESHOLD : currentN / 3; + requestIfNecessary( + streamId, + requestThreshold, + currentN, + outstanding.get(), + writer, + requested, + outstanding + ); + } + + } + + @Override + public void cancel() { + synchronized(Requester.this) { + streamInputMap.remove(streamId); + } + if (!streamInputSubscriber.terminated.get()) { + writer.onNext(Frame.Cancel.from(streamId)); + } + streamInputSubscriber.parentSubscription.cancel(); + } + + }); + }; + } + + /* + * Using payload/payloads with null check for efficiency so I don't have to + * allocate a Publisher for the most common case of single Payload + */ + private Publisher startChannel( + int streamId, + FrameType type, + Publisher payloads + ) { + if (payloads == null) { + throw new IllegalStateException("Both payload and payloads can not be null"); + } + assertStarted(); + return (Subscriber child) -> { + child.onSubscribe(new Subscription() { + + AtomicBoolean started = new AtomicBoolean(false); + volatile StreamInputSubscriber streamInputSubscriber; + volatile UnicastSubject writer; + final AtomicReference payloadsSubscription = new AtomicReference<>(); + // TODO does this need to be atomic? Can request(n) come from any thread? + final AtomicLong requested = new AtomicLong(); + // TODO AtomicLong just so I can pass it around ... perf issue? or is there a thread-safety issue? + final AtomicLong outstanding = new AtomicLong(); + + @Override + public void request(long n) { + if(n <= 0) { + return; + } + BackpressureUtils.getAndAddRequest(requested, n); + if (started.compareAndSet(false, true)) { + // determine initial RequestN + long currentN = requested.get(); + final long requestN = currentN < DEFAULT_BATCH ? currentN : DEFAULT_BATCH; + // threshold + final long threshold = + requestN == DEFAULT_BATCH ? REQUEST_THRESHOLD : requestN / 3; + + // declare output to transport + writer = UnicastSubject.create((w, rn) -> { + numberOfRemainingRequests--; + // decrement as we request it + requested.addAndGet(-requestN); + // record how many we have requested + outstanding.addAndGet(requestN); + + connection.addOutput(new Publisher() { + @Override + public void subscribe(Subscriber transport) { + transport.onSubscribe(new Subscription() { + + final AtomicBoolean started = new AtomicBoolean(false); + @Override + public void request(long n) { + if(n <= 0) { + return; + } + if(started.compareAndSet(false, true)) { + payloads.subscribe(new Subscriber() { + + @Override + public void onSubscribe(Subscription s) { + if (!payloadsSubscription.compareAndSet(null, s)) { + // we are already unsubscribed + s.cancel(); + } else { + // we always start with 1 to initiate + // requestChannel, then wait for REQUEST_N + // from Responder to send more + s.request(1); + } + } + + // onNext is serialized by contract so this is + // okay as non-volatile primitive + boolean isInitialRequest = true; + + @Override + public void onNext(Payload p) { + if(isInitialRequest) { + isInitialRequest = false; + Frame f = Frame.Request.from( + streamId, type, p, (int)requestN); + transport.onNext(f); + } else { + Frame f = Frame.Request.from( + streamId, type, p, 0); + transport.onNext(f); + } + } + + @Override + public void onError(Throwable t) { + // TODO validate with unit tests + RuntimeException exc = new RuntimeException( + "Error received from request stream.", t); + transport.onError(exc); + child.onError(exc); + cancel(); + } + + @Override + public void onComplete() { + Frame f = Frame.Request.from( + streamId, + FrameType.REQUEST_CHANNEL, + RequestFrameFlyweight.FLAGS_REQUEST_CHANNEL_C + ); + transport.onNext(f); + transport.onComplete(); + } + + }); + } else { + // TODO we need to compose this requestN from + // transport with the remote REQUEST_N + } + + } + + @Override + public void cancel() {} + }); + } + }, new Completable() { + @Override + public void success() { + // nothing to do onSuccess + } + + @Override + public void error(Throwable e) { + child.onError(e); + cancel(); + } + }); + + }); + + // Response frames for this Stream + UnicastSubject transportInputSubject = UnicastSubject.create(); + synchronized(Requester.this) { + streamInputMap.put(streamId, transportInputSubject); + } + streamInputSubscriber = new StreamInputSubscriber( + streamId, + threshold, + outstanding, + requested, + writer, + child, + payloadsSubscription, + this::cancel + ); + transportInputSubject.subscribe(streamInputSubscriber); + + // connect to transport + connection.addOutput(writer, new Completable() { + @Override + public void success() { + // nothing to do onSuccess + } + + @Override + public void error(Throwable e) { + child.onError(e); + if (!(e instanceof Retryable)) { + cancel(); + } + } + }); + } else { + // propagate further requestN frames + long currentN = requested.get(); + long requestThreshold = + REQUEST_THRESHOLD < currentN ? REQUEST_THRESHOLD : currentN / 3; + requestIfNecessary( + streamId, + requestThreshold, + currentN, + outstanding.get(), + writer, + requested, + outstanding + ); + } + } + + @Override + public void cancel() { + synchronized(Requester.this) { + streamInputMap.remove(streamId); + } + if (!streamInputSubscriber.terminated.get()) { + writer.onNext(Frame.Cancel.from(streamId)); + } + streamInputSubscriber.parentSubscription.cancel(); + if (payloadsSubscription != null) { + if (!payloadsSubscription.compareAndSet(null, EmptySubscription.INSTANCE)) { + // unsubscribe it if it already exists + payloadsSubscription.get().cancel(); + } + } + } + + }); + }; + } + + /* + * Special-cased for performance reasons (achieved 20-30% throughput + * increase over using startStream for request/response) + */ + private Publisher startRequestResponse(int streamId, FrameType type, Payload payload) { + if (payload == null) { + throw new IllegalStateException("Both payload and payloads can not be null"); + } + assertStarted(); + return (Subscriber child) -> { + child.onSubscribe(new Subscription() { + + final AtomicBoolean started = new AtomicBoolean(false); + volatile StreamInputSubscriber streamInputSubscriber; + volatile UnicastSubject writer; + + @Override + public void request(long n) { + if (n > 0 && started.compareAndSet(false, true)) { + // Response frames for this Stream + UnicastSubject transportInputSubject = UnicastSubject.create(); + synchronized(Requester.this) { + streamInputMap.put(streamId, transportInputSubject); + } + streamInputSubscriber = new StreamInputSubscriber( + streamId, + 0, + null, + null, + writer, + child, + this::cancel + ); + transportInputSubject.subscribe(streamInputSubscriber); + + Frame requestFrame = Frame.Request.from(streamId, type, payload, 1); + // connect to transport + connection.addOutput(requestFrame, new Completable() { + @Override + public void success() { + // nothing to do onSuccess + } + + @Override + public void error(Throwable e) { + child.onError(e); + cancel(); + } + }); + } + } + + @Override + public void cancel() { + if (!streamInputSubscriber.terminated.get()) { + Frame cancelFrame = Frame.Cancel.from(streamId); + connection.addOutput(cancelFrame, new Completable() { + @Override + public void success() { + // nothing to do onSuccess + } + + @Override + public void error(Throwable e) { + child.onError(e); + } + }); + } + synchronized(Requester.this) { + streamInputMap.remove(streamId); + } + streamInputSubscriber.parentSubscription.cancel(); + } + }); + }; + } + + private final static class StreamInputSubscriber implements Subscriber { + final AtomicBoolean terminated = new AtomicBoolean(false); + volatile Subscription parentSubscription; + + private final int streamId; + private final long requestThreshold; + private final AtomicLong outstandingRequests; + private final AtomicLong requested; + private final UnicastSubject writer; + private final Subscriber child; + private final Runnable cancelAction; + private final AtomicReference requestStreamSubscription; + + public StreamInputSubscriber( + int streamId, + long threshold, + AtomicLong outstanding, + AtomicLong requested, + UnicastSubject writer, + Subscriber child, + Runnable cancelAction + ) { + this.streamId = streamId; + this.requestThreshold = threshold; + this.requested = requested; + this.outstandingRequests = outstanding; + this.writer = writer; + this.child = child; + this.cancelAction = cancelAction; + this.requestStreamSubscription = null; + } + + public StreamInputSubscriber( + int streamId, + long threshold, + AtomicLong outstanding, + AtomicLong requested, + UnicastSubject writer, + Subscriber child, + AtomicReference requestStreamSubscription, + Runnable cancelAction + ) { + this.streamId = streamId; + this.requestThreshold = threshold; + this.requested = requested; + this.outstandingRequests = outstanding; + this.writer = writer; + this.child = child; + this.cancelAction = cancelAction; + this.requestStreamSubscription = requestStreamSubscription; + } + + @Override + public void onSubscribe(Subscription s) { + this.parentSubscription = s; + // no backpressure to transport (we will only receive what we've asked for already) + s.request(Long.MAX_VALUE); + } + + @Override + public void onNext(Frame frame) { + FrameType type = frame.getType(); + // convert ERROR messages into terminal events + if (type == FrameType.NEXT_COMPLETE) { + terminated.set(true); + child.onNext(frame); + onComplete(); + cancel(); + } else if (type == FrameType.NEXT) { + child.onNext(frame); + long currentOutstanding = outstandingRequests.decrementAndGet(); + requestIfNecessary(streamId, requestThreshold, requested.get(), + currentOutstanding, writer, requested, outstandingRequests); + } else if (type == FrameType.REQUEST_N) { + if(requestStreamSubscription != null) { + Subscription s = requestStreamSubscription.get(); + if(s != null) { + s.request(Frame.RequestN.requestN(frame)); + } else { + // TODO can this ever be null? + System.err.println( + "ReactiveSocket Requester DEBUG: requestStreamSubscription is null"); + } + return; + } + // TODO should we do anything if we don't find the stream? emitting an error + // is risky as the responder could have terminated and cleaned up already + } else if (type == FrameType.COMPLETE) { + terminated.set(true); + onComplete(); + cancel(); + } else if (type == FrameType.ERROR) { + terminated.set(true); + final ByteBuffer byteBuffer = frame.getData(); + String errorMessage = getByteBufferAsString(byteBuffer); + onError(new RuntimeException(errorMessage)); + cancel(); + } else { + onError(new RuntimeException("Unexpected FrameType: " + frame.getType())); + cancel(); + } + } + + @Override + public void onError(Throwable t) { + terminated.set(true); + child.onError(t); + } + + @Override + public void onComplete() { + terminated.set(true); + child.onComplete(); + } + + private void cancel() { + cancelAction.run(); + } + } + + private static void requestIfNecessary( + int streamId, + long requestThreshold, + long currentN, + long currentOutstanding, + UnicastSubject writer, + AtomicLong requested, + AtomicLong outstanding + ) { + if(currentOutstanding <= requestThreshold) { + long batchSize = DEFAULT_BATCH - currentOutstanding; + final long requestN = currentN < batchSize ? currentN : batchSize; + + if (requestN > 0) { + // decrement as we request it + requested.addAndGet(-requestN); + // record how many we have requested + outstanding.addAndGet(requestN); + + writer.onNext(Frame.RequestN.from(streamId, (int)requestN)); + } + } + } + + private int nextStreamId() { + return streamCount += 2; // go by two since server is odd, client is even + } + + private void start(Completable onComplete) { + AtomicReference connectionSubscription = new AtomicReference<>(); + // get input from responder->requestor for responses + connection.getInput().subscribe(new Observer() { + public void onSubscribe(Disposable d) { + if (connectionSubscription.compareAndSet(null, d)) { + if(isServer) { + requesterStarted = true; + onComplete.success(); + } else { + // now that we are connected, send SETUP frame + // (asynchronously, other messages can continue being written after this) + Frame setupFrame = Frame.Setup.from( + setupPayload.getFlags(), + KEEPALIVE_INTERVAL_MS, + 0, + setupPayload.metadataMimeType(), + setupPayload.dataMimeType(), + setupPayload + ); + connection.addOutput(setupFrame, + new Completable() { + @Override + public void success() { + requesterStarted = true; + onComplete.success(); + } + + @Override + public void error(Throwable e) { + onComplete.error(e); + tearDown(e); + } + }); + + Publisher keepaliveTicker = + PublisherUtils.keepaliveTicker(KEEPALIVE_INTERVAL_MS, TimeUnit.MILLISECONDS); + connection.addOutput(keepaliveTicker, + new Completable() { + public void success() {} + + public void error(Throwable e) { + onComplete.error(e); + tearDown(e); + } + } + ); + } + } else { + // means we already were cancelled + d.dispose(); + onComplete.error(new CancelException("Connection Is Already Cancelled")); + } + } + + private void tearDown(Throwable e) { + onError(e); + } + + public void onNext(Frame frame) { + int streamId = frame.getStreamId(); + if (streamId == 0) { + if (FrameType.ERROR.equals(frame.getType())) { + final Throwable throwable = Exceptions.from(frame); + onError(throwable); + } else if (FrameType.LEASE.equals(frame.getType()) && honorLease) { + numberOfRemainingRequests = Frame.Lease.numberOfRequests(frame); + final long now = System.currentTimeMillis(); + final int ttl = Frame.Lease.ttl(frame); + if (ttl == Integer.MAX_VALUE) { + // Integer.MAX_VALUE represents infinity + ttlExpiration = Long.MAX_VALUE; + } else { + ttlExpiration = now + ttl; + } + } else if (FrameType.KEEPALIVE.equals(frame.getType())) { + timeOfLastKeepalive = System.currentTimeMillis(); + } else { + onError(new RuntimeException( + "Received unexpected message type on stream 0: " + frame.getType().name())); + } + } else { + UnicastSubject streamSubject; + synchronized (Requester.this) { + streamSubject = streamInputMap.get(streamId); + } + if (streamSubject == null) { + if (streamId <= streamCount) { + // receiving a frame after a given stream has been cancelled/completed, + // so ignore (cancellation is async so there is a race condition) + return; + } else { + // message for stream that has never existed, we have a problem with + // the overall connection and must tear down + if (frame.getType() == FrameType.ERROR) { + String errorMessage = getByteBufferAsString(frame.getData()); + onError(new RuntimeException( + "Received error for non-existent stream: " + + streamId + " Message: " + errorMessage)); + } else { + onError(new RuntimeException( + "Received message for non-existent stream: " + streamId)); + } + } + } else { + streamSubject.onNext(frame); + } + } + } + + public void onError(Throwable t) { + Collection> subjects = null; + synchronized (Requester.this) { + subjects = streamInputMap.values(); + } + subjects.forEach(subject -> subject.onError(t)); + // TODO: iterate over responder side and destroy world + errorStream.accept(t); + cancel(); + } + + public void onComplete() { + Collection> subjects = null; + synchronized (Requester.this) { + subjects = streamInputMap.values(); + } + subjects.forEach(UnicastSubject::onComplete); + cancel(); + } + + public void cancel() { // TODO this isn't used ... is it supposed to be? + if (!connectionSubscription.compareAndSet(null, CANCELLED)) { + // cancel the one that was there if we failed to set the sentinel + connectionSubscription.get().dispose(); + try { + connection.close(); + } catch (IOException e) { + errorStream.accept(e); + } + } + } + }); + } + + private static String getByteBufferAsString(ByteBuffer bb) { + final byte[] bytes = new byte[bb.capacity()]; + bb.get(bytes); + return new String(bytes, Charset.forName("UTF-8")); + } } diff --git a/src/main/java/io/reactivesocket/internal/Responder.java b/src/main/java/io/reactivesocket/internal/Responder.java index ac9c5a100..dc0402242 100644 --- a/src/main/java/io/reactivesocket/internal/Responder.java +++ b/src/main/java/io/reactivesocket/internal/Responder.java @@ -192,8 +192,10 @@ public void onNext(Frame requestFrame) { final ConnectionSetupPayload connectionSetupPayload = ConnectionSetupPayload.create(requestFrame); try { - if (Frame.Setup.version(requestFrame) != SetupFrameFlyweight.CURRENT_VERSION) { - throw new SetupException("unsupported protocol version: " + Frame.Setup.version(requestFrame)); + int version = Frame.Setup.version(requestFrame); + if (version != SetupFrameFlyweight.CURRENT_VERSION) { + throw new SetupException("unsupported protocol version: " + + version); } // accept setup for ReactiveSocket/Requester usage @@ -203,7 +205,8 @@ public void onNext(Frame requestFrame) { } catch (SetupException setupException) { setupErrorAndTearDown(connection, setupException); } catch (Throwable e) { - setupErrorAndTearDown(connection, new InvalidSetupException(e.getMessage())); + InvalidSetupException exc = new InvalidSetupException(e.getMessage()); + setupErrorAndTearDown(connection, exc); } // the L bit set must wait until the application logic explicitly sends @@ -216,24 +219,32 @@ public void onNext(Frame requestFrame) { // TODO: handle keepalive logic here } else { - setupErrorAndTearDown(connection, new InvalidSetupException("Setup frame missing")); + setupErrorAndTearDown(connection, + new InvalidSetupException("Setup frame missing")); } } else { Publisher responsePublisher = null; if (leaseGovernor.accept(Responder.this, requestFrame)) { try { if (requestFrame.getType() == FrameType.REQUEST_RESPONSE) { - responsePublisher = handleRequestResponse(requestFrame, requestHandler, cancellationSubscriptions); + responsePublisher = handleRequestResponse( + requestFrame, requestHandler, cancellationSubscriptions); } else if (requestFrame.getType() == FrameType.REQUEST_STREAM) { - responsePublisher = handleRequestStream(requestFrame, requestHandler, cancellationSubscriptions, inFlight); + responsePublisher = handleRequestStream( + requestFrame, requestHandler, cancellationSubscriptions, inFlight); } else if (requestFrame.getType() == FrameType.FIRE_AND_FORGET) { - responsePublisher = handleFireAndForget(requestFrame, requestHandler); + responsePublisher = handleFireAndForget( + requestFrame, requestHandler); } else if (requestFrame.getType() == FrameType.REQUEST_SUBSCRIPTION) { - responsePublisher = handleRequestSubscription(requestFrame, requestHandler, cancellationSubscriptions, inFlight); + responsePublisher = handleRequestSubscription( + requestFrame, requestHandler, cancellationSubscriptions, inFlight); } else if (requestFrame.getType() == FrameType.REQUEST_CHANNEL) { - responsePublisher = handleRequestChannel(requestFrame, requestHandler, channels, cancellationSubscriptions, inFlight); + responsePublisher = handleRequestChannel( + requestFrame, requestHandler, channels, + cancellationSubscriptions, inFlight); } else if (requestFrame.getType() == FrameType.METADATA_PUSH) { - responsePublisher = handleMetadataPush(requestFrame, requestHandler); + responsePublisher = handleMetadataPush( + requestFrame, requestHandler); } else if (requestFrame.getType() == FrameType.CANCEL) { Subscription s = null; synchronized (Responder.this) { @@ -253,30 +264,39 @@ public void onNext(Frame requestFrame) { inFlightSubscription.addApplicationRequest(requestN); return; } - // TODO should we do anything if we don't find the stream? emitting an - // error is risky as the responder could have terminated and cleaned up already + // TODO should we do anything if we don't find the stream? + // emitting an error is risky as the responder could have + // terminated and cleaned up already } else if (requestFrame.getType() == FrameType.KEEPALIVE) { // this client is alive. timeOfLastKeepalive = System.nanoTime(); // echo back if flag set if (Frame.Keepalive.hasRespondFlag(requestFrame)) { - responsePublisher = PublisherUtils.just(Frame.Keepalive.from(requestFrame.getData(), false)); + Frame keepAliveFrame = Frame.Keepalive.from( + requestFrame.getData(), false); + responsePublisher = PublisherUtils.just(keepAliveFrame); } else { return; } } else if (requestFrame.getType() == FrameType.LEASE) { // LEASE only concerns the Requester } else { - responsePublisher = PublisherUtils.errorFrame(streamId, new IllegalStateException("Unexpected prefix: " + requestFrame.getType())); + IllegalStateException exc = new IllegalStateException( + "Unexpected prefix: " + requestFrame.getType()); + responsePublisher = PublisherUtils.errorFrame(streamId, exc); } } catch (Throwable e) { - // synchronous try/catch since we execute user functions in the handlers and they could throw - errorStream.accept(new RuntimeException("Error in request handling.", e)); + // synchronous try/catch since we execute user functions + // in the handlers and they could throw + errorStream.accept( + new RuntimeException("Error in request handling.", e)); // error message to user - responsePublisher = PublisherUtils.errorFrame(streamId, new RuntimeException("Unhandled error processing request")); - } - } else { - final RejectedException exception = new RejectedException("No associated lease"); + responsePublisher = PublisherUtils.errorFrame( + streamId, new RuntimeException( + "Unhandled error processing request")); + } + } else { + RejectedException exception = new RejectedException("No associated lease"); responsePublisher = PublisherUtils.errorFrame(streamId, exception); } @@ -291,18 +311,22 @@ public void success() { public void error(Throwable e) { // TODO validate with unit tests if (childTerminated.compareAndSet(false, true)) { - errorStream.accept(new RuntimeException("Error writing", e)); // TODO should we have typed RuntimeExceptions? + // TODO should we have typed RuntimeExceptions? + errorStream.accept(new RuntimeException("Error writing", e)); cancel(); } } - }); } } } - private void setupErrorAndTearDown(DuplexConnection connection, SetupException setupException) { - // pass the ErrorFrame output, subscribe to write it, await onComplete and then tear down + private void setupErrorAndTearDown( + DuplexConnection connection, + SetupException setupException + ) { + // pass the ErrorFrame output, subscribe to write it, await + // onComplete and then tear down final Frame frame = Frame.Error.from(0, setupException); connection.addOutput(PublisherUtils.just(frame), new Completable() { @@ -312,13 +336,16 @@ public void success() { } @Override public void error(Throwable e) { - tearDownWithError(new RuntimeException("Failure outputting SetupException", e)); + RuntimeException exc = new RuntimeException( + "Failure outputting SetupException", e); + tearDownWithError(exc); } }); } private void tearDownWithError(Throwable se) { - onError(new RuntimeException("Connection Setup Failure", se)); // TODO unit test that this actually shuts things down + // TODO unit test that this actually shuts things down + onError(new RuntimeException("Connection Setup Failure", se)); } @Override @@ -340,7 +367,8 @@ public void onComplete() { } private void cancel() { - // child has cancelled (shutdown the connection or server) // TODO validate with unit tests + // child has cancelled (shutdown the connection or server) + // TODO validate with unit tests if (!transportSubscription.compareAndSet(null, EmptyDisposable.EMPTY)) { // cancel the one that was there if we failed to set the sentinel transportSubscription.get().dispose(); @@ -371,7 +399,9 @@ public void request(long n) { if (n > 0 && started.compareAndSet(false, true)) { final int streamId = requestFrame.getStreamId(); - requestHandler.handleRequestResponse(requestFrame).subscribe(new Subscriber() { + Publisher responsePublisher = + requestHandler.handleRequestResponse(requestFrame); + responsePublisher.subscribe(new Subscriber() { // event emission is serialized so this doesn't need to be atomic int count = 0; @@ -379,7 +409,8 @@ public void request(long n) { @Override public void onSubscribe(Subscription s) { if (parent.compareAndSet(null, s)) { - s.request(Long.MAX_VALUE); // only expect 1 value so we don't need REQUEST_N + // only expect 1 value so we don't need REQUEST_N + s.request(Long.MAX_VALUE); } else { s.cancel(); cleanup(); @@ -389,10 +420,13 @@ public void onSubscribe(Subscription s) { @Override public void onNext(Payload v) { if (++count > 1) { - onError(new IllegalStateException("RequestResponse expects a single onNext")); + IllegalStateException exc = new IllegalStateException( + "RequestResponse expects a single onNext"); + onError(exc); } else { - - child.onNext(Frame.Response.from(streamId, FrameType.NEXT_COMPLETE, v)); + Frame nextCompleteFrame = Frame.Response.from( + streamId, FrameType.NEXT_COMPLETE, v); + child.onNext(nextCompleteFrame); } } @@ -405,7 +439,9 @@ public void onError(Throwable t) { @Override public void onComplete() { if (count != 1) { - onError(new IllegalStateException("RequestResponse expects a single onNext")); + IllegalStateException exc = new IllegalStateException( + "RequestResponse expects a single onNext"); + onError(exc); } else { child.onComplete(); cleanup(); @@ -438,17 +474,24 @@ private void cleanup() { }; } - private static BiFunction> requestSubscriptionHandler = - (RequestHandler handler, Payload requestPayload) -> handler.handleSubscription(requestPayload); - private static BiFunction> requestStreamHandler = - (RequestHandler handler, Payload requestPayload) -> handler.handleRequestStream(requestPayload); + private static BiFunction> + requestSubscriptionHandler = RequestHandler::handleSubscription; + private static BiFunction> + requestStreamHandler = RequestHandler::handleRequestStream; private Publisher handleRequestStream( Frame requestFrame, final RequestHandler requestHandler, final Int2ObjectHashMap cancellationSubscriptions, final Int2ObjectHashMap inFlight) { - return _handleRequestStream(requestStreamHandler, requestFrame, requestHandler, cancellationSubscriptions, inFlight, true); + return _handleRequestStream( + requestStreamHandler, + requestFrame, + requestHandler, + cancellationSubscriptions, + inFlight, + true + ); } private Publisher handleRequestSubscription( @@ -456,7 +499,14 @@ private Publisher handleRequestSubscription( final RequestHandler requestHandler, final Int2ObjectHashMap cancellationSubscriptions, final Int2ObjectHashMap inFlight) { - return _handleRequestStream(requestSubscriptionHandler, requestFrame, requestHandler, cancellationSubscriptions, inFlight, false); + return _handleRequestStream( + requestSubscriptionHandler, + requestFrame, + requestHandler, + cancellationSubscriptions, + inFlight, + false + ); } /** @@ -496,13 +546,16 @@ public void request(long n) { arbiter.addTransportRequest(n); final int streamId = requestFrame.getStreamId(); - handler.apply(requestHandler, requestFrame).subscribe(new Subscriber() { + Publisher responses = + handler.apply(requestHandler, requestFrame); + responses.subscribe(new Subscriber() { @Override public void onSubscribe(Subscription s) { if (parent.compareAndSet(null, s)) { inFlight.put(streamId, arbiter); - arbiter.addApplicationRequest(Frame.Request.initialRequestN(requestFrame)); + long n = Frame.Request.initialRequestN(requestFrame); + arbiter.addApplicationRequest(n); arbiter.addApplicationProducer(s); } else { s.cancel(); @@ -513,7 +566,9 @@ public void onSubscribe(Subscription s) { @Override public void onNext(Payload v) { try { - child.onNext(Frame.Response.from(streamId, FrameType.NEXT, v)); + Frame nextFrame = Frame.Response.from( + streamId, FrameType.NEXT, v); + child.onNext(nextFrame); } catch (Throwable e) { onError(e); } @@ -529,15 +584,18 @@ public void onError(Throwable t) { @Override public void onComplete() { if (allowCompletion) { - child.onNext(Frame.Response.from(streamId, FrameType.COMPLETE)); + Frame completeFrame = Frame.Response.from( + streamId, FrameType.COMPLETE); + child.onNext(completeFrame); child.onComplete(); cleanup(); } else { - onError(new IllegalStateException("Unexpected onComplete occurred on 'requestSubscription'")); + IllegalStateException exc = new IllegalStateException( + "Unexpected onComplete occurred on " + + "'requestSubscription'"); + onError(exc); } - } - }); } else { arbiter.addTransportRequest(n); @@ -570,39 +628,56 @@ private void cleanup() { } - private Publisher handleFireAndForget(Frame requestFrame, final RequestHandler requestHandler) { + private Publisher handleFireAndForget( + Frame requestFrame, + final RequestHandler requestHandler + ) { try { requestHandler.handleFireAndForget(requestFrame).subscribe(completionSubscriber); } catch (Throwable e) { - // we catch these errors here as we don't want anything propagating back to the user on fireAndForget + // we catch these errors here as we don't want anything propagating + // back to the user on fireAndForget errorStream.accept(new RuntimeException("Error processing 'fireAndForget'", e)); } - return PublisherUtils.empty(); // we always treat this as if it immediately completes as we don't want errors passing back to the user + // we always treat this as if it immediately completes as we don't want + // errors passing back to the user + return PublisherUtils.empty(); } - private Publisher handleMetadataPush(Frame requestFrame, final RequestHandler requestHandler) { + private Publisher handleMetadataPush( + Frame requestFrame, + final RequestHandler requestHandler + ) { try { requestHandler.handleMetadataPush(requestFrame).subscribe(completionSubscriber); } catch (Throwable e) { - // we catch these errors here as we don't want anything propagating back to the user on metadataPush + // we catch these errors here as we don't want anything propagating + // back to the user on metadataPush errorStream.accept(new RuntimeException("Error processing 'metadataPush'", e)); } - return PublisherUtils.empty(); // we always treat this as if it immediately completes as we don't want errors passing back to the user + // we always treat this as if it immediately completes as we don't want + // errors passing back to the user + return PublisherUtils.empty(); } /** - * Reusable for each fireAndForget and metadataPush since no state is shared across invocations. It just passes through errors. + * Reusable for each fireAndForget and metadataPush since no state is shared + * across invocations. It just passes through errors. */ - private final Subscriber completionSubscriber = new Subscriber(){ + private final Subscriber completionSubscriber = new Subscriber(){ + @Override + public void onSubscribe(Subscription s) { + s.request(Long.MAX_VALUE); + } - @Override public void onSubscribe(Subscription s){s.request(Long.MAX_VALUE);} + @Override + public void onNext(Void t) {} - @Override public void onNext(Void t){} - - @Override public void onError(Throwable t){errorStream.accept(t);} - - @Override public void onComplete(){} + @Override public void onError(Throwable t) { + errorStream.accept(t); + } + @Override public void onComplete() {} }; private Publisher handleRequestChannel(Frame requestFrame, @@ -636,20 +711,30 @@ public void request(long n) { final int streamId = requestFrame.getStreamId(); // first request on this channel - UnicastSubject channelRequests = UnicastSubject.create((s, rn) -> { - // after we are first subscribed to then send the initial frame - s.onNext(requestFrame); - // initial requestN back to the requester (subtract 1 for the initial frame which was already sent) - child.onNext(Frame.RequestN.from(streamId, rn.intValue() - 1)); - }, r -> { - // requested - child.onNext(Frame.RequestN.from(streamId, r.intValue())); - }); - synchronized(Responder.this) { + UnicastSubject channelRequests = + UnicastSubject.create((s, rn) -> { + // after we are first subscribed to then send + // the initial frame + s.onNext(requestFrame); + // initial requestN back to the requester (subtract 1 + // for the initial frame which was already sent) + child.onNext( + Frame.RequestN.from(streamId, rn.intValue() - 1)); + }, r -> { + // requested + child.onNext(Frame.RequestN.from(streamId, r.intValue())); + }); + synchronized(Responder.this) { if(channels.get(streamId) != null) { - // TODO validate that this correctly defends against this issue - // this means we received a followup request that raced and that the requester didn't correct wait for REQUEST_N before sending more frames - child.onNext(Frame.Error.from(streamId, new RuntimeException("Requester sent more than 1 requestChannel frame before permitted."))); + // TODO validate that this correctly defends + // against this issue, this means we received a + // followup request that raced and that the requester + // didn't correct wait for REQUEST_N before sending + // more frames + RuntimeException exc = new RuntimeException( + "Requester sent more than 1 requestChannel " + + "frame before permitted."); + child.onNext(Frame.Error.from(streamId, exc)); child.onComplete(); cleanup(); return; @@ -657,13 +742,14 @@ public void request(long n) { channels.put(streamId, channelRequests); } - requestHandler.handleChannel(requestFrame, channelRequests).subscribe(new Subscriber() { - + Publisher responses = requestHandler.handleChannel(channelRequests); + responses.subscribe(new Subscriber() { @Override public void onSubscribe(Subscription s) { if (parent.compareAndSet(null, s)) { inFlight.put(streamId, arbiter); - arbiter.addApplicationRequest(Frame.Request.initialRequestN(requestFrame)); + long n = Frame.Request.initialRequestN(requestFrame); + arbiter.addApplicationRequest(n); arbiter.addApplicationProducer(s); } else { s.cancel(); @@ -673,7 +759,9 @@ public void onSubscribe(Subscription s) { @Override public void onNext(Payload v) { - child.onNext(Frame.Response.from(streamId, FrameType.NEXT, v)); + Frame nextFrame = Frame.Response.from( + streamId, FrameType.NEXT, v); + child.onNext(nextFrame); } @Override @@ -685,12 +773,12 @@ public void onError(Throwable t) { @Override public void onComplete() { - child.onNext(Frame.Response.from(streamId, FrameType.COMPLETE)); + Frame completeFrame = Frame.Response.from( + streamId, FrameType.COMPLETE); + child.onNext(completeFrame); child.onComplete(); cleanup(); - } - }); } else { arbiter.addTransportRequest(n); @@ -727,13 +815,20 @@ private void cleanup() { if(Frame.Request.isRequestChannelComplete(requestFrame)) { channelSubject.onComplete(); } else { - channelSubject.onNext(requestFrame); // TODO this is ignoring requestN flow control (need to validate that this is legit because REQUEST_N across the wire is controlling it on the Requester side) + // TODO this is ignoring requestN flow control (need to validate + // that this is legit because REQUEST_N across the wire is + // controlling it on the Requester side) + channelSubject.onNext(requestFrame); } - // TODO should at least have an error message of some kind if the Requester disregarded it + // TODO should at least have an error message of some kind if the + // Requester disregarded it return PublisherUtils.empty(); } else { - // TODO should we use a BufferUntilSubscriber solution instead to handle time-gap issues like this? - return PublisherUtils.errorFrame(requestFrame.getStreamId(), new RuntimeException("Channel unavailable")); // TODO validate with unit tests. + // TODO should we use a BufferUntilSubscriber solution instead to + // handle time-gap issues like this? + // TODO validate with unit tests. + return PublisherUtils.errorFrame( + requestFrame.getStreamId(), new RuntimeException("Channel unavailable")); } } } @@ -767,18 +862,16 @@ public void addTransportRequest(long n) { private void tryRequest() { long toRequest = 0; - Subscription s = null; synchronized(this) { if(applicationProducer == null) { return; } - s = applicationProducer; long minToRequest = Math.min(appRequested, transportRequested); toRequest = minToRequest - requestedToProducer; requestedToProducer += toRequest; } if(toRequest > 0) { - s.request(toRequest); + applicationProducer.request(toRequest); } } diff --git a/src/perf/java/io/reactivesocket/ReactiveSocketPerf.java b/src/perf/java/io/reactivesocket/ReactiveSocketPerf.java index c985cfef1..fed6667b7 100644 --- a/src/perf/java/io/reactivesocket/ReactiveSocketPerf.java +++ b/src/perf/java/io/reactivesocket/ReactiveSocketPerf.java @@ -122,7 +122,7 @@ public Publisher handleFireAndForget(Payload payload) { } @Override - public Publisher handleChannel(Payload initialPayload, Publisher payloads) { + public Publisher handleChannel(Publisher inputs) { return null; } diff --git a/src/perf/java/io/reactivesocket/perfutil/PerfTestConnection.java b/src/perf/java/io/reactivesocket/perfutil/PerfTestConnection.java index 2cf44038a..1d05f2eec 100644 --- a/src/perf/java/io/reactivesocket/perfutil/PerfTestConnection.java +++ b/src/perf/java/io/reactivesocket/perfutil/PerfTestConnection.java @@ -58,6 +58,12 @@ public void onComplete() { }); } + @Override + public void addOutput(Frame f, Completable callback) { + writeSubject.onNext(f); + callback.success(); + } + @Override public Observable getInput() { return toInput; diff --git a/src/test/java/io/reactivesocket/LeaseTest.java b/src/test/java/io/reactivesocket/LeaseTest.java index 075122d9f..66097f016 100644 --- a/src/test/java/io/reactivesocket/LeaseTest.java +++ b/src/test/java/io/reactivesocket/LeaseTest.java @@ -114,12 +114,10 @@ public Publisher handleFireAndForget(Payload payload) { */ @Override public Publisher handleChannel( - Payload initialPayload, - Publisher payloads + Publisher inputs ) { - return fromPublisher(payloads).map(p -> { - return utf8EncodedPayload(byteToString(p.getData()) + "_echo", null); - }); + return fromPublisher(inputs).map(p -> + utf8EncodedPayload(byteToString(p.getData()) + "_echo", null)); } @Override @@ -162,7 +160,6 @@ public void testWriteWithoutLease() throws InterruptedException { TestSubscriber ts0 = new TestSubscriber<>();; response0.subscribe(ts0); ts0.awaitTerminalEvent(500, TimeUnit.MILLISECONDS); -// ts0.assertError(RuntimeException.class); // send a Lease(10 sec, 1 message), and wait for the availability on the client side leaseGovernor.distribute(10_000, 1); diff --git a/src/test/java/io/reactivesocket/ReactiveSocketTest.java b/src/test/java/io/reactivesocket/ReactiveSocketTest.java index 5dbd3241b..11aedc3f0 100644 --- a/src/test/java/io/reactivesocket/ReactiveSocketTest.java +++ b/src/test/java/io/reactivesocket/ReactiveSocketTest.java @@ -34,11 +34,12 @@ import org.junit.runner.RunWith; import org.reactivestreams.Publisher; -import io.reactivesocket.internal.PublisherUtils; import io.reactivesocket.lease.FairLeaseGovernor; import io.reactivex.disposables.Disposable; import io.reactivex.observables.ConnectableObservable; import io.reactivex.subscribers.TestSubscriber; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; @RunWith(Theories.class) public class ReactiveSocketTest { @@ -124,13 +125,39 @@ public Publisher handleFireAndForget(Payload payload) { * Use Payload.metadata for routing */ @Override - public Publisher handleChannel(Payload initialPayload, Publisher payloads) { - String request = byteToString(initialPayload.getMetadata()); - if ("echo".equals(request)) { - return echoChannel(payloads); - } else { - return error(new RuntimeException("Not Found")); - } + public Publisher handleChannel(Publisher inputs) { + return new Publisher() { + @Override + public void subscribe(Subscriber subscriber) { + inputs.subscribe(new Subscriber() { + @Override + public void onSubscribe(Subscription s) { + subscriber.onSubscribe(s); + } + + @Override + public void onNext(Payload input) { + String metadata = byteToString(input.getMetadata()); + String data = byteToString(input.getData()); + if ("echo".equals(metadata)) { + subscriber.onNext(utf8EncodedPayload(data + "_echo", null)); + } else { + onError(new RuntimeException("Not Found")); + } + } + + @Override + public void onError(Throwable t) { + subscriber.onError(t); + } + + @Override + public void onComplete() { + subscriber.onComplete(); + } + }); + } + }; } @Override @@ -369,10 +396,13 @@ public void testFireAndForgetServerSideErrorHandlerBlowup(int setupFlag) throws public void testRequestChannelEcho(int setupFlag) throws InterruptedException { startSockets(setupFlag); - Publisher requestStream = just(TestUtil.utf8EncodedPayload("1", "echo")).concatWith(just(TestUtil.utf8EncodedPayload("2", null))); - Publisher response = socketClient.requestChannel(requestStream); + Publisher inputs = just( + TestUtil.utf8EncodedPayload("1", "echo"), + TestUtil.utf8EncodedPayload("2", "echo") + ); + Publisher outputs = socketClient.requestChannel(inputs); TestSubscriber ts = new TestSubscriber<>(); - response.subscribe(ts); + outputs.subscribe(ts); ts.awaitTerminalEvent(); ts.assertNoErrors(); assertEquals(2, ts.values().size()); diff --git a/src/test/java/io/reactivesocket/TestConnection.java b/src/test/java/io/reactivesocket/TestConnection.java index 61e1ffc43..9f1c4e7b5 100644 --- a/src/test/java/io/reactivesocket/TestConnection.java +++ b/src/test/java/io/reactivesocket/TestConnection.java @@ -42,6 +42,12 @@ public void addOutput(Publisher o, Completable callback) { } , callback::error, callback::success); } + @Override + public void addOutput(Frame f, Completable callback) { + write.send(f); + callback.success(); + } + @Override public io.reactivesocket.rx.Observable getInput() { return new io.reactivesocket.rx.Observable() { @@ -80,12 +86,14 @@ public void connectToServerConnection(TestConnection serverConnection, boolean l // client to server write.add(f -> { +// serverConnection.toInput.send(f); serverThread.schedule(() -> { serverConnection.toInput.send(f); }); }); // server to client serverConnection.write.add(f -> { +// toInput.send(f); clientThread.schedule(() -> { toInput.send(f); }); diff --git a/src/test/java/io/reactivesocket/TestConnectionWithControlledRequestN.java b/src/test/java/io/reactivesocket/TestConnectionWithControlledRequestN.java index e70368a36..fc4f3595f 100644 --- a/src/test/java/io/reactivesocket/TestConnectionWithControlledRequestN.java +++ b/src/test/java/io/reactivesocket/TestConnectionWithControlledRequestN.java @@ -87,7 +87,13 @@ public void onComplete() { } }); + } + @Override + public void addOutput(Frame f, Completable callback) { + emitted.incrementAndGet(); + write.send(f); + callback.success(); } public boolean awaitSubscription(int timeInMillis) { diff --git a/src/test/java/io/reactivesocket/TestFlowControlRequestN.java b/src/test/java/io/reactivesocket/TestFlowControlRequestN.java index 41eef96d2..078cbc13a 100644 --- a/src/test/java/io/reactivesocket/TestFlowControlRequestN.java +++ b/src/test/java/io/reactivesocket/TestFlowControlRequestN.java @@ -18,6 +18,7 @@ import static io.reactivesocket.ConnectionSetupPayload.*; import static io.reactivesocket.TestUtil.*; import static io.reactivex.Observable.*; +import static io.reactivex.Observable.fromPublisher; import static org.junit.Assert.*; import java.util.concurrent.CountDownLatch; @@ -25,11 +26,10 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; +import io.reactivex.Observable; +import org.junit.*; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -153,14 +153,12 @@ public void testRequestSubscription_batches() throws InterruptedException { * Test that downstream is governed by request(n) * @throws InterruptedException */ - @Test(timeout=2000) + @Test(timeout=200000) public void testRequestChannel_batches_downstream() throws InterruptedException { ControlledSubscriber s = new ControlledSubscriber(); socketClient.requestChannel( - range(1, 10) - .map(i -> { - return utf8EncodedPayload(String.valueOf(i), "1000"); - })).subscribe(s); + range(1, 10).map(i -> utf8EncodedPayload(String.valueOf(i), "1000")) + ).subscribe(s); // if flatMap is being used, then each of the 10 streams will emit at least 128 (default) @@ -188,17 +186,33 @@ public void testRequestChannel_batches_downstream() throws InterruptedException * Test that the upstream is governed by request(n) * @throws InterruptedException */ - @Test(timeout=2000) + @Test(timeout=200000) public void testRequestChannel_batches_upstream_echo() throws InterruptedException { + setup(requestStreamHandler, requestSubscriptionHandler, inputs -> { + return outputSubscriber -> + fromPublisher(inputs) + .doOnRequest(n -> System.out.println("requested in echo responder: " + n)) + .doOnRequest(r -> requested.addAndGet(r)) + .doOnRequest(r -> numRequests.incrementAndGet()) + .doOnError(t -> System.out.println("Error in 'echo' handler: " + t.getMessage())) + .doOnNext(i -> emitted.incrementAndGet()) + .map(input -> { + String metadata = byteToString(input.getMetadata()); + String data = byteToString(input.getData()); + System.out.println("responder echo handleChannel received payload: " + + data + ":" + metadata); + return utf8EncodedPayload(String.valueOf(data) + "_echo", null); + }).subscribe(outputSubscriber); + }); + ControlledSubscriber s = new ControlledSubscriber(); AtomicInteger emittedClient = new AtomicInteger(); socketClient.requestChannel( range(1, 10000) .doOnNext(n -> emittedClient.incrementAndGet()) .doOnRequest(r -> System.out.println("CLIENT REQUESTS requestN: " + r)) - .map(i -> { - return utf8EncodedPayload(String.valueOf(i), "echo"); // metadata to route us to the echo behavior (only actually need this in the first payload) - })).subscribe(s); + .map(i -> utf8EncodedPayload(String.valueOf(i), "echo")) + ).subscribe(s); assertEquals(0, s.received.get()); assertEquals(0, emitted.get()); @@ -213,7 +227,7 @@ public void testRequestChannel_batches_upstream_echo() throws InterruptedExcepti assertEquals(210, s.received.get()); Thread.sleep(100); assertFalse(s.error.get()); - + System.out.println(">>> Client sent " + emittedClient.get() + " requests and received " + s.received.get() + " responses"); } @@ -221,8 +235,96 @@ public void testRequestChannel_batches_upstream_echo() throws InterruptedExcepti * Test that the upstream is governed by request(n) * @throws InterruptedException */ - @Test(timeout=2000) + @Test(timeout=2_000_000) public void testRequestChannel_batches_upstream_decoupled() throws InterruptedException { + setup(requestStreamHandler, requestSubscriptionHandler, new Function, Publisher>() { + @Override + public Publisher apply(Publisher inputs) { + /* + * Consume 300 from request and then stop requesting more (but no cancel from responder side) + */ + inputs.subscribe(new Subscriber() { + int count = 0; + Subscription s; + + @Override + public void onSubscribe(Subscription s) { + this.s = s; + s.request(50L); + } + + @Override + public void onNext(Payload input) { + String data = byteToString(input.getData()); + System.out.println("DECOUPLED side-effect of request: " + data); + count++; + if (count == 50) { + s.request(250L); + } + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onComplete() { + } + }); + + return range(1, 1000) + .doOnNext(n -> System.out.println("RESPONDER sending value: " + n)) + .doOnRequest(n -> System.out.println(">>> requested in decoupled responder: " + n)) + .doOnRequest(r -> requested.addAndGet(r)) + .doOnRequest(r -> numRequests.incrementAndGet()) + .doOnError(t -> System.out.println("Error in 'decoupled' handler: " + t.getMessage())) + .doOnNext(i -> emitted.incrementAndGet()) + .map(i -> utf8EncodedPayload(String.valueOf(i) + "_decoupled", null)); + } + }); + +// .subscribe(new Subscriber() { +// int count = 0; +// Subscription s; +// +// @Override +// public void onError(Throwable e) { +// } +// +// @Override +// public void onNext(Payload t) { +// count++; +// if (count == 50) { +// s.request(250); +// } +// } +// +// @Override +// public void onSubscribe(Subscription s) { +// this.s = s; +// // start with 50 +// s.request(50); +// } +// +// @Override +// public void onComplete() { +// } +// }); + + +// .flatMap(input -> { +// return range(1, 1000) +// .doOnNext(n -> System.out.println("RESPONDER sending value: " + n)) +// .doOnRequest(n -> System.out.println(">>> requested in decoupled responder: " + n)) +// .doOnRequest(r -> requested.addAndGet(r)) +// .doOnRequest(r -> numRequests.incrementAndGet()) +// .doOnError(t -> System.out.println("Error in 'decoupled' handler: " + t.getMessage())) +// .doOnNext(i -> emitted.incrementAndGet()) +// .map(i -> utf8EncodedPayload(String.valueOf(i) + "_decoupled", null)); +// }); +// } +// }); + ControlledSubscriber s = new ControlledSubscriber(); AtomicInteger emittedClient = new AtomicInteger(); socketClient.requestChannel( @@ -257,7 +359,6 @@ private void waitForAsyncValue(AtomicInteger value, int n) throws InterruptedExc } private static class ControlledSubscriber implements Subscriber { - AtomicInteger received = new AtomicInteger(); Subscription subscription; CountDownLatch terminated = new CountDownLatch(1); @@ -286,136 +387,86 @@ public void onComplete() { completed.set(true); terminated.countDown(); } - } - private static TestConnection serverConnection; - private static TestConnection clientConnection; - private static ReactiveSocket socketServer; - private static ReactiveSocket socketClient; - private static AtomicInteger emitted = new AtomicInteger(); - private static AtomicInteger numRequests = new AtomicInteger(); - private static AtomicLong requested = new AtomicLong(); + private TestConnection serverConnection; + private TestConnection clientConnection; + private ReactiveSocket socketServer; + private ReactiveSocket socketClient; + private AtomicInteger emitted = new AtomicInteger(); + private AtomicInteger numRequests = new AtomicInteger(); + private AtomicLong requested = new AtomicLong(); + private Function> requestStreamHandler = payload -> { + String request = byteToString(payload.getData()); + System.out.println("responder received requestStream: " + request); + return range(0, Integer.parseInt(request)) + .doOnRequest(n -> System.out.println("requested in responder: " + n)) + .doOnRequest(r -> requested.addAndGet(r)) + .doOnRequest(r -> numRequests.incrementAndGet()) + .doOnNext(i -> emitted.incrementAndGet()) + .map(i -> utf8EncodedPayload(String.valueOf(i), null)); + }; + private Function> requestSubscriptionHandler = payload -> { + return range(0, Integer.MAX_VALUE) + .doOnRequest(n -> System.out.println("requested in responder: " + n)) + .doOnRequest(r -> requested.addAndGet(r)) + .doOnRequest(r -> numRequests.incrementAndGet()) + .doOnNext(i -> emitted.incrementAndGet()) + .map(i -> utf8EncodedPayload(String.valueOf(i), null)); + }; + private Function, Publisher> requestChannelHandler = inputs -> { + return outputSubscriber -> + fromPublisher(inputs).flatMap(input -> { + String requestMetadata = byteToString(input.getMetadata()); + String payloadData = byteToString(input.getData()); + System.out.println("responder handleChannel received payload: " + + payloadData + ":" + requestMetadata); + + return range(0, Integer.parseInt(requestMetadata)) + .doOnRequest(n -> System.out.println("requested in responder: " + n)) + .doOnRequest(r -> requested.addAndGet(r)) + .doOnRequest(r -> numRequests.incrementAndGet()) + .doOnNext(i -> emitted.incrementAndGet()) + .map(i -> utf8EncodedPayload(String.valueOf(i), null)); + }).subscribe(outputSubscriber); + }; @Before - public void init() { + public void init() throws InterruptedException { emitted.set(0); requested.set(0); numRequests.set(0); + setup(); } - @BeforeClass - public static void setup() throws InterruptedException { + private void setup() throws InterruptedException { + setup(requestStreamHandler, requestSubscriptionHandler, requestChannelHandler); + } + + private void setup( + Function> requestStreamHandler, + Function> requestSubscriptionHandler, + Function, Publisher> requestChannelHandler + ) throws InterruptedException { serverConnection = new TestConnection(); clientConnection = new TestConnection(); clientConnection.connectToServerConnection(serverConnection, false); - socketServer = ReactiveSocket.fromServerConnection(serverConnection, setup -> new RequestHandler() { @Override public Publisher handleRequestStream(Payload payload) { - String request = byteToString(payload.getData()); - System.out.println("responder received requestStream: " + request); - return range(0, Integer.parseInt(request)) - .doOnRequest(n -> System.out.println("requested in responder: " + n)) - .doOnRequest(r -> requested.addAndGet(r)) - .doOnRequest(r -> numRequests.incrementAndGet()) - .doOnNext(i -> emitted.incrementAndGet()) - .map(i -> utf8EncodedPayload(String.valueOf(i), null)); + return requestStreamHandler.apply(payload); } @Override public Publisher handleSubscription(Payload payload) { - return range(0, Integer.MAX_VALUE) - .doOnRequest(n -> System.out.println("requested in responder: " + n)) - .doOnRequest(r -> requested.addAndGet(r)) - .doOnRequest(r -> numRequests.incrementAndGet()) - .doOnNext(i -> emitted.incrementAndGet()) - .map(i -> utf8EncodedPayload(String.valueOf(i), null)); + return requestSubscriptionHandler.apply(payload); } - /** - * Use Payload.metadata for routing - */ @Override - public Publisher handleChannel(Payload initialPayload, Publisher payloads) { - String requestMetadata = byteToString(initialPayload.getMetadata()); - System.out.println("responder received requestChannel: " + requestMetadata); - - if(requestMetadata.equals("echo")) { - return fromPublisher(payloads).map(payload -> { // TODO I want this to be concatMap instead of flatMap but apparently concatMap has a bug - String payloadData = byteToString(payload.getData()); - return utf8EncodedPayload(String.valueOf(payloadData) + "_echo", null); - }).doOnRequest(n -> System.out.println(">>> requested in echo responder: " + n)) - .doOnRequest(r -> requested.addAndGet(r)) - .doOnRequest(r -> numRequests.incrementAndGet()) - .doOnError(t -> System.out.println("Error in 'echo' handler: " + t.getMessage())) - .doOnNext(i -> emitted.incrementAndGet()); - } else if (requestMetadata.equals("decoupled")) { - /* - * Consume 300 from request and then stop requesting more (but no cancel from responder side) - */ - fromPublisher(payloads) - .doOnNext(payload -> { - String payloadData = byteToString(payload.getData()); - System.out.println("DECOUPLED side-effect of request: " + payloadData); - }).subscribe(new Subscriber() { - - int count=0; - Subscription s; - - @Override - public void onError(Throwable e) { - - } - - @Override - public void onNext(Payload t) { - count++; - if(count == 50) { - s.request(250); - } - } - - @Override - public void onSubscribe(Subscription s) { - this.s = s; - // start with 50 - s.request(50); - } - - @Override - public void onComplete() { - // TODO Auto-generated method stub - - } - - - }); - - return range(1, 1000) - .doOnNext(n -> System.out.println("RESPONDER sending value: " + n)) - .map(i -> { - return utf8EncodedPayload(String.valueOf(i) + "_decoupled", null); - }) - .doOnRequest(n -> System.out.println(">>> requested in decoupled responder: " + n)) - .doOnRequest(r -> requested.addAndGet(r)) - .doOnRequest(r -> numRequests.incrementAndGet()) - .doOnError(t -> System.out.println("Error in 'decoupled' handler: " + t.getMessage())) - .doOnNext(i -> emitted.incrementAndGet()); - } else { - return fromPublisher(payloads).flatMap(payload -> { // TODO I want this to be concatMap instead of flatMap but apparently concatMap has a bug - String payloadData = byteToString(payload.getData()); - System.out.println("responder handleChannel received payload: " + payloadData); - return range(0, Integer.parseInt(requestMetadata)) - .doOnRequest(n -> System.out.println("requested in responder [" + payloadData + "]: " + n)) - .doOnRequest(r -> requested.addAndGet(r)) - .doOnRequest(r -> numRequests.incrementAndGet()) - .doOnNext(i -> emitted.incrementAndGet()) - .map(i -> utf8EncodedPayload(String.valueOf(i), null)); - }).doOnRequest(n -> System.out.println(">>> response stream request(n) in responder: " + n)); - } + public Publisher handleChannel(Publisher inputs) { + return requestChannelHandler.apply(inputs); } @Override @@ -446,8 +497,8 @@ public Publisher handleMetadataPush(Payload payload) } } - @AfterClass - public static void shutdown() { + @After + public void shutdown() { socketServer.shutdown(); socketClient.shutdown(); } diff --git a/src/test/java/io/reactivesocket/TestTransportRequestN.java b/src/test/java/io/reactivesocket/TestTransportRequestN.java index bb3d0b785..693fb97f6 100644 --- a/src/test/java/io/reactivesocket/TestTransportRequestN.java +++ b/src/test/java/io/reactivesocket/TestTransportRequestN.java @@ -200,7 +200,7 @@ public Publisher handleFireAndForget(Payload payload) { * Use Payload.metadata for routing */ @Override - public Publisher handleChannel(Payload initialPayload, Publisher payloads) { + public Publisher handleChannel(Publisher inputs) { return range(0, 10000).map(i -> "channel_response_" + i).map(n -> utf8EncodedPayload(n, null)); }