Skip to content

Commit

Permalink
Update BigQuerySinkMetrics for StreamingInserts. (#30320)
Browse files Browse the repository at this point in the history
* Update BigQuerySinkMetrics for StreamingInserts.

* Spotless/add additional comments

* Address comments

* Address comments and some minor optimizations

Revert chagnes to histogram bucket widths

* Add functionality to completely disable new streaming inserts metrics

* Remove unnecessary qualifiers in StreamingInsertsMetrics

* fix unit tests
  • Loading branch information
JayajP authored Mar 21, 2024
1 parent e8e272b commit 32e7a6a
Show file tree
Hide file tree
Showing 7 changed files with 674 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -579,13 +579,10 @@ public static void main(String[] args) throws Exception {
// metrics.
MetricsEnvironment.setProcessWideContainer(new MetricsLogger(null));

// When enabled, the Pipeline will record Per-Worker metrics that will be piped to DFE.
StreamingStepMetricsContainer.setEnablePerWorkerMetrics(
options.isEnableStreamingEngine()
&& DataflowRunner.hasExperiment(options, "enable_per_worker_metrics"));
// StreamingStepMetricsContainer automatically deletes perWorkerCounters if they are zero-valued
// for longer than 5 minutes.
BigQuerySinkMetrics.setSupportMetricsDeletion(true);
if (options.isEnableStreamingEngine()
&& DataflowRunner.hasExperiment(options, "enable_per_worker_metrics")) {
enableBigQueryMetrics();
}

JvmInitializers.runBeforeProcessing(options);
worker.startStatusPages();
Expand Down Expand Up @@ -672,6 +669,16 @@ private static long chooseMaxBytesOutstanding(DataflowWorkerHarnessOptions optio
return maxMem > 0 ? maxMem : (Runtime.getRuntime().maxMemory() / 2);
}

private static void enableBigQueryMetrics() {
// When enabled, the Pipeline will record Per-Worker metrics that will be piped to DFE.
StreamingStepMetricsContainer.setEnablePerWorkerMetrics(true);
// StreamingStepMetricsContainer automatically deletes perWorkerCounters if they are zero-valued
// for longer than 5 minutes.
BigQuerySinkMetrics.setSupportMetricsDeletion(true);
// Support metrics for BigQuery's Streaming Inserts write method.
BigQuerySinkMetrics.setSupportStreamingInsertsMetrics(true);
}

void addStateNameMappings(Map<String, String> nameMap) {
stateNameMap.putAll(nameMap);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import io.grpc.Status.Code;
import io.grpc.protobuf.ProtoUtils;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -937,6 +938,7 @@ static class InsertBatchofRowsCallable implements Callable<List<InsertErrors>> {
private final List<TableDataInsertAllRequest.Rows> rows;
private final AtomicLong maxThrottlingMsec;
private final Sleeper sleeper;
private final StreamingInsertsMetrics result;

InsertBatchofRowsCallable(
TableReference ref,
Expand All @@ -946,7 +948,8 @@ static class InsertBatchofRowsCallable implements Callable<List<InsertErrors>> {
FluentBackoff rateLimitBackoffFactory,
List<TableDataInsertAllRequest.Rows> rows,
AtomicLong maxThrottlingMsec,
Sleeper sleeper) {
Sleeper sleeper,
StreamingInsertsMetrics result) {
this.ref = ref;
this.skipInvalidRows = skipInvalidRows;
this.ignoreUnkownValues = ignoreUnknownValues;
Expand All @@ -955,6 +958,7 @@ static class InsertBatchofRowsCallable implements Callable<List<InsertErrors>> {
this.rows = rows;
this.maxThrottlingMsec = maxThrottlingMsec;
this.sleeper = sleeper;
this.result = result;
}

@Override
Expand All @@ -975,6 +979,7 @@ public List<TableDataInsertAllResponse.InsertErrors> call() throws Exception {
long totalBackoffMillis = 0L;
while (true) {
ServiceCallMetric serviceCallMetric = BigQueryUtils.writeCallMetric(ref);
Instant start = Instant.now();
try {
List<TableDataInsertAllResponse.InsertErrors> response =
insert.execute().getInsertErrors();
Expand All @@ -987,14 +992,18 @@ public List<TableDataInsertAllResponse.InsertErrors> call() throws Exception {
}
}
}
result.updateSuccessfulRpcMetrics(start, Instant.now());
return response;
} catch (IOException e) {
GoogleJsonError.ErrorInfo errorInfo = getErrorInfo(e);
if (errorInfo == null) {
serviceCallMetric.call(ServiceCallMetric.CANONICAL_STATUS_UNKNOWN);
result.updateFailedRpcMetrics(start, start, BigQuerySinkMetrics.UNKNOWN);
throw e;
}
serviceCallMetric.call(errorInfo.getReason());
String errorReason = errorInfo.getReason();
serviceCallMetric.call(errorReason);
result.updateFailedRpcMetrics(start, Instant.now(), errorReason);
/**
* TODO(BEAM-10584): Check for QUOTA_EXCEEDED error will be replaced by
* ApiErrorExtractor.INSTANCE.quotaExceeded(e) after the next release of
Expand Down Expand Up @@ -1031,6 +1040,7 @@ public List<TableDataInsertAllResponse.InsertErrors> call() throws Exception {
totalBackoffMillis += nextBackOffMillis;
final long totalBackoffMillisSoFar = totalBackoffMillis;
maxThrottlingMsec.getAndUpdate(current -> Math.max(current, totalBackoffMillisSoFar));
result.updateRetriedRowsWithStatus(errorReason, rows.size());
} catch (InterruptedException interrupted) {
throw new IOException("Interrupted while waiting before retrying insertAll");
}
Expand Down Expand Up @@ -1067,7 +1077,8 @@ <T> long insertAll(
"If insertIdList is not null it needs to have at least "
+ "as many elements as rowList");
}

StreamingInsertsMetrics streamingInsertsResults =
BigQuerySinkMetrics.streamingInsertsMetrics();
final Set<Integer> failedIndices = new HashSet<>();
long retTotalDataSize = 0;
List<TableDataInsertAllResponse.InsertErrors> allErrors = new ArrayList<>();
Expand Down Expand Up @@ -1124,6 +1135,7 @@ <T> long insertAll(
+ " pipeline, and the row will be output as a failed insert.",
nextRowSize));
} else {
streamingInsertsResults.incrementFailedRows();
errorContainer.add(failedInserts, error, ref, rowsToPublish.get(rowIndex));
failedIndices.add(rowIndex);
rowIndex++;
Expand All @@ -1150,7 +1162,8 @@ <T> long insertAll(
rateLimitBackoffFactory,
rows,
maxThrottlingMsec,
sleeper)));
sleeper,
streamingInsertsResults)));
strideIndices.add(strideIndex);
retTotalDataSize += dataSize;
strideIndex = rowIndex;
Expand Down Expand Up @@ -1180,7 +1193,8 @@ <T> long insertAll(
rateLimitBackoffFactory,
rows,
maxThrottlingMsec,
sleeper)));
sleeper,
streamingInsertsResults)));
strideIndices.add(strideIndex);
retTotalDataSize += dataSize;
rows = new ArrayList<>();
Expand Down Expand Up @@ -1209,6 +1223,7 @@ <T> long insertAll(
retryIds.add(idsToPublish.get(errorIndex));
}
} else {
streamingInsertsResults.incrementFailedRows();
errorContainer.add(failedInserts, error, ref, rowsToPublish.get(errorIndex));
}
}
Expand All @@ -1219,6 +1234,7 @@ <T> long insertAll(
Thread.currentThread().interrupt();
throw new IOException("Interrupted while inserting " + rowsToPublish);
} catch (ExecutionException e) {
streamingInsertsResults.updateStreamingInsertsMetrics(ref);
throw new RuntimeException(e.getCause());
}

Expand All @@ -1237,6 +1253,8 @@ <T> long insertAll(
}
rowsToPublish = retryRows;
idsToPublish = retryIds;
streamingInsertsResults.updateRetriedRowsWithStatus(
BigQuerySinkMetrics.INTERNAL, retryRows.size());
// print first 5 failures
int numErrorToLog = Math.min(allErrors.size(), 5);
LOG.info(
Expand All @@ -1258,6 +1276,8 @@ <T> long insertAll(
}
}
}
streamingInsertsResults.updateSuccessfulAndFailedRows(rowList.size(), allErrors.size());
streamingInsertsResults.updateStreamingInsertsMetrics(ref);
if (!allErrors.isEmpty()) {
throw new IOException("Insert failed: " + allErrors);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.beam.sdk.metrics.Histogram;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.util.HistogramData;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;

Expand All @@ -45,12 +46,14 @@
*/
public class BigQuerySinkMetrics {
private static boolean supportMetricsDeletion = false;
private static boolean supportStreamingInsertsMetrics = false;

public static final String METRICS_NAMESPACE = "BigQuerySink";

// Status codes
private static final String UNKNOWN = Status.Code.UNKNOWN.toString();
public static final String UNKNOWN = Status.Code.UNKNOWN.toString();
public static final String OK = Status.Code.OK.toString();
static final String INTERNAL = "INTERNAL";
public static final String PAYLOAD_TOO_LARGE = "PayloadTooLarge";

// Base Metric names
Expand All @@ -59,8 +62,9 @@ public class BigQuerySinkMetrics {
private static final String APPEND_ROWS_ROW_STATUS = "RowsAppendedCount";
public static final String THROTTLED_TIME = "ThrottledTime";

// StorageWriteAPI Method names
// BigQuery Write Method names
public enum RpcMethod {
STREAMING_INSERTS,
APPEND_ROWS,
FLUSH_ROWS,
FINALIZE_STREAM
Expand Down Expand Up @@ -167,8 +171,8 @@ public static Optional<ParsedMetricName> parseMetricName(String metricName) {
* 'RpcRequests-Method:{method}RpcStatus:{status};TableId:{tableId}' TableId label is dropped
* if 'supportsMetricsDeletion' is not enabled.
*/
private static Counter createRPCRequestCounter(
RpcMethod method, String rpcStatus, String tableId) {
@VisibleForTesting
static Counter createRPCRequestCounter(RpcMethod method, String rpcStatus, String tableId) {
NavigableMap<String, String> metricLabels = new TreeMap<String, String>();
metricLabels.put(RPC_STATUS_LABEL, rpcStatus);
metricLabels.put(RPC_METHOD, method.toString());
Expand All @@ -189,7 +193,7 @@ private static Counter createRPCRequestCounter(
* @param method StorageWriteAPI method associated with this metric.
* @return Histogram with exponential buckets with a sqrt(2) growth factor.
*/
private static Histogram createRPCLatencyHistogram(RpcMethod method) {
static Histogram createRPCLatencyHistogram(RpcMethod method) {
NavigableMap<String, String> metricLabels = new TreeMap<String, String>();
metricLabels.put(RPC_METHOD, method.toString());
String fullMetricName = createLabeledMetricName(RPC_LATENCY, metricLabels);
Expand Down Expand Up @@ -326,6 +330,22 @@ public static void reportFailedRPCMetrics(
updateRpcLatencyMetric(c, method);
}

/**
* Returns a container to store metrics for BigQuery's {@code Streaming Inserts} RPC. If these
* metrics are disabled, then we return a no-op container.
*/
static StreamingInsertsMetrics streamingInsertsMetrics() {
if (supportStreamingInsertsMetrics) {
return StreamingInsertsMetrics.StreamingInsertsMetricsImpl.create();
} else {
return StreamingInsertsMetrics.NoOpStreamingInsertsMetrics.getInstance();
}
}

public static void setSupportStreamingInsertsMetrics(boolean supportStreamingInsertsMetrics) {
BigQuerySinkMetrics.supportStreamingInsertsMetrics = supportStreamingInsertsMetrics;
}

public static void setSupportMetricsDeletion(boolean supportMetricsDeletion) {
BigQuerySinkMetrics.supportMetricsDeletion = supportMetricsDeletion;
}
Expand Down
Loading

0 comments on commit 32e7a6a

Please sign in to comment.