diff --git a/server/src/main/java/org/opensearch/common/util/DoubleMovingAverage.java b/server/src/main/java/org/opensearch/common/util/DoubleMovingAverage.java new file mode 100644 index 0000000000000..5ce42bfbb1dc5 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/util/DoubleMovingAverage.java @@ -0,0 +1,85 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.util; + +/** + * MovingAverage is used to calculate the moving average of last 'n' observations of double type. + * + * @opensearch.internal + */ +public class DoubleMovingAverage { + private final int windowSize; + private final double[] observations; + + private volatile long count = 0; + private volatile long sum = 0; + private volatile double average = 0; + + public DoubleMovingAverage(int windowSize) { + checkWindowSize(windowSize); + this.windowSize = windowSize; + this.observations = new double[windowSize]; + } + + /** + * Used for changing the window size of {@code MovingAverage}. + * + * @param newWindowSize new window size. + * @return copy of original object with updated size. + */ + public DoubleMovingAverage copyWithSize(int newWindowSize) { + DoubleMovingAverage copy = new DoubleMovingAverage(newWindowSize); + // Start is inclusive, but end is exclusive + long start, end = count; + if (isReady() == false) { + start = 0; + } else { + start = end - windowSize; + } + // If the newWindow Size is smaller than the elements eligible to be copied over, then we adjust the start value + if (end - start > newWindowSize) { + start = end - newWindowSize; + } + for (int i = (int) start; i < end; i++) { + copy.record(observations[i % observations.length]); + } + return copy; + } + + private void checkWindowSize(int size) { + if (size <= 0) { + throw new IllegalArgumentException("window size must be greater than zero"); + } + } + + /** + * Records a new observation and evicts the n-th last observation. + */ + public synchronized double record(double value) { + double delta = value - observations[(int) (count % observations.length)]; + observations[(int) (count % observations.length)] = value; + + count++; + sum += delta; + average = sum / Math.min(count, observations.length); + return average; + } + + public double getAverage() { + return average; + } + + public long getCount() { + return count; + } + + public boolean isReady() { + return count >= windowSize; + } +} diff --git a/server/src/main/java/org/opensearch/common/util/MovingAverage.java b/server/src/main/java/org/opensearch/common/util/MovingAverage.java index 50d863709d489..11afb33b1a347 100644 --- a/server/src/main/java/org/opensearch/common/util/MovingAverage.java +++ b/server/src/main/java/org/opensearch/common/util/MovingAverage.java @@ -18,7 +18,7 @@ public class MovingAverage { private final long[] observations; private volatile long count = 0; - private volatile long sum = 0; + private volatile double sum = 0; private volatile double average = 0; public MovingAverage(int windowSize) { @@ -67,7 +67,7 @@ public synchronized double record(long value) { count++; sum += delta; - average = (double) sum / Math.min(count, observations.length); + average = sum / Math.min(count, observations.length); return average; } diff --git a/server/src/main/java/org/opensearch/monitor/fs/FsInfo.java b/server/src/main/java/org/opensearch/monitor/fs/FsInfo.java index a4b6843d177f5..2859ada3b22a8 100644 --- a/server/src/main/java/org/opensearch/monitor/fs/FsInfo.java +++ b/server/src/main/java/org/opensearch/monitor/fs/FsInfo.java @@ -32,6 +32,8 @@ package org.opensearch.monitor.fs; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.opensearch.Version; import org.opensearch.common.Nullable; import org.opensearch.core.common.io.stream.StreamInput; @@ -41,6 +43,7 @@ import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.ToXContentObject; import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.throttling.tracker.AverageCpuUsageTracker; import java.io.IOException; import java.util.Arrays; @@ -223,6 +226,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws * @opensearch.internal] */ public static class DeviceStats implements Writeable, ToXContentFragment { + private static final Logger logger = LogManager.getLogger(DeviceStats.class); final int majorDeviceNumber; final int minorDeviceNumber; @@ -389,11 +393,14 @@ public long operations() { public long readOperations() { if (previousReadsCompleted == -1) return -1; + //logger.info("Current reads : {} , Previous reads : {}", currentReadsCompleted, previousReadsCompleted); + return (currentReadsCompleted - previousReadsCompleted); } public long writeOperations() { if (previousWritesCompleted == -1) return -1; + //logger.info("Current writes : {} , Previous writes : {}", currentWritesCompleted, previousWritesCompleted); return (currentWritesCompleted - previousWritesCompleted); } diff --git a/server/src/main/java/org/opensearch/node/NodePerformanceStatistics.java b/server/src/main/java/org/opensearch/node/NodePerformanceStatistics.java index 45a2ca242de68..2119b032bc38b 100644 --- a/server/src/main/java/org/opensearch/node/NodePerformanceStatistics.java +++ b/server/src/main/java/org/opensearch/node/NodePerformanceStatistics.java @@ -11,6 +11,7 @@ import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.throttling.tracker.AverageDiskStats; import java.io.IOException; import java.util.Locale; @@ -25,10 +26,14 @@ public class NodePerformanceStatistics implements Writeable { double cpuUtilizationPercent; double memoryUtilizationPercent; - public NodePerformanceStatistics(String nodeId, double cpuUtilizationPercent, double memoryUtilizationPercent, long timestamp) { + AverageDiskStats averageDiskStats; + + public NodePerformanceStatistics(String nodeId, double cpuUtilizationPercent, double memoryUtilizationPercent, + AverageDiskStats averageDiskStats, long timestamp) { this.nodeId = nodeId; this.cpuUtilizationPercent = cpuUtilizationPercent; this.memoryUtilizationPercent = memoryUtilizationPercent; + this.averageDiskStats = averageDiskStats; this.timestamp = timestamp; } @@ -36,6 +41,7 @@ public NodePerformanceStatistics(StreamInput in) throws IOException { this.nodeId = in.readString(); this.cpuUtilizationPercent = in.readDouble(); this.memoryUtilizationPercent = in.readDouble(); + this.averageDiskStats = new AverageDiskStats(in); this.timestamp = in.readLong(); } @@ -44,6 +50,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(this.nodeId); out.writeDouble(this.cpuUtilizationPercent); out.writeDouble(this.memoryUtilizationPercent); + this.averageDiskStats.writeTo(out); out.writeLong(this.timestamp); } @@ -63,6 +70,7 @@ public String toString() { nodePerformanceStatistics.nodeId, nodePerformanceStatistics.cpuUtilizationPercent, nodePerformanceStatistics.memoryUtilizationPercent, + nodePerformanceStatistics.averageDiskStats, nodePerformanceStatistics.timestamp ); } diff --git a/server/src/main/java/org/opensearch/node/NodesPerformanceStats.java b/server/src/main/java/org/opensearch/node/NodesPerformanceStats.java index 7b1f45514f919..84fdc1acbbcb0 100644 --- a/server/src/main/java/org/opensearch/node/NodesPerformanceStats.java +++ b/server/src/main/java/org/opensearch/node/NodesPerformanceStats.java @@ -60,6 +60,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws "elapsed_time", new TimeValue(System.currentTimeMillis() - perfStats.timestamp, TimeUnit.MILLISECONDS).toString() ); + perfStats.averageDiskStats.toXContent(builder, params); } builder.endObject(); } diff --git a/server/src/main/java/org/opensearch/node/PerformanceCollectorService.java b/server/src/main/java/org/opensearch/node/PerformanceCollectorService.java index ebd448a2ad1a5..bf8fd3fd974d0 100644 --- a/server/src/main/java/org/opensearch/node/PerformanceCollectorService.java +++ b/server/src/main/java/org/opensearch/node/PerformanceCollectorService.java @@ -13,6 +13,7 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.util.concurrent.ConcurrentCollections; +import org.opensearch.throttling.tracker.AverageDiskStats; import java.util.HashMap; import java.util.Map; @@ -43,13 +44,17 @@ void removeNode(String nodeId) { nodeIdToPerfStats.remove(nodeId); } - public void addNodePerfStatistics(String nodeId, double cpuUtilizationPercent, double memoryUtilizationPercent, long timestamp) { + public void addNodePerfStatistics(String nodeId, double cpuUtilizationPercent, double memoryUtilizationPercent, + AverageDiskStats averageDiskStats, + long timestamp) { nodeIdToPerfStats.compute(nodeId, (id, nodePerfStats) -> { if (nodePerfStats == null) { - return new NodePerformanceStatistics(nodeId, cpuUtilizationPercent, memoryUtilizationPercent, timestamp); + return new NodePerformanceStatistics(nodeId, cpuUtilizationPercent, memoryUtilizationPercent, + averageDiskStats, timestamp); } else { nodePerfStats.cpuUtilizationPercent = cpuUtilizationPercent; nodePerfStats.memoryUtilizationPercent = memoryUtilizationPercent; + nodePerfStats.averageDiskStats = averageDiskStats; nodePerfStats.timestamp = timestamp; return nodePerfStats; } diff --git a/server/src/main/java/org/opensearch/throttling/tracker/AverageDiskStats.java b/server/src/main/java/org/opensearch/throttling/tracker/AverageDiskStats.java new file mode 100644 index 0000000000000..544dd862a859c --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/tracker/AverageDiskStats.java @@ -0,0 +1,97 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.tracker; + +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Locale; +import java.util.concurrent.TimeUnit; + +public class AverageDiskStats implements Writeable { + private final double readIopsAverage; + private final double writeIopsAverage; + private final double readKbAverage; + private final double writeKbAverage; + private final double readLatencyAverage; + private final double writeLatencyAverage; + private final double ioUtilizationPercent; + + public AverageDiskStats(double readIopsAverage, double writeIopsAverage, double readKbAverage, double writeKbAverage, + double readLatencyAverage, double writeLatencyAverage, double ioUtilizationPercent) { + this.readIopsAverage = readIopsAverage; + this.writeIopsAverage = writeIopsAverage; + this.readKbAverage = readKbAverage; + this.writeKbAverage = writeKbAverage; + this.readLatencyAverage = readLatencyAverage; + this.writeLatencyAverage = writeLatencyAverage; + this.ioUtilizationPercent = ioUtilizationPercent; + } + + public AverageDiskStats(StreamInput in) throws IOException { + this.readIopsAverage = in.readDouble(); + this.readKbAverage = in.readDouble(); + this.readLatencyAverage = in.readDouble(); + this.writeIopsAverage = in.readDouble(); + this.writeKbAverage = in.readDouble(); + this.writeLatencyAverage = in.readDouble(); + this.ioUtilizationPercent = in.readDouble(); + } + + public double getIoUtilizationPercent() { + return ioUtilizationPercent; + } + + public double getReadIopsAverage() { + return readIopsAverage; + } + + public double getReadKbAverage() { + return readKbAverage; + } + + public double getReadLatencyAverage() { + return readLatencyAverage; + } + + public double getWriteIopsAverage() { + return writeIopsAverage; + } + + public double getWriteKbAverage() { + return writeKbAverage; + } + + public double getWriteLatencyAverage() { + return writeLatencyAverage; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + + } + + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + builder.startObject("io_stats"); + builder.field("read_iops_average", String.format(Locale.ROOT, "%.1f", readIopsAverage )); + builder.field("write_iops_average", String.format(Locale.ROOT, "%.1f", writeIopsAverage)); + builder.field("read_throughput_average", String.format(Locale.ROOT, "%.1f", readKbAverage)); + builder.field("write_throughput_average", String.format(Locale.ROOT, "%.1f", writeKbAverage)); + builder.field("read_latency_average", String.format(Locale.ROOT, "%.8f", readLatencyAverage)); + builder.field("write_latency_average", String.format(Locale.ROOT, "%.8f", writeLatencyAverage)); + builder.field("io_utilization_percent", String.format(Locale.ROOT, "%.3f", ioUtilizationPercent)); + builder.endObject(); + return builder; + } +} diff --git a/server/src/main/java/org/opensearch/throttling/tracker/AverageIOUsageTracker.java b/server/src/main/java/org/opensearch/throttling/tracker/AverageIOUsageTracker.java index aa68b865c7964..01b9961d8ea77 100644 --- a/server/src/main/java/org/opensearch/throttling/tracker/AverageIOUsageTracker.java +++ b/server/src/main/java/org/opensearch/throttling/tracker/AverageIOUsageTracker.java @@ -10,26 +10,40 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.common.lifecycle.AbstractLifecycleComponent; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.unit.TimeValue; -import org.opensearch.monitor.fs.FsInfo; +import org.opensearch.common.util.DoubleMovingAverage; +import org.opensearch.common.util.MovingAverage; import org.opensearch.monitor.fs.FsService; +import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.ThreadPool; +import java.io.IOException; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.Map; -import java.util.Queue; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; -public class AverageIOUsageTracker extends AbstractAverageUsageTracker { +public class AverageIOUsageTracker extends AbstractLifecycleComponent { private static final Logger logger = LogManager.getLogger(AverageCpuUsageTracker.class); - private Map previousIOTimeMap; - private FsService fsService; + private final ThreadPool threadPool; + + private IoUsageFetcher ioUsageFetcher; + private final TimeValue windowDuration; + private final TimeValue pollingInterval; + private volatile Scheduler.Cancellable scheduledFuture; + private final AtomicReference ioTimeObservations = new AtomicReference<>(); + private final AtomicReference readIopsObservations = new AtomicReference<>(); + private final AtomicReference writeIopsObservations = new AtomicReference<>(); + private final AtomicReference readKbObservations = new AtomicReference<>(); + private final AtomicReference writeKbObservations = new AtomicReference<>(); + private final AtomicReference readLatencyObservations = new AtomicReference<>(); + private final AtomicReference writeLatencyObservations = new AtomicReference<>(); + + private long runs = 1; + public AverageIOUsageTracker( ThreadPool threadPool, TimeValue pollingInterval, @@ -37,14 +51,21 @@ public AverageIOUsageTracker( ClusterSettings clusterSettings, FsService fsService ) { - super(threadPool, pollingInterval, windowDuration); + //super(threadPool, pollingInterval, windowDuration); setFsService(fsService); + this.threadPool = threadPool; + this.pollingInterval = pollingInterval; + this.windowDuration = windowDuration; + this.setWindowDuration(windowDuration); + this.ioUsageFetcher = new IoUsageFetcher(fsService); clusterSettings.addSettingsUpdateConsumer( - PerformanceTrackerSettings.GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING, + PerformanceTrackerSettings.GLOBAL_IO_WINDOW_DURATION_SETTING, this::setWindowDuration ); } + + public FsService getFsService() { return fsService; } @@ -53,66 +74,92 @@ public void setFsService(FsService fsService) { this.fsService = fsService; } - class DevicePreviousStats { - public long ioTime; - public double readTime; - public double writeTime; - public double readOps; - public double writeOps; - public DevicePreviousStats(long ioTime, double readTime, double writeTime, double readOps, double writeOps) { - this.ioTime = ioTime; - this.readTime = readTime; - this.writeTime = writeTime; - this.readOps = readOps; - this.writeOps = writeOps; - } + public double getIoPercentAverage() { + return ioTimeObservations.get().getAverage(); } - private long monitorIOUtilisation() { - logger.info("IO stats is triggered"); - Map currentIOTimeMap = new HashMap<>(); - for (FsInfo.DeviceStats devicesStat : this.fsService.stats().getIoStats().getDevicesStats()) { - logger.info("Device Id: {} , IO time : {}", devicesStat.getDeviceName(), devicesStat.getCurrentIOTime()); - logger.info("Read Latency : {} , Write latency : {} ", devicesStat.getCurrentReadLatency(), devicesStat.getCurrentWriteLatency()); - logger.info("Write time : {} , Read time : {}", devicesStat.getCurrentWriteTime(), devicesStat.getCurrentReadTime()); - logger.info("Read latency diff : {} , Write latency diff : {}", devicesStat.getReadLatency(), devicesStat.getWriteLatency()); - logger.info("Read time diff : {}, Write time diff : {}", devicesStat.getReadTime(), devicesStat.getWriteTime()); + public double getReadIopsAverage() { + return readIopsObservations.get().getAverage(); + } - logger.info("Read latency : " + devicesStat.getNewReadLatency() + " Write latency : " + devicesStat.getNewWriteLatency()); + public double getWriteIopsAverage() { + return writeIopsObservations.get().getAverage(); + } + public double getReadKbAverage() { + return readKbObservations.get().getAverage(); + } - if (previousIOTimeMap.containsKey(devicesStat.getDeviceName())){ - long ioSpentTime = devicesStat.getCurrentIOTime() - previousIOTimeMap.get(devicesStat.getDeviceName()).ioTime; - double ioUsePercent = (double) (ioSpentTime * 100) / (10 * 1000); - //ioExecutionEWMA.addValue(ioUsePercent / 100.0); + public double getWriteKbAverage() { + return writeKbObservations.get().getAverage(); + } - double readOps = devicesStat.currentReadOperations() - previousIOTimeMap.get(devicesStat.getDeviceName()).readOps; - double writeOps = devicesStat.currentWriteOpetations() - previousIOTimeMap.get(devicesStat.getDeviceName()).writeOps; + public double getReadLatencyAverage() { + return readLatencyObservations.get().getAverage(); + } - double readTime = devicesStat.getCurrentReadTime() - previousIOTimeMap.get(devicesStat.getDeviceName()).readTime; - double writeTime = devicesStat.getWriteTime() - previousIOTimeMap.get(devicesStat.getDeviceName()).writeTime; + public double getWriteLatencyAverage() { + return writeLatencyObservations.get().getAverage(); + } - double readLatency = readOps / readTime; - double wrieLatency = writeOps / writeTime; + public AverageDiskStats getAverageDiskStats() { + return new AverageDiskStats(getReadIopsAverage(), getWriteIopsAverage(), getReadKbAverage(), getWriteKbAverage(), + getReadLatencyAverage(), getWriteLatencyAverage(), getIoPercentAverage()); + } - logger.info("read ops : {} , writeops : {} , readtime: {} , writetime: {}", readOps, writeOps, readTime, writeTime); - logger.info("Read latency final : " + readLatency + "write latency final : " + wrieLatency); + public void setWindowDuration(TimeValue windowDuration) { + int windowSize = (int) (windowDuration.nanos() / pollingInterval.nanos()); + logger.debug("updated window size: {}", windowSize); + ioTimeObservations.set(new MovingAverage(windowSize)); + readIopsObservations.set(new MovingAverage(windowSize)); + writeIopsObservations.set(new MovingAverage(windowSize)); + readKbObservations.set(new MovingAverage(windowSize)); + writeKbObservations.set(new MovingAverage(windowSize)); + readLatencyObservations.set(new DoubleMovingAverage(windowSize)); + writeLatencyObservations.set(new DoubleMovingAverage(windowSize)); + } - } + private void recordUsage(IoUsageFetcher.DiskStats usage) { + if(usage.getIoTime() == 0.0) { + runs++; + return; + } else { + runs = 1; + } + ioTimeObservations.get().record(usage.getIoTime()); + readIopsObservations.get().record((long)usage.getReadOps()); + readKbObservations.get().record(usage.getReadkb()); + double readOps = usage.getReadOps() < 1 ? 1.0 : usage.getReadOps() * 1.0; + double writeOps = usage.getWriteOps() < 1 ? 1.0 : usage.getWriteOps() * 1.0; + double readTime = usage.getReadTime() < 1 ? 0.0 : usage.getReadTime(); + double writeTime = usage.getWriteTime() < 1 ? 0.0 : usage.getWriteTime(); + double readLatency = (readTime / readOps) * runs; + double writeLatency = (writeTime/ writeOps) * runs; + writeLatencyObservations.get().record(writeLatency); + readLatencyObservations.get().record(readLatency); + writeKbObservations.get().record(usage.getWritekb()); + writeIopsObservations.get().record((long) usage.getWriteOps()); + } - DevicePreviousStats ps = new DevicePreviousStats(devicesStat.getCurrentIOTime(), devicesStat.getCurrentReadTime(), - devicesStat.getCurrentWriteTime(), devicesStat.currentReadOperations(), devicesStat.currentWriteOpetations()); + @Override + protected void doStart() { + scheduledFuture = threadPool.scheduleWithFixedDelay(() -> { + IoUsageFetcher.DiskStats usage = getUsage(); + recordUsage(usage); + }, pollingInterval, ThreadPool.Names.GENERIC); + } - currentIOTimeMap.put(devicesStat.getDeviceName(), ps); - return devicesStat.readOperations() + devicesStat.writeOperations(); + @Override + protected void doStop() { + if (scheduledFuture != null) { + scheduledFuture.cancel(); } - previousIOTimeMap = currentIOTimeMap; - return 0; - } @Override - public long getUsage() { - return monitorIOUtilisation(); + protected void doClose() throws IOException {} + + public IoUsageFetcher.DiskStats getUsage() { + return ioUsageFetcher.getDiskUtilizationStats(); } } diff --git a/server/src/main/java/org/opensearch/throttling/tracker/IoUsageFetcher.java b/server/src/main/java/org/opensearch/throttling/tracker/IoUsageFetcher.java new file mode 100644 index 0000000000000..7414ca12892cd --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/tracker/IoUsageFetcher.java @@ -0,0 +1,117 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.tracker; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.monitor.fs.FsInfo; +import org.opensearch.monitor.fs.FsService; + +import java.util.HashMap; +import java.util.Map; + +public class IoUsageFetcher { + private static final Logger logger = LogManager.getLogger(AverageCpuUsageTracker.class); + private Map previousIOTimeMap; + private FsService fsService; + public IoUsageFetcher(FsService fsService){ + this.fsService = fsService; + } + + class DiskStats { + public long ioTime; + public double readTime; + public double writeTime; + public double readOps; + public double writeOps; + public long readkb; + public long writekb; + public DiskStats(long ioTime, double readTime, double writeTime, double readOps, double writeOps, long readkb, long writekb) { + this.ioTime = ioTime; + this.readTime = readTime; + this.writeTime = writeTime; + this.readOps = readOps; + this.writeOps = writeOps; + this.readkb = readkb; + this.writekb = writekb; + } + + public long getIoTime() { + return ioTime; + } + + public double getReadOps() { + return readOps; + } + + public double getReadTime() { + return readTime; + } + + public long getReadkb() { + return readkb; + } + + public double getWriteOps() { + return writeOps; + } + + public double getWriteTime() { + return writeTime; + } + + public long getWritekb() { + return writekb; + } + } + public DiskStats getDiskUtilizationStats() { + Map currentIOTimeMap = new HashMap<>(); + long ioUsePercent = 0; + long readkb = 0; + long writekb = 0; + double readTime = 0; + double writeTime = 0; + double readLatency = 0.0; + double writeLatency = 0.0; + double readOps = 0.0; + double writeOps = 0.0; + for (FsInfo.DeviceStats devicesStat : this.fsService.stats().getIoStats().getDevicesStats()) { + if (previousIOTimeMap != null && previousIOTimeMap.containsKey(devicesStat.getDeviceName())){ + logger.info(this.fsService.stats().getTimestamp()); + long ioSpentTime = devicesStat.getCurrentIOTime() - previousIOTimeMap.get(devicesStat.getDeviceName()).ioTime; + ioUsePercent = (ioSpentTime * 100) / (1000); + readOps += devicesStat.currentReadOperations() - previousIOTimeMap.get(devicesStat.getDeviceName()).readOps; + writeOps += devicesStat.currentWriteOpetations() - previousIOTimeMap.get(devicesStat.getDeviceName()).writeOps; + readkb += devicesStat.readKilobytes() - previousIOTimeMap.get(devicesStat.getDeviceName()).readkb; + writekb += devicesStat.writeKilobytes() - previousIOTimeMap.get(devicesStat.getDeviceName()).writekb; + readTime += devicesStat.getCurrentReadTime() - previousIOTimeMap.get(devicesStat.getDeviceName()).readTime; + writeTime += devicesStat.getCurrentWriteTime() - previousIOTimeMap.get(devicesStat.getDeviceName()).writeTime; + if(readTime < 1) readTime = 1; + if(readOps < 1) readOps = 1; + if(writeOps < 1) writeOps = 1; + if(writeTime < 1) writeTime = 1; + readLatency += (readTime / readOps); + writeLatency += (writeTime / writeOps); + } + DiskStats ps = new DiskStats(devicesStat.getCurrentIOTime(), devicesStat.getCurrentReadTime(), + devicesStat.getCurrentWriteTime(), devicesStat.currentReadOperations(), devicesStat.currentWriteOpetations(), + devicesStat.readKilobytes(), devicesStat.writeKilobytes()); + currentIOTimeMap.put(devicesStat.getDeviceName(), ps); + } + logger.info("Read in MB : {} , Write in MB : {}", readkb/1000, writekb/1000); + readLatency += (readOps / readTime) * 100; + writeLatency += (writeOps / writeTime) * 100; + logger.info("read ops : {} , writeops : {} , readtime: {} , writetime: {}", readOps, writeOps, readTime, writeTime); + logger.info("Read latency : {} write latency : {}" , readLatency, writeLatency); + logger.info("IO use percent : {}", ioUsePercent); + previousIOTimeMap = currentIOTimeMap; + + return new DiskStats(ioUsePercent, readTime, writeTime, readOps, writeOps, readkb, writekb); + } +} diff --git a/server/src/main/java/org/opensearch/throttling/tracker/NodePerformanceTracker.java b/server/src/main/java/org/opensearch/throttling/tracker/NodePerformanceTracker.java index 6fcf668818968..641f18ca6774a 100644 --- a/server/src/main/java/org/opensearch/throttling/tracker/NodePerformanceTracker.java +++ b/server/src/main/java/org/opensearch/throttling/tracker/NodePerformanceTracker.java @@ -27,6 +27,8 @@ public class NodePerformanceTracker extends AbstractLifecycleComponent { private double cpuUtilizationPercent; private double memoryUtilizationPercent; + + private AverageDiskStats averageDiskStats; private ThreadPool threadPool; private volatile Scheduler.Cancellable scheduledFuture; private final ClusterSettings clusterSettings; @@ -68,8 +70,16 @@ private double getAverageMemoryUsed() { return memoryUsageTracker.getAverage(); } - private double getAverageIOUsed() { - return ioUsageTracker.getAverage(); + private AverageDiskStats getAverageIOUsed() { + return ioUsageTracker.getAverageDiskStats(); + } + + private void setAverageDiskStats(AverageDiskStats averageDiskStats) { + this.averageDiskStats = averageDiskStats; + } + + private AverageDiskStats getAverageDiskStats() { + return averageDiskStats; } private void setCpuUtilizationPercent(double cpuUtilizationPercent) { @@ -90,11 +100,13 @@ public double getMemoryUtilizationPercent() { void doRun() { setCpuUtilizationPercent(getAverageCpuUsed()); - setMemoryUtilizationPercent(getAverageIOUsed()); + setMemoryUtilizationPercent(getAverageMemoryUsed()); + setAverageDiskStats(getAverageIOUsed()); performanceCollectorService.addNodePerfStatistics( LOCAL_NODE, getCpuUtilizationPercent(), getMemoryUtilizationPercent(), + getAverageDiskStats(), System.currentTimeMillis() ); } diff --git a/server/src/main/java/org/opensearch/throttling/tracker/PerformanceTrackerSettings.java b/server/src/main/java/org/opensearch/throttling/tracker/PerformanceTrackerSettings.java index cce796fd86b96..3c41a961ceaa5 100644 --- a/server/src/main/java/org/opensearch/throttling/tracker/PerformanceTrackerSettings.java +++ b/server/src/main/java/org/opensearch/throttling/tracker/PerformanceTrackerSettings.java @@ -23,7 +23,7 @@ private static class Defaults { private static final long WINDOW_DURATION = 30; private static final long REFRESH_INTERVAL = 1000; - private static final long IO_POLLING_INTERVAL = 1000; + private static final long IO_POLLING_INTERVAL = 30000; private static final long IO_WINDOW_DURATION = 60; } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java index 06a83e6ffd984..41fd46bfb5c8b 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -32,6 +32,7 @@ package org.opensearch.action.admin.cluster.node.stats; +import org.mockito.Mockito; import org.opensearch.action.admin.indices.stats.CommonStats; import org.opensearch.action.admin.indices.stats.CommonStatsFlags; import org.opensearch.cluster.coordination.PendingClusterStateStats; @@ -66,6 +67,7 @@ import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.VersionUtils; import org.opensearch.threadpool.ThreadPoolStats; +import org.opensearch.throttling.tracker.AverageDiskStats; import org.opensearch.transport.TransportStats; import java.io.IOException; @@ -786,6 +788,7 @@ public static NodeStats createNodeStats(boolean remoteStoreStats) { nodeId, randomDoubleBetween(1.0, 100.0, true), randomDoubleBetween(1.0, 100.0, true), + Mockito.mock(AverageDiskStats.class), System.currentTimeMillis() ); nodePerfStats.put(nodeId, stats);