Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Master: Cherry pick #1211 and #1227 : Adding total time statics, refactoring event bus and adding bifurcation of status metrics, Fixing bug : Avoid registering subscriber class multiple times #1240

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,31 @@
package com.google.cloud.hadoop.fs.gcs;

import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.EXCEPTION_COUNT;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_CLIENT_RATE_LIMIT_COUNT;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_CLIENT_SIDE_ERROR_COUNT;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_REQUEST_COUNT;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_SERVER_SIDE_ERROR_COUNT;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_API_CLIENT_BAD_REQUEST_COUNT;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_API_CLIENT_GONE_RESPONSE_COUNT;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_API_CLIENT_NOT_FOUND_RESPONSE_COUNT;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_API_CLIENT_PRECONDITION_FAILED_RESPONSE_COUNT;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_API_CLIENT_RATE_LIMIT_COUNT;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_API_CLIENT_REQUESTED_RANGE_NOT_SATISFIABLE_COUNT;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_API_CLIENT_REQUEST_TIMEOUT_COUNT;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_API_CLIENT_SIDE_ERROR_COUNT;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_API_CLIENT_UNAUTHORIZED_RESPONSE_COUNT;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_API_REQUEST_COUNT;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_API_SERVER_BAD_GATEWAY_COUNT;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_API_SERVER_INTERNAL_ERROR_COUNT;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_API_SERVER_SERVICE_UNAVAILABLE_COUNT;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_API_SERVER_SIDE_ERROR_COUNT;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_API_SERVER_TIMEOUT_COUNT;
import static com.google.cloud.hadoop.gcsio.StatisticTypeEnum.TYPE_DURATION;
import static com.google.common.base.Preconditions.checkArgument;

import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.http.HttpResponseException;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics;
import com.google.cloud.hadoop.util.GcsRequestExecutionEvent;
import com.google.cloud.hadoop.util.GoogleCloudStorageEventBus.StatisticsType;
import com.google.cloud.hadoop.gcsio.StatisticTypeEnum;
import com.google.cloud.hadoop.util.ITraceFactory;
import com.google.cloud.hadoop.util.ITraceOperation;
import com.google.common.base.Stopwatch;
import com.google.common.eventbus.Subscribe;
import com.google.common.flogger.GoogleLogger;
import io.grpc.Status;
import com.google.common.util.concurrent.AtomicDouble;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -70,6 +77,7 @@ public class GhfsGlobalStorageStatistics extends StorageStatistics {
private final Map<String, AtomicLong> minimums = new HashMap<>();
private final Map<String, AtomicLong> maximums = new HashMap<>();
private final Map<String, MeanStatistic> means = new HashMap<>();
private final Map<String, AtomicDouble> total = new HashMap<>();

public GhfsGlobalStorageStatistics() {
super(NAME);
Expand All @@ -83,10 +91,14 @@ public GhfsGlobalStorageStatistics() {
String symbol = opType.getSymbol();
opsCount.put(symbol, new AtomicLong(0));

if (opType.getType() == TYPE_DURATION) {
if (opType.getType() == StatisticTypeEnum.TYPE_DURATION
|| opType.getType() == StatisticTypeEnum.TYPE_DURATION_TOTAL) {
minimums.put(getMinKey(symbol), null);
maximums.put(getMaxKey(symbol), new AtomicLong(0));
means.put(getMeanKey(symbol), new MeanStatistic());
if (opType.getType() == StatisticTypeEnum.TYPE_DURATION_TOTAL) {
total.put(getTimeKey(symbol), new AtomicDouble(0.0));
}
}
}
}
Expand Down Expand Up @@ -141,8 +153,9 @@ void incrementCounter(GoogleCloudStorageStatistics op, long count) {

@Override
public void reset() {
resetMetrics(opsCount);
resetMetrics(maximums);
resetLongMetrics(opsCount);
resetLongMetrics(maximums);
resetDoubleMetrics(total);

for (String ms : means.keySet()) {
means.get(ms).reset();
Expand All @@ -153,12 +166,18 @@ public void reset() {
}
}

private void resetMetrics(Map<String, AtomicLong> metrics) {
private void resetLongMetrics(Map<String, AtomicLong> metrics) {
for (AtomicLong value : metrics.values()) {
value.set(0);
}
}

private void resetDoubleMetrics(Map<String, AtomicDouble> metrics) {
for (AtomicDouble value : metrics.values()) {
value.set(0.0);
}
}

void updateStats(GhfsStatistic statistic, long durationMs, Object context) {
checkArgument(
statistic.getType() == TYPE_DURATION,
Expand All @@ -175,6 +194,18 @@ private void addMeanStatistic(GhfsStatistic statistic, long totalDurationMs, int
}
}

protected void addTotalTimeStatistic(String statistic) {
assert (statistic.contains("_duration"));
String parentCounterKey = statistic.replace("_duration", "");
String parentMeanKey = getMeanKey(parentCounterKey);

assert (means.containsKey(parentMeanKey) && opsCount.containsKey(parentCounterKey));
double meanValue = means.get(parentMeanKey).getValue();
long operationValue = opsCount.get(parentCounterKey).get();

total.get(statistic).set(1.0 * meanValue * operationValue);
}

void updateStats(
GhfsStatistic statistic,
long minLatency,
Expand Down Expand Up @@ -219,148 +250,68 @@ private void updateMinMaxStats(
}
}

/**
* Updating the required gcs specific statistics based on httpresponse.
*
* @param statusCode
*/
private void updateGcsIOSpecificStatistics(int statusCode) {

if (statusCode >= 400 && statusCode < 500) {
incrementGcsClientSideCounter();

if (statusCode == 429) {
incrementRateLimitingCounter();
}
}
void incrementGcsExceptionCount() {
increment(EXCEPTION_COUNT);
}

if (statusCode >= 500 && statusCode < 600) {
incrementGcsServerSideCounter();
}
void incrementGcsTotalRequestCount() {
increment(GCS_API_REQUEST_COUNT);
}

private int grpcToHttpStatusCodeMapping(Status grpcStatusCode) {
// using code.proto as reference
// https://github.com/googleapis/googleapis/blob/master/google/rpc/code.proto
switch (grpcStatusCode.getCode()) {
case OK:
return 200;
case CANCELLED:
return 499;
case INVALID_ARGUMENT:
case FAILED_PRECONDITION:
case OUT_OF_RANGE:
return 400;
case DEADLINE_EXCEEDED:
return 504;
case NOT_FOUND:
return 404;
case ALREADY_EXISTS:
case ABORTED:
return 409;
case PERMISSION_DENIED:
return 403;
case RESOURCE_EXHAUSTED:
return 429;
case UNIMPLEMENTED:
return 501;
case UNAVAILABLE:
return 503;
case UNAUTHENTICATED:
return 401;
case UNKNOWN:
case INTERNAL:
case DATA_LOSS:
default:
return 500;
}
void incrementRateLimitingCounter() {
increment(GCS_API_CLIENT_RATE_LIMIT_COUNT);
}

/**
* Updating the required gcs specific statistics based on HttpResponseException.
*
* @param responseException contains statusCode based on which metrics are updated
*/
@Subscribe
private void subscriberOnHttpResponseException(@Nonnull HttpResponseException responseException) {
updateGcsIOSpecificStatistics(responseException.getStatusCode());
void incrementGcsClientSideCounter() {
increment(GCS_API_CLIENT_SIDE_ERROR_COUNT);
}

/**
* Updating the required gcs specific statistics based on GoogleJsonResponseException.
*
* @param responseException contains statusCode based on which metrics are updated
*/
@Subscribe
private void subscriberOnGoogleJsonResponseException(
@Nonnull GoogleJsonResponseException responseException) {
updateGcsIOSpecificStatistics(responseException.getStatusCode());
void incrementGcsServerSideCounter() {
increment(GCS_API_SERVER_SIDE_ERROR_COUNT);
}

/**
* Updating the required gcs specific statistics based on HttpResponse.
*
* @param responseStatus responseStatus status code from HTTP response
*/
@Subscribe
private void subscriberOnHttpResponseStatus(@Nonnull Integer responseStatus) {
updateGcsIOSpecificStatistics(responseStatus);
void incrementGcsClientBadRequestCount() {
increment(GCS_API_CLIENT_BAD_REQUEST_COUNT);
}

@Subscribe
private void subscriberOnGcsRequest(@Nonnull GcsRequestExecutionEvent event) {
incrementGcsTotalRequestCount();
void incrementGcsClientUnauthorizedResponseCount() {
increment(GCS_API_CLIENT_UNAUTHORIZED_RESPONSE_COUNT);
}

@Subscribe
private void subscriberOnGrpcStatus(@Nonnull Status status) {
updateGcsIOSpecificStatistics(grpcToHttpStatusCodeMapping(status));
void incrementGcsClientNotFoundResponseCount() {
increment(GCS_API_CLIENT_NOT_FOUND_RESPONSE_COUNT);
}

/**
* Updating the EXCEPTION_COUNT
*
* @param exception
*/
@Subscribe
private void subscriberOnException(IOException exception) {
incrementGcsExceptionCount();
void incrementGcsClientRequestTimeoutCount() {
increment(GCS_API_CLIENT_REQUEST_TIMEOUT_COUNT);
}

/**
* Updating the corresponding statistics
*
* @param strType
*/
@Subscribe
private void subscriberOnStatisticsType(StatisticsType strType) {
if (strType == StatisticsType.DIRECTORIES_DELETED) {
incrementDirectoriesDeleted();
}
void incrementGcsClientGoneResponseCount() {
increment(GCS_API_CLIENT_GONE_RESPONSE_COUNT);
}

private void incrementDirectoriesDeleted() {
increment(GhfsStatistic.DIRECTORIES_DELETED);
void incrementGcsClientPreconditionFailedResponseCount() {
increment(GCS_API_CLIENT_PRECONDITION_FAILED_RESPONSE_COUNT);
}

private void incrementGcsExceptionCount() {
increment(EXCEPTION_COUNT);
void incrementGcsClientRequestedRangeNotSatisfiableCount() {
increment(GCS_API_CLIENT_REQUESTED_RANGE_NOT_SATISFIABLE_COUNT);
}

private void incrementGcsTotalRequestCount() {
increment(GCS_REQUEST_COUNT);
void incrementGcsServerInternalErrorCount() {
increment(GCS_API_SERVER_INTERNAL_ERROR_COUNT);
}

private void incrementRateLimitingCounter() {
increment(GCS_CLIENT_RATE_LIMIT_COUNT);
void incrementGcsServerBadGatewayCount() {
increment(GCS_API_SERVER_BAD_GATEWAY_COUNT);
}

private void incrementGcsClientSideCounter() {
increment(GCS_CLIENT_SIDE_ERROR_COUNT);
void incrementGcsServerServiceUnavailableCount() {
increment(GCS_API_SERVER_SERVICE_UNAVAILABLE_COUNT);
}

private void incrementGcsServerSideCounter() {
increment(GCS_SERVER_SIDE_ERROR_COUNT);
void incrementGcsServerTimeoutCount() {
increment(GCS_API_SERVER_TIMEOUT_COUNT);
}

void streamReadBytes(int bytesRead) {
Expand Down Expand Up @@ -401,6 +352,11 @@ private Iterator<String> getMetricNames() {
metrics.addAll(minimums.keySet());
metrics.addAll(maximums.keySet());
metrics.addAll(means.keySet());
for (String statistic : total.keySet()) {
addTotalTimeStatistic(statistic);
}

metrics.addAll(total.keySet());

return metrics.iterator();
}
Expand Down Expand Up @@ -443,6 +399,10 @@ private long getValue(String key) {
return Math.round(means.get(key).getValue());
}

if (total.containsKey(key)) {
return total.get(key).longValue();
}

return 0L;
}

Expand All @@ -461,7 +421,8 @@ public boolean isTracked(String key) {
return opsCount.containsKey(key)
|| maximums.containsKey(key)
|| minimums.containsKey(key)
|| means.containsKey(key);
|| means.containsKey(key)
|| total.containsKey(key);
}

/**
Expand Down Expand Up @@ -491,6 +452,10 @@ private String getMeanKey(String symbol) {
return symbol + "_mean";
}

private String getTimeKey(String symbol) {
return symbol + "_duration";
}

/**
* To get the maximum value which is stored with MAXIMUM extension
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,9 @@ private IOStatisticsStoreBuilder createStoreBuilder() {
} else if (stat.getType() == StatisticTypeEnum.TYPE_DURATION) {
duration(stat);
storeBuilder.withDurationTracking(stat.getSymbol());
} else if (stat.getType() == StatisticTypeEnum.TYPE_DURATION_TOTAL) {
duration(stat);
storeBuilder.withDurationTracking(stat.getSymbol());
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static com.google.cloud.hadoop.gcsio.StatisticTypeEnum.TYPE_COUNTER;
import static com.google.cloud.hadoop.gcsio.StatisticTypeEnum.TYPE_DURATION;
import static com.google.cloud.hadoop.gcsio.StatisticTypeEnum.TYPE_DURATION_TOTAL;

import com.google.cloud.hadoop.gcsio.StatisticTypeEnum;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -55,11 +56,6 @@ public enum GhfsStatistic {
"Total number of directories created through the object store.",
TYPE_COUNTER),

DIRECTORIES_DELETED(
"directories_deleted",
"Total number of directories deleted through the object store.",
TYPE_COUNTER),

FILES_CREATED(
"files_created", "Total number of files created through the object store.", TYPE_COUNTER),
FILES_DELETED(
Expand Down Expand Up @@ -106,9 +102,6 @@ public enum GhfsStatistic {
"Calls of read stream close()",
TYPE_DURATION),

STREAM_READ_OPERATIONS(
StreamStatisticNames.STREAM_READ_OPERATIONS, "Calls of read()", TYPE_DURATION),

STREAM_READ_VECTORED_OPERATIONS(
StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
"Calls of readVectored()",
Expand All @@ -126,6 +119,7 @@ public enum GhfsStatistic {
StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES,
"size of fileRanges requested in readVectoredRequest",
TYPE_COUNTER),
STREAM_READ_OPERATIONS("stream_read_operations", "Calls of read()", TYPE_DURATION_TOTAL),

STREAM_READ_VECTORED_READ_COMBINED_RANGES(
StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES,
Expand Down Expand Up @@ -161,7 +155,7 @@ public enum GhfsStatistic {
TYPE_COUNTER),
STREAM_WRITE_CLOSE_OPERATIONS(
"stream_write_close_operations", "Calls of write stream close()", TYPE_DURATION),
STREAM_WRITE_OPERATIONS("stream_write_operations", "Calls of write()", TYPE_DURATION),
STREAM_WRITE_OPERATIONS("stream_write_operations", "Calls of write()", TYPE_DURATION_TOTAL),

/** The XAttr API statistics */
INVOCATION_XATTR_GET_MAP(
Expand Down
Loading