Skip to content

Commit

Permalink
Merge pull request #62 from ossgang/update-operations
Browse files Browse the repository at this point in the history
Update operations
  • Loading branch information
kaifox authored May 18, 2021
2 parents 4f220aa + 971090f commit ebfc1c8
Show file tree
Hide file tree
Showing 61 changed files with 694 additions and 311 deletions.
14 changes: 7 additions & 7 deletions src/main/java/org/ossgang/commons/awaitables/BaseAwaitable.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package org.ossgang.commons.awaitables;

import org.ossgang.commons.awaitables.exceptions.AwaitRetryCountException;
import org.ossgang.commons.awaitables.exceptions.AwaitTimeoutException;
import static java.time.Duration.ZERO;
import static java.util.concurrent.CompletableFuture.supplyAsync;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.ossgang.commons.utils.NamedDaemonThreadFactory.daemonThreadFactoryWithPrefix;

import java.time.Duration;
import java.time.Instant;
Expand All @@ -14,11 +17,8 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import static java.time.Duration.ZERO;
import static java.util.concurrent.CompletableFuture.supplyAsync;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.ossgang.commons.utils.NamedDaemonThreadFactory.daemonThreadFactoryWithPrefix;
import org.ossgang.commons.awaitables.exceptions.AwaitRetryCountException;
import org.ossgang.commons.awaitables.exceptions.AwaitTimeoutException;

/**
* Base class for {@link Retry} and {@link Await} providing the common parts of the await DSL.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@
import static org.ossgang.commons.mapbackeds.MapbackedInternals.toStringMethod;

import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.invoke.MethodHandles.Lookup;
import java.lang.invoke.MethodType;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;

/**
Expand All @@ -25,7 +26,7 @@
* <li>delegate calls to default methods of the proxied interface to the real implementations</li>
* </ul>
*/
class MapbackedObject implements InvocationHandler {
final class MapbackedObject implements InvocationHandler {

private final Class<?> intfc;
private final Map<String, Object> fieldValues;
Expand Down Expand Up @@ -100,10 +101,13 @@ public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {

Optional<MapbackedObject> handler = Mapbackeds.handlerFrom(obj);
if(!handler.isPresent()) {
return false;
}
MapbackedObject other = (MapbackedObject) obj;

MapbackedObject other = handler.get();
return Objects.equals(fieldMethods, other.fieldMethods) && Objects.equals(fieldValues, other.fieldValues)
&& Objects.equals(intfc, other.intfc);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public static boolean isMapbacked(Object object) {
return handlerFrom(object).isPresent();
}

private static Optional<MapbackedObject> handlerFrom(Object object) {
static Optional<MapbackedObject> handlerFrom(Object object) {
if (!(object instanceof Proxy)) {
return Optional.empty();
}
Expand Down
16 changes: 8 additions & 8 deletions src/main/java/org/ossgang/commons/monads/AsyncMaybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,6 @@

package org.ossgang.commons.monads;

import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;

import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.supplyAsync;
import static java.util.concurrent.Executors.newCachedThreadPool;
Expand All @@ -35,6 +27,14 @@
import static org.ossgang.commons.utils.Uncheckeds.uncheckedRunnable;
import static org.ossgang.commons.utils.Uncheckeds.uncheckedSupplier;

import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;

/**
* This utility class implements the concept of a "Maybe" or "Try" {@link Optional} that can be resolved at some point
* in the future. It allows to run code asynchronously and then fetch back or react on the result.
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/org/ossgang/commons/monads/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@

package org.ossgang.commons.monads;

import static java.util.Objects.requireNonNull;
import static org.ossgang.commons.utils.Uncheckeds.asUnchecked;

import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.function.Consumer;

import static java.util.Objects.requireNonNull;
import static org.ossgang.commons.utils.Uncheckeds.asUnchecked;

/**
* This utility class implements the concept of a "Maybe" or "Try" {@link Optional}. A Maybe&lt;T&gt; either carries a
* T or an exception that occurred when producing it.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package org.ossgang.commons.observables;

import org.ossgang.commons.observables.exceptions.UnhandledException;

import static org.ossgang.commons.observables.ExceptionHandlers.dispatchToUncaughtExceptionHandler;

import org.ossgang.commons.observables.exceptions.UnhandledException;

/**
* An immutable implementation of {@link ObservableValue} which always returns null on get(),
* and immediately dispatches a constant exception as a single update on subscribe().
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package org.ossgang.commons.observables;

import org.ossgang.commons.observables.exceptions.UpdateDeliveryException;

import static org.ossgang.commons.observables.ExceptionHandlers.dispatchToUncaughtExceptionHandler;

import org.ossgang.commons.observables.exceptions.UpdateDeliveryException;

/**
* A fixed-value, immutable implementation of {@link ObservableValue}. Such an observable always returns the same
* constant value on get(), and immediately dispatches one single update on subscribe().
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@

package org.ossgang.commons.observables;

import org.ossgang.commons.observables.exceptions.UnhandledException;
import org.ossgang.commons.observables.exceptions.UpdateDeliveryException;
import static java.util.Collections.newSetFromMap;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static org.ossgang.commons.observables.ExceptionHandlers.dispatchToUncaughtExceptionHandler;
import static org.ossgang.commons.utils.NamedDaemonThreadFactory.daemonThreadFactoryWithPrefix;

import java.util.Arrays;
import java.util.HashSet;
Expand All @@ -37,10 +39,8 @@
import java.util.function.Consumer;
import java.util.function.Predicate;

import static java.util.Collections.newSetFromMap;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static org.ossgang.commons.observables.ExceptionHandlers.dispatchToUncaughtExceptionHandler;
import static org.ossgang.commons.utils.NamedDaemonThreadFactory.daemonThreadFactoryWithPrefix;
import org.ossgang.commons.observables.exceptions.UnhandledException;
import org.ossgang.commons.observables.exceptions.UpdateDeliveryException;

/**
* A basic implementation of {@link Observable} managing a set of listeners, and dispatching updates to them.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,19 @@

package org.ossgang.commons.observables;

import java.util.*;
import java.util.concurrent.atomic.AtomicReference;

import static java.util.Objects.requireNonNull;
import static org.ossgang.commons.observables.SubscriptionOptions.FIRST_UPDATE;
import static org.ossgang.commons.observables.SubscriptionOptions.ON_CHANGE;
import static org.ossgang.commons.utils.Uncheckeds.uncheckedConsumer;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BinaryOperator;

/**
* A basic implementation of {@link ObservableValue}, based on {@link DispatchingObservable} to handle the update
* dispatching, and keeping track of the latest value in a thread safe way.
Expand All @@ -51,18 +56,44 @@ public Subscription subscribe(Observer<? super T> listener, SubscriptionOption..
Set<SubscriptionOption> optionSet = new HashSet<>(Arrays.asList(options));
Subscription subscription = super.subscribe(listener, options);
if (optionSet.contains(FIRST_UPDATE)) {
Optional.ofNullable(lastValue.get()).ifPresent(uncheckedConsumer(value -> dispatch(listener::onValue, value).get()));
Optional.ofNullable(lastValue.get())
.ifPresent(uncheckedConsumer(value -> dispatch(listener::onValue, value).get()));
}
return subscription;
}

@Override
protected void dispatchValue(T newValue) {
requireNonNull(newValue, "null value not allowed");
if (Objects.equals(lastValue.getAndSet(newValue), newValue)) {
super.dispatchValue(newValue, s -> !s.contains(ON_CHANGE));
accumulate(newValue, (old, update) -> update);
}

/**
* This is the most generic way to update the internal reference: It uses and accumulator function to transit from
* the current value to a new one. The new value will be the result of the accumulator function with the current
* value as first parameter and the update value as second parameter.
*
* @param x the update value
* @param accumulatorFunction a side-effect-free function of two arguments
* @return a transition object, containing both, the original value and the updated (new) value
*/
protected Transition<T> accumulate(T x, BinaryOperator<T> accumulatorFunction) {
AtomicReference<T> newValue = new AtomicReference<>();
T oldValue = lastValue.getAndAccumulate(x, (old, update) -> {
T v = accumulatorFunction.apply(old, update);
requireNonNull(v, "updated value must not be null.");
newValue.set(v);
return v;
});
Transition<T> transition = Transition.fromTo(oldValue, newValue.get());
dispatch(transition);
return transition;
}

private void dispatch(Transition<T> transition) {
if (Objects.equals(transition.oldValue(), transition.newValue())) {
super.dispatchValue(transition.newValue(), s -> !s.contains(ON_CHANGE));
} else {
super.dispatchValue(newValue);
super.dispatchValue(transition.newValue());
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/main/java/org/ossgang/commons/observables/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@
package org.ossgang.commons.observables;


import org.ossgang.commons.monads.Maybe;
import org.ossgang.commons.observables.operators.BlockingOperators;
import org.ossgang.commons.observables.operators.DerivedObservableValue;

import java.time.Duration;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Predicate;

import org.ossgang.commons.monads.Maybe;
import org.ossgang.commons.observables.operators.BlockingOperators;
import org.ossgang.commons.observables.operators.DerivedObservableValue;

/**
* An stream of objects of type T, which can be subscribed to by interested consumers.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@

package org.ossgang.commons.observables;

import org.ossgang.commons.observables.operators.DerivedObservableValue;

import java.util.Optional;
import java.util.function.Function;
import java.util.function.Predicate;

import org.ossgang.commons.observables.operators.DerivedObservableValue;

/**
* An observable of type T which has an actual value.
*
Expand Down
29 changes: 19 additions & 10 deletions src/main/java/org/ossgang/commons/observables/Observables.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,25 @@

package org.ossgang.commons.observables;

import org.ossgang.commons.monads.*;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

import org.ossgang.commons.monads.Consumer3;
import org.ossgang.commons.monads.Consumer4;
import org.ossgang.commons.monads.Consumer5;
import org.ossgang.commons.monads.Function3;
import org.ossgang.commons.monads.Function4;
import org.ossgang.commons.monads.Function5;
import org.ossgang.commons.observables.exceptions.UnhandledException;
import org.ossgang.commons.observables.exceptions.UpdateDeliveryException;
import org.ossgang.commons.observables.operators.CombinationOperators;
Expand All @@ -30,15 +48,6 @@
import org.ossgang.commons.observables.operators.connectors.ConnectorObservables;
import org.ossgang.commons.observables.operators.connectors.DynamicConnectorObservableValue;

import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.*;

/**
* Static support class for dealing with {@link Observable} and {@link ObservableValue}.
*/
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/ossgang/commons/observables/Observers.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package org.ossgang.commons.observables;

import org.ossgang.commons.monads.Maybe;

import java.util.function.Consumer;

import org.ossgang.commons.monads.Maybe;

/**
* Utility class to create {@link Observer} instances.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.ossgang.commons.utils.NamedDaemonThreadFactory;

/**
* Creates an observable value that emits periodically at the given rate. The value emitted is the current time (as
* instant). Emitting is done on a single thread.
Expand Down
Loading

0 comments on commit ebfc1c8

Please sign in to comment.