Skip to content

Commit

Permalink
Adding Metric for Status Code 429 (#1058)
Browse files Browse the repository at this point in the history
* Adding Metric for Status Code 429

* Adding Metric for Status Code 429
  • Loading branch information
guljain authored Oct 26, 2023
1 parent 283e8cf commit 48891bf
Show file tree
Hide file tree
Showing 11 changed files with 350 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,14 @@
import static com.google.cloud.hadoop.fs.gcs.GhfsStatistic.STREAM_WRITE_CLOSE_OPERATIONS;
import static com.google.cloud.hadoop.fs.gcs.GhfsStatistic.STREAM_WRITE_EXCEPTIONS;
import static com.google.cloud.hadoop.fs.gcs.GhfsStatistic.STREAM_WRITE_OPERATIONS;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatusStatistics.GCS_CLIENT_RATE_LIMIT_COUNT;
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.snapshotIOStatistics;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_FAILURES;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore;

import com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatusStatistics;
import com.google.cloud.hadoop.gcsio.StatisticTypeEnum;
import com.google.cloud.hadoop.util.GcsClientStatisticInterface;
import com.google.common.flogger.GoogleLogger;
import java.io.Closeable;
import java.net.URI;
Expand Down Expand Up @@ -60,14 +64,18 @@
* Instrumentation of GCS.
*
* <p>Counters and metrics are generally addressed in code by their name or {@link GhfsStatistic}
* key. There <i>may</i> be some Statistics which do not have an entry here. To avoid attempts to
* access such counters failing, the operations to increment/query metric values are designed to
* handle lookup failures.
* and {@link GoogleCloudStorageStatusStatistics} key. There <i>may</i> be some Statistics which do
* not have an entry here. To avoid attempts to access such counters failing, the operations to
* increment/query metric values are designed to handle lookup failures.
*
* <p>GoogleHadoopFileSystem StorageStatistics are dynamically derived from the IOStatistics.
*/
public class GhfsInstrumentation
implements Closeable, MetricsSource, IOStatisticsSource, DurationTrackerFactory {
implements Closeable,
MetricsSource,
IOStatisticsSource,
DurationTrackerFactory,
GcsClientStatisticInterface {

private static final String METRICS_SOURCE_BASENAME = "GCSMetrics";

Expand Down Expand Up @@ -200,6 +208,19 @@ public void incrementCounter(GhfsStatistic op, long count) {
instanceIOStatistics.incrementCounter(name, count);
}

/**
* Increments a mutable counter and the matching instance IOStatistics counter for metrics in
* GoogleCloudStorageStatusStatistics.
*
* @param op operation
*/
private void incrementCounter(GoogleCloudStorageStatusStatistics op) {

String name = op.getSymbol();
incrementMutableCounter(name, 1);
instanceIOStatistics.incrementCounter(name, 1);
}

/**
* Get the metrics system.
*
Expand Down Expand Up @@ -227,7 +248,7 @@ protected final MutableCounterLong counter(String name, String desc) {
}

/**
* Create a counter in the registry.
* Create a counter in the registry for metrics in GhfsStatistic.
*
* @param op statistic to count
* @return a new counter
Expand All @@ -236,6 +257,16 @@ protected final MutableCounterLong counter(GhfsStatistic op) {
return counter(op.getSymbol(), op.getDescription());
}

/**
* Create a counter in the registry for metrics in GoogleCloudStorageStatusStatistics.
*
* @param op statistic to count
* @return a new counter
*/
private final MutableCounterLong counter(GoogleCloudStorageStatusStatistics op) {
return counter(op.getSymbol(), op.getDescription());
}

/**
* Registering a duration adds the success and failure counters.
*
Expand Down Expand Up @@ -322,6 +353,20 @@ private void incrementMutableCounter(String name, long count) {
}
}

/**
* Counter Metrics updation based on the Http response
*
* @param statusCode of ther Http response
*/
@Override
public void statusMetricsUpdation(int statusCode) {
switch (statusCode) {
case 429:
incrementCounter(GCS_CLIENT_RATE_LIMIT_COUNT);
break;
}
}

/**
* A duration tracker which updates a mutable counter with a metric. The metric is updated with
* the count on start; after a failure the failures count is incremented by one.
Expand Down Expand Up @@ -947,20 +992,28 @@ private IOStatisticsStoreBuilder createStoreBuilder() {
.forEach(
stat -> {
// declare all counter statistics
if (stat.getType() == GhfsStatisticTypeEnum.TYPE_COUNTER) {
if (stat.getType() == StatisticTypeEnum.TYPE_COUNTER) {
counter(stat);
storeBuilder.withCounters(stat.getSymbol());
// declare all gauge statistics
} else if (stat.getType() == GhfsStatisticTypeEnum.TYPE_GAUGE) {
} else if (stat.getType() == StatisticTypeEnum.TYPE_GAUGE) {
gauge(stat);
storeBuilder.withGauges(stat.getSymbol());
// and durations
} else if (stat.getType() == GhfsStatisticTypeEnum.TYPE_DURATION) {
} else if (stat.getType() == StatisticTypeEnum.TYPE_DURATION) {
duration(stat);
storeBuilder.withDurationTracking(stat.getSymbol());
}
});

// registering metrics of GoogleCloudStorageStatusStatistics which are all counters
EnumSet.allOf(GoogleCloudStorageStatusStatistics.class)
.forEach(
stat -> {
counter(stat);
storeBuilder.withCounters(stat.getSymbol());
});

return storeBuilder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@

package com.google.cloud.hadoop.fs.gcs;

import static com.google.cloud.hadoop.fs.gcs.GhfsStatisticTypeEnum.TYPE_COUNTER;
import static com.google.cloud.hadoop.fs.gcs.GhfsStatisticTypeEnum.TYPE_DURATION;
import static com.google.cloud.hadoop.gcsio.StatisticTypeEnum.TYPE_COUNTER;
import static com.google.cloud.hadoop.gcsio.StatisticTypeEnum.TYPE_DURATION;

import com.google.cloud.hadoop.gcsio.StatisticTypeEnum;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterators;
Expand Down Expand Up @@ -191,7 +192,7 @@ public enum GhfsStatistic {
* @param description description.
* @param type type
*/
GhfsStatistic(String symbol, String description, GhfsStatisticTypeEnum type) {
GhfsStatistic(String symbol, String description, StatisticTypeEnum type) {
this.symbol = symbol;
this.description = description;
this.type = type;
Expand All @@ -204,7 +205,7 @@ public enum GhfsStatistic {
private final String description;

/** Statistic type. */
private final GhfsStatisticTypeEnum type;
private final StatisticTypeEnum type;

/** the name of the statistic */
public String getSymbol() {
Expand Down Expand Up @@ -241,7 +242,7 @@ public String toString() {
*
* @return the type.
*/
public GhfsStatisticTypeEnum getType() {
public StatisticTypeEnum getType() {
return type;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -254,14 +254,13 @@ public void initialize(URI path, Configuration config) throws IOException {
initializeFsRoot();
initializeWorkingDirectory(config);
initializeDelegationTokenSupport(config);
initializeGcsFs(config);

instrumentation = new GhfsInstrumentation(initUri);
storageStatistics =
(GhfsStorageStatistics)
GlobalStorageStatistics.INSTANCE.put(
GhfsStorageStatistics.NAME,
() -> new GhfsStorageStatistics(instrumentation.getIOStatistics()));
initializeGcsFs(config);
}

private void initializeFsRoot() {
Expand Down Expand Up @@ -349,9 +348,10 @@ private GoogleCloudStorageFileSystem createGcsFs(Configuration config) throws IO
? new GoogleCloudStorageFileSystemImpl(
/* credentials= */ null,
accessBoundaries -> accessTokenProvider.getAccessToken(accessBoundaries).getToken(),
gcsFsOptions)
gcsFsOptions,
getInstrumentation())
: new GoogleCloudStorageFileSystemImpl(
credentials, /* downscopedAccessTokenFn= */ null, gcsFsOptions);
credentials, /* downscopedAccessTokenFn= */ null, gcsFsOptions, getInstrumentation());
}

private GoogleCredentials getCredentials(Configuration config) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.hadoop.fs.gcs;

import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatusStatistics.GCS_CLIENT_RATE_LIMIT_COUNT;
import static com.google.cloud.hadoop.util.testing.MockHttpTransportHelper.emptyResponse;
import static com.google.cloud.hadoop.util.testing.MockHttpTransportHelper.mockTransport;
import static com.google.common.truth.Truth.assertThat;

import com.google.api.client.http.GenericUrl;
import com.google.api.client.http.HttpRequest;
import com.google.api.client.http.HttpRequestFactory;
import com.google.api.client.http.HttpResponse;
import com.google.api.client.http.HttpStatusCodes;
import com.google.cloud.hadoop.util.RetryHttpInitializer;
import com.google.cloud.hadoop.util.RetryHttpInitializerOptions;
import com.google.cloud.hadoop.util.interceptors.InvocationIdInterceptor;
import com.google.common.collect.ImmutableMap;
import java.net.URI;
import java.time.Duration;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/** Unit tests for {@link GoogleCloudStorageStatusStatistics} class. */
@RunWith(JUnit4.class)
public class GoogleHadoopStatusMetricsTest {

@Test
public void gcs_client_429_status_metrics_STORAGE_CLIENT() throws Exception {

URI initUri = new Path("gs://test/").toUri();
GhfsInstrumentation ghfsInstrumentation = new GhfsInstrumentation(initUri);

String authHeaderValue = "Bearer: y2.WAKiHahzxGS_a1bd40RjNUF";

RetryHttpInitializer retryHttpInitializer =
new RetryHttpInitializer(
null,
RetryHttpInitializerOptions.builder()
.setDefaultUserAgent("foo-user-agent")
.setHttpHeaders(ImmutableMap.of("header-key", "header-value"))
.setMaxRequestRetries(5)
.setConnectTimeout(Duration.ofSeconds(5))
.setReadTimeout(Duration.ofSeconds(5))
.build(),
ghfsInstrumentation);

HttpRequestFactory requestFactory =
mockTransport(emptyResponse(429), emptyResponse(429), emptyResponse(200))
.createRequestFactory(retryHttpInitializer);

HttpRequest req = requestFactory.buildGetRequest(new GenericUrl("http://fake-url.com"));

HttpResponse res = req.execute();

assertThat((String) req.getHeaders().get(InvocationIdInterceptor.GOOG_API_CLIENT))
.contains(InvocationIdInterceptor.GCCL_INVOCATION_ID_PREFIX);
assertThat(res).isNotNull();
assertThat(res.getStatusCode()).isEqualTo(HttpStatusCodes.STATUS_CODE_OK);
assertThat(
((GhfsInstrumentation) ghfsInstrumentation)
.getIOStatistics()
.counters()
.get(GCS_CLIENT_RATE_LIMIT_COUNT.getSymbol()))
.isNotEqualTo(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.cloud.hadoop.util.AccessBoundary;
import com.google.cloud.hadoop.util.AsyncWriteChannelOptions;
import com.google.cloud.hadoop.util.ErrorTypeExtractor;
import com.google.cloud.hadoop.util.GcsClientStatisticInterface;
import com.google.cloud.hadoop.util.GrpcErrorTypeExtractor;
import com.google.cloud.storage.BlobWriteSessionConfig;
import com.google.cloud.storage.BlobWriteSessionConfigs;
Expand Down Expand Up @@ -85,7 +86,8 @@ public class GoogleCloudStorageClientImpl extends ForwardingGoogleCloudStorage {
@Nullable HttpTransport httpTransport,
@Nullable HttpRequestInitializer httpRequestInitializer,
@Nullable ImmutableList<ClientInterceptor> gRPCInterceptors,
@Nullable Function<List<AccessBoundary>, String> downscopedAccessTokenFn)
@Nullable Function<List<AccessBoundary>, String> downscopedAccessTokenFn,
@Nullable GcsClientStatisticInterface gcsClientStatisticInterface)
throws IOException {
super(
GoogleCloudStorageImpl.builder()
Expand All @@ -94,7 +96,9 @@ public class GoogleCloudStorageClientImpl extends ForwardingGoogleCloudStorage {
.setHttpTransport(httpTransport)
.setHttpRequestInitializer(httpRequestInitializer)
.setDownscopedAccessTokenFn(downscopedAccessTokenFn)
.setGcsClientStatisticInterface(gcsClientStatisticInterface)
.build());

this.storageOptions = options;
this.storage =
clientLibraryStorage == null
Expand Down Expand Up @@ -280,6 +284,9 @@ public abstract Builder setDownscopedAccessTokenFn(
public abstract Builder setGRPCInterceptors(
@Nullable ImmutableList<ClientInterceptor> gRPCInterceptors);

public abstract Builder setGcsClientStatisticInterface(
@Nullable GcsClientStatisticInterface gcsClientStatisticInterface);

@VisibleForTesting
public abstract Builder setClientLibraryStorage(@Nullable Storage clientLibraryStorage);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.google.cloud.hadoop.gcsio.GoogleCloudStorage.ListPage;
import com.google.cloud.hadoop.util.AccessBoundary;
import com.google.cloud.hadoop.util.CheckedFunction;
import com.google.cloud.hadoop.util.GcsClientStatisticInterface;
import com.google.cloud.hadoop.util.LazyExecutorService;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -118,21 +119,25 @@ public class GoogleCloudStorageFileSystemImpl implements GoogleCloudStorageFileS
private static GoogleCloudStorage createCloudStorage(
GoogleCloudStorageFileSystemOptions options,
Credentials credentials,
Function<List<AccessBoundary>, String> downscopedAccessTokenFn)
Function<List<AccessBoundary>, String> downscopedAccessTokenFn,
GcsClientStatisticInterface gcsClientStatisticInterface)
throws IOException {
checkNotNull(options, "options must not be null");

switch (options.getClientType()) {
case STORAGE_CLIENT:
return GoogleCloudStorageClientImpl.builder()
.setOptions(options.getCloudStorageOptions())
.setCredentials(credentials)
.setDownscopedAccessTokenFn(downscopedAccessTokenFn)
.setGcsClientStatisticInterface(gcsClientStatisticInterface)
.build();
default:
return GoogleCloudStorageImpl.builder()
.setOptions(options.getCloudStorageOptions())
.setCredentials(credentials)
.setDownscopedAccessTokenFn(downscopedAccessTokenFn)
.setGcsClientStatisticInterface(gcsClientStatisticInterface)
.build();
}
}
Expand All @@ -146,7 +151,13 @@ private static GoogleCloudStorage createCloudStorage(
*/
public GoogleCloudStorageFileSystemImpl(
Credentials credentials, GoogleCloudStorageFileSystemOptions options) throws IOException {
this(createCloudStorage(options, credentials, /* downscopedAccessTokenFn= */ null), options);
this(
createCloudStorage(
options,
credentials,
/* downscopedAccessTokenFn= */ null, /* gcsClientStatisticInterface */
null),
options);
logger.atFiner().log("GoogleCloudStorageFileSystem(options: %s)", options);
}

Expand All @@ -157,13 +168,18 @@ public GoogleCloudStorageFileSystemImpl(
* @param downscopedAccessTokenFn Function that generates downscoped access token.
* @param options Options for how this filesystem should operate and configure its underlying
* storage.
* @param gcsClientStatisticInterface for backporting ghfsInstrumentation
*/
public GoogleCloudStorageFileSystemImpl(
Credentials credentials,
Function<List<AccessBoundary>, String> downscopedAccessTokenFn,
GoogleCloudStorageFileSystemOptions options)
GoogleCloudStorageFileSystemOptions options,
GcsClientStatisticInterface gcsClientStatisticInterface)
throws IOException {
this(createCloudStorage(options, credentials, downscopedAccessTokenFn), options);
this(
createCloudStorage(
options, credentials, downscopedAccessTokenFn, gcsClientStatisticInterface),
options);
logger.atFiner().log("GoogleCloudStorageFileSystem(options: %s)", options);
}

Expand Down
Loading

0 comments on commit 48891bf

Please sign in to comment.