Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce client-side micrometer, start with exposing watcher revision metrics #542

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,15 @@
import com.linecorp.centraldogma.internal.thrift.WatchFileResult;
import com.linecorp.centraldogma.internal.thrift.WatchRepositoryResult;

import io.micrometer.core.instrument.MeterRegistry;

final class LegacyCentralDogma extends AbstractCentralDogma {

private final CentralDogmaService.AsyncIface client;

LegacyCentralDogma(ScheduledExecutorService blockingTaskExecutor, CentralDogmaService.AsyncIface client) {
super(blockingTaskExecutor);
LegacyCentralDogma(ScheduledExecutorService blockingTaskExecutor, CentralDogmaService.AsyncIface client,
MeterRegistry meterRegistry) {
super(blockingTaskExecutor, meterRegistry);
this.client = requireNonNull(client, "client");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import java.net.UnknownHostException;
import java.util.concurrent.ScheduledExecutorService;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.linecorp.armeria.client.ClientBuilder;
import com.linecorp.armeria.client.ClientRequestContext;
import com.linecorp.armeria.client.encoding.DecodingClient;
Expand All @@ -30,13 +33,17 @@
import com.linecorp.centraldogma.internal.client.ReplicationLagTolerantCentralDogma;
import com.linecorp.centraldogma.internal.thrift.CentralDogmaService.AsyncIface;

import io.micrometer.core.instrument.MeterRegistry;

/**
* Builds a legacy {@link CentralDogma} client based on Thrift.
*
* @deprecated Use {@link ArmeriaCentralDogmaBuilder}.
*/
@Deprecated
public class LegacyCentralDogmaBuilder extends AbstractArmeriaCentralDogmaBuilder<LegacyCentralDogmaBuilder> {
private static final Logger logger = LoggerFactory.getLogger(LegacyCentralDogmaBuilder.class);

/**
* Returns a newly-created {@link CentralDogma} instance.
*
Expand Down Expand Up @@ -67,8 +74,14 @@ public CentralDogma build() throws UnknownHostException {
final ScheduledExecutorService blockingTaskExecutor = blockingTaskExecutor();

final int maxRetriesOnReplicationLag = maxNumRetriesOnReplicationLag();

final MeterRegistry meterRegistry = meterRegistry().orElse(clientFactory().meterRegistry());
if (meterRegistry != clientFactory().meterRegistry()) {
logger.warn("The specified meterRegistry differs from the meterRegistry from clientFactory.");
}

final CentralDogma dogma = new LegacyCentralDogma(blockingTaskExecutor,
builder.build(AsyncIface.class));
builder.build(AsyncIface.class), meterRegistry);
if (maxRetriesOnReplicationLag <= 0) {
return dogma;
} else {
Expand All @@ -80,7 +93,7 @@ public CentralDogma build() throws UnknownHostException {
// in Armeria: https://github.com/line/armeria/issues/760
final ClientRequestContext ctx = ClientRequestContext.currentOrNull();
return ctx != null ? ctx.remoteAddress() : null;
});
}, meterRegistry);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.google.common.collect.ImmutableSet;

import com.linecorp.armeria.common.CommonPools;
import com.linecorp.armeria.common.metric.NoopMeterRegistry;
import com.linecorp.centraldogma.client.CentralDogma;
import com.linecorp.centraldogma.client.RepositoryInfo;
import com.linecorp.centraldogma.client.armeria.legacy.ThriftTypes.TAuthor;
Expand Down Expand Up @@ -80,7 +81,7 @@ class LegacyCentralDogmaTest {

@BeforeEach
void setUp() {
client = new LegacyCentralDogma(CommonPools.blockingTaskExecutor(), iface);
client = new LegacyCentralDogma(CommonPools.blockingTaskExecutor(), iface, NoopMeterRegistry.get());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@
import com.linecorp.centraldogma.internal.Util;
import com.linecorp.centraldogma.internal.api.v1.WatchTimeout;

import io.micrometer.core.instrument.MeterRegistry;

final class ArmeriaCentralDogma extends AbstractCentralDogma {

private static final MediaType JSON_PATCH_UTF8 = MediaType.JSON_PATCH.withCharset(StandardCharsets.UTF_8);
Expand Down Expand Up @@ -134,8 +136,9 @@ final class ArmeriaCentralDogma extends AbstractCentralDogma {
private final WebClient client;
private final String authorization;

ArmeriaCentralDogma(ScheduledExecutorService blockingTaskExecutor, WebClient client, String accessToken) {
super(blockingTaskExecutor);
ArmeriaCentralDogma(ScheduledExecutorService blockingTaskExecutor, WebClient client, String accessToken,
MeterRegistry meterRegistry) {
super(blockingTaskExecutor, meterRegistry);
this.client = requireNonNull(client, "client");
authorization = "Bearer " + requireNonNull(accessToken, "accessToken");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import java.net.UnknownHostException;
import java.util.concurrent.ScheduledExecutorService;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.linecorp.armeria.client.ClientBuilder;
import com.linecorp.armeria.client.ClientRequestContext;
import com.linecorp.armeria.client.WebClient;
Expand All @@ -26,12 +29,16 @@
import com.linecorp.centraldogma.client.CentralDogma;
import com.linecorp.centraldogma.internal.client.ReplicationLagTolerantCentralDogma;

import io.micrometer.core.instrument.MeterRegistry;

/**
* Builds a {@link CentralDogma} client based on an <a href="https://line.github.io/armeria/">Armeria</a>
* HTTP client.
*/
public final class ArmeriaCentralDogmaBuilder
extends AbstractArmeriaCentralDogmaBuilder<ArmeriaCentralDogmaBuilder> {
private static final Logger logger = LoggerFactory.getLogger(ArmeriaCentralDogmaBuilder.class);

/**
* Returns a newly-created {@link CentralDogma} instance.
*
Expand All @@ -44,13 +51,17 @@ public CentralDogma build() throws UnknownHostException {
newClientBuilder(scheme, endpointGroup, cb -> cb.decorator(DecodingClient.newDecorator()), "/");
final int maxRetriesOnReplicationLag = maxNumRetriesOnReplicationLag();

final MeterRegistry meterRegistry = meterRegistry().orElse(clientFactory().meterRegistry());
if (meterRegistry != clientFactory().meterRegistry()) {
logger.warn("The specified meterRegistry differs from the meterRegistry from clientFactory.");
}
// TODO(ikhoon): Apply ExecutorServiceMetrics for the 'blockingTaskExecutor' once
// https://github.com/line/centraldogma/pull/542 is merged.
final ScheduledExecutorService blockingTaskExecutor = blockingTaskExecutor();

final CentralDogma dogma = new ArmeriaCentralDogma(blockingTaskExecutor,
builder.build(WebClient.class),
accessToken());
accessToken(), meterRegistry);
if (maxRetriesOnReplicationLag <= 0) {
return dogma;
} else {
Expand All @@ -62,7 +73,7 @@ public CentralDogma build() throws UnknownHostException {
// in Armeria: https://github.com/line/armeria/issues/760
final ClientRequestContext ctx = ClientRequestContext.currentOrNull();
return ctx != null ? ctx.remoteAddress() : null;
});
}, meterRegistry);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import com.linecorp.centraldogma.client.armeria.ArmeriaClientConfigurator;
import com.linecorp.centraldogma.client.armeria.DnsAddressEndpointGroupConfigurator;

import io.micrometer.core.instrument.MeterRegistry;

/**
* Spring bean configuration for {@link CentralDogma} client.
*/
Expand All @@ -55,8 +57,8 @@ public CentralDogma dogmaClient(
CentralDogmaSettings settings,
Optional<List<CentralDogmaClientFactoryConfigurator>> factoryConfigurators,
Optional<ArmeriaClientConfigurator> armeriaClientConfigurator,
Optional<DnsAddressEndpointGroupConfigurator> dnsAddressEndpointGroupConfigurator)
throws UnknownHostException {
Optional<DnsAddressEndpointGroupConfigurator> dnsAddressEndpointGroupConfigurator,
Optional<MeterRegistry> meterRegistry) throws UnknownHostException {

final ArmeriaCentralDogmaBuilder builder = new ArmeriaCentralDogmaBuilder();

Expand Down Expand Up @@ -125,6 +127,10 @@ public CentralDogma dogmaClient(
builder.retryIntervalOnReplicationLagMillis(retryIntervalOnReplicationLagMillis);
}

if (meterRegistry.isPresent()) {
builder.meterRegistry(meterRegistry.get());
}

return builder.build();
}
}
1 change: 1 addition & 0 deletions client/java/build.gradle
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
dependencies {
implementation 'org.javassist:javassist'
implementation 'io.micrometer:micrometer-core'
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,16 @@
import com.linecorp.centraldogma.internal.client.FileWatcher;
import com.linecorp.centraldogma.internal.client.RepositoryWatcher;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Metrics;

/**
* A skeletal {@link CentralDogma} implementation.
*/
public abstract class AbstractCentralDogma implements CentralDogma {

private final ScheduledExecutorService blockingTaskExecutor;
private final MeterRegistry meterRegistry;

/**
* Creates a new instance.
Expand All @@ -52,7 +56,22 @@ public abstract class AbstractCentralDogma implements CentralDogma {
* watched changes.
*/
protected AbstractCentralDogma(ScheduledExecutorService blockingTaskExecutor) {
this(blockingTaskExecutor, Metrics.globalRegistry);
}

/**
* Creates a new instance.
*
* @param blockingTaskExecutor the {@link ScheduledExecutorService} which will be used for scheduling the
* tasks related with automatic retries and invoking the callbacks for
* watched changes.
* @param meterRegistry the {@link MeterRegistry} which collects metrics for
* this {@link CentralDogma} instance.
*/
protected AbstractCentralDogma(ScheduledExecutorService blockingTaskExecutor,
MeterRegistry meterRegistry) {
this.blockingTaskExecutor = requireNonNull(blockingTaskExecutor, "blockingTaskExecutor");
this.meterRegistry = requireNonNull(meterRegistry, "meterRegistry");
}

/**
Expand Down Expand Up @@ -215,4 +234,9 @@ protected final CompletableFuture<Revision> maybeNormalizeRevision(
return CompletableFuture.completedFuture(revision);
}
}

@Override
public final MeterRegistry meterRegistry() {
return meterRegistry;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;

Expand All @@ -46,6 +47,8 @@
import com.linecorp.centraldogma.common.RevisionNotFoundException;
import com.linecorp.centraldogma.internal.CsrfToken;

import io.micrometer.core.instrument.MeterRegistry;

/**
* Builds a {@link CentralDogma} client.
*/
Expand All @@ -70,6 +73,7 @@ public abstract class AbstractCentralDogmaBuilder<B extends AbstractCentralDogma
private int maxNumRetriesOnReplicationLag = DEFAULT_MAX_NUM_RETRIES_ON_REPLICATION_LAG;
private long retryIntervalOnReplicationLagMillis =
TimeUnit.SECONDS.toMillis(DEFAULT_RETRY_INTERVAL_ON_REPLICATION_LAG_SECONDS);
private Optional<MeterRegistry> meterRegistry = Optional.empty();

/**
* Returns {@code this}.
Expand Down Expand Up @@ -415,4 +419,16 @@ public final B retryIntervalOnReplicationLagMillis(long retryIntervalOnReplicati
protected long retryIntervalOnReplicationLagMillis() {
return retryIntervalOnReplicationLagMillis;
}

/**
* Sets the {@link MeterRegistry} used to collect metrics.
*/
public final B meterRegistry(MeterRegistry meterRegistry) {
this.meterRegistry = Optional.of(requireNonNull(meterRegistry, "meterRegistry"));
return self();
}

protected Optional<MeterRegistry> meterRegistry() {
jrhee17 marked this conversation as resolved.
Show resolved Hide resolved
return meterRegistry;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import com.linecorp.centraldogma.common.Revision;
import com.linecorp.centraldogma.common.RevisionNotFoundException;

import io.micrometer.core.instrument.MeterRegistry;

/**
* Central Dogma client.
*/
Expand Down Expand Up @@ -591,4 +593,11 @@ <T> Watcher<T> repositoryWatcher(String projectName, String repositoryName, Stri
*/
<T> Watcher<T> repositoryWatcher(String projectName, String repositoryName, String pathPattern,
Function<Revision, ? extends T> function, Executor executor);

/**
* The {@link MeterRegistry} which collects metrics for this {@link CentralDogma} instance.
*
* @return the {@link MeterRegistry}
*/
MeterRegistry meterRegistry();
ikhoon marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static java.util.Objects.requireNonNull;

import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
Expand All @@ -39,6 +40,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.util.concurrent.AtomicDouble;

import com.linecorp.centraldogma.client.CentralDogma;
import com.linecorp.centraldogma.client.Latest;
import com.linecorp.centraldogma.client.Watcher;
Expand All @@ -49,6 +52,8 @@
import com.linecorp.centraldogma.common.Revision;
import com.linecorp.centraldogma.common.ShuttingDownException;

import io.micrometer.core.instrument.Gauge;

abstract class AbstractWatcher<T> implements Watcher<T> {

private static final Logger logger = LoggerFactory.getLogger(AbstractWatcher.class);
Expand Down Expand Up @@ -114,6 +119,7 @@ private enum State {
private final List<Map.Entry<BiConsumer<? super Revision, ? super T>, Executor>> updateListeners;
private final AtomicReference<State> state;
private final CompletableFuture<Latest<T>> initialValueFuture;
private final AtomicDouble latestNotifiedRevision = new AtomicDouble();

private volatile Latest<T> latest;
private volatile ScheduledFuture<?> currentScheduleFuture;
Expand All @@ -127,6 +133,13 @@ protected AbstractWatcher(CentralDogma client, ScheduledExecutorService watchSch
this.repositoryName = requireNonNull(repositoryName, "repositoryName");
this.pathPattern = requireNonNull(pathPattern, "pathPattern");

Gauge.builder("centraldogma.client.watcher.revision",
this, watcher -> watcher.latestNotifiedRevision.get())
.tag("project", projectName)
.tag("repository", repositoryName)
.tag("path", pathPattern)
.register(client.meterRegistry());

updateListeners = new CopyOnWriteArrayList<>();
state = new AtomicReference<>(State.INIT);
initialValueFuture = new CompletableFuture<>();
Expand Down Expand Up @@ -296,18 +309,28 @@ private void notifyListeners() {
}

final Latest<T> latest = this.latest;
final List<CompletableFuture<Boolean>> futures = new ArrayList<>();
for (Map.Entry<BiConsumer<? super Revision, ? super T>, Executor> entry : updateListeners) {
final BiConsumer<? super Revision, ? super T> listener = entry.getKey();
final Executor executor = entry.getValue();
executor.execute(() -> {
final CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> {
try {
listener.accept(latest.revision(), latest.value());
return true;
} catch (Exception e) {
logger.warn("Exception thrown for watcher ({}/{}{}): rev={}",
projectName, repositoryName, pathPattern, latest.revision(), e);
return false;
}
});
}, executor);
futures.add(future);
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).thenAccept(ignored -> {
final boolean result = futures.stream().allMatch(CompletableFuture::join);
if (result) {
latestNotifiedRevision.set(latest.revision().major());
}
});
jrhee17 marked this conversation as resolved.
Show resolved Hide resolved
}

private void handleExecutorShutdown(Executor executor, RejectedExecutionException e) {
Expand Down
Loading