diff --git a/rsocket-core/src/main/java/io/rsocket/internal/SwitchTransform.java b/rsocket-core/src/main/java/io/rsocket/internal/SwitchTransform.java deleted file mode 100644 index e1e79d9d9..000000000 --- a/rsocket-core/src/main/java/io/rsocket/internal/SwitchTransform.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Copyright 2015-2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.internal; - -import io.netty.util.ReferenceCountUtil; -import java.util.Objects; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.function.BiFunction; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscription; -import reactor.core.CoreSubscriber; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Operators; -import reactor.core.publisher.UnicastProcessor; - -public final class SwitchTransform extends Flux { - - final Publisher source; - final BiFunction, Publisher> transformer; - - public SwitchTransform( - Publisher source, BiFunction, Publisher> transformer) { - this.source = Objects.requireNonNull(source, "source"); - this.transformer = Objects.requireNonNull(transformer, "transformer"); - } - - @Override - public void subscribe(CoreSubscriber actual) { - source.subscribe(new SwitchTransformSubscriber<>(actual, transformer)); - } - - static final class SwitchTransformSubscriber implements CoreSubscriber { - @SuppressWarnings("rawtypes") - static final AtomicIntegerFieldUpdater ONCE = - AtomicIntegerFieldUpdater.newUpdater(SwitchTransformSubscriber.class, "once"); - - final CoreSubscriber actual; - final BiFunction, Publisher> transformer; - final UnicastProcessor processor = UnicastProcessor.create(); - Subscription s; - volatile int once; - - SwitchTransformSubscriber( - CoreSubscriber actual, - BiFunction, Publisher> transformer) { - this.actual = actual; - this.transformer = transformer; - } - - @Override - public void onSubscribe(Subscription s) { - if (Operators.validate(this.s, s)) { - this.s = s; - processor.onSubscribe(s); - } - } - - @Override - public void onNext(T t) { - if (once == 0 && ONCE.compareAndSet(this, 0, 1)) { - try { - Publisher result = - Objects.requireNonNull( - transformer.apply(t, processor), "The transformer returned a null value"); - result.subscribe(actual); - } catch (Throwable e) { - onError(Operators.onOperatorError(s, e, t, actual.currentContext())); - ReferenceCountUtil.safeRelease(t); - return; - } - } - processor.onNext(t); - } - - @Override - public void onError(Throwable t) { - processor.onError(t); - } - - @Override - public void onComplete() { - processor.onComplete(); - } - } -} diff --git a/rsocket-core/src/main/java/io/rsocket/internal/SwitchTransformFlux.java b/rsocket-core/src/main/java/io/rsocket/internal/SwitchTransformFlux.java new file mode 100644 index 000000000..226a9b78c --- /dev/null +++ b/rsocket-core/src/main/java/io/rsocket/internal/SwitchTransformFlux.java @@ -0,0 +1,249 @@ +/* + * Copyright 2015-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.internal; + +import java.util.Objects; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.BiFunction; + +import io.netty.util.ReferenceCountUtil; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscription; +import reactor.core.CoreSubscriber; +import reactor.core.Scannable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Operators; +import reactor.util.annotation.Nullable; + +public final class SwitchTransformFlux extends Flux { + + final Publisher source; + final BiFunction, Publisher> transformer; + + public SwitchTransformFlux( + Publisher source, BiFunction, Publisher> transformer) { + this.source = Objects.requireNonNull(source, "source"); + this.transformer = Objects.requireNonNull(transformer, "transformer"); + } + + @Override + public int getPrefetch() { + return 1; + } + + @Override + public void subscribe(CoreSubscriber actual) { + source.subscribe(new SwitchTransformMain<>(actual, transformer)); + } + + static final class SwitchTransformMain implements CoreSubscriber, Scannable { + + final CoreSubscriber actual; + final BiFunction, Publisher> transformer; + final SwitchTransformInner inner; + + Subscription s; + + volatile int once; + @SuppressWarnings("rawtypes") + static final AtomicIntegerFieldUpdater ONCE = + AtomicIntegerFieldUpdater.newUpdater(SwitchTransformMain.class, "once"); + + SwitchTransformMain( + CoreSubscriber actual, + BiFunction, Publisher> transformer + ) { + this.actual = actual; + this.transformer = transformer; + this.inner = new SwitchTransformInner<>(this); + } + + @Override + @Nullable + public Object scanUnsafe(Attr key) { + if (key == Attr.CANCELLED) return s == Operators.cancelledSubscription(); + if (key == Attr.PREFETCH) return 1; + + return null; + } + + @Override + public void onSubscribe(Subscription s) { + if (Operators.validate(this.s, s)) { + this.s = s; + s.request(1); + } + } + + @Override + public void onNext(T t) { + if (isCanceled()) { + return; + } + + if (once == 0 && ONCE.compareAndSet(this, 0, 1)) { + try { + inner.first = t; + Publisher result = + Objects.requireNonNull(transformer.apply(t, inner), "The transformer returned a null value"); + result.subscribe(actual); + return; + } catch (Throwable e) { + onError(Operators.onOperatorError(s, e, t, actual.currentContext())); + ReferenceCountUtil.safeRelease(t); + return; + } + } + + inner.onNext(t); + } + + @Override + public void onError(Throwable t) { + if (isCanceled()) { + return; + } + + if (once != 0) { + inner.onError(t); + } else { + actual.onSubscribe(Operators.emptySubscription()); + actual.onError(t); + } + } + + @Override + public void onComplete() { + if (isCanceled()) { + return; + } + + if (once != 0) { + inner.onComplete(); + } else { + actual.onSubscribe(Operators.emptySubscription()); + actual.onComplete(); + } + } + + boolean isCanceled() { + return s == Operators.cancelledSubscription(); + } + + void cancel() { + s.cancel(); + s = Operators.cancelledSubscription(); + } + } + + static final class SwitchTransformInner extends Flux + implements Scannable, Subscription { + + final SwitchTransformMain parent; + + volatile CoreSubscriber actual; + @SuppressWarnings("rawtypes") + static final AtomicReferenceFieldUpdater ACTUAL = + AtomicReferenceFieldUpdater.newUpdater(SwitchTransformInner.class, CoreSubscriber.class, "actual"); + + volatile V first; + @SuppressWarnings("rawtypes") + static final AtomicReferenceFieldUpdater FIRST = + AtomicReferenceFieldUpdater.newUpdater(SwitchTransformInner.class, Object.class, "first"); + + volatile int once; + @SuppressWarnings("rawtypes") + static final AtomicIntegerFieldUpdater ONCE = + AtomicIntegerFieldUpdater.newUpdater(SwitchTransformInner.class, "once"); + + SwitchTransformInner(SwitchTransformMain parent) { + this.parent = parent; + } + + public void onNext(V t) { + CoreSubscriber a = actual; + + if (a != null) { + a.onNext(t); + } + } + + public void onError(Throwable t) { + CoreSubscriber a = actual; + + if (a != null) { + a.onError(t); + } + } + + public void onComplete() { + CoreSubscriber a = actual; + + if (a != null) { + a.onComplete(); + } + } + + @Override + public void subscribe(CoreSubscriber actual) { + if (once == 0 && ONCE.compareAndSet(this, 0, 1)) { + ACTUAL.lazySet(this, actual); + actual.onSubscribe(this); + } + else { + actual.onError(new IllegalStateException("SwitchTransform allows only one Subscriber")); + } + } + + @Override + public void request(long n) { + V f = first; + + if (f != null && FIRST.compareAndSet(this, f, null)) { + actual.onNext(f); + + long r = Operators.addCap(n, -1); + if (r > 0) { + parent.s.request(r); + } + } else { + parent.s.request(n); + } + } + + @Override + public void cancel() { + actual = null; + first = null; + parent.cancel(); + } + + @Override + @Nullable + public Object scanUnsafe(Attr key) { + if (key == Attr.PARENT) return parent; + if (key == Attr.ACTUAL) return actual(); + + return null; + } + + public CoreSubscriber actual() { + return actual; + } + } +} \ No newline at end of file diff --git a/rsocket-core/src/test/java/io/rsocket/internal/SwitchTransformFluxTest.java b/rsocket-core/src/test/java/io/rsocket/internal/SwitchTransformFluxTest.java new file mode 100644 index 000000000..aaa1fa4f0 --- /dev/null +++ b/rsocket-core/src/test/java/io/rsocket/internal/SwitchTransformFluxTest.java @@ -0,0 +1,109 @@ +package io.rsocket.internal; + +import java.time.Duration; + +import org.junit.Test; +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; +import reactor.test.publisher.TestPublisher; + +public class SwitchTransformFluxTest { + + @Test + public void backpressureTest() { + TestPublisher publisher = TestPublisher.createCold(); + + Flux switchTransformed = publisher + .flux() + .transform(flux -> new SwitchTransformFlux<>( + flux, + (first, innerFlux) -> innerFlux.map(String::valueOf) + )); + + publisher.next(1L); + + StepVerifier.create(switchTransformed, 0) + .thenRequest(1) + .expectNext("1") + .thenRequest(1) + .then(() -> publisher.next(2L)) + .expectNext("2") + .then(publisher::complete) + .expectComplete() + .verify(Duration.ofSeconds(10)); + + publisher.assertWasRequested(); + publisher.assertNoRequestOverflow(); + } + + @Test + public void shouldErrorOnOverflowTest() { + TestPublisher publisher = TestPublisher.createCold(); + + Flux switchTransformed = publisher + .flux() + .transform(flux -> new SwitchTransformFlux<>( + flux, + (first, innerFlux) -> innerFlux.map(String::valueOf) + )); + + publisher.next(1L); + + StepVerifier.create(switchTransformed, 0) + .thenRequest(1) + .expectNext("1") + .then(() -> publisher.next(2L)) + .expectError() + .verify(Duration.ofSeconds(10)); + + publisher.assertWasRequested(); + publisher.assertNoRequestOverflow(); + } + + @Test + public void shouldPropagateonCompleteCorrectly() { + Flux switchTransformed = Flux.empty() + .transform(flux -> new SwitchTransformFlux<>( + flux, + (first, innerFlux) -> innerFlux.map(String::valueOf) + )); + + StepVerifier.create(switchTransformed) + .expectComplete() + .verify(Duration.ofSeconds(10)); + } + + @Test + public void shouldPropagateErrorCorrectly() { + Flux switchTransformed = Flux.error(new RuntimeException("hello")) + .transform(flux -> new SwitchTransformFlux<>( + flux, + (first, innerFlux) -> innerFlux.map(String::valueOf) + )); + + StepVerifier.create(switchTransformed) + .expectErrorMessage("hello") + .verify(Duration.ofSeconds(10)); + } + + @Test + public void shouldBeAbleToBeCancelledProperly() { + TestPublisher publisher = TestPublisher.createCold(); + Flux switchTransformed = publisher + .flux() + .transform(flux -> new SwitchTransformFlux<>( + flux, + (first, innerFlux) -> innerFlux.map(String::valueOf) + )); + + publisher.emit(1, 2, 3, 4, 5); + + StepVerifier.create(switchTransformed, 0) + .thenCancel() + .verify(Duration.ofSeconds(10)); + + publisher.assertCancelled(); + publisher.assertWasRequested(); + + } +} \ No newline at end of file