Skip to content

Commit

Permalink
Don't use Scala Futures in Java APIs
Browse files Browse the repository at this point in the history
Sketching out apache#1417 - incomplete and notably not bothering
with binary compatibility yet, just to illustrate the idea.
  • Loading branch information
raboof committed Jul 29, 2024
1 parent 684fec9 commit 59d0af4
Show file tree
Hide file tree
Showing 9 changed files with 55 additions and 50 deletions.
5 changes: 5 additions & 0 deletions actor/src/main/scala/org/apache/pekko/dispatch/Future.scala
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ object Futures {
* @param executor the execution context on which the future is run
* @return the `Future` holding the result of the computation
*/
//@deprecated("Use CompletionStage / CompletableFuture-based APIs instead", "1.1.0")
def future[T](body: Callable[T], executor: ExecutionContext): Future[T] = Future(body.call)(executor)

/**
Expand All @@ -142,6 +143,7 @@ object Futures {
/**
* creates an already completed Promise with the specified exception
*/
//@deprecated("Use CompletionStage / CompletableFuture-based APIs instead", "1.1.0")
def failed[T](exception: Throwable): Future[T] = Future.failed(exception)

/**
Expand All @@ -151,6 +153,8 @@ object Futures {

/**
* Creates an already completed CompletionStage with the specified exception
*
* Note: prefer CompletableFuture.failedStage(ex) from Java 9 onwards
*/
def failedCompletionStage[T](ex: Throwable): CompletionStage[T] = {
val f = CompletableFuture.completedFuture[T](null.asInstanceOf[T])
Expand All @@ -172,6 +176,7 @@ object Futures {
/**
* Returns a Future to the result of the first future in the list that is completed
*/
//@deprecated("Use CompletableFuture.anyOf instead", "1.1.0")
def firstCompletedOf[T <: AnyRef](futures: JIterable[Future[T]], executor: ExecutionContext): Future[T] =
Future.firstCompletedOf(futures.asScala)(executor)

Expand Down
5 changes: 3 additions & 2 deletions docs/src/test/java/jdocs/future/ActorWithFuture.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@

// #context-dispatcher
import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.dispatch.Futures;

import java.util.concurrent.CompletableFuture;

public class ActorWithFuture extends AbstractActor {
ActorWithFuture() {
Futures.future(() -> "hello", getContext().dispatcher());
CompletableFuture.supplyAsync(() -> "hello", getContext().dispatcher());
}

@Override
Expand Down
34 changes: 14 additions & 20 deletions docs/src/test/java/jdocs/future/FutureDocTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,18 @@
package jdocs.future;

import org.apache.pekko.actor.typed.ActorSystem;
import org.apache.pekko.dispatch.Futures;
import org.apache.pekko.pattern.Patterns;
import org.apache.pekko.testkit.PekkoJUnitActorSystemResource;
import org.apache.pekko.testkit.PekkoSpec;
import org.apache.pekko.util.Timeout;
import org.apache.pekko.util.FutureConverters;
import jdocs.AbstractJavaTest;
import org.junit.ClassRule;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;

import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.*;

import static org.apache.pekko.actor.typed.javadsl.Adapter.toTyped;
import static org.apache.pekko.dispatch.Futures.future;
// #imports

// #imports
Expand All @@ -50,7 +41,7 @@ public class FutureDocTest extends AbstractJavaTest {

@Test(expected = java.util.concurrent.CompletionException.class)
public void useAfter() throws Exception {
final ExecutionContext ec = system.executionContext();
final Executor ex = system.executionContext();
// #after
CompletionStage<String> failWithException =
CompletableFuture.supplyAsync(
Expand All @@ -60,18 +51,21 @@ public void useAfter() throws Exception {
CompletionStage<String> delayed =
Patterns.after(Duration.ofMillis(200), system, () -> failWithException);
// #after
Future<String> future =
future(
CompletionStage<String> completionStage =
CompletableFuture.supplyAsync(
() -> {
Thread.sleep(1000);
return "foo";
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "foo";
},
ec);
Future<String> result =
Futures.firstCompletedOf(
Arrays.<Future<String>>asList(future, FutureConverters.asScala(delayed)), ec);
ex);
CompletableFuture<Object> result =
CompletableFuture.anyOf(completionStage.toCompletableFuture(), delayed.toCompletableFuture());
Timeout timeout = Timeout.create(Duration.ofSeconds(2));
Await.result(result, timeout.duration());
result.toCompletableFuture().get(2, SECONDS);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
import org.junit.runner.RunWith;
import org.scalatestplus.junit.JUnitRunner;
import scala.concurrent.Future;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;
import org.iq80.leveldb.util.FileUtils;
import java.util.Optional;
Expand Down Expand Up @@ -81,45 +84,45 @@ public Receive createReceive() {

class MySnapshotStore extends SnapshotStore {
@Override
public Future<Optional<SelectedSnapshot>> doLoadAsync(
public CompletionStage<Optional<SelectedSnapshot>> doLoadAsync(
String persistenceId, SnapshotSelectionCriteria criteria) {
return null;
}

@Override
public Future<Void> doSaveAsync(SnapshotMetadata metadata, Object snapshot) {
public CompletionStage<Void> doSaveAsync(SnapshotMetadata metadata, Object snapshot) {
return null;
}

@Override
public Future<Void> doDeleteAsync(SnapshotMetadata metadata) {
return Futures.successful(null);
public CompletionStage<Void> doDeleteAsync(SnapshotMetadata metadata) {
return CompletableFuture.completedFuture(null);
}

@Override
public Future<Void> doDeleteAsync(String persistenceId, SnapshotSelectionCriteria criteria) {
return Futures.successful(null);
public CompletionStage<Void> doDeleteAsync(String persistenceId, SnapshotSelectionCriteria criteria) {
return CompletableFuture.completedFuture(null);
}
}

class MyAsyncJournal extends AsyncWriteJournal {
// #sync-journal-plugin-api
@Override
public Future<Iterable<Optional<Exception>>> doAsyncWriteMessages(
public CompletionStage<Iterable<Optional<Exception>>> doAsyncWriteMessages(
Iterable<AtomicWrite> messages) {
try {
Iterable<Optional<Exception>> result = new ArrayList<Optional<Exception>>();
// blocking call here...
// result.add(..)
return Futures.successful(result);
return CompletableFuture.completedFuture(result);
} catch (Exception e) {
return Futures.failed(e);
return Futures.failedCompletionStage(e);
}
}
// #sync-journal-plugin-api

@Override
public Future<Void> doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr) {
public CompletionStage<Void> doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr) {
return null;
}

Expand Down
5 changes: 2 additions & 3 deletions docs/src/test/java/jdocs/stream/FlowDocTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.Cancellable;
import org.apache.pekko.dispatch.Futures;
import org.apache.pekko.japi.Pair;
import org.apache.pekko.stream.*;
import org.apache.pekko.stream.javadsl.*;
Expand Down Expand Up @@ -158,8 +157,8 @@ public void creatingSourcesSinks() throws Exception {
list.add(3);
Source.from(list);

// Create a source form a Future
Source.future(Futures.successful("Hello Streams!"));
// Create a source form a CompletionStage
Source.completionStage(CompletableFuture.completedFuture("Hello Streams!"));

// Create a source from a single element
Source.single("only one element");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@
package org.apache.pekko.persistence.journal.japi;

import java.util.Optional;

import scala.concurrent.Future;
import java.util.concurrent.CompletionStage;

import org.apache.pekko.persistence.*;

Expand Down Expand Up @@ -73,7 +72,7 @@ interface AsyncWritePlugin {
*
* <p>This call is protected with a circuit-breaker.
*/
Future<Iterable<Optional<Exception>>> doAsyncWriteMessages(Iterable<AtomicWrite> messages);
CompletionStage<Iterable<Optional<Exception>>> doAsyncWriteMessages(Iterable<AtomicWrite> messages);

/**
* Java API, Plugin API: synchronously deletes all persistent messages up to `toSequenceNr`.
Expand All @@ -82,6 +81,6 @@ interface AsyncWritePlugin {
*
* @see AsyncRecoveryPlugin
*/
Future<Void> doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr);
CompletionStage<Void> doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr);
// #async-write-plugin-api
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import scala.concurrent.Future;

import java.util.Optional;
import java.util.concurrent.CompletionStage;

interface SnapshotStorePlugin {
// #snapshot-store-plugin-api
Expand All @@ -28,7 +29,7 @@ interface SnapshotStorePlugin {
* @param persistenceId id of the persistent actor.
* @param criteria selection criteria for loading.
*/
Future<Optional<SelectedSnapshot>> doLoadAsync(
CompletionStage<Optional<SelectedSnapshot>> doLoadAsync(
String persistenceId, SnapshotSelectionCriteria criteria);

/**
Expand All @@ -37,21 +38,21 @@ Future<Optional<SelectedSnapshot>> doLoadAsync(
* @param metadata snapshot metadata.
* @param snapshot snapshot.
*/
Future<Void> doSaveAsync(SnapshotMetadata metadata, Object snapshot);
CompletionStage<Void> doSaveAsync(SnapshotMetadata metadata, Object snapshot);

/**
* Java API, Plugin API: deletes the snapshot identified by `metadata`.
*
* @param metadata snapshot metadata.
*/
Future<Void> doDeleteAsync(SnapshotMetadata metadata);
CompletionStage<Void> doDeleteAsync(SnapshotMetadata metadata);

/**
* Java API, Plugin API: deletes all snapshots matching `criteria`.
*
* @param persistenceId id of the persistent actor.
* @param criteria selection criteria for deleting.
*/
Future<Void> doDeleteAsync(String persistenceId, SnapshotSelectionCriteria criteria);
CompletionStage<Void> doDeleteAsync(String persistenceId, SnapshotSelectionCriteria criteria);
// #snapshot-store-plugin-api
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.util.Try
import org.apache.pekko
import pekko.persistence._
import pekko.persistence.journal.{ AsyncWriteJournal => SAsyncWriteJournal }
import pekko.util.FutureConverters._
import pekko.util.ccompat._
import pekko.util.ccompat.JavaConverters._

Expand All @@ -33,7 +34,7 @@ abstract class AsyncWriteJournal extends AsyncRecovery with SAsyncWriteJournal w
import context.dispatcher

final def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] =
doAsyncWriteMessages(messages.asJava).map { results =>
doAsyncWriteMessages(messages.asJava).asScala.map { results =>
results.asScala.iterator
.map { r =>
if (r.isPresent) Failure(r.get)
Expand All @@ -42,6 +43,7 @@ abstract class AsyncWriteJournal extends AsyncRecovery with SAsyncWriteJournal w
.to(immutable.IndexedSeq)
}

final def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long) =
doAsyncDeleteMessagesTo(persistenceId, toSequenceNr).map(_ => ())
final def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = {
doAsyncDeleteMessagesTo(persistenceId, toSequenceNr).asScala.map(_ => ())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package org.apache.pekko.persistence.snapshot.japi
import scala.concurrent.Future

import org.apache.pekko
import pekko.util.FutureConverters._
import pekko.japi.Util._
import pekko.persistence._
import pekko.persistence.snapshot.{ SnapshotStore => SSnapshotStore }
Expand All @@ -29,15 +30,15 @@ abstract class SnapshotStore extends SSnapshotStore with SnapshotStorePlugin {
override final def loadAsync(
persistenceId: String,
criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] =
doLoadAsync(persistenceId, criteria).map(option)
doLoadAsync(persistenceId, criteria).asScala.map(option)

override final def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] =
doSaveAsync(metadata, snapshot).map(_ => ())
doSaveAsync(metadata, snapshot).asScala.map(_ => ())

override final def deleteAsync(metadata: SnapshotMetadata): Future[Unit] =
doDeleteAsync(metadata).map(_ => ())
doDeleteAsync(metadata).asScala.map(_ => ())

override final def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] =
doDeleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria).map(_ => ())
doDeleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria).asScala.map(_ => ())

}

0 comments on commit 59d0af4

Please sign in to comment.