Skip to content

Commit

Permalink
minimal impl
Browse files Browse the repository at this point in the history
  • Loading branch information
jrhee17 committed Oct 23, 2020
1 parent 3927148 commit 1c3ad2b
Show file tree
Hide file tree
Showing 13 changed files with 69 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,15 @@
import com.linecorp.centraldogma.internal.thrift.WatchFileResult;
import com.linecorp.centraldogma.internal.thrift.WatchRepositoryResult;

import io.micrometer.core.instrument.MeterRegistry;

final class LegacyCentralDogma extends AbstractCentralDogma {

private final CentralDogmaService.AsyncIface client;

LegacyCentralDogma(ScheduledExecutorService executor, CentralDogmaService.AsyncIface client) {
super(executor);
LegacyCentralDogma(ScheduledExecutorService executor, CentralDogmaService.AsyncIface client,
MeterRegistry meterRegistry) {
super(executor, meterRegistry);
this.client = requireNonNull(client, "client");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public CentralDogma build() throws UnknownHostException {

final EventLoopGroup executor = clientFactory().eventLoopGroup();
final int maxRetriesOnReplicationLag = maxNumRetriesOnReplicationLag();
final CentralDogma dogma = new LegacyCentralDogma(executor, builder.build(AsyncIface.class));
final CentralDogma dogma = new LegacyCentralDogma(executor, builder.build(AsyncIface.class), meterRegistry());
if (maxRetriesOnReplicationLag <= 0) {
return dogma;
} else {
Expand All @@ -78,7 +78,7 @@ executor, dogma, maxRetriesOnReplicationLag, retryIntervalOnReplicationLagMillis
// in Armeria: https://github.com/line/armeria/issues/760
final ClientRequestContext ctx = ClientRequestContext.currentOrNull();
return ctx != null ? ctx.remoteAddress() : null;
});
}, meterRegistry());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.google.common.collect.ImmutableSet;

import com.linecorp.armeria.common.CommonPools;
import com.linecorp.armeria.common.metric.NoopMeterRegistry;
import com.linecorp.centraldogma.client.CentralDogma;
import com.linecorp.centraldogma.client.RepositoryInfo;
import com.linecorp.centraldogma.client.armeria.legacy.ThriftTypes.TAuthor;
Expand Down Expand Up @@ -80,7 +81,7 @@ class LegacyCentralDogmaTest {

@BeforeEach
void setUp() {
client = new LegacyCentralDogma(CommonPools.workerGroup(), iface);
client = new LegacyCentralDogma(CommonPools.workerGroup(), iface, NoopMeterRegistry.get());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@
import com.linecorp.centraldogma.internal.Util;
import com.linecorp.centraldogma.internal.api.v1.WatchTimeout;

import io.micrometer.core.instrument.MeterRegistry;

final class ArmeriaCentralDogma extends AbstractCentralDogma {

private static final MediaType JSON_PATCH_UTF8 = MediaType.JSON_PATCH.withCharset(StandardCharsets.UTF_8);
Expand Down Expand Up @@ -134,8 +136,9 @@ final class ArmeriaCentralDogma extends AbstractCentralDogma {
private final WebClient client;
private final String authorization;

ArmeriaCentralDogma(ScheduledExecutorService executor, WebClient client, String accessToken) {
super(executor);
ArmeriaCentralDogma(ScheduledExecutorService executor, WebClient client, String accessToken,
MeterRegistry meterRegistry) {
super(executor, meterRegistry);
this.client = requireNonNull(client, "client");
authorization = "Bearer " + requireNonNull(accessToken, "accessToken");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public CentralDogma build() throws UnknownHostException {
final int maxRetriesOnReplicationLag = maxNumRetriesOnReplicationLag();
final CentralDogma dogma = new ArmeriaCentralDogma(executor,
builder.build(WebClient.class),
accessToken());
accessToken(), meterRegistry());
if (maxRetriesOnReplicationLag <= 0) {
return dogma;
} else {
Expand All @@ -58,7 +58,7 @@ executor, dogma, maxRetriesOnReplicationLag, retryIntervalOnReplicationLagMillis
// in Armeria: https://github.com/line/armeria/issues/760
final ClientRequestContext ctx = ClientRequestContext.currentOrNull();
return ctx != null ? ctx.remoteAddress() : null;
});
}, meterRegistry());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import com.linecorp.centraldogma.client.armeria.ArmeriaCentralDogmaBuilder;
import com.linecorp.centraldogma.client.armeria.ArmeriaClientConfigurator;

import io.micrometer.core.instrument.MeterRegistry;

/**
* Spring bean configuration for {@link CentralDogma} client.
*/
Expand Down Expand Up @@ -87,7 +89,8 @@ public CentralDogma dogmaClient(
Environment env,
CentralDogmaSettings settings,
@ForCentralDogma ClientFactory clientFactory,
Optional<ArmeriaClientConfigurator> armeriaClientConfigurator) throws UnknownHostException {
Optional<ArmeriaClientConfigurator> armeriaClientConfigurator,
Optional<MeterRegistry> meterRegistry) throws UnknownHostException {

final ArmeriaCentralDogmaBuilder builder = new ArmeriaCentralDogmaBuilder();

Expand Down Expand Up @@ -150,6 +153,10 @@ public CentralDogma dogmaClient(
builder.retryIntervalOnReplicationLagMillis(retryIntervalOnReplicationLagMillis);
}

if (meterRegistry.isPresent()) {
builder.meterRegistry(meterRegistry.get());
}

return builder.build();
}

Expand Down
1 change: 1 addition & 0 deletions client/java/build.gradle
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
dependencies {
implementation 'org.javassist:javassist'
implementation 'io.micrometer:micrometer-core'
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,25 @@
import com.linecorp.centraldogma.internal.client.FileWatcher;
import com.linecorp.centraldogma.internal.client.RepositoryWatcher;

import io.micrometer.core.instrument.MeterRegistry;

/**
* A skeletal {@link CentralDogma} implementation.
*/
public abstract class AbstractCentralDogma implements CentralDogma {

private final ScheduledExecutorService executor;
private final MeterRegistry meterRegistry;

/**
* Creates a new instance.
*
* @param executor the {@link ScheduledExecutorService} which will be used for scheduling the tasks
* related with automatic retries.
*/
protected AbstractCentralDogma(ScheduledExecutorService executor) {
protected AbstractCentralDogma(ScheduledExecutorService executor, MeterRegistry meterRegistry) {
this.executor = requireNonNull(executor, "executor");
this.meterRegistry = requireNonNull(meterRegistry, "meterRegistry");
}

/**
Expand Down Expand Up @@ -201,4 +205,9 @@ protected final CompletableFuture<Revision> maybeNormalizeRevision(
return CompletableFuture.completedFuture(revision);
}
}

@Override
public MeterRegistry meterRegistry() {
return meterRegistry;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@
import com.linecorp.centraldogma.common.RevisionNotFoundException;
import com.linecorp.centraldogma.internal.CsrfToken;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Metrics;

/**
* Builds a {@link CentralDogma} client.
*/
Expand All @@ -70,6 +73,7 @@ public abstract class AbstractCentralDogmaBuilder<B extends AbstractCentralDogma
private int maxNumRetriesOnReplicationLag = DEFAULT_MAX_NUM_RETRIES_ON_REPLICATION_LAG;
private long retryIntervalOnReplicationLagMillis =
TimeUnit.SECONDS.toMillis(DEFAULT_RETRY_INTERVAL_ON_REPLICATION_LAG_SECONDS);
private MeterRegistry meterRegistry = Metrics.globalRegistry;

/**
* Returns {@code this}.
Expand Down Expand Up @@ -415,4 +419,13 @@ public final B retryIntervalOnReplicationLagMillis(long retryIntervalOnReplicati
protected long retryIntervalOnReplicationLagMillis() {
return retryIntervalOnReplicationLagMillis;
}

public final B meterRegistry(MeterRegistry meterRegistry) {
this.meterRegistry = requireNonNull(meterRegistry, "meterRegistry");
return self();
}

protected MeterRegistry meterRegistry() {
return meterRegistry;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import com.linecorp.centraldogma.common.Revision;
import com.linecorp.centraldogma.common.RevisionNotFoundException;

import io.micrometer.core.instrument.MeterRegistry;

/**
* Central Dogma client.
*/
Expand Down Expand Up @@ -546,4 +548,6 @@ default Watcher<Revision> repositoryWatcher(String projectName, String repositor
*/
<T> Watcher<T> repositoryWatcher(String projectName, String repositoryName, String pathPattern,
Function<Revision, ? extends T> function);

MeterRegistry meterRegistry();
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,15 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.ImmutableList;

import com.linecorp.centraldogma.client.CentralDogma;
import com.linecorp.centraldogma.client.Latest;
import com.linecorp.centraldogma.client.Watcher;
Expand All @@ -46,6 +49,8 @@
import com.linecorp.centraldogma.common.Revision;
import com.linecorp.centraldogma.common.ShuttingDownException;

import io.micrometer.core.instrument.Tag;

abstract class AbstractWatcher<T> implements Watcher<T> {

private static final Logger logger = LoggerFactory.getLogger(AbstractWatcher.class);
Expand Down Expand Up @@ -110,6 +115,7 @@ private enum State {
private final List<BiConsumer<? super Revision, ? super T>> updateListeners;
private final AtomicReference<State> state;
private final CompletableFuture<Latest<T>> initialValueFuture;
private final AtomicLong revisionGauge;

private volatile Latest<T> latest;
private volatile ScheduledFuture<?> currentScheduleFuture;
Expand All @@ -123,6 +129,10 @@ protected AbstractWatcher(CentralDogma client, ScheduledExecutorService executor
this.repositoryName = requireNonNull(repositoryName, "repositoryName");
this.pathPattern = requireNonNull(pathPattern, "pathPattern");

final Iterable<Tag> tags = ImmutableList.of(
Tag.of("project", projectName), Tag.of("repository", repositoryName), Tag.of("path", pathPattern));
revisionGauge = client.meterRegistry().gauge("centraldogma.client.watcher.revision", tags, new AtomicLong());

updateListeners = new CopyOnWriteArrayList<>();
state = new AtomicReference<>(State.INIT);
initialValueFuture = new CompletableFuture<>();
Expand Down Expand Up @@ -235,6 +245,7 @@ private void doWatch(int numAttemptsSoFar) {
if (oldLatest == null) {
initialValueFuture.complete(newLatest);
}
revisionGauge.set(latest.revision().major());
}

// Watch again for the next change.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@
import com.linecorp.centraldogma.common.Revision;
import com.linecorp.centraldogma.common.RevisionNotFoundException;

import io.micrometer.core.instrument.MeterRegistry;

/**
* A {@link CentralDogma} client that retries the request automatically when a {@link RevisionNotFoundException}
* was raised but it is certain that a given {@link Revision} exists.
Expand All @@ -85,8 +87,8 @@ protected boolean removeEldestEntry(Map.Entry<RepoId, Revision> eldest) {

public ReplicationLagTolerantCentralDogma(ScheduledExecutorService executor, CentralDogma delegate,
int maxRetries, long retryIntervalMillis,
Supplier<?> currentReplicaHintSupplier) {
super(executor);
Supplier<?> currentReplicaHintSupplier, MeterRegistry meterRegistry) {
super(executor, meterRegistry);

requireNonNull(delegate, "delegate");
checkArgument(maxRetries > 0, "maxRetries: %s (expected: > 0)", maxRetries);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

import com.linecorp.armeria.common.metric.NoopMeterRegistry;
import com.linecorp.centraldogma.client.CentralDogma;
import com.linecorp.centraldogma.client.RepositoryInfo;
import com.linecorp.centraldogma.common.Change;
Expand All @@ -66,7 +67,7 @@ class ReplicationLagTolerantCentralDogmaTest {
@BeforeEach
void setUp() {
dogma = new ReplicationLagTolerantCentralDogma(executor, delegate, 3, 0,
currentReplicaHintSupplier);
currentReplicaHintSupplier, NoopMeterRegistry.get());
}

@AfterAll
Expand Down

0 comments on commit 1c3ad2b

Please sign in to comment.