From 7753d849ce4e37ca82184aefdf10af884f788a3f Mon Sep 17 00:00:00 2001 From: Tomas Mikula Date: Tue, 3 Jun 2014 20:59:06 +0200 Subject: [PATCH] Add EventStreams.ticks(Duration) factory method. --- .../main/java/org/reactfx/EventStream.java | 2 +- .../main/java/org/reactfx/EventStreams.java | 46 +++++++++++++ .../ScheduledExecutorServiceTimer.java | 44 ++++++++++--- .../test/java/org/reactfx/EventCounter.java | 22 +++++++ .../src/test/java/org/reactfx/TicksTest.java | 64 +++++++++++++++++++ 5 files changed, 167 insertions(+), 11 deletions(-) create mode 100644 reactfx/src/test/java/org/reactfx/EventCounter.java create mode 100644 reactfx/src/test/java/org/reactfx/TicksTest.java diff --git a/reactfx/src/main/java/org/reactfx/EventStream.java b/reactfx/src/main/java/org/reactfx/EventStream.java index ae40c11..cae3e40 100644 --- a/reactfx/src/main/java/org/reactfx/EventStream.java +++ b/reactfx/src/main/java/org/reactfx/EventStream.java @@ -419,7 +419,7 @@ default AwaitingEventStream reduceSuccessions( Executor eventThreadExecutor) { Function timerFactory = - action -> new ScheduledExecutorServiceTimer( + action -> ScheduledExecutorServiceTimer.create( timeout, action, scheduler, eventThreadExecutor); return new SuccessionReducingStream( this, initialTransformation, reduction, timerFactory); diff --git a/reactfx/src/main/java/org/reactfx/EventStreams.java b/reactfx/src/main/java/org/reactfx/EventStreams.java index 660b583..e1276ec 100644 --- a/reactfx/src/main/java/org/reactfx/EventStreams.java +++ b/reactfx/src/main/java/org/reactfx/EventStreams.java @@ -1,5 +1,8 @@ package org.reactfx; +import java.time.Duration; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; import java.util.function.Consumer; import java.util.stream.Stream; @@ -18,6 +21,9 @@ import javafx.event.EventType; import javafx.scene.Node; +import org.reactfx.util.FxTimer; +import org.reactfx.util.Timer; + public class EventStreams { /** @@ -132,6 +138,46 @@ protected Subscription subscribeToInputs() { }; } + /** + * Returns an event stream that emits periodic ticks. The returned + * stream may only be used on the JavaFX application thread. + */ + public static EventStream ticks(Duration interval) { + return new LazilyBoundStream() { + private final Timer timer = FxTimer.createPeriodic( + interval, () -> emit(null)); + + @Override + protected Subscription subscribeToInputs() { + timer.restart(); + return timer::stop; + } + }; + } + + /** + * Returns an event stream that emits periodic ticks on the given + * {@code eventThreadExecutor}. The returned stream may only be used from + * that executor's thread. + * @param scheduler scheduler used to schedule periodic emissions. + * @param eventThreadExecutor single-thread executor used to emit the ticks. + */ + public static EventStream ticks( + Duration interval, + ScheduledExecutorService scheduler, + Executor eventThreadExecutor) { + return new LazilyBoundStream() { + private final Timer timer = ScheduledExecutorServiceTimer.createPeriodic( + interval, () -> emit(null), scheduler, eventThreadExecutor); + + @Override + protected Subscription subscribeToInputs() { + timer.restart(); + return timer::stop; + } + }; + } + /** * Returns an event stream that emits all the events emitted from any of * the {@code inputs}. The event type of the returned stream is the nearest diff --git a/reactfx/src/main/java/org/reactfx/ScheduledExecutorServiceTimer.java b/reactfx/src/main/java/org/reactfx/ScheduledExecutorServiceTimer.java index bcfbc8b..1c702d8 100644 --- a/reactfx/src/main/java/org/reactfx/ScheduledExecutorServiceTimer.java +++ b/reactfx/src/main/java/org/reactfx/ScheduledExecutorServiceTimer.java @@ -6,21 +6,45 @@ import java.util.concurrent.TimeUnit; import org.reactfx.util.Timer; +import org.reactfx.util.TriFunction; class ScheduledExecutorServiceTimer implements Timer { + + public static Timer create( + java.time.Duration timeout, + Runnable action, + ScheduledExecutorService scheduler, + Executor eventThreadExecutor) { + return new ScheduledExecutorServiceTimer( + timeout, action, + (delay, unit, cmd) -> scheduler.schedule(cmd, delay, unit), + eventThreadExecutor); + } + + public static Timer createPeriodic( + java.time.Duration timeout, + Runnable action, + ScheduledExecutorService scheduler, + Executor eventThreadExecutor) { + return new ScheduledExecutorServiceTimer( + timeout, action, + (delay, unit, cmd) -> scheduler.scheduleAtFixedRate(cmd, delay, delay, unit), + eventThreadExecutor); + } + private final long timeout; private final TimeUnit unit; private final Runnable action; - private final ScheduledExecutorService scheduler; + private final TriFunction> scheduler; private final Executor eventThreadExecutor; private ScheduledFuture pendingTimer = null; private long seq = 0; - public ScheduledExecutorServiceTimer( + private ScheduledExecutorServiceTimer( java.time.Duration timeout, Runnable action, - ScheduledExecutorService scheduler, + TriFunction> scheduler, Executor eventThreadExecutor) { this.timeout = timeout.toNanos(); @@ -34,13 +58,13 @@ public ScheduledExecutorServiceTimer( public final void restart() { stop(); long expected = seq; - pendingTimer = scheduler.schedule( - () -> eventThreadExecutor.execute(() -> { - if(seq == expected) { - action.run(); - } - }), - timeout, unit); + pendingTimer = scheduler.apply(timeout, unit, () -> { + eventThreadExecutor.execute(() -> { + if(seq == expected) { + action.run(); + } + }); + }); } @Override diff --git a/reactfx/src/test/java/org/reactfx/EventCounter.java b/reactfx/src/test/java/org/reactfx/EventCounter.java new file mode 100644 index 0000000..ad388ba --- /dev/null +++ b/reactfx/src/test/java/org/reactfx/EventCounter.java @@ -0,0 +1,22 @@ +package org.reactfx; + +import java.util.function.Consumer; + +class EventCounter implements Consumer { + private int count = 0; + + @Override + public void accept(Object o) { + ++count; + } + + public int get() { + return count; + } + + public int getAndReset() { + int res = count; + count = 0; + return res; + } +} diff --git a/reactfx/src/test/java/org/reactfx/TicksTest.java b/reactfx/src/test/java/org/reactfx/TicksTest.java new file mode 100644 index 0000000..f5f5297 --- /dev/null +++ b/reactfx/src/test/java/org/reactfx/TicksTest.java @@ -0,0 +1,64 @@ +package org.reactfx; + +import static org.junit.Assert.*; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +import javafx.application.Platform; +import javafx.embed.swing.JFXPanel; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.reactfx.util.FxTimer; + + +public class TicksTest { + private ScheduledExecutorService scheduler; + + @Before + public void setUp() throws Exception { + scheduler = Executors.newSingleThreadScheduledExecutor(); + new JFXPanel(); // initializes JavaFX toolkit + } + + @After + public void tearDown() throws Exception { + scheduler.shutdown(); + } + + @Test + public void fxTest() throws InterruptedException, ExecutionException { + CompletableFuture nTicks = new CompletableFuture<>(); + Platform.runLater(() -> { + EventCounter counter = new EventCounter(); + Subscription sub = EventStreams.ticks(Duration.ofMillis(100)).subscribe(counter); + FxTimer.runLater(Duration.ofMillis(350), sub::unsubscribe); // stop after 3 ticks + // wait a little more to test that no more than 3 ticks arrive anyway + FxTimer.runLater(Duration.ofMillis(550), () -> nTicks.complete(counter.get())); + }); + assertEquals(3, nTicks.get().intValue()); + } + + @Test + public void executorTest() throws InterruptedException, ExecutionException { + ExecutorService executor = Executors.newSingleThreadExecutor(); + + CompletableFuture nTicks = new CompletableFuture<>(); + executor.execute(() -> { + EventCounter counter = new EventCounter(); + Subscription sub = EventStreams.ticks(Duration.ofMillis(100), scheduler, executor).subscribe(counter); + ScheduledExecutorServiceTimer.create(Duration.ofMillis(350), sub::unsubscribe, scheduler, executor).restart(); // stop after 3 ticks + // wait a little more to test that no more than 3 ticks arrive anyway + ScheduledExecutorServiceTimer.create(Duration.ofMillis(550), () -> nTicks.complete(counter.get()), scheduler, executor).restart(); + }); + assertEquals(3, nTicks.get().intValue()); + + executor.shutdown(); + } +}