Skip to content

Commit

Permalink
Don't use Scala Futures in Java APIs
Browse files Browse the repository at this point in the history
Sketching out apache#1417 - incomplete and notably not bothering
with binary compatibility yet, just to illustrate the idea.
  • Loading branch information
raboof committed Aug 2, 2024
1 parent 684fec9 commit 9f74250
Show file tree
Hide file tree
Showing 12 changed files with 107 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import org.apache.pekko.testkit.PekkoSpec;

@SuppressWarnings("deprecation")
public class JavaFutureTests extends JUnitSuite {

@ClassRule
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.pekko.testkit.PekkoSpec;
import org.apache.pekko.testkit.TestProbe;
import org.apache.pekko.util.Timeout;
import org.apache.pekko.util.FutureConverters;
import org.junit.ClassRule;
import org.junit.Test;
import org.scalatestplus.junit.JUnitSuite;
Expand Down Expand Up @@ -219,15 +220,15 @@ public void testAskWithReplyToTimeout() throws Exception {
@Test
public void usePipe() throws Exception {
TestProbe probe = new TestProbe(system);
pipe(Futures.successful("ho!"), system.dispatcher()).to(probe.ref());
pipe(CompletableFuture.completedFuture("ho!"), system.dispatcher()).to(probe.ref());
probe.expectMsg("ho!");
}

@Test
public void usePipeWithActorSelection() throws Exception {
TestProbe probe = new TestProbe(system);
ActorSelection selection = system.actorSelection(probe.ref().path());
pipe(Futures.successful("hi!"), system.dispatcher()).to(selection);
pipe(CompletableFuture.completedFuture("hi!"), system.dispatcher()).to(selection);
probe.expectMsg("hi!");
}

Expand Down Expand Up @@ -291,15 +292,11 @@ public void testRetryCompletionStageRandomDelay() throws Exception {
public void testRetry() throws Exception {
final String expected = "hello";

Future<String> retriedFuture =
CompletionStage<String> retriedFuture =
Patterns.retry(
() -> Futures.successful(expected),
3,
scala.concurrent.duration.Duration.apply(200, "millis"),
system.scheduler(),
ec);
() -> CompletableFuture.completedFuture(expected), 3, Duration.ofMillis(200), system);

String actual = Await.result(retriedFuture, FiniteDuration.apply(3, SECONDS));
String actual = retriedFuture.toCompletableFuture().get(3, SECONDS);
assertEquals(expected, actual);
}

Expand All @@ -317,21 +314,21 @@ public void testCSRetry() throws Exception {
}

@Test(expected = IllegalStateException.class)
public void testAfterFailedCallable() throws Exception {
Callable<Future<String>> failedCallable =
() -> Futures.failed(new IllegalStateException("Illegal!"));
public void testAfterFailedCallable() throws Throwable {
Callable<CompletionStage<String>> failedCallable =
() -> Futures.failedCompletionStage(new IllegalStateException("Illegal!"));

Future<String> delayedFuture =
Patterns.after(
scala.concurrent.duration.Duration.create(200, "millis"),
system.scheduler(),
ec,
failedCallable);
CompletionStage<String> delayedFuture =
Patterns.after(Duration.ofMillis(200), system, failedCallable);

Future<String> resultFuture = Futures.firstCompletedOf(Arrays.asList(delayedFuture), ec);
Await.result(resultFuture, scala.concurrent.duration.FiniteDuration.apply(3, SECONDS));
try {
delayedFuture.toCompletableFuture().get(3, SECONDS);
} catch (ExecutionException e) {
throw e.getCause();
}
}

@SuppressWarnings("deprecation")
@Test(expected = IllegalStateException.class)
public void testAfterFailedFuture() throws Exception {

Expand All @@ -340,7 +337,9 @@ public void testAfterFailedFuture() throws Exception {
scala.concurrent.duration.Duration.create(200, "millis"),
system.scheduler(),
ec,
() -> Futures.failed(new IllegalStateException("Illegal!")));
() ->
FutureConverters.asScala(
Futures.failedCompletionStage(new IllegalStateException("Illegal!"))));

Future<String> resultFuture = Futures.firstCompletedOf(Arrays.asList(delayedFuture), ec);
Await.result(resultFuture, FiniteDuration.apply(3, SECONDS));
Expand All @@ -350,19 +349,16 @@ public void testAfterFailedFuture() throws Exception {
public void testAfterSuccessfulCallable() throws Exception {
final String expected = "Hello";

Future<String> delayedFuture =
CompletionStage<String> delayedFuture =
Patterns.after(
scala.concurrent.duration.Duration.create(200, "millis"),
system.scheduler(),
ec,
() -> Futures.successful(expected));
Duration.ofMillis(200), system, () -> CompletableFuture.completedFuture(expected));

Future<String> resultFuture = Futures.firstCompletedOf(Arrays.asList(delayedFuture), ec);
final String actual = Await.result(resultFuture, FiniteDuration.apply(3, SECONDS));
String actual = delayedFuture.toCompletableFuture().get(3, SECONDS);

assertEquals(expected, actual);
}

@SuppressWarnings("deprecation")
@Test
public void testAfterSuccessfulFuture() throws Exception {
final String expected = "Hello";
Expand All @@ -380,6 +376,7 @@ public void testAfterSuccessfulFuture() throws Exception {
assertEquals(expected, actual);
}

@SuppressWarnings("deprecation")
@Test
public void testAfterFiniteDuration() throws Exception {
final String expected = "Hello";
Expand All @@ -391,7 +388,8 @@ public void testAfterFiniteDuration() throws Exception {
ec,
() -> Futures.successful("world"));

Future<String> immediateFuture = Futures.future(() -> expected, ec);
Future<String> immediateFuture =
FutureConverters.asScala(CompletableFuture.completedFuture(expected));

Future<String> resultFuture =
Futures.firstCompletedOf(Arrays.asList(delayedFuture, immediateFuture), ec);
Expand Down
7 changes: 7 additions & 0 deletions actor/src/main/scala/org/apache/pekko/dispatch/Future.scala
Original file line number Diff line number Diff line change
Expand Up @@ -130,27 +130,33 @@ object Futures {
* @param executor the execution context on which the future is run
* @return the `Future` holding the result of the computation
*/
@deprecated("Use CompletionStage / CompletableFuture-based APIs instead", "1.1.0")
def future[T](body: Callable[T], executor: ExecutionContext): Future[T] = Future(body.call)(executor)

/**
* Creates a promise object which can be completed with a value.
*
* @return the newly created `Promise` object
*/
@deprecated("Use CompletableFuture instead", "1.1.0")
def promise[T](): Promise[T] = Promise[T]()

/**
* creates an already completed Promise with the specified exception
*/
@deprecated("Use CompletionStage / CompletableFuture-based APIs instead", "1.1.0")
def failed[T](exception: Throwable): Future[T] = Future.failed(exception)

/**
* Creates an already completed Promise with the specified result
*/
@deprecated("Use CompletionStage / CompletableFuture-based APIs instead", "1.1.0")
def successful[T](result: T): Future[T] = Future.successful(result)

/**
* Creates an already completed CompletionStage with the specified exception
*
* Note: prefer CompletableFuture.failedStage(ex) from Java 9 onwards
*/
def failedCompletionStage[T](ex: Throwable): CompletionStage[T] = {
val f = CompletableFuture.completedFuture[T](null.asInstanceOf[T])
Expand All @@ -172,6 +178,7 @@ object Futures {
/**
* Returns a Future to the result of the first future in the list that is completed
*/
@deprecated("Use CompletableFuture.anyOf instead", "1.1.0")
def firstCompletedOf[T <: AnyRef](futures: JIterable[Future[T]], executor: ExecutionContext): Future[T] =
Future.firstCompletedOf(futures.asScala)(executor)

Expand Down
5 changes: 3 additions & 2 deletions docs/src/test/java/jdocs/future/ActorWithFuture.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@

// #context-dispatcher
import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.dispatch.Futures;

import java.util.concurrent.CompletableFuture;

public class ActorWithFuture extends AbstractActor {
ActorWithFuture() {
Futures.future(() -> "hello", getContext().dispatcher());
CompletableFuture.supplyAsync(() -> "hello", getContext().dispatcher());
}

@Override
Expand Down
42 changes: 20 additions & 22 deletions docs/src/test/java/jdocs/future/FutureDocTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,18 @@
package jdocs.future;

import org.apache.pekko.actor.typed.ActorSystem;
import org.apache.pekko.dispatch.Futures;
import org.apache.pekko.pattern.Patterns;
import org.apache.pekko.testkit.PekkoJUnitActorSystemResource;
import org.apache.pekko.testkit.PekkoSpec;
import org.apache.pekko.util.Timeout;
import org.apache.pekko.util.FutureConverters;
import jdocs.AbstractJavaTest;
import org.junit.ClassRule;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;

import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.*;

import static org.apache.pekko.actor.typed.javadsl.Adapter.toTyped;
import static org.apache.pekko.dispatch.Futures.future;
// #imports

// #imports
Expand All @@ -48,9 +39,9 @@ public class FutureDocTest extends AbstractJavaTest {

private final ActorSystem<Void> system = toTyped(actorSystemResource.getSystem());

@Test(expected = java.util.concurrent.CompletionException.class)
public void useAfter() throws Exception {
final ExecutionContext ec = system.executionContext();
@Test(expected = IllegalStateException.class)
public void useAfter() throws Throwable {
final Executor ex = system.executionContext();
// #after
CompletionStage<String> failWithException =
CompletableFuture.supplyAsync(
Expand All @@ -60,18 +51,25 @@ public void useAfter() throws Exception {
CompletionStage<String> delayed =
Patterns.after(Duration.ofMillis(200), system, () -> failWithException);
// #after
Future<String> future =
future(
CompletionStage<String> completionStage =
CompletableFuture.supplyAsync(
() -> {
Thread.sleep(1000);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "foo";
},
ec);
Future<String> result =
Futures.firstCompletedOf(
Arrays.<Future<String>>asList(future, FutureConverters.asScala(delayed)), ec);
Timeout timeout = Timeout.create(Duration.ofSeconds(2));
Await.result(result, timeout.duration());
ex);
CompletableFuture<Object> result =
CompletableFuture.anyOf(
completionStage.toCompletableFuture(), delayed.toCompletableFuture());
try {
result.toCompletableFuture().get(2, SECONDS);
} catch (ExecutionException e) {
throw e.getCause();
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
import org.junit.runner.RunWith;
import org.scalatestplus.junit.JUnitRunner;
import scala.concurrent.Future;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;
import org.iq80.leveldb.util.FileUtils;
import java.util.Optional;
Expand Down Expand Up @@ -81,45 +84,46 @@ public Receive createReceive() {

class MySnapshotStore extends SnapshotStore {
@Override
public Future<Optional<SelectedSnapshot>> doLoadAsync(
public CompletionStage<Optional<SelectedSnapshot>> doLoadAsync(
String persistenceId, SnapshotSelectionCriteria criteria) {
return null;
}

@Override
public Future<Void> doSaveAsync(SnapshotMetadata metadata, Object snapshot) {
public CompletionStage<Void> doSaveAsync(SnapshotMetadata metadata, Object snapshot) {
return null;
}

@Override
public Future<Void> doDeleteAsync(SnapshotMetadata metadata) {
return Futures.successful(null);
public CompletionStage<Void> doDeleteAsync(SnapshotMetadata metadata) {
return CompletableFuture.completedFuture(null);
}

@Override
public Future<Void> doDeleteAsync(String persistenceId, SnapshotSelectionCriteria criteria) {
return Futures.successful(null);
public CompletionStage<Void> doDeleteAsync(
String persistenceId, SnapshotSelectionCriteria criteria) {
return CompletableFuture.completedFuture(null);
}
}

class MyAsyncJournal extends AsyncWriteJournal {
// #sync-journal-plugin-api
@Override
public Future<Iterable<Optional<Exception>>> doAsyncWriteMessages(
public CompletionStage<Iterable<Optional<Exception>>> doAsyncWriteMessages(
Iterable<AtomicWrite> messages) {
try {
Iterable<Optional<Exception>> result = new ArrayList<Optional<Exception>>();
// blocking call here...
// result.add(..)
return Futures.successful(result);
return CompletableFuture.completedFuture(result);
} catch (Exception e) {
return Futures.failed(e);
return Futures.failedCompletionStage(e);
}
}
// #sync-journal-plugin-api

@Override
public Future<Void> doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr) {
public CompletionStage<Void> doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr) {
return null;
}

Expand Down
5 changes: 2 additions & 3 deletions docs/src/test/java/jdocs/stream/FlowDocTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.Cancellable;
import org.apache.pekko.dispatch.Futures;
import org.apache.pekko.japi.Pair;
import org.apache.pekko.stream.*;
import org.apache.pekko.stream.javadsl.*;
Expand Down Expand Up @@ -158,8 +157,8 @@ public void creatingSourcesSinks() throws Exception {
list.add(3);
Source.from(list);

// Create a source form a Future
Source.future(Futures.successful("Hello Streams!"));
// Create a source form a CompletionStage
Source.completionStage(CompletableFuture.completedFuture("Hello Streams!"));

// Create a source from a single element
Source.single("only one element");
Expand Down
Loading

0 comments on commit 9f74250

Please sign in to comment.