From 784a473b3d3891e41afca3588dd3eda7a7bd3def Mon Sep 17 00:00:00 2001 From: Shourya Dutta Biswas <114977491+shourya035@users.noreply.github.com> Date: Mon, 21 Aug 2023 11:00:27 +0530 Subject: [PATCH] [Remote Store] Add cumulative bytes lag to NodesStats (#9393) --------- Signed-off-by: Shourya Dutta Biswas <114977491+shourya035@users.noreply.github.com> --- CHANGELOG.md | 2 +- .../indices/stats/IndexStatsIT.java | 1 + .../RemoteSegmentStatsFromNodesStatsIT.java | 12 ++- .../index/remote/RemoteSegmentStats.java | 87 +++++++++++++++---- .../cluster/node/stats/NodeStatsTests.java | 4 +- 5 files changed, 86 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index efc19b5b77af9..e859874d1cbf1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -48,7 +48,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Change http code on create index API with bad input raising NotXContentException from 500 to 400 ([#4773](https://github.com/opensearch-project/OpenSearch/pull/4773)) - Improve summary error message for invalid setting updates ([#4792](https://github.com/opensearch-project/OpenSearch/pull/4792)) - [Remote Store] Add Segment download stats to remotestore stats API ([#8718](https://github.com/opensearch-project/OpenSearch/pull/8718)) -- [Remote Store] Add remote segment transfer stats on NodesStats API ([#9168](https://github.com/opensearch-project/OpenSearch/pull/9168)) +- [Remote Store] Add remote segment transfer stats on NodesStats API ([#9168](https://github.com/opensearch-project/OpenSearch/pull/9168) [#9393](https://github.com/opensearch-project/OpenSearch/pull/9393)) - Return 409 Conflict HTTP status instead of 503 on failure to concurrently execute snapshots ([#8986](https://github.com/opensearch-project/OpenSearch/pull/5855)) ### Deprecated diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java index ef3c2c1235a3c..af5191d7d2039 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java @@ -1454,6 +1454,7 @@ private void assertZeroRemoteSegmentStats(RemoteSegmentStats remoteSegmentStats) assertEquals(0, remoteSegmentStats.getDownloadBytesStarted()); assertEquals(0, remoteSegmentStats.getDownloadBytesSucceeded()); assertEquals(0, remoteSegmentStats.getDownloadBytesFailed()); + assertEquals(0, remoteSegmentStats.getTotalRefreshBytesLag()); assertEquals(0, remoteSegmentStats.getMaxRefreshBytesLag()); assertEquals(0, remoteSegmentStats.getMaxRefreshTimeLag()); } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteSegmentStatsFromNodesStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteSegmentStatsFromNodesStatsIT.java index 71a174e300fe8..19ad43b503ab7 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteSegmentStatsFromNodesStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteSegmentStatsFromNodesStatsIT.java @@ -67,7 +67,7 @@ public void testNodesStatsParityWithOnlyPrimaryShards() { indexSingleDoc(secondIndex, true); long cumulativeUploadsSucceeded = 0, cumulativeUploadsStarted = 0, cumulativeUploadsFailed = 0; - long max_bytes_lag = 0, max_time_lag = 0; + long total_bytes_lag = 0, max_bytes_lag = 0, max_time_lag = 0; // Fetch upload stats RemoteStoreStatsResponse remoteStoreStatsFirstIndex = client(randomDataNode).admin() .cluster() @@ -77,6 +77,7 @@ public void testNodesStatsParityWithOnlyPrimaryShards() { cumulativeUploadsSucceeded += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesSucceeded; cumulativeUploadsStarted += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesStarted; cumulativeUploadsFailed += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesFailed; + total_bytes_lag += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag; max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag); max_time_lag = Math.max(max_time_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs); @@ -85,9 +86,11 @@ public void testNodesStatsParityWithOnlyPrimaryShards() { .prepareRemoteStoreStats(secondIndex, "0") .setLocal(true) .get(); + cumulativeUploadsSucceeded += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesSucceeded; cumulativeUploadsStarted += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesStarted; cumulativeUploadsFailed += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesFailed; + total_bytes_lag += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag; max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag); max_time_lag = Math.max(max_time_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs); @@ -101,6 +104,7 @@ public void testNodesStatsParityWithOnlyPrimaryShards() { assertEquals(cumulativeUploadsSucceeded, remoteSegmentStats.getUploadBytesSucceeded()); assertEquals(cumulativeUploadsStarted, remoteSegmentStats.getUploadBytesStarted()); assertEquals(cumulativeUploadsFailed, remoteSegmentStats.getUploadBytesFailed()); + assertEquals(total_bytes_lag, remoteSegmentStats.getTotalRefreshBytesLag()); assertEquals(max_bytes_lag, remoteSegmentStats.getMaxRefreshBytesLag()); assertEquals(max_time_lag, remoteSegmentStats.getMaxRefreshTimeLag()); } @@ -173,6 +177,7 @@ private void assertZeroRemoteSegmentStats(RemoteSegmentStats remoteSegmentStats) assertEquals(0, remoteSegmentStats.getDownloadBytesStarted()); assertEquals(0, remoteSegmentStats.getDownloadBytesSucceeded()); assertEquals(0, remoteSegmentStats.getDownloadBytesFailed()); + assertEquals(0, remoteSegmentStats.getTotalRefreshBytesLag()); assertEquals(0, remoteSegmentStats.getMaxRefreshBytesLag()); assertEquals(0, remoteSegmentStats.getMaxRefreshTimeLag()); } @@ -181,7 +186,7 @@ private static void assertNodeStatsParityAcrossNodes(String firstIndex, String s for (String dataNode : internalCluster().getDataNodeNames()) { long cumulativeUploadsSucceeded = 0, cumulativeUploadsStarted = 0, cumulativeUploadsFailed = 0; long cumulativeDownloadsSucceeded = 0, cumulativeDownloadsStarted = 0, cumulativeDownloadsFailed = 0; - long max_bytes_lag = 0, max_time_lag = 0; + long total_bytes_lag = 0, max_bytes_lag = 0, max_time_lag = 0; // Fetch upload stats RemoteStoreStatsResponse remoteStoreStatsFirstIndex = client(dataNode).admin() .cluster() @@ -197,6 +202,7 @@ private static void assertNodeStatsParityAcrossNodes(String firstIndex, String s .getSegmentStats().directoryFileTransferTrackerStats.transferredBytesStarted; cumulativeDownloadsFailed += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0] .getSegmentStats().directoryFileTransferTrackerStats.transferredBytesFailed; + total_bytes_lag += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag; max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag); max_time_lag = Math.max(max_time_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs); @@ -214,6 +220,7 @@ private static void assertNodeStatsParityAcrossNodes(String firstIndex, String s .getSegmentStats().directoryFileTransferTrackerStats.transferredBytesStarted; cumulativeDownloadsFailed += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0] .getSegmentStats().directoryFileTransferTrackerStats.transferredBytesFailed; + total_bytes_lag += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag; max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag); max_time_lag = Math.max(max_time_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs); @@ -230,6 +237,7 @@ private static void assertNodeStatsParityAcrossNodes(String firstIndex, String s assertEquals(cumulativeDownloadsSucceeded, remoteSegmentStats.getDownloadBytesSucceeded()); assertEquals(cumulativeDownloadsStarted, remoteSegmentStats.getDownloadBytesStarted()); assertEquals(cumulativeDownloadsFailed, remoteSegmentStats.getDownloadBytesFailed()); + assertEquals(total_bytes_lag, remoteSegmentStats.getTotalRefreshBytesLag()); assertEquals(max_bytes_lag, remoteSegmentStats.getMaxRefreshBytesLag()); assertEquals(max_time_lag, remoteSegmentStats.getMaxRefreshTimeLag()); } diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteSegmentStats.java b/server/src/main/java/org/opensearch/index/remote/RemoteSegmentStats.java index f834f4ad9583d..0ff61d49c00f8 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteSegmentStats.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteSegmentStats.java @@ -8,6 +8,7 @@ package org.opensearch.index.remote; +import org.opensearch.Version; import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStats; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.common.io.stream.StreamInput; @@ -61,6 +62,11 @@ public class RemoteSegmentStats implements Writeable, ToXContentFragment { * Used to check for data freshness in the remote store */ private long maxRefreshBytesLag; + /** + * Total refresh lag (in bytes) between local and the remote store + * Used to check for data freshness in the remote store + */ + private long totalRefreshBytesLag; public RemoteSegmentStats() {} @@ -73,6 +79,19 @@ public RemoteSegmentStats(StreamInput in) throws IOException { downloadBytesSucceeded = in.readLong(); maxRefreshTimeLag = in.readLong(); maxRefreshBytesLag = in.readLong(); + /* TODO: + Adding version checks here since the base PR of adding remote store stats + in SegmentStats has already been merged and backported to 2.x branch. + + Since this is a new field that is being added, we need to have this check in place + to ensure BWCs don't break. + + This would have to be removed after the new field addition PRs are also backported to 2.x. + If possible we would need to ensure that all field addition PRs are backported at once + */ + if (in.getVersion().onOrAfter(Version.V_3_0_0)) { + totalRefreshBytesLag = in.readLong(); + } } /** @@ -91,7 +110,11 @@ public RemoteSegmentStats(RemoteSegmentTransferTracker.Stats trackerStats) { this.downloadBytesStarted = trackerStats.directoryFileTransferTrackerStats.transferredBytesStarted; this.downloadBytesFailed = trackerStats.directoryFileTransferTrackerStats.transferredBytesFailed; this.maxRefreshTimeLag = trackerStats.refreshTimeLagMs; + // Initializing both total and max bytes lag to the same `bytesLag` + // value from the tracker object + // Aggregations would be performed on the add method this.maxRefreshBytesLag = trackerStats.bytesLag; + this.totalRefreshBytesLag = trackerStats.bytesLag; } // Getter and setters. All are visible for testing @@ -155,8 +178,16 @@ public long getMaxRefreshBytesLag() { return maxRefreshBytesLag; } - public void setMaxRefreshBytesLag(long maxRefreshBytesLag) { - this.maxRefreshBytesLag = maxRefreshBytesLag; + public void addMaxRefreshBytesLag(long maxRefreshBytesLag) { + this.maxRefreshBytesLag = Math.max(this.maxRefreshBytesLag, maxRefreshBytesLag); + } + + public long getTotalRefreshBytesLag() { + return totalRefreshBytesLag; + } + + public void addTotalRefreshBytesLag(long totalRefreshBytesLag) { + this.totalRefreshBytesLag += totalRefreshBytesLag; } /** @@ -174,6 +205,7 @@ public void add(RemoteSegmentStats existingStats) { this.downloadBytesSucceeded += existingStats.getDownloadBytesSucceeded(); this.maxRefreshTimeLag = Math.max(this.maxRefreshTimeLag, existingStats.getMaxRefreshTimeLag()); this.maxRefreshBytesLag = Math.max(this.maxRefreshBytesLag, existingStats.getMaxRefreshBytesLag()); + this.totalRefreshBytesLag += existingStats.getTotalRefreshBytesLag(); } } @@ -187,33 +219,53 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(downloadBytesSucceeded); out.writeLong(maxRefreshTimeLag); out.writeLong(maxRefreshBytesLag); + /* TODO: + Adding version checks here since the base PR of adding remote store stats + in SegmentStats has already been merged and backported to 2.x branch. + + Since this is a new field that is being added, we need to have this check in place + to ensure BWCs don't break. + + This would have to be removed after the new field addition PRs are also backported to 2.x. + If possible we would need to ensure that all field addition PRs are backported at once + */ + if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + out.writeLong(totalRefreshBytesLag); + } } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(Fields.REMOTE_STORE); builder.startObject(Fields.UPLOAD); + buildUploadStats(builder); + builder.endObject(); + builder.startObject(Fields.DOWNLOAD); + buildDownloadStats(builder); + builder.endObject(); + builder.endObject(); + return builder; + } + + private void buildUploadStats(XContentBuilder builder) throws IOException { builder.startObject(Fields.TOTAL_UPLOADS); builder.humanReadableField(Fields.STARTED_BYTES, Fields.STARTED, new ByteSizeValue(uploadBytesStarted)); builder.humanReadableField(Fields.SUCCEEDED_BYTES, Fields.SUCCEEDED, new ByteSizeValue(uploadBytesSucceeded)); builder.humanReadableField(Fields.FAILED_BYTES, Fields.FAILED, new ByteSizeValue(uploadBytesFailed)); builder.endObject(); - builder.humanReadableField(Fields.MAX_REFRESH_TIME_LAG_IN_MILLIS, Fields.MAX_REFRESH_TIME_LAG, new TimeValue(maxRefreshTimeLag)); - builder.humanReadableField( - Fields.MAX_REFRESH_SIZE_LAG_IN_MILLIS, - Fields.MAX_REFRESH_SIZE_LAG, - new ByteSizeValue(maxRefreshBytesLag) - ); + builder.startObject(Fields.REFRESH_SIZE_LAG); + builder.humanReadableField(Fields.TOTAL_BYTES, Fields.TOTAL, new ByteSizeValue(totalRefreshBytesLag)); + builder.humanReadableField(Fields.MAX_BYTES, Fields.MAX, new ByteSizeValue(maxRefreshBytesLag)); builder.endObject(); - builder.startObject(Fields.DOWNLOAD); + builder.humanReadableField(Fields.MAX_REFRESH_TIME_LAG_IN_MILLIS, Fields.MAX_REFRESH_TIME_LAG, new TimeValue(maxRefreshTimeLag)); + } + + private void buildDownloadStats(XContentBuilder builder) throws IOException { builder.startObject(Fields.TOTAL_DOWNLOADS); builder.humanReadableField(Fields.STARTED_BYTES, Fields.STARTED, new ByteSizeValue(downloadBytesStarted)); builder.humanReadableField(Fields.SUCCEEDED_BYTES, Fields.SUCCEEDED, new ByteSizeValue(downloadBytesSucceeded)); builder.humanReadableField(Fields.FAILED_BYTES, Fields.FAILED, new ByteSizeValue(downloadBytesFailed)); builder.endObject(); - builder.endObject(); - builder.endObject(); - return builder; } static final class Fields { @@ -222,15 +274,18 @@ static final class Fields { static final String DOWNLOAD = "download"; static final String TOTAL_UPLOADS = "total_uploads"; static final String TOTAL_DOWNLOADS = "total_downloads"; + static final String MAX_REFRESH_TIME_LAG = "max_refresh_time_lag"; + static final String MAX_REFRESH_TIME_LAG_IN_MILLIS = "max_refresh_time_lag_in_millis"; + static final String REFRESH_SIZE_LAG = "refresh_size_lag"; static final String STARTED = "started"; static final String STARTED_BYTES = "started_bytes"; static final String FAILED = "failed"; static final String FAILED_BYTES = "failed_bytes"; static final String SUCCEEDED = "succeeded"; static final String SUCCEEDED_BYTES = "succeeded_bytes"; - static final String MAX_REFRESH_TIME_LAG = "max_refresh_time_lag"; - static final String MAX_REFRESH_TIME_LAG_IN_MILLIS = "max_refresh_time_lag_in_millis"; - static final String MAX_REFRESH_SIZE_LAG = "max_refresh_size_lag"; - static final String MAX_REFRESH_SIZE_LAG_IN_MILLIS = "max_refresh_size_lag_in_bytes"; + static final String TOTAL = "total"; + static final String TOTAL_BYTES = "total_bytes"; + static final String MAX = "max"; + static final String MAX_BYTES = "max_bytes"; } } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java index e6460e429bd42..fbe70748adf2d 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -459,6 +459,7 @@ public void testSerialization() throws IOException { assertEquals(remoteSegmentStats.getUploadBytesFailed(), deserializedRemoteSegmentStats.getUploadBytesFailed()); assertEquals(remoteSegmentStats.getMaxRefreshTimeLag(), deserializedRemoteSegmentStats.getMaxRefreshTimeLag()); assertEquals(remoteSegmentStats.getMaxRefreshBytesLag(), deserializedRemoteSegmentStats.getMaxRefreshBytesLag()); + assertEquals(remoteSegmentStats.getTotalRefreshBytesLag(), deserializedRemoteSegmentStats.getTotalRefreshBytesLag()); } } } @@ -789,7 +790,8 @@ private static NodeIndicesStats getNodeIndicesStats(boolean remoteStoreStats) { remoteSegmentStats.addDownloadBytesStarted(10L); remoteSegmentStats.addDownloadBytesSucceeded(10L); remoteSegmentStats.addDownloadBytesFailed(1L); - remoteSegmentStats.setMaxRefreshBytesLag(5L); + remoteSegmentStats.addTotalRefreshBytesLag(5L); + remoteSegmentStats.addMaxRefreshBytesLag(2L); remoteSegmentStats.setMaxRefreshTimeLag(2L); } return indicesStats;