From 7fe0c445e189cb04e51596b0f2d984cdf3b9e4ff Mon Sep 17 00:00:00 2001 From: Nitesh Kant Date: Mon, 9 Jan 2017 14:11:19 -0800 Subject: [PATCH] Event publishing for `LoadBalancer` (#218) * Event publishing for `LoadBalancer` __Problem__ No events are published for `LoadBalancer` __Modification__ Publishing events for `LoadBalancer` __Result__ More events, more insight! --- .../reactivesocket/client/LoadBalancer.java | 242 ++++++++++++++---- .../client/LoadBalancerInitializer.java | 20 +- .../client/LoadBalancerSocketMetrics.java | 64 +++++ .../client/LoadBalancingClient.java | 1 + .../events/LoadBalancingClientListener.java | 23 +- .../LoggingLoadBalancingClientListener.java | 70 +++++ 6 files changed, 361 insertions(+), 59 deletions(-) create mode 100644 reactivesocket-client/src/main/java/io/reactivesocket/client/LoadBalancerSocketMetrics.java create mode 100644 reactivesocket-client/src/main/java/io/reactivesocket/client/events/LoggingLoadBalancingClientListener.java diff --git a/reactivesocket-client/src/main/java/io/reactivesocket/client/LoadBalancer.java b/reactivesocket-client/src/main/java/io/reactivesocket/client/LoadBalancer.java index 0491b2777..f313a1f9c 100644 --- a/reactivesocket-client/src/main/java/io/reactivesocket/client/LoadBalancer.java +++ b/reactivesocket-client/src/main/java/io/reactivesocket/client/LoadBalancer.java @@ -15,11 +15,17 @@ */ package io.reactivesocket.client; +import io.reactivesocket.Availability; import io.reactivesocket.Payload; import io.reactivesocket.ReactiveSocket; +import io.reactivesocket.client.events.LoadBalancingClientListener; +import io.reactivesocket.events.ClientEventListener; +import io.reactivesocket.events.EventSource; import io.reactivesocket.exceptions.NoAvailableReactiveSocketException; import io.reactivesocket.exceptions.TimeoutException; import io.reactivesocket.exceptions.TransportException; +import io.reactivesocket.internal.DisabledEventPublisher; +import io.reactivesocket.internal.EventPublisher; import io.reactivesocket.reactivestreams.extensions.Px; import io.reactivesocket.reactivestreams.extensions.internal.EmptySubject; import io.reactivesocket.reactivestreams.extensions.internal.ValidatingSubscription; @@ -40,7 +46,6 @@ import java.util.Collection; import java.util.HashSet; import java.util.Iterator; -import java.util.List; import java.util.Random; import java.util.Set; import java.util.concurrent.ThreadLocalRandom; @@ -84,8 +89,8 @@ public class LoadBalancer implements ReactiveSocket { private Runnable readyCallback; private int pendingSockets; - private final List activeSockets; - private final List activeFactories; + private final ActiveList activeSockets; + private final ActiveList activeFactories; private final FactoriesRefresher factoryRefresher; private final Ewma pendings; @@ -95,6 +100,9 @@ public class LoadBalancer implements ReactiveSocket { private volatile long lastRefresh; private final EmptySubject closeSubject = new EmptySubject(); + private final LoadBalancingClientListener eventListener; + private final EventPublisher eventPublisher; + /** * * @param factories the source (factories) of ReactiveSocket @@ -124,14 +132,23 @@ public LoadBalancer( double maxPendings, int minAperture, int maxAperture, - long maxRefreshPeriodMs + long maxRefreshPeriodMs, + EventPublisher eventPublisher ) { this.expFactor = expFactor; this.lowerQuantile = new FrugalQuantile(lowQuantile); this.higherQuantile = new FrugalQuantile(highQuantile); + this.eventPublisher = eventPublisher; - this.activeSockets = new ArrayList<>(128); - this.activeFactories = new ArrayList<>(128); + if (eventPublisher.isEventPublishingEnabled() + && eventPublisher.getEventListener() instanceof LoadBalancingClientListener) { + eventListener = (LoadBalancingClientListener) eventPublisher.getEventListener(); + } else { + eventListener = null; + } + + this.activeSockets = new ActiveList<>(eventListener, false); + this.activeFactories = new ActiveList<>(eventListener, true); this.pendingSockets = 0; this.factoryRefresher = new FactoriesRefresher(); @@ -147,7 +164,6 @@ public LoadBalancer( this.lastApertureRefresh = Clock.now(); this.refreshPeriod = Clock.unit().convert(15L, TimeUnit.SECONDS); this.lastRefresh = Clock.now(); - factories.subscribe(factoryRefresher); } @@ -157,17 +173,20 @@ public LoadBalancer(Publisher> factor DEFAULT_LOWER_QUANTILE, DEFAULT_HIGHER_QUANTILE, DEFAULT_MIN_PENDING, DEFAULT_MAX_PENDING, DEFAULT_MIN_APERTURE, DEFAULT_MAX_APERTURE, - DEFAULT_MAX_REFRESH_PERIOD_MS + DEFAULT_MAX_REFRESH_PERIOD_MS, + new DisabledEventPublisher<>() ); } - LoadBalancer(Publisher> factories, Runnable readyCallback) { + LoadBalancer(Publisher> factories, Runnable readyCallback, + EventPublisher eventPublisher) { this(factories, DEFAULT_EXP_FACTOR, DEFAULT_LOWER_QUANTILE, DEFAULT_HIGHER_QUANTILE, DEFAULT_MIN_PENDING, DEFAULT_MAX_PENDING, DEFAULT_MIN_APERTURE, DEFAULT_MAX_APERTURE, - DEFAULT_MAX_REFRESH_PERIOD_MS + DEFAULT_MAX_REFRESH_PERIOD_MS, + eventPublisher ); this.readyCallback = readyCallback; } @@ -214,7 +233,7 @@ private synchronized void addSockets(int numberOfNewSocket) { while (n > 0) { int size = activeFactories.size(); if (size == 1) { - ReactiveSocketClient factory = activeFactories.get(0); + ReactiveSocketClient factory = activeFactories.holder.get(0); if (factory.availability() > 0.0) { activeFactories.remove(0); pendingSockets++; @@ -232,8 +251,8 @@ private synchronized void addSockets(int numberOfNewSocket) { if (i1 >= i0) { i1++; } - factory0 = activeFactories.get(i0); - factory1 = activeFactories.get(i1); + factory0 = activeFactories.holder.get(i0); + factory1 = activeFactories.holder.get(i1); if (factory0.availability() > 0.0 && factory1.availability() > 0.0) { break; } @@ -245,7 +264,7 @@ private synchronized void addSockets(int numberOfNewSocket) { // cheaper to permute activeFactories.get(i1) with the last item and remove the last // rather than doing a activeFactories.remove(i1) if (i1 < size - 1) { - activeFactories.set(i1, activeFactories.get(size - 1)); + activeFactories.set(i1, activeFactories.holder.get(size - 1)); } activeFactories.remove(size - 1); factory1.connect().subscribe(new SocketAdder(factory1)); @@ -254,7 +273,7 @@ private synchronized void addSockets(int numberOfNewSocket) { pendingSockets++; // c.f. above if (i0 < size - 1) { - activeFactories.set(i0, activeFactories.get(size - 1)); + activeFactories.set(i0, activeFactories.holder.get(size - 1)); } activeFactories.remove(size - 1); factory0.connect().subscribe(new SocketAdder(factory0)); @@ -269,7 +288,7 @@ private synchronized void refreshAperture() { } double p = 0.0; - for (WeightedSocket wrs: activeSockets) { + for (WeightedSocket wrs: activeSockets.holder) { p += wrs.getPending(); } p /= n + pendingSockets; @@ -300,6 +319,9 @@ private void updateAperture(int newValue, long now) { pendings.reset((minPendings + maxPendings)/2); if (targetAperture != previous) { + if (eventListener != null) { + eventListener.apertureChanged(previous, targetAperture); + } logger.debug("Current pending={}, new target={}, previous target={}", pendings.value(), targetAperture, previous); } @@ -313,9 +335,8 @@ private void updateAperture(int newValue, long now) { */ private synchronized void refreshSockets() { refreshAperture(); - int n = pendingSockets + activeSockets.size(); - if (n < targetAperture && !activeFactories.isEmpty()) { + if (n < targetAperture && !activeFactories.holder.isEmpty()) { logger.debug("aperture {} is below target {}, adding {} sockets", n, targetAperture, targetAperture - n); addSockets(targetAperture - n); @@ -326,15 +347,22 @@ private synchronized void refreshSockets() { } long now = Clock.now(); - if (now - lastRefresh < refreshPeriod) { - return; + if (now - lastRefresh >= refreshPeriod) { + if (eventListener != null) { + eventListener.socketsRefreshStart(); + } + long prev = refreshPeriod; + refreshPeriod = (long) Math.min(refreshPeriod * 1.5, maxRefreshPeriod); + logger.debug("Bumping refresh period, {}->{}", prev / 1000, refreshPeriod / 1000); + if (prev != refreshPeriod && eventListener != null) { + eventListener.socketRefreshPeriodChanged(prev, refreshPeriod, Clock.unit()); + } + lastRefresh = now; + addSockets(1); + if (eventListener != null) { + eventListener.socketsRefreshCompleted(Clock.elapsedSince(now), Clock.unit()); + } } - - long prev = refreshPeriod; - refreshPeriod = (long) Math.min(refreshPeriod * 1.5, maxRefreshPeriod); - logger.debug("Bumping refresh period, {}->{}", prev/1000, refreshPeriod/1000); - lastRefresh = now; - addSockets(1); } private synchronized void quickSlowestRS() { @@ -344,7 +372,7 @@ private synchronized void quickSlowestRS() { WeightedSocket slowest = null; double lowestAvailability = Double.MAX_VALUE; - for (WeightedSocket socket: activeSockets) { + for (WeightedSocket socket: activeSockets.holder) { double load = socket.availability(); if (load == 0.0) { slowest = socket; @@ -378,8 +406,8 @@ private synchronized void removeSocket(WeightedSocket socket) { @Override public synchronized double availability() { double currentAvailability = 0.0; - if (!activeSockets.isEmpty()) { - for (WeightedSocket rs : activeSockets) { + if (!activeSockets.holder.isEmpty()) { + for (WeightedSocket rs : activeSockets.holder) { currentAvailability += rs.availability(); } currentAvailability /= activeSockets.size(); @@ -389,14 +417,14 @@ public synchronized double availability() { } private synchronized ReactiveSocket select() { - if (activeSockets.isEmpty()) { + if (activeSockets.holder.isEmpty()) { return FAILING_REACTIVE_SOCKET; } refreshSockets(); int size = activeSockets.size(); if (size == 1) { - return activeSockets.get(0); + return activeSockets.holder.get(0); } WeightedSocket rsc1 = null; @@ -409,12 +437,12 @@ private synchronized ReactiveSocket select() { if (i2 >= i1) { i2++; } - rsc1 = activeSockets.get(i1); - rsc2 = activeSockets.get(i2); + rsc1 = activeSockets.holder.get(i1); + rsc2 = activeSockets.holder.get(i2); if (rsc1.availability() > 0.0 && rsc2.availability() > 0.0) { break; } - if (i+1 == EFFORT && !activeFactories.isEmpty()) { + if (i+1 == EFFORT && !activeFactories.holder.isEmpty()) { addSockets(1); } } @@ -474,7 +502,7 @@ public Publisher close() { activeFactories.clear(); AtomicInteger n = new AtomicInteger(activeSockets.size()); - activeSockets.forEach(rs -> { + activeSockets.holder.forEach(rs -> { rs.close().subscribe(new Subscriber() { @Override public void onSubscribe(Subscription s) { @@ -527,8 +555,8 @@ public void onNext(Collection newFactories) { Set current = new HashSet<>(activeFactories.size() + activeSockets.size()); - current.addAll(activeFactories); - for (WeightedSocket socket: activeSockets) { + current.addAll(activeFactories.holder); + for (WeightedSocket socket: activeSockets.holder) { ReactiveSocketClient factory = socket.getFactory(); current.add(factory); } @@ -540,11 +568,12 @@ public void onNext(Collection newFactories) { added.removeAll(current); boolean changed = false; - Iterator it0 = activeSockets.iterator(); + Iterator it0 = activeSockets.holder.iterator(); while (it0.hasNext()) { WeightedSocket socket = it0.next(); if (removed.contains(socket.getFactory())) { it0.remove(); + activeSockets.publishRemoveEvent(socket); try { changed = true; socket.close(); @@ -553,11 +582,12 @@ public void onNext(Collection newFactories) { } } } - Iterator it1 = activeFactories.iterator(); + Iterator it1 = activeFactories.holder.iterator(); while (it1.hasNext()) { ReactiveSocketClient factory = it1.next(); if (removed.contains(factory)) { it1.remove(); + activeFactories.publishRemoveEvent(factory); changed = true; } } @@ -567,11 +597,11 @@ public void onNext(Collection newFactories) { if (changed && logger.isDebugEnabled()) { StringBuilder msgBuilder = new StringBuilder(); msgBuilder.append("\nUpdated active factories (size: " + activeFactories.size() + ")\n"); - for (ReactiveSocketClient f : activeFactories) { + for (ReactiveSocketClient f : activeFactories.holder) { msgBuilder.append(" + ").append(f).append('\n'); } msgBuilder.append("Active sockets:\n"); - for (WeightedSocket socket: activeSockets) { + for (WeightedSocket socket: activeSockets.holder) { msgBuilder.append(" + ").append(socket).append('\n'); } logger.debug(msgBuilder.toString()); @@ -600,7 +630,7 @@ void close() { private class SocketAdder implements Subscriber { private final ReactiveSocketClient factory; - private int errors = 0; + private int errors; private SocketAdder(ReactiveSocketClient factory) { this.factory = factory; @@ -622,6 +652,9 @@ public void onNext(ReactiveSocket rs) { logger.debug("Adding new WeightedSocket {}", weightedSocket); activeSockets.add(weightedSocket); + if (eventListener != null) { + eventListener.socketAdded(weightedSocket); + } if (readyCallback != null) { readyCallback.run(); } @@ -712,7 +745,8 @@ public Publisher onClose() { * Wrapper of a ReactiveSocket, it computes statistics about the req/resp calls and * update availability accordingly. */ - private class WeightedSocket extends ReactiveSocketProxy { + private class WeightedSocket extends ReactiveSocketProxy implements LoadBalancerSocketMetrics { + private static final double STARTUP_PENALTY = Long.MAX_VALUE >> 12; private final ReactiveSocket child; @@ -887,6 +921,36 @@ public String toString() { + ")->" + child; } + @Override + public double medianLatency() { + return median.estimation(); + } + + @Override + public double lowerQuantileLatency() { + return lowerQuantile.estimation(); + } + + @Override + public double higherQuantileLatency() { + return higherQuantile.estimation(); + } + + @Override + public double interArrivalTime() { + return interArrivalTime.value(); + } + + @Override + public int pending() { + return pending; + } + + @Override + public long lastTimeUsedMillis() { + return stamp0; + } + /** * Subscriber wrapper used for request/response interaction model, measure and collect * latency information. @@ -990,4 +1054,96 @@ public void onComplete() { } } } + + private class ActiveList { + + private final ArrayList holder; + private final LoadBalancingClientListener listener; + private final boolean server; + + public ActiveList(LoadBalancingClientListener listener, boolean server) { + this.listener = listener; + this.server = server; + holder = new ArrayList(128); + } + + public void add(T item) { + holder.add(item); + publishAddEvent(item); + } + + public T remove(int index) { + T item = holder.remove(index); + if (item != null) { + publishRemoveEvent(item); + } + return item; + } + + public boolean remove(T item) { + boolean removed = holder.remove(item); + if (removed) { + publishRemoveEvent(item); + } + return removed; + } + + public T set(int index, T item) { + T prev = holder.set(index, item); + if (prev != null) { + publishRemoveEvent(prev); + } + publishAddEvent(item); + return prev; + } + + public void addAll(Collection toAdd) { + holder.addAll(toAdd); + if (listener != null) { + for (T t : toAdd) { + publishAddEvent(t); + } + } + } + + public void clear() { + if (listener != null) { + for (T t : holder) { + publishRemoveEvent(t); + } + } + holder.clear(); + } + + public int size() { + return holder.size(); + } + + private void publishRemoveEvent(T item) { + if (listener == null) { + return; + } + if (server) { + listener.serverRemoved(item); + } else { + listener.socketRemoved(item); + } + } + + private void publishAddEvent(T item) { + if (server && eventPublisher.isEventPublishingEnabled()) { + @SuppressWarnings("unchecked") + EventSource src = (EventSource) item; + src.subscribe(eventPublisher.getEventListener()); + } + if (listener == null) { + return; + } + if (server) { + listener.serverAdded(item); + } else { + listener.socketAdded(item); + } + } + } } diff --git a/reactivesocket-client/src/main/java/io/reactivesocket/client/LoadBalancerInitializer.java b/reactivesocket-client/src/main/java/io/reactivesocket/client/LoadBalancerInitializer.java index 0ac2c60ed..274a7add0 100644 --- a/reactivesocket-client/src/main/java/io/reactivesocket/client/LoadBalancerInitializer.java +++ b/reactivesocket-client/src/main/java/io/reactivesocket/client/LoadBalancerInitializer.java @@ -17,6 +17,8 @@ package io.reactivesocket.client; import io.reactivesocket.ReactiveSocket; +import io.reactivesocket.events.AbstractEventSource; +import io.reactivesocket.events.ClientEventListener; import io.reactivesocket.reactivestreams.extensions.Px; import io.reactivesocket.reactivestreams.extensions.internal.ValidatingSubscription; import org.reactivestreams.Publisher; @@ -31,21 +33,30 @@ * This is a temporary class to provide a {@link LoadBalancingClient#connect()} implementation when {@link LoadBalancer} * does not support it. */ -final class LoadBalancerInitializer implements Runnable { +final class LoadBalancerInitializer extends AbstractEventSource implements Runnable { private volatile LoadBalancer loadBalancer; private final Publisher emitSource; private boolean ready; // Guarded by this. + private boolean created; // Guarded by this. private final List> earlySubscribers = new CopyOnWriteArrayList<>(); - private LoadBalancerInitializer() { + private LoadBalancerInitializer(Publisher> factories) { emitSource = s -> { final boolean _emit; + final boolean _create; synchronized (this) { + _create = !created; _emit = ready; if (!_emit) { earlySubscribers.add(s); } + if (!created) { + created = true; + } + } + if (_create) { + loadBalancer = new LoadBalancer(factories, this, this); } if (_emit) { s.onSubscribe(ValidatingSubscription.empty(s)); @@ -56,10 +67,7 @@ private LoadBalancerInitializer() { } static LoadBalancerInitializer create(Publisher> factories) { - final LoadBalancerInitializer initializer = new LoadBalancerInitializer(); - final LoadBalancer loadBalancer = new LoadBalancer(factories, initializer); - initializer.loadBalancer = loadBalancer; - return initializer; + return new LoadBalancerInitializer(factories); } Publisher connect() { diff --git a/reactivesocket-client/src/main/java/io/reactivesocket/client/LoadBalancerSocketMetrics.java b/reactivesocket-client/src/main/java/io/reactivesocket/client/LoadBalancerSocketMetrics.java new file mode 100644 index 000000000..0201959d2 --- /dev/null +++ b/reactivesocket-client/src/main/java/io/reactivesocket/client/LoadBalancerSocketMetrics.java @@ -0,0 +1,64 @@ +/* + * Copyright 2017 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.reactivesocket.client; + +import io.reactivesocket.Availability; + +/** + * A contract for the metrics managed by {@link LoadBalancer} per socket. + */ +public interface LoadBalancerSocketMetrics extends Availability { + + /** + * Median value of latency as per last calculation. This is not calculated per invocation. + * + * @return Median latency. + */ + double medianLatency(); + + /** + * Lower quantile of latency as per last calculation. This is not calculated per invocation. + * + * @return Median latency. + */ + double lowerQuantileLatency(); + + /** + * Higher quantile value of latency as per last calculation. This is not calculated per invocation. + * + * @return Median latency. + */ + double higherQuantileLatency(); + + /** + * An exponentially weighted moving average value of the time between two requests. + * + * @return Inter arrival time. + */ + double interArrivalTime(); + + /** + * Number of pending requests at this moment. + * + * @return Number of pending requests at this moment. + */ + int pending(); + + /** + * Last time this socket was used i.e. either a request was sent or a response was received. + * + * @return Last time used in millis since epoch. + */ + long lastTimeUsedMillis(); +} diff --git a/reactivesocket-client/src/main/java/io/reactivesocket/client/LoadBalancingClient.java b/reactivesocket-client/src/main/java/io/reactivesocket/client/LoadBalancingClient.java index 90f0113c1..823bc9818 100644 --- a/reactivesocket-client/src/main/java/io/reactivesocket/client/LoadBalancingClient.java +++ b/reactivesocket-client/src/main/java/io/reactivesocket/client/LoadBalancingClient.java @@ -36,6 +36,7 @@ public class LoadBalancingClient extends AbstractReactiveSocketClient { private final LoadBalancerInitializer initializer; public LoadBalancingClient(LoadBalancerInitializer initializer) { + super(initializer); this.initializer = initializer; } diff --git a/reactivesocket-client/src/main/java/io/reactivesocket/client/events/LoadBalancingClientListener.java b/reactivesocket-client/src/main/java/io/reactivesocket/client/events/LoadBalancingClientListener.java index 3ebe31cac..0d0946593 100644 --- a/reactivesocket-client/src/main/java/io/reactivesocket/client/events/LoadBalancingClientListener.java +++ b/reactivesocket-client/src/main/java/io/reactivesocket/client/events/LoadBalancingClientListener.java @@ -13,6 +13,7 @@ package io.reactivesocket.client.events; +import io.reactivesocket.Availability; import io.reactivesocket.client.LoadBalancingClient; import io.reactivesocket.events.ClientEventListener; @@ -27,45 +28,47 @@ public interface LoadBalancingClientListener extends ClientEventListener { /** * Event when a new socket is added to the load balancer. * - * @param socketAddress Address for the socket. + * @param availability Availability for the added socket. */ - default void socketAdded(SocketAddress socketAddress) {} + default void socketAdded(Availability availability) {} /** * Event when a socket is removed from the load balancer. * - * @param socketAddress Address for the socket. + * @param availability Availability for the removed socket. */ - default void socketRemoved(SocketAddress socketAddress) {} + default void socketRemoved(Availability availability) {} /** * An event when a server is added to the load balancer. * - * @param socketAddress Address for the server. + * @param availability Availability of the added server. */ - default void serverAdded(SocketAddress socketAddress) {} + default void serverAdded(Availability availability) {} /** * An event when a server is removed from the load balancer. * - * @param socketAddress Address for the server. + * @param availability Availability of the removed server. */ - default void serverRemoved(SocketAddress socketAddress) {} + default void serverRemoved(Availability availability) {} /** * An event when the expected number of active sockets held by the load balancer changes. * + * @param oldAperture Old aperture size, i.e. expected number of active sockets. * @param newAperture New aperture size, i.e. expected number of active sockets. */ - default void apertureChanged(int newAperture) {} + default void apertureChanged(int oldAperture, int newAperture) {} /** * An event when the expected time period for refreshing active sockets in the load balancer changes. * + * @param oldPeriod Old refresh period. * @param newPeriod New refresh period. * @param periodUnit {@link TimeUnit} for the refresh period. */ - default void socketRefreshPeriodChanged(long newPeriod, TimeUnit periodUnit) {} + default void socketRefreshPeriodChanged(long oldPeriod, long newPeriod, TimeUnit periodUnit) {} /** * An event to mark the start of the socket refresh cycle. diff --git a/reactivesocket-client/src/main/java/io/reactivesocket/client/events/LoggingLoadBalancingClientListener.java b/reactivesocket-client/src/main/java/io/reactivesocket/client/events/LoggingLoadBalancingClientListener.java new file mode 100644 index 000000000..fea51cea4 --- /dev/null +++ b/reactivesocket-client/src/main/java/io/reactivesocket/client/events/LoggingLoadBalancingClientListener.java @@ -0,0 +1,70 @@ +/* + * Copyright 2017 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.reactivesocket.client.events; + +import io.reactivesocket.Availability; +import io.reactivesocket.events.LoggingClientEventListener; +import org.slf4j.event.Level; + +import java.util.concurrent.TimeUnit; + +public class LoggingLoadBalancingClientListener extends LoggingClientEventListener implements LoadBalancingClientListener { + + public LoggingLoadBalancingClientListener(String name, Level logLevel) { + super(name, logLevel); + } + + @Override + public void socketAdded(Availability availability) { + logIfEnabled(() -> name + ": socketAdded " + "availability = [" + availability + ']'); + } + + @Override + public void socketRemoved(Availability availability) { + logIfEnabled(() -> name + ": socketRemoved " + "availability = [" + availability + ']'); + } + + @Override + public void serverAdded(Availability availability) { + logIfEnabled(() -> name + ": serverAdded " + "availability = [" + availability + ']'); + } + + @Override + public void serverRemoved(Availability availability) { + logIfEnabled(() -> name + ": serverRemoved " + "availability = [" + availability + ']'); + } + + @Override + public void apertureChanged(int oldAperture, int newAperture) { + logIfEnabled(() -> name + ": apertureChanged " + "oldAperture = [" + oldAperture + "newAperture = [" + + newAperture + ']'); + } + + @Override + public void socketRefreshPeriodChanged(long oldPeriod, long newPeriod, TimeUnit periodUnit) { + logIfEnabled(() -> name + ": socketRefreshPeriodChanged " + "newPeriod = [" + newPeriod + "], periodUnit = [" + + periodUnit + ']'); + } + + @Override + public void socketsRefreshStart() { + logIfEnabled(() -> name + ": socketsRefreshStart"); + } + + @Override + public void socketsRefreshCompleted(long duration, TimeUnit durationUnit) { + logIfEnabled(() -> name + ": socketsRefreshCompleted " + "duration = [" + duration + + "], durationUnit = [" + durationUnit + ']'); + } +}