Skip to content

Commit

Permalink
Add EventStreams.ticks(Duration) factory method.
Browse files Browse the repository at this point in the history
  • Loading branch information
TomasMikula committed Jun 3, 2014
1 parent e754843 commit 7753d84
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 11 deletions.
2 changes: 1 addition & 1 deletion reactfx/src/main/java/org/reactfx/EventStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ default <U> AwaitingEventStream<U> reduceSuccessions(
Executor eventThreadExecutor) {

Function<Runnable, Timer> timerFactory =
action -> new ScheduledExecutorServiceTimer(
action -> ScheduledExecutorServiceTimer.create(
timeout, action, scheduler, eventThreadExecutor);
return new SuccessionReducingStream<T, U>(
this, initialTransformation, reduction, timerFactory);
Expand Down
46 changes: 46 additions & 0 deletions reactfx/src/main/java/org/reactfx/EventStreams.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package org.reactfx;

import java.time.Duration;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import java.util.stream.Stream;

Expand All @@ -18,6 +21,9 @@
import javafx.event.EventType;
import javafx.scene.Node;

import org.reactfx.util.FxTimer;
import org.reactfx.util.Timer;

public class EventStreams {

/**
Expand Down Expand Up @@ -132,6 +138,46 @@ protected Subscription subscribeToInputs() {
};
}

/**
* Returns an event stream that emits periodic <i>ticks</i>. The returned
* stream may only be used on the JavaFX application thread.
*/
public static EventStream<?> ticks(Duration interval) {
return new LazilyBoundStream<Void>() {
private final Timer timer = FxTimer.createPeriodic(
interval, () -> emit(null));

@Override
protected Subscription subscribeToInputs() {
timer.restart();
return timer::stop;
}
};
}

/**
* Returns an event stream that emits periodic <i>ticks</i> on the given
* {@code eventThreadExecutor}. The returned stream may only be used from
* that executor's thread.
* @param scheduler scheduler used to schedule periodic emissions.
* @param eventThreadExecutor single-thread executor used to emit the ticks.
*/
public static EventStream<?> ticks(
Duration interval,
ScheduledExecutorService scheduler,
Executor eventThreadExecutor) {
return new LazilyBoundStream<Void>() {
private final Timer timer = ScheduledExecutorServiceTimer.createPeriodic(
interval, () -> emit(null), scheduler, eventThreadExecutor);

@Override
protected Subscription subscribeToInputs() {
timer.restart();
return timer::stop;
}
};
}

/**
* Returns an event stream that emits all the events emitted from any of
* the {@code inputs}. The event type of the returned stream is the nearest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,45 @@
import java.util.concurrent.TimeUnit;

import org.reactfx.util.Timer;
import org.reactfx.util.TriFunction;

class ScheduledExecutorServiceTimer implements Timer {

public static Timer create(
java.time.Duration timeout,
Runnable action,
ScheduledExecutorService scheduler,
Executor eventThreadExecutor) {
return new ScheduledExecutorServiceTimer(
timeout, action,
(delay, unit, cmd) -> scheduler.schedule(cmd, delay, unit),
eventThreadExecutor);
}

public static Timer createPeriodic(
java.time.Duration timeout,
Runnable action,
ScheduledExecutorService scheduler,
Executor eventThreadExecutor) {
return new ScheduledExecutorServiceTimer(
timeout, action,
(delay, unit, cmd) -> scheduler.scheduleAtFixedRate(cmd, delay, delay, unit),
eventThreadExecutor);
}

private final long timeout;
private final TimeUnit unit;
private final Runnable action;
private final ScheduledExecutorService scheduler;
private final TriFunction<Long, TimeUnit, Runnable, ScheduledFuture<?>> scheduler;
private final Executor eventThreadExecutor;

private ScheduledFuture<?> pendingTimer = null;
private long seq = 0;

public ScheduledExecutorServiceTimer(
private ScheduledExecutorServiceTimer(
java.time.Duration timeout,
Runnable action,
ScheduledExecutorService scheduler,
TriFunction<Long, TimeUnit, Runnable, ScheduledFuture<?>> scheduler,
Executor eventThreadExecutor) {

this.timeout = timeout.toNanos();
Expand All @@ -34,13 +58,13 @@ public ScheduledExecutorServiceTimer(
public final void restart() {
stop();
long expected = seq;
pendingTimer = scheduler.schedule(
() -> eventThreadExecutor.execute(() -> {
if(seq == expected) {
action.run();
}
}),
timeout, unit);
pendingTimer = scheduler.apply(timeout, unit, () -> {
eventThreadExecutor.execute(() -> {
if(seq == expected) {
action.run();
}
});
});
}

@Override
Expand Down
22 changes: 22 additions & 0 deletions reactfx/src/test/java/org/reactfx/EventCounter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package org.reactfx;

import java.util.function.Consumer;

class EventCounter implements Consumer<Object> {
private int count = 0;

@Override
public void accept(Object o) {
++count;
}

public int get() {
return count;
}

public int getAndReset() {
int res = count;
count = 0;
return res;
}
}
64 changes: 64 additions & 0 deletions reactfx/src/test/java/org/reactfx/TicksTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package org.reactfx;

import static org.junit.Assert.*;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import javafx.application.Platform;
import javafx.embed.swing.JFXPanel;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.reactfx.util.FxTimer;


public class TicksTest {
private ScheduledExecutorService scheduler;

@Before
public void setUp() throws Exception {
scheduler = Executors.newSingleThreadScheduledExecutor();
new JFXPanel(); // initializes JavaFX toolkit
}

@After
public void tearDown() throws Exception {
scheduler.shutdown();
}

@Test
public void fxTest() throws InterruptedException, ExecutionException {
CompletableFuture<Integer> nTicks = new CompletableFuture<>();
Platform.runLater(() -> {
EventCounter counter = new EventCounter();
Subscription sub = EventStreams.ticks(Duration.ofMillis(100)).subscribe(counter);
FxTimer.runLater(Duration.ofMillis(350), sub::unsubscribe); // stop after 3 ticks
// wait a little more to test that no more than 3 ticks arrive anyway
FxTimer.runLater(Duration.ofMillis(550), () -> nTicks.complete(counter.get()));
});
assertEquals(3, nTicks.get().intValue());
}

@Test
public void executorTest() throws InterruptedException, ExecutionException {
ExecutorService executor = Executors.newSingleThreadExecutor();

CompletableFuture<Integer> nTicks = new CompletableFuture<>();
executor.execute(() -> {
EventCounter counter = new EventCounter();
Subscription sub = EventStreams.ticks(Duration.ofMillis(100), scheduler, executor).subscribe(counter);
ScheduledExecutorServiceTimer.create(Duration.ofMillis(350), sub::unsubscribe, scheduler, executor).restart(); // stop after 3 ticks
// wait a little more to test that no more than 3 ticks arrive anyway
ScheduledExecutorServiceTimer.create(Duration.ofMillis(550), () -> nTicks.complete(counter.get()), scheduler, executor).restart();
});
assertEquals(3, nTicks.get().intValue());

executor.shutdown();
}
}

0 comments on commit 7753d84

Please sign in to comment.