Skip to content

Commit

Permalink
Don't use Scala Futures in Java APIs
Browse files Browse the repository at this point in the history
Sketching out apache#1417 - incomplete and notably not bothering
with binary compatibility yet, just to illustrate the idea.
  • Loading branch information
raboof committed Jul 29, 2024
1 parent 684fec9 commit 9519bc9
Show file tree
Hide file tree
Showing 11 changed files with 70 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import org.apache.pekko.testkit.PekkoSpec;

@SuppressWarnings("deprecation")
public class JavaFutureTests extends JUnitSuite {

@ClassRule
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.pekko.testkit.PekkoSpec;
import org.apache.pekko.testkit.TestProbe;
import org.apache.pekko.util.Timeout;
import org.apache.pekko.util.FutureConverters;
import org.junit.ClassRule;
import org.junit.Test;
import org.scalatestplus.junit.JUnitSuite;
Expand Down Expand Up @@ -317,19 +318,21 @@ public void testCSRetry() throws Exception {
}

@Test(expected = IllegalStateException.class)
public void testAfterFailedCallable() throws Exception {
Callable<Future<String>> failedCallable =
() -> Futures.failed(new IllegalStateException("Illegal!"));
public void testAfterFailedCallable() throws Throwable {
Callable<CompletionStage<String>> failedCallable =
() -> Futures.failedCompletionStage(new IllegalStateException("Illegal!"));

Future<String> delayedFuture =
CompletionStage<String> delayedFuture =
Patterns.after(
scala.concurrent.duration.Duration.create(200, "millis"),
system.scheduler(),
ec,
Duration.ofMillis(200),
system,
failedCallable);

Future<String> resultFuture = Futures.firstCompletedOf(Arrays.asList(delayedFuture), ec);
Await.result(resultFuture, scala.concurrent.duration.FiniteDuration.apply(3, SECONDS));
try {
delayedFuture.toCompletableFuture().get(3, SECONDS);
} catch (ExecutionException e) {
throw e.getCause();
}
}

@Test(expected = IllegalStateException.class)
Expand All @@ -340,7 +343,7 @@ public void testAfterFailedFuture() throws Exception {
scala.concurrent.duration.Duration.create(200, "millis"),
system.scheduler(),
ec,
() -> Futures.failed(new IllegalStateException("Illegal!")));
() -> FutureConverters.asScala(Futures.failedCompletionStage(new IllegalStateException("Illegal!"))));

Future<String> resultFuture = Futures.firstCompletedOf(Arrays.asList(delayedFuture), ec);
Await.result(resultFuture, FiniteDuration.apply(3, SECONDS));
Expand Down Expand Up @@ -391,7 +394,7 @@ public void testAfterFiniteDuration() throws Exception {
ec,
() -> Futures.successful("world"));

Future<String> immediateFuture = Futures.future(() -> expected, ec);
Future<String> immediateFuture = FutureConverters.asScala(CompletableFuture.completedFuture(expected));

Future<String> resultFuture =
Futures.firstCompletedOf(Arrays.asList(delayedFuture, immediateFuture), ec);
Expand Down
5 changes: 5 additions & 0 deletions actor/src/main/scala/org/apache/pekko/dispatch/Future.scala
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ object Futures {
* @param executor the execution context on which the future is run
* @return the `Future` holding the result of the computation
*/
@deprecated("Use CompletionStage / CompletableFuture-based APIs instead", "1.1.0")
def future[T](body: Callable[T], executor: ExecutionContext): Future[T] = Future(body.call)(executor)

/**
Expand All @@ -142,6 +143,7 @@ object Futures {
/**
* creates an already completed Promise with the specified exception
*/
@deprecated("Use CompletionStage / CompletableFuture-based APIs instead", "1.1.0")
def failed[T](exception: Throwable): Future[T] = Future.failed(exception)

/**
Expand All @@ -151,6 +153,8 @@ object Futures {

/**
* Creates an already completed CompletionStage with the specified exception
*
* Note: prefer CompletableFuture.failedStage(ex) from Java 9 onwards
*/
def failedCompletionStage[T](ex: Throwable): CompletionStage[T] = {
val f = CompletableFuture.completedFuture[T](null.asInstanceOf[T])
Expand All @@ -172,6 +176,7 @@ object Futures {
/**
* Returns a Future to the result of the first future in the list that is completed
*/
//@deprecated("Use CompletableFuture.anyOf instead", "1.1.0")
def firstCompletedOf[T <: AnyRef](futures: JIterable[Future[T]], executor: ExecutionContext): Future[T] =
Future.firstCompletedOf(futures.asScala)(executor)

Expand Down
5 changes: 3 additions & 2 deletions docs/src/test/java/jdocs/future/ActorWithFuture.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@

// #context-dispatcher
import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.dispatch.Futures;

import java.util.concurrent.CompletableFuture;

public class ActorWithFuture extends AbstractActor {
ActorWithFuture() {
Futures.future(() -> "hello", getContext().dispatcher());
CompletableFuture.supplyAsync(() -> "hello", getContext().dispatcher());
}

@Override
Expand Down
34 changes: 14 additions & 20 deletions docs/src/test/java/jdocs/future/FutureDocTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,18 @@
package jdocs.future;

import org.apache.pekko.actor.typed.ActorSystem;
import org.apache.pekko.dispatch.Futures;
import org.apache.pekko.pattern.Patterns;
import org.apache.pekko.testkit.PekkoJUnitActorSystemResource;
import org.apache.pekko.testkit.PekkoSpec;
import org.apache.pekko.util.Timeout;
import org.apache.pekko.util.FutureConverters;
import jdocs.AbstractJavaTest;
import org.junit.ClassRule;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;

import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.*;

import static org.apache.pekko.actor.typed.javadsl.Adapter.toTyped;
import static org.apache.pekko.dispatch.Futures.future;
// #imports

// #imports
Expand All @@ -50,7 +41,7 @@ public class FutureDocTest extends AbstractJavaTest {

@Test(expected = java.util.concurrent.CompletionException.class)
public void useAfter() throws Exception {
final ExecutionContext ec = system.executionContext();
final Executor ex = system.executionContext();
// #after
CompletionStage<String> failWithException =
CompletableFuture.supplyAsync(
Expand All @@ -60,18 +51,21 @@ public void useAfter() throws Exception {
CompletionStage<String> delayed =
Patterns.after(Duration.ofMillis(200), system, () -> failWithException);
// #after
Future<String> future =
future(
CompletionStage<String> completionStage =
CompletableFuture.supplyAsync(
() -> {
Thread.sleep(1000);
return "foo";
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "foo";
},
ec);
Future<String> result =
Futures.firstCompletedOf(
Arrays.<Future<String>>asList(future, FutureConverters.asScala(delayed)), ec);
ex);
CompletableFuture<Object> result =
CompletableFuture.anyOf(completionStage.toCompletableFuture(), delayed.toCompletableFuture());
Timeout timeout = Timeout.create(Duration.ofSeconds(2));
Await.result(result, timeout.duration());
result.toCompletableFuture().get(2, SECONDS);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
import org.junit.runner.RunWith;
import org.scalatestplus.junit.JUnitRunner;
import scala.concurrent.Future;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;
import org.iq80.leveldb.util.FileUtils;
import java.util.Optional;
Expand Down Expand Up @@ -81,45 +84,45 @@ public Receive createReceive() {

class MySnapshotStore extends SnapshotStore {
@Override
public Future<Optional<SelectedSnapshot>> doLoadAsync(
public CompletionStage<Optional<SelectedSnapshot>> doLoadAsync(
String persistenceId, SnapshotSelectionCriteria criteria) {
return null;
}

@Override
public Future<Void> doSaveAsync(SnapshotMetadata metadata, Object snapshot) {
public CompletionStage<Void> doSaveAsync(SnapshotMetadata metadata, Object snapshot) {
return null;
}

@Override
public Future<Void> doDeleteAsync(SnapshotMetadata metadata) {
return Futures.successful(null);
public CompletionStage<Void> doDeleteAsync(SnapshotMetadata metadata) {
return CompletableFuture.completedFuture(null);
}

@Override
public Future<Void> doDeleteAsync(String persistenceId, SnapshotSelectionCriteria criteria) {
return Futures.successful(null);
public CompletionStage<Void> doDeleteAsync(String persistenceId, SnapshotSelectionCriteria criteria) {
return CompletableFuture.completedFuture(null);
}
}

class MyAsyncJournal extends AsyncWriteJournal {
// #sync-journal-plugin-api
@Override
public Future<Iterable<Optional<Exception>>> doAsyncWriteMessages(
public CompletionStage<Iterable<Optional<Exception>>> doAsyncWriteMessages(
Iterable<AtomicWrite> messages) {
try {
Iterable<Optional<Exception>> result = new ArrayList<Optional<Exception>>();
// blocking call here...
// result.add(..)
return Futures.successful(result);
return CompletableFuture.completedFuture(result);
} catch (Exception e) {
return Futures.failed(e);
return Futures.failedCompletionStage(e);
}
}
// #sync-journal-plugin-api

@Override
public Future<Void> doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr) {
public CompletionStage<Void> doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr) {
return null;
}

Expand Down
5 changes: 2 additions & 3 deletions docs/src/test/java/jdocs/stream/FlowDocTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.Cancellable;
import org.apache.pekko.dispatch.Futures;
import org.apache.pekko.japi.Pair;
import org.apache.pekko.stream.*;
import org.apache.pekko.stream.javadsl.*;
Expand Down Expand Up @@ -158,8 +157,8 @@ public void creatingSourcesSinks() throws Exception {
list.add(3);
Source.from(list);

// Create a source form a Future
Source.future(Futures.successful("Hello Streams!"));
// Create a source form a CompletionStage
Source.completionStage(CompletableFuture.completedFuture("Hello Streams!"));

// Create a source from a single element
Source.single("only one element");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@
package org.apache.pekko.persistence.journal.japi;

import java.util.Optional;

import scala.concurrent.Future;
import java.util.concurrent.CompletionStage;

import org.apache.pekko.persistence.*;

Expand Down Expand Up @@ -73,7 +72,7 @@ interface AsyncWritePlugin {
*
* <p>This call is protected with a circuit-breaker.
*/
Future<Iterable<Optional<Exception>>> doAsyncWriteMessages(Iterable<AtomicWrite> messages);
CompletionStage<Iterable<Optional<Exception>>> doAsyncWriteMessages(Iterable<AtomicWrite> messages);

/**
* Java API, Plugin API: synchronously deletes all persistent messages up to `toSequenceNr`.
Expand All @@ -82,6 +81,6 @@ interface AsyncWritePlugin {
*
* @see AsyncRecoveryPlugin
*/
Future<Void> doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr);
CompletionStage<Void> doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr);
// #async-write-plugin-api
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import scala.concurrent.Future;

import java.util.Optional;
import java.util.concurrent.CompletionStage;

interface SnapshotStorePlugin {
// #snapshot-store-plugin-api
Expand All @@ -28,7 +29,7 @@ interface SnapshotStorePlugin {
* @param persistenceId id of the persistent actor.
* @param criteria selection criteria for loading.
*/
Future<Optional<SelectedSnapshot>> doLoadAsync(
CompletionStage<Optional<SelectedSnapshot>> doLoadAsync(
String persistenceId, SnapshotSelectionCriteria criteria);

/**
Expand All @@ -37,21 +38,21 @@ Future<Optional<SelectedSnapshot>> doLoadAsync(
* @param metadata snapshot metadata.
* @param snapshot snapshot.
*/
Future<Void> doSaveAsync(SnapshotMetadata metadata, Object snapshot);
CompletionStage<Void> doSaveAsync(SnapshotMetadata metadata, Object snapshot);

/**
* Java API, Plugin API: deletes the snapshot identified by `metadata`.
*
* @param metadata snapshot metadata.
*/
Future<Void> doDeleteAsync(SnapshotMetadata metadata);
CompletionStage<Void> doDeleteAsync(SnapshotMetadata metadata);

/**
* Java API, Plugin API: deletes all snapshots matching `criteria`.
*
* @param persistenceId id of the persistent actor.
* @param criteria selection criteria for deleting.
*/
Future<Void> doDeleteAsync(String persistenceId, SnapshotSelectionCriteria criteria);
CompletionStage<Void> doDeleteAsync(String persistenceId, SnapshotSelectionCriteria criteria);
// #snapshot-store-plugin-api
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.util.Try
import org.apache.pekko
import pekko.persistence._
import pekko.persistence.journal.{ AsyncWriteJournal => SAsyncWriteJournal }
import pekko.util.FutureConverters._
import pekko.util.ccompat._
import pekko.util.ccompat.JavaConverters._

Expand All @@ -33,7 +34,7 @@ abstract class AsyncWriteJournal extends AsyncRecovery with SAsyncWriteJournal w
import context.dispatcher

final def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] =
doAsyncWriteMessages(messages.asJava).map { results =>
doAsyncWriteMessages(messages.asJava).asScala.map { results =>
results.asScala.iterator
.map { r =>
if (r.isPresent) Failure(r.get)
Expand All @@ -42,6 +43,7 @@ abstract class AsyncWriteJournal extends AsyncRecovery with SAsyncWriteJournal w
.to(immutable.IndexedSeq)
}

final def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long) =
doAsyncDeleteMessagesTo(persistenceId, toSequenceNr).map(_ => ())
final def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = {
doAsyncDeleteMessagesTo(persistenceId, toSequenceNr).asScala.map(_ => ())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package org.apache.pekko.persistence.snapshot.japi
import scala.concurrent.Future

import org.apache.pekko
import pekko.util.FutureConverters._
import pekko.japi.Util._
import pekko.persistence._
import pekko.persistence.snapshot.{ SnapshotStore => SSnapshotStore }
Expand All @@ -29,15 +30,15 @@ abstract class SnapshotStore extends SSnapshotStore with SnapshotStorePlugin {
override final def loadAsync(
persistenceId: String,
criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] =
doLoadAsync(persistenceId, criteria).map(option)
doLoadAsync(persistenceId, criteria).asScala.map(option)

override final def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] =
doSaveAsync(metadata, snapshot).map(_ => ())
doSaveAsync(metadata, snapshot).asScala.map(_ => ())

override final def deleteAsync(metadata: SnapshotMetadata): Future[Unit] =
doDeleteAsync(metadata).map(_ => ())
doDeleteAsync(metadata).asScala.map(_ => ())

override final def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] =
doDeleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria).map(_ => ())
doDeleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria).asScala.map(_ => ())

}

0 comments on commit 9519bc9

Please sign in to comment.