diff --git a/src/main/java/org/opensearch/performanceanalyzer/commons/metrics_generator/linux/LinuxDiskIOMetricsGenerator.java b/src/main/java/org/opensearch/performanceanalyzer/commons/metrics_generator/linux/LinuxDiskIOMetricsGenerator.java index e509888..ac70fd6 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/commons/metrics_generator/linux/LinuxDiskIOMetricsGenerator.java +++ b/src/main/java/org/opensearch/performanceanalyzer/commons/metrics_generator/linux/LinuxDiskIOMetricsGenerator.java @@ -10,10 +10,11 @@ import java.util.Map; import org.opensearch.performanceanalyzer.commons.metrics_generator.DiskIOMetricsGenerator; import org.opensearch.performanceanalyzer.commons.os.ThreadDiskIO; +import org.opensearch.performanceanalyzer.commons.os.metrics.IOMetrics; public class LinuxDiskIOMetricsGenerator implements DiskIOMetricsGenerator { - private Map diskIOMetricsMap; + private Map diskIOMetricsMap; public LinuxDiskIOMetricsGenerator() { diskIOMetricsMap = new HashMap<>(); @@ -66,7 +67,7 @@ public void addSample() { ThreadDiskIO.addSample(); } - public void setDiskIOMetrics(final String threadId, final ThreadDiskIO.IOMetrics ioMetrics) { + public void setDiskIOMetrics(final String threadId, final IOMetrics ioMetrics) { diskIOMetricsMap.put(threadId, ioMetrics); } } diff --git a/src/main/java/org/opensearch/performanceanalyzer/commons/metrics_generator/linux/LinuxSchedMetricsGenerator.java b/src/main/java/org/opensearch/performanceanalyzer/commons/metrics_generator/linux/LinuxSchedMetricsGenerator.java index 2704274..0962b78 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/commons/metrics_generator/linux/LinuxSchedMetricsGenerator.java +++ b/src/main/java/org/opensearch/performanceanalyzer/commons/metrics_generator/linux/LinuxSchedMetricsGenerator.java @@ -10,10 +10,11 @@ import java.util.Map; import org.opensearch.performanceanalyzer.commons.metrics_generator.SchedMetricsGenerator; import org.opensearch.performanceanalyzer.commons.os.ThreadSched; +import org.opensearch.performanceanalyzer.commons.os.metrics.SchedMetrics; public class LinuxSchedMetricsGenerator implements SchedMetricsGenerator { - private final Map schedMetricsMap; + private final Map schedMetricsMap; public LinuxSchedMetricsGenerator() { schedMetricsMap = new HashMap<>(); @@ -50,7 +51,7 @@ public void addSample() { ThreadSched.INSTANCE.addSample(); } - public void setSchedMetric(final String threadId, final ThreadSched.SchedMetrics schedMetrics) { + public void setSchedMetric(final String threadId, final SchedMetrics schedMetrics) { schedMetricsMap.put(threadId, schedMetrics); } diff --git a/src/main/java/org/opensearch/performanceanalyzer/commons/observer/ResourceObserver.java b/src/main/java/org/opensearch/performanceanalyzer/commons/observer/ResourceObserver.java new file mode 100644 index 0000000..358a366 --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/commons/observer/ResourceObserver.java @@ -0,0 +1,27 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.commons.observer; + + +import java.util.Map; + +/** + * Observers resources consumption + * + * @param Type of the metric + */ +public interface ResourceObserver { + + /** + * Retrieves the metrics for the given thread + * + * @param threadId + * @return + */ + T observe(String threadId); + + Map observe(); +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/commons/os/OSGlobals.java b/src/main/java/org/opensearch/performanceanalyzer/commons/os/OSGlobals.java index e977306..07c2e4c 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/commons/os/OSGlobals.java +++ b/src/main/java/org/opensearch/performanceanalyzer/commons/os/OSGlobals.java @@ -83,7 +83,7 @@ private static void enumTids() { } } - static synchronized List getTids() { + public static synchronized List getTids() { long curtime = System.currentTimeMillis(); if (curtime - lastUpdated > REFRESH_INTERVAL_MS) { enumTids(); diff --git a/src/main/java/org/opensearch/performanceanalyzer/commons/os/ThreadCPU.java b/src/main/java/org/opensearch/performanceanalyzer/commons/os/ThreadCPU.java index d5ddefc..0e11677 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/commons/os/ThreadCPU.java +++ b/src/main/java/org/opensearch/performanceanalyzer/commons/os/ThreadCPU.java @@ -5,22 +5,21 @@ package org.opensearch.performanceanalyzer.commons.os; +import static org.opensearch.performanceanalyzer.commons.os.metrics.CPUMetricsCalculator.calculateCPUUtilization; +import static org.opensearch.performanceanalyzer.commons.os.metrics.CPUMetricsCalculator.calculateMajorFault; +import static org.opensearch.performanceanalyzer.commons.os.metrics.CPUMetricsCalculator.calculateMinorFault; +import static org.opensearch.performanceanalyzer.commons.os.metrics.CPUMetricsCalculator.getResidentSetSize; import java.util.HashMap; -import java.util.List; import java.util.Map; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.logging.log4j.util.Supplier; import org.opensearch.performanceanalyzer.commons.metrics_generator.linux.LinuxCPUPagingActivityGenerator; +import org.opensearch.performanceanalyzer.commons.observer.ResourceObserver; +import org.opensearch.performanceanalyzer.commons.os.observer.impl.CPUObserver; public final class ThreadCPU { - private static final Logger LOGGER = LogManager.getLogger(ThreadCPU.class); + public static final ThreadCPU INSTANCE = new ThreadCPU(); - private long scClkTck = 0; - private String pid = null; - private List tids = null; + private static final ResourceObserver cpuObserver = new CPUObserver(); private Map> tidKVMap = new HashMap<>(); private Map> oldtidKVMap = new HashMap<>(); private long kvTimestamp = 0; @@ -28,137 +27,15 @@ public final class ThreadCPU { private LinuxCPUPagingActivityGenerator cpuPagingActivityMap = new LinuxCPUPagingActivityGenerator(); - // these two arrays map 1-1 - private static String[] statKeys = { - "pid", - "comm", - "state", - "ppid", - "pgrp", - "session", - "ttynr", - "tpgid", - "flags", - "minflt", - "cminflt", - "majflt", - "cmajflt", - "utime", - "stime", - "cutime", - "cstime", - "prio", - "nice", - "nthreads", - "itrealvalue", - "starttime", - "vsize", - "rss", - "rsslim", - "startcode", - "endcode", - "startstack", - "kstkesp", - "kstkeip", - "signal", - "blocked", - "sigignore", - "sigcatch", - "wchan", - "nswap", - "cnswap", - "exitsig", - "cpu", - "rtprio", - "schedpolicy", - "bio_ticks", - "vmtime", - "cvmtime" - // more that we ignore - }; - - private static SchemaFileParser.FieldTypes[] statTypes = { - SchemaFileParser.FieldTypes.INT, - SchemaFileParser.FieldTypes.STRING, - SchemaFileParser.FieldTypes.CHAR, - SchemaFileParser.FieldTypes.INT, - SchemaFileParser.FieldTypes.INT, - SchemaFileParser.FieldTypes.INT, - SchemaFileParser.FieldTypes.INT, - SchemaFileParser.FieldTypes.INT, - SchemaFileParser.FieldTypes.INT, - SchemaFileParser.FieldTypes.ULONG, // 10 - SchemaFileParser.FieldTypes.ULONG, - SchemaFileParser.FieldTypes.ULONG, - SchemaFileParser.FieldTypes.ULONG, - SchemaFileParser.FieldTypes.ULONG, - SchemaFileParser.FieldTypes.ULONG, - SchemaFileParser.FieldTypes.ULONG, - SchemaFileParser.FieldTypes.ULONG, - SchemaFileParser.FieldTypes.ULONG, - SchemaFileParser.FieldTypes.ULONG, - SchemaFileParser.FieldTypes.ULONG, // 20 - SchemaFileParser.FieldTypes.ULONG, - SchemaFileParser.FieldTypes.ULONG, - SchemaFileParser.FieldTypes.ULONG, - SchemaFileParser.FieldTypes.ULONG, - SchemaFileParser.FieldTypes.ULONG, - SchemaFileParser.FieldTypes.ULONG, - SchemaFileParser.FieldTypes.ULONG, - SchemaFileParser.FieldTypes.ULONG, - SchemaFileParser.FieldTypes.ULONG, - SchemaFileParser.FieldTypes.ULONG, // 30 - SchemaFileParser.FieldTypes.ULONG, - SchemaFileParser.FieldTypes.ULONG, - SchemaFileParser.FieldTypes.ULONG, - SchemaFileParser.FieldTypes.ULONG, - SchemaFileParser.FieldTypes.ULONG, - SchemaFileParser.FieldTypes.ULONG, - SchemaFileParser.FieldTypes.ULONG, - SchemaFileParser.FieldTypes.INT, - SchemaFileParser.FieldTypes.INT, - SchemaFileParser.FieldTypes.INT, // 40 - SchemaFileParser.FieldTypes.INT, - SchemaFileParser.FieldTypes.INT, - SchemaFileParser.FieldTypes.INT, - SchemaFileParser.FieldTypes.INT - }; - - private ThreadCPU() { - try { - pid = OSGlobals.getPid(); - scClkTck = OSGlobals.getScClkTck(); - tids = OSGlobals.getTids(); - } catch (Exception e) { - LOGGER.error( - (Supplier) - () -> - new ParameterizedMessage( - "Error In Initializing ThreadCPU: {}", e.toString()), - e); - } - } - public synchronized void addSample() { - tids = OSGlobals.getTids(); - oldtidKVMap.clear(); oldtidKVMap.putAll(tidKVMap); tidKVMap.clear(); oldkvTimestamp = kvTimestamp; kvTimestamp = System.currentTimeMillis(); - for (String tid : tids) { - Map sample = - // (new SchemaFileParser("/proc/"+tid+"/stat", - (new SchemaFileParser( - "/proc/" + pid + "/task/" + tid + "/stat", - statKeys, - statTypes, - true)) - .parse(); - tidKVMap.put(tid, sample); - } + // Retrieve the cpu metrics for all threads + tidKVMap.putAll(cpuObserver.observe()); calculateCPUDetails(); calculatePagingActivity(); @@ -172,18 +49,8 @@ private void calculateCPUDetails() { for (Map.Entry> entry : tidKVMap.entrySet()) { Map v = entry.getValue(); Map oldv = oldtidKVMap.get(entry.getKey()); - if (v != null && oldv != null) { - if (!v.containsKey("utime") || !oldv.containsKey("utime")) { - continue; - } - long diff = - ((long) (v.getOrDefault("utime", 0L)) - - (long) (oldv.getOrDefault("utime", 0L))) - + ((long) (v.getOrDefault("stime", 0L)) - - (long) (oldv.getOrDefault("stime", 0L))); - double util = (1.0e3 * diff / scClkTck) / (kvTimestamp - oldkvTimestamp); - cpuPagingActivityMap.setCPUUtilization(entry.getKey(), util); - } + double util = calculateCPUUtilization(kvTimestamp, oldkvTimestamp, v, oldv); + cpuPagingActivityMap.setCPUUtilization(entry.getKey(), util); } } @@ -196,22 +63,12 @@ private void calculatePagingActivity() { for (Map.Entry> entry : tidKVMap.entrySet()) { Map v = entry.getValue(); Map oldv = oldtidKVMap.get(entry.getKey()); - if (v != null && oldv != null) { - if (!v.containsKey("majflt") || !oldv.containsKey("majflt")) { - continue; - } - double majdiff = - ((long) (v.getOrDefault("majflt", 0L)) - - (long) (oldv.getOrDefault("majflt", 0L))); - majdiff /= 1.0e-3 * (kvTimestamp - oldkvTimestamp); - double mindiff = - ((long) (v.getOrDefault("minflt", 0L)) - - (long) (oldv.getOrDefault("minflt", 0L))); - mindiff /= 1.0e-3 * (kvTimestamp - oldkvTimestamp); - Double[] fltarr = {majdiff, mindiff, (double) ((long) v.getOrDefault("rss", 0L))}; - cpuPagingActivityMap.setPagingActivities(entry.getKey(), fltarr); - } + double majdiff = calculateMajorFault(kvTimestamp, oldkvTimestamp, v, oldv); + double mindiff = calculateMinorFault(kvTimestamp, oldkvTimestamp, v, oldv); + double rss = getResidentSetSize(v); + Double[] fltarr = {majdiff, mindiff, rss}; + cpuPagingActivityMap.setPagingActivities(entry.getKey(), fltarr); } } diff --git a/src/main/java/org/opensearch/performanceanalyzer/commons/os/ThreadDiskIO.java b/src/main/java/org/opensearch/performanceanalyzer/commons/os/ThreadDiskIO.java index 7cd23eb..a042a6e 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/commons/os/ThreadDiskIO.java +++ b/src/main/java/org/opensearch/performanceanalyzer/commons/os/ThreadDiskIO.java @@ -6,124 +6,31 @@ package org.opensearch.performanceanalyzer.commons.os; -import java.io.BufferedReader; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileReader; import java.util.HashMap; -import java.util.List; import java.util.Map; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.opensearch.performanceanalyzer.commons.collectors.StatsCollector; import org.opensearch.performanceanalyzer.commons.metrics_generator.linux.LinuxDiskIOMetricsGenerator; -import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode; +import org.opensearch.performanceanalyzer.commons.observer.ResourceObserver; +import org.opensearch.performanceanalyzer.commons.os.metrics.DiskIOMetricsCalculator; +import org.opensearch.performanceanalyzer.commons.os.metrics.IOMetrics; +import org.opensearch.performanceanalyzer.commons.os.observer.impl.IOObserver; public class ThreadDiskIO { - private static String pid = OSGlobals.getPid(); - private static List tids = null; - private static final Logger LOGGER = LogManager.getLogger(ThreadDiskIO.class); + private static final ResourceObserver ioObserver = new IOObserver(); private static Map> tidKVMap = new HashMap<>(); private static Map> oldtidKVMap = new HashMap<>(); private static long kvTimestamp = 0; private static long oldkvTimestamp = 0; - public static class IOMetrics { - public double avgReadThroughputBps; - public double avgWriteThroughputBps; - public double avgTotalThroughputBps; - - public double avgReadSyscallRate; - public double avgWriteSyscallRate; - public double avgTotalSyscallRate; - - public double avgPageCacheReadThroughputBps; - public double avgPageCacheWriteThroughputBps; - public double avgPageCacheTotalThroughputBps; - - @SuppressWarnings("checkstyle:parameternumber") - IOMetrics( - double avgReadThroughputBps, - double avgReadSyscallRate, - double avgWriteThroughputBps, - double avgWriteSyscallRate, - double avgTotalThroughputBps, - double avgTotalSyscallRate, - double avgPageCacheReadThroughputBps, - double avgPageCacheWriteThroughputBps, - double avgPageCacheTotalThroughputBps) { - this.avgReadThroughputBps = avgReadThroughputBps; - this.avgWriteThroughputBps = avgWriteThroughputBps; - this.avgTotalThroughputBps = avgTotalThroughputBps; - this.avgReadSyscallRate = avgReadSyscallRate; - this.avgWriteSyscallRate = avgWriteSyscallRate; - this.avgTotalSyscallRate = avgTotalSyscallRate; - this.avgPageCacheReadThroughputBps = avgPageCacheReadThroughputBps; - this.avgPageCacheWriteThroughputBps = avgPageCacheWriteThroughputBps; - this.avgPageCacheTotalThroughputBps = avgPageCacheTotalThroughputBps; - } - - public String toString() { - return new StringBuilder() - .append("rBps:") - .append(avgReadThroughputBps) - .append(" wBps:") - .append(avgWriteThroughputBps) - .append(" totBps:") - .append(avgTotalThroughputBps) - .append(" rSysc:") - .append(avgReadSyscallRate) - .append(" wSysc:") - .append(avgWriteSyscallRate) - .append(" totSysc:") - .append(avgTotalSyscallRate) - .append(" rPcBps:") - .append(avgPageCacheReadThroughputBps) - .append(" wPcBps:") - .append(avgPageCacheWriteThroughputBps) - .append(" totPcBps:") - .append(avgPageCacheTotalThroughputBps) - .toString(); - } - } - - private static void addSampleTid(String tid) { - try (FileReader fileReader = - new FileReader(new File("/proc/" + pid + "/task/" + tid + "/io")); - BufferedReader bufferedReader = new BufferedReader(fileReader); ) { - String line = null; - Map kvmap = new HashMap<>(); - while ((line = bufferedReader.readLine()) != null) { - String[] toks = line.split("[: ]+"); - String key = toks[0]; - long val = Long.parseLong(toks[1]); - kvmap.put(key, val); - } - tidKVMap.put(tid, kvmap); - } catch (FileNotFoundException e) { - LOGGER.debug("FileNotFound in parse with exception: {}", () -> e.toString()); - } catch (Exception e) { - LOGGER.debug( - "Error In addSample Tid for: {} with error: {} with ExceptionCode: {}", - () -> tid, - () -> e.toString(), - () -> StatExceptionCode.THREAD_IO_ERROR.toString()); - StatsCollector.instance().logException(StatExceptionCode.THREAD_IO_ERROR); - } - } - public static synchronized void addSample() { - tids = OSGlobals.getTids(); oldtidKVMap.clear(); oldtidKVMap.putAll(tidKVMap); tidKVMap.clear(); oldkvTimestamp = kvTimestamp; kvTimestamp = System.currentTimeMillis(); - for (String tid : tids) { - addSampleTid(tid); - } + // Retrieve the disk io metrics for all threads + tidKVMap.putAll(ioObserver.observe()); } public static synchronized LinuxDiskIOMetricsGenerator getIOUtilization() { @@ -136,33 +43,12 @@ public static synchronized LinuxDiskIOMetricsGenerator getIOUtilization() { for (Map.Entry> entry : tidKVMap.entrySet()) { Map v = entry.getValue(); Map oldv = oldtidKVMap.get(entry.getKey()); - if (v != null && oldv != null) { - double duration = 1.0e-3 * (kvTimestamp - oldkvTimestamp); - double readBytes = v.get("read_bytes") - oldv.get("read_bytes"); - double writeBytes = v.get("write_bytes") - oldv.get("write_bytes"); - double readSyscalls = v.get("syscr") - oldv.get("syscr"); - double writeSyscalls = v.get("syscw") - oldv.get("syscw"); - double readPcBytes = v.get("rchar") - oldv.get("rchar") - readBytes; - double writePcBytes = v.get("wchar") - oldv.get("wchar") - writeBytes; - readBytes /= duration; - readSyscalls /= duration; - writeBytes /= duration; - writeSyscalls /= duration; - readPcBytes /= duration; - writePcBytes /= duration; + IOMetrics ioMetrics = + DiskIOMetricsCalculator.calculateIOMetrics( + kvTimestamp, oldkvTimestamp, v, oldv); - linuxDiskIOMetricsHandler.setDiskIOMetrics( - entry.getKey(), - new IOMetrics( - readBytes, - readSyscalls, - writeBytes, - writeSyscalls, - readBytes + writeBytes, - readSyscalls + writeSyscalls, - readPcBytes, - writePcBytes, - readPcBytes + writePcBytes)); + if (ioMetrics != null) { + linuxDiskIOMetricsHandler.setDiskIOMetrics(entry.getKey(), ioMetrics); } } return linuxDiskIOMetricsHandler; diff --git a/src/main/java/org/opensearch/performanceanalyzer/commons/os/ThreadSched.java b/src/main/java/org/opensearch/performanceanalyzer/commons/os/ThreadSched.java index 55af97c..2b3707b 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/commons/os/ThreadSched.java +++ b/src/main/java/org/opensearch/performanceanalyzer/commons/os/ThreadSched.java @@ -7,91 +7,34 @@ import java.util.HashMap; -import java.util.List; import java.util.Map; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.logging.log4j.util.Supplier; import org.opensearch.performanceanalyzer.commons.metrics_generator.SchedMetricsGenerator; import org.opensearch.performanceanalyzer.commons.metrics_generator.linux.LinuxSchedMetricsGenerator; +import org.opensearch.performanceanalyzer.commons.observer.ResourceObserver; +import org.opensearch.performanceanalyzer.commons.os.metrics.SchedMetrics; +import org.opensearch.performanceanalyzer.commons.os.metrics.SchedMetricsCalculator; +import org.opensearch.performanceanalyzer.commons.os.observer.impl.SchedObserver; public final class ThreadSched { - private static final Logger LOGGER = LogManager.getLogger(ThreadSched.class); + public static final ThreadSched INSTANCE = new ThreadSched(); - private String pid = null; - private List tids = null; + private static final ResourceObserver schedObserver = new SchedObserver(); private Map> tidKVMap = new HashMap<>(); private Map> oldtidKVMap = new HashMap<>(); private long kvTimestamp = 0; private long oldkvTimestamp = 0; - public static class SchedMetrics { - public final double avgRuntime; - public final double avgWaittime; - public final double contextSwitchRate; // both voluntary and involuntary - - SchedMetrics(double avgRuntime, double avgWaittime, double contextSwitchRate) { - this.avgRuntime = avgRuntime; - this.avgWaittime = avgWaittime; - this.contextSwitchRate = contextSwitchRate; - } - - @Override - public String toString() { - return new StringBuilder() - .append("avgruntime: ") - .append(avgRuntime) - .append(" avgwaittime: ") - .append(avgWaittime) - .append(" ctxrate: ") - .append(contextSwitchRate) - .toString(); - } - } - private LinuxSchedMetricsGenerator schedLatencyMap = new LinuxSchedMetricsGenerator(); - private static String[] schedKeys = {"runticks", "waitticks", "totctxsws"}; - - private static SchemaFileParser.FieldTypes[] schedTypes = { - SchemaFileParser.FieldTypes.ULONG, - SchemaFileParser.FieldTypes.ULONG, - SchemaFileParser.FieldTypes.ULONG - }; - - private ThreadSched() { - try { - pid = OSGlobals.getPid(); - tids = OSGlobals.getTids(); - } catch (Exception e) { - LOGGER.error( - (Supplier) - () -> - new ParameterizedMessage( - "Error In Initializing ThreadCPU: {}", e.toString()), - e); - } - } - public synchronized void addSample() { - tids = OSGlobals.getTids(); - oldtidKVMap.clear(); oldtidKVMap.putAll(tidKVMap); tidKVMap.clear(); oldkvTimestamp = kvTimestamp; kvTimestamp = System.currentTimeMillis(); - for (String tid : tids) { - Map sample = - (new SchemaFileParser( - "/proc/" + pid + "/task/" + tid + "/schedstat", - schedKeys, - schedTypes)) - .parse(); - tidKVMap.put(tid, sample); - } + // Retrieve the sched metrics for all threads + tidKVMap.putAll(schedObserver.observe()); calculateSchedLatency(); } @@ -104,34 +47,11 @@ private void calculateSchedLatency() { for (Map.Entry> entry : tidKVMap.entrySet()) { Map v = entry.getValue(); Map oldv = oldtidKVMap.get(entry.getKey()); - if (v != null && oldv != null) { - if (!v.containsKey("totctxsws") || !oldv.containsKey("totctxsws")) { - continue; - } - long ctxdiff = - (long) v.getOrDefault("totctxsws", 0L) - - (long) oldv.getOrDefault("totctxsws", 0L); - double avgRuntime = - 1.0e-9 - * ((long) v.getOrDefault("runticks", 0L) - - (long) oldv.getOrDefault("runticks", 0L)); - double avgWaittime = - 1.0e-9 - * ((long) v.getOrDefault("waitticks", 0L) - - (long) oldv.getOrDefault("waitticks", 0L)); - if (ctxdiff == 0) { - avgRuntime = 0; - avgWaittime = 0; - } else { - avgRuntime /= 1.0 * ctxdiff; - avgWaittime /= 1.0 * ctxdiff; - } - double contextSwitchRate = ctxdiff; - contextSwitchRate /= 1.0e-3 * (kvTimestamp - oldkvTimestamp); - - schedLatencyMap.setSchedMetric( - entry.getKey(), - new SchedMetrics(avgRuntime, avgWaittime, contextSwitchRate)); + SchedMetrics schedMetrics = + SchedMetricsCalculator.calculateThreadSchedLatency( + kvTimestamp, oldkvTimestamp, v, oldv); + if (schedMetrics != null) { + schedLatencyMap.setSchedMetric(entry.getKey(), schedMetrics); } } } diff --git a/src/main/java/org/opensearch/performanceanalyzer/commons/os/metrics/CPUMetrics.java b/src/main/java/org/opensearch/performanceanalyzer/commons/os/metrics/CPUMetrics.java new file mode 100644 index 0000000..9b099d4 --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/commons/os/metrics/CPUMetrics.java @@ -0,0 +1,37 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.commons.os.metrics; + + +import org.opensearch.performanceanalyzer.commons.os.observer.impl.CPUObserver.StatKeys; + +public class CPUMetrics { + public double cpuUtilization; + public double majorFault; + public double minorFault; + public double residentSetSize; + + public CPUMetrics( + double cpuUtilization, double majorFault, double minorFault, double residentSetSize) { + this.cpuUtilization = cpuUtilization; + this.majorFault = majorFault; + this.minorFault = minorFault; + this.residentSetSize = residentSetSize; + } + + public String toString() { + return new StringBuilder() + .append(StatKeys.CPU.getLabel() + ":") + .append(cpuUtilization) + .append(" " + StatKeys.MAJFLT.getLabel() + ":") + .append(majorFault) + .append(" " + StatKeys.MINFLT.getLabel() + ":") + .append(minorFault) + .append(" " + StatKeys.RSS + ":") + .append(residentSetSize) + .toString(); + } +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/commons/os/metrics/CPUMetricsCalculator.java b/src/main/java/org/opensearch/performanceanalyzer/commons/os/metrics/CPUMetricsCalculator.java new file mode 100644 index 0000000..3c35bb3 --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/commons/os/metrics/CPUMetricsCalculator.java @@ -0,0 +1,152 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.commons.os.metrics; + + +import java.util.Map; +import org.opensearch.performanceanalyzer.commons.os.OSGlobals; +import org.opensearch.performanceanalyzer.commons.os.observer.impl.CPUObserver.StatKeys; + +/** + * Calculates the cpu and paging activity for the thread considering the beginning and end + * measurements + */ +public final class CPUMetricsCalculator { + /** + * Calculates CPU related metrics - cpu utilization + paging activity based on the provided + * metrics map + * + * @param endMeasurementTime + * @param startMeasurementTime + * @param endTimeResourceMetrics + * @param startTimeResourceMetrics + * @return + */ + public static CPUMetrics calculateThreadCpuPagingActivity( + long endMeasurementTime, + long startMeasurementTime, + Map endTimeResourceMetrics, + Map startTimeResourceMetrics) { + if (startMeasurementTime == endMeasurementTime) { + return null; + } + + if (endTimeResourceMetrics == null || startTimeResourceMetrics == null) { + return null; + } + + double majorFault = + calculateMajorFault( + endMeasurementTime, + startMeasurementTime, + endTimeResourceMetrics, + startTimeResourceMetrics); + double minorFault = + calculateMinorFault( + endMeasurementTime, + startMeasurementTime, + endTimeResourceMetrics, + startTimeResourceMetrics); + double rss = getResidentSetSize(endTimeResourceMetrics); + double cpuUtilization = + calculateCPUUtilization( + endMeasurementTime, + startMeasurementTime, + endTimeResourceMetrics, + startTimeResourceMetrics); + + return new CPUMetrics(cpuUtilization, majorFault, minorFault, rss); + } + + public static double calculateCPUUtilization( + long endMeasurementTime, + long startMeasurementTime, + Map endTimeResourceMetrics, + Map startTimeResourceMetrics) { + if (endMeasurementTime == startMeasurementTime) { + return 0D; + } + if (endTimeResourceMetrics == null || startTimeResourceMetrics == null) { + return 0D; + } + return calculateCPUUtilization( + endMeasurementTime, + startMeasurementTime, + (long) endTimeResourceMetrics.getOrDefault(StatKeys.UTIME.getLabel(), 0L), + (long) startTimeResourceMetrics.getOrDefault(StatKeys.UTIME.getLabel(), 0L), + (long) endTimeResourceMetrics.getOrDefault(StatKeys.STIME.getLabel(), 0L), + (long) startTimeResourceMetrics.getOrDefault(StatKeys.STIME.getLabel(), 0L)); + } + + public static double calculateMajorFault( + long endMeasurementTime, + long startMeasurementTime, + Map endTimeResourceMetrics, + Map startTimeResourceMetrics) { + if (endTimeResourceMetrics == null || startTimeResourceMetrics == null) { + return 0d; + } + + return calculateFault( + endMeasurementTime, + startMeasurementTime, + (long) (endTimeResourceMetrics.getOrDefault(StatKeys.MAJFLT.getLabel(), 0L)), + (long) (startTimeResourceMetrics.getOrDefault(StatKeys.MAJFLT.getLabel(), 0L))); + } + + public static double calculateMinorFault( + long endMeasurementTime, + long startMeasurementTime, + Map endTimeResourceMetrics, + Map startTimeResourceMetrics) { + if (endTimeResourceMetrics == null || startTimeResourceMetrics == null) { + return 0d; + } + + return calculateFault( + endMeasurementTime, + startMeasurementTime, + (long) (endTimeResourceMetrics.getOrDefault(StatKeys.MINFLT.getLabel(), 0L)), + (long) (startTimeResourceMetrics.getOrDefault(StatKeys.MINFLT.getLabel(), 0L))); + } + + public static double getResidentSetSize(Map v) { + return (double) ((long) v.getOrDefault(StatKeys.RSS.getLabel(), 0L)); + } + + /** + * Calculates the CPU utilization based on the given parameters + * + * @param endMeasurementTime End time of the measurement + * @param startMeasurementTime Start time of the measurement + * @param endUTime utime metric value at the end of the measurement + * @param startUtime utime metric value at the beginning of the measurement + * @param endSTime stime metric value at the end of the measurement + * @param startSTime stime metric value at the beginning the measurement + * @return cpu utilization + */ + public static double calculateCPUUtilization( + long endMeasurementTime, + long startMeasurementTime, + long endUTime, + long startUtime, + long endSTime, + long startSTime) { + long scClckTck = OSGlobals.getScClkTck(); + long diff = endUTime - startUtime + endSTime - startSTime; + return (1.0e3 * diff / scClckTck) / (endMeasurementTime - startMeasurementTime); + } + + private static double calculateFault( + long endMeasurementTime, + long startMeasurementTime, + long endMajorFault, + long startMajorFault) { + double majdiff = endMajorFault - startMajorFault; + majdiff /= 1.0e-3 * (endMeasurementTime - startMeasurementTime); + return majdiff; + } +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/commons/os/metrics/DiskIOMetricsCalculator.java b/src/main/java/org/opensearch/performanceanalyzer/commons/os/metrics/DiskIOMetricsCalculator.java new file mode 100644 index 0000000..580c085 --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/commons/os/metrics/DiskIOMetricsCalculator.java @@ -0,0 +1,65 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.commons.os.metrics; + + +import java.util.Map; +import org.opensearch.performanceanalyzer.commons.os.observer.impl.IOObserver.StatKeys; + +/** Calculates the disk io metrics for the threads considering the beginning and end measurements */ +public final class DiskIOMetricsCalculator { + public static IOMetrics calculateIOMetrics( + long endMeasurementTime, + long startMeasurementTime, + Map endTimeResourceMetrics, + Map startTimeResourceMetrics) { + if (startMeasurementTime == endMeasurementTime) { + return null; + } + + if (endTimeResourceMetrics != null && startTimeResourceMetrics != null) { + double duration = 1.0e-3 * (endMeasurementTime - startMeasurementTime); + double readBytes = + endTimeResourceMetrics.get(StatKeys.READ_BYTES.getLabel()) + - startTimeResourceMetrics.get(StatKeys.READ_BYTES.getLabel()); + double writeBytes = + endTimeResourceMetrics.get(StatKeys.WRITE_BYTES.getLabel()) + - startTimeResourceMetrics.get(StatKeys.WRITE_BYTES.getLabel()); + double readSyscalls = + endTimeResourceMetrics.get(StatKeys.SYSCR.getLabel()) + - startTimeResourceMetrics.get(StatKeys.SYSCR.getLabel()); + double writeSyscalls = + endTimeResourceMetrics.get(StatKeys.SYSCW.getLabel()) + - startTimeResourceMetrics.get(StatKeys.SYSCW.getLabel()); + double readPcBytes = + endTimeResourceMetrics.get(StatKeys.RCHAR.getLabel()) + - startTimeResourceMetrics.get(StatKeys.RCHAR.getLabel()) + - readBytes; + double writePcBytes = + endTimeResourceMetrics.get(StatKeys.WCHAR.getLabel()) + - startTimeResourceMetrics.get(StatKeys.WCHAR.getLabel()) + - writeBytes; + readBytes /= duration; + readSyscalls /= duration; + writeBytes /= duration; + writeSyscalls /= duration; + readPcBytes /= duration; + writePcBytes /= duration; + + return new IOMetrics( + readBytes, + readSyscalls, + writeBytes, + writeSyscalls, + readBytes + writeBytes, + readSyscalls + writeSyscalls, + readPcBytes, + writePcBytes, + readPcBytes + writePcBytes); + } + return null; + } +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/commons/os/metrics/IOMetrics.java b/src/main/java/org/opensearch/performanceanalyzer/commons/os/metrics/IOMetrics.java new file mode 100644 index 0000000..1b957aa --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/commons/os/metrics/IOMetrics.java @@ -0,0 +1,65 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.commons.os.metrics; + +public class IOMetrics { + public double avgReadThroughputBps; + public double avgWriteThroughputBps; + public double avgTotalThroughputBps; + + public double avgReadSyscallRate; + public double avgWriteSyscallRate; + public double avgTotalSyscallRate; + + public double avgPageCacheReadThroughputBps; + public double avgPageCacheWriteThroughputBps; + public double avgPageCacheTotalThroughputBps; + + @SuppressWarnings("checkstyle:parameternumber") + public IOMetrics( + double avgReadThroughputBps, + double avgReadSyscallRate, + double avgWriteThroughputBps, + double avgWriteSyscallRate, + double avgTotalThroughputBps, + double avgTotalSyscallRate, + double avgPageCacheReadThroughputBps, + double avgPageCacheWriteThroughputBps, + double avgPageCacheTotalThroughputBps) { + this.avgReadThroughputBps = avgReadThroughputBps; + this.avgWriteThroughputBps = avgWriteThroughputBps; + this.avgTotalThroughputBps = avgTotalThroughputBps; + this.avgReadSyscallRate = avgReadSyscallRate; + this.avgWriteSyscallRate = avgWriteSyscallRate; + this.avgTotalSyscallRate = avgTotalSyscallRate; + this.avgPageCacheReadThroughputBps = avgPageCacheReadThroughputBps; + this.avgPageCacheWriteThroughputBps = avgPageCacheWriteThroughputBps; + this.avgPageCacheTotalThroughputBps = avgPageCacheTotalThroughputBps; + } + + public String toString() { + return new StringBuilder() + .append("rBps:") + .append(avgReadThroughputBps) + .append(" wBps:") + .append(avgWriteThroughputBps) + .append(" totBps:") + .append(avgTotalThroughputBps) + .append(" rSysc:") + .append(avgReadSyscallRate) + .append(" wSysc:") + .append(avgWriteSyscallRate) + .append(" totSysc:") + .append(avgTotalSyscallRate) + .append(" rPcBps:") + .append(avgPageCacheReadThroughputBps) + .append(" wPcBps:") + .append(avgPageCacheWriteThroughputBps) + .append(" totPcBps:") + .append(avgPageCacheTotalThroughputBps) + .toString(); + } +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/commons/os/metrics/SchedMetrics.java b/src/main/java/org/opensearch/performanceanalyzer/commons/os/metrics/SchedMetrics.java new file mode 100644 index 0000000..313984e --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/commons/os/metrics/SchedMetrics.java @@ -0,0 +1,30 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.commons.os.metrics; + +public class SchedMetrics { + public final double avgRuntime; + public final double avgWaittime; + public final double contextSwitchRate; // both voluntary and involuntary + + public SchedMetrics(double avgRuntime, double avgWaittime, double contextSwitchRate) { + this.avgRuntime = avgRuntime; + this.avgWaittime = avgWaittime; + this.contextSwitchRate = contextSwitchRate; + } + + @Override + public String toString() { + return new StringBuilder() + .append("avgruntime: ") + .append(avgRuntime) + .append(" avgwaittime: ") + .append(avgWaittime) + .append(" ctxrate: ") + .append(contextSwitchRate) + .toString(); + } +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/commons/os/metrics/SchedMetricsCalculator.java b/src/main/java/org/opensearch/performanceanalyzer/commons/os/metrics/SchedMetricsCalculator.java new file mode 100644 index 0000000..5613501 --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/commons/os/metrics/SchedMetricsCalculator.java @@ -0,0 +1,75 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.commons.os.metrics; + + +import java.util.Map; +import org.opensearch.performanceanalyzer.commons.os.observer.impl.SchedObserver.SchedKeys; + +/** Calculates sched metric resources consumption by threads */ +public final class SchedMetricsCalculator { + /** + * Calculates sched metrics based on the values from beginning and end of measurement + * + * @param endTimeResourceMetrics + * @param startTimeResourceMetrics + * @param endMeasurementTime + * @param startMeasurementTime + * @return + */ + public static SchedMetrics calculateThreadSchedLatency( + long endMeasurementTime, + long startMeasurementTime, + Map endTimeResourceMetrics, + Map startTimeResourceMetrics) { + + if (startMeasurementTime == endMeasurementTime) { + return null; + } + + if (endTimeResourceMetrics == null || startTimeResourceMetrics == null) { + return null; + } + + if (!endTimeResourceMetrics.containsKey(SchedKeys.TOTCTXSWS.getLabel()) + || !startTimeResourceMetrics.containsKey(SchedKeys.TOTCTXSWS.getLabel())) { + return null; + } + + long ctxdiff = + (long) endTimeResourceMetrics.getOrDefault(SchedKeys.TOTCTXSWS.getLabel(), 0L) + - (long) + startTimeResourceMetrics.getOrDefault( + SchedKeys.TOTCTXSWS.getLabel(), 0L); + double avgRuntime = + 1.0e-9 + * ((long) + endTimeResourceMetrics.getOrDefault( + SchedKeys.RUNTICKS.getLabel(), 0L) + - (long) + startTimeResourceMetrics.getOrDefault( + SchedKeys.RUNTICKS.getLabel(), 0L)); + double avgWaittime = + 1.0e-9 + * ((long) + endTimeResourceMetrics.getOrDefault( + SchedKeys.WAITTICKS.getLabel(), 0L) + - (long) + startTimeResourceMetrics.getOrDefault( + SchedKeys.WAITTICKS.getLabel(), 0L)); + if (ctxdiff == 0) { + avgRuntime = 0; + avgWaittime = 0; + } else { + avgRuntime /= 1.0 * ctxdiff; + avgWaittime /= 1.0 * ctxdiff; + } + double contextSwitchRate = ctxdiff; + contextSwitchRate /= 1.0e-3 * (endMeasurementTime - startMeasurementTime); + + return new SchedMetrics(avgRuntime, avgWaittime, contextSwitchRate); + } +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/commons/os/observer/OsObserver.java b/src/main/java/org/opensearch/performanceanalyzer/commons/os/observer/OsObserver.java new file mode 100644 index 0000000..2337540 --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/commons/os/observer/OsObserver.java @@ -0,0 +1,34 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.commons.os.observer; + + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.opensearch.performanceanalyzer.commons.observer.ResourceObserver; +import org.opensearch.performanceanalyzer.commons.os.OSGlobals; + +public class OsObserver implements ResourceObserver { + + @Override + public Map observe(String threadId) { + throw new UnsupportedOperationException( + "Observer abstraction can't be used to observer the thread"); + } + + /** + * Retrieves the metrics for all available threads + * + * @return map of threads and associated metrics + */ + @Override + public Map> observe() { + List threadIds = OSGlobals.getTids(); + return threadIds.stream() + .collect(Collectors.toMap(threadId -> threadId, threadId -> observe(threadId))); + } +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/commons/os/observer/impl/CPUObserver.java b/src/main/java/org/opensearch/performanceanalyzer/commons/os/observer/impl/CPUObserver.java new file mode 100644 index 0000000..f8b493a --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/commons/os/observer/impl/CPUObserver.java @@ -0,0 +1,133 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.commons.os.observer.impl; + + +import java.util.Map; +import java.util.stream.Stream; +import org.opensearch.performanceanalyzer.commons.os.OSGlobals; +import org.opensearch.performanceanalyzer.commons.os.SchemaFileParser; +import org.opensearch.performanceanalyzer.commons.os.SchemaFileParser.FieldTypes; +import org.opensearch.performanceanalyzer.commons.os.observer.OsObserver; + +public class CPUObserver extends OsObserver { + public enum StatKeys { + PID("pid"), + COMM("comm"), + STATE("state"), + PPID("ppid"), + PGRP("pgrp"), + SESSION("session"), + TTYNR("ttynr"), + TPGID("tpgid"), + FLAGS("flags"), + MINFLT("minflt"), + CMNFLT("cminflt"), + MAJFLT("majflt"), + CMAJFLT("cmajflt"), + UTIME("utime"), + STIME("stime"), + CUTIME("cutime"), + CSTIME("cstime"), + PRIO("prio"), + NICE("nice"), + NTHREADS("nthreads"), + ITERALVALUE("itrealvalue"), + STARTTIME("starttime"), + VSIZE("vsize"), + RSS("rss"), + RSSLIM("rsslim"), + STARTCODE("startcode"), + ENDCODE("endcode"), + STARTSTACK("startstack"), + KSTKESP("kstkesp"), + KSTKEIP("kstkeip"), + SIGNAL("signal"), + BLOCKED("blocked"), + SIGIGNORE("sigignore"), + SIGCATCH("sigcatch"), + WCHAN("wchan"), + NSWAP("nswap"), + CNSWAP("cnswap"), + EXISTSIG("exitsig"), + CPU("cpu"), + RTPRIO("rtprio"), + SCHEDPOLICY("schedpolicy"), + BIO_TICKS("bio_ticks"), + VMTIME("vmtime"), + CVMTIME("cvmtime"); + private final String label; + + public String getLabel() { + return label; + } + + StatKeys(String label) { + this.label = label; + } + + public static String[] getStatKeys() { + return Stream.of(StatKeys.values()).map(StatKeys::getLabel).toArray(String[]::new); + } + } + + private static FieldTypes[] statTypes = { + SchemaFileParser.FieldTypes.INT, + SchemaFileParser.FieldTypes.STRING, + SchemaFileParser.FieldTypes.CHAR, + SchemaFileParser.FieldTypes.INT, + SchemaFileParser.FieldTypes.INT, + SchemaFileParser.FieldTypes.INT, + SchemaFileParser.FieldTypes.INT, + SchemaFileParser.FieldTypes.INT, + SchemaFileParser.FieldTypes.INT, + SchemaFileParser.FieldTypes.ULONG, // 10 + SchemaFileParser.FieldTypes.ULONG, + SchemaFileParser.FieldTypes.ULONG, + SchemaFileParser.FieldTypes.ULONG, + SchemaFileParser.FieldTypes.ULONG, + SchemaFileParser.FieldTypes.ULONG, + SchemaFileParser.FieldTypes.ULONG, + SchemaFileParser.FieldTypes.ULONG, + SchemaFileParser.FieldTypes.ULONG, + SchemaFileParser.FieldTypes.ULONG, + SchemaFileParser.FieldTypes.ULONG, // 20 + SchemaFileParser.FieldTypes.ULONG, + SchemaFileParser.FieldTypes.ULONG, + SchemaFileParser.FieldTypes.ULONG, + SchemaFileParser.FieldTypes.ULONG, + SchemaFileParser.FieldTypes.ULONG, + SchemaFileParser.FieldTypes.ULONG, + SchemaFileParser.FieldTypes.ULONG, + SchemaFileParser.FieldTypes.ULONG, + SchemaFileParser.FieldTypes.ULONG, + SchemaFileParser.FieldTypes.ULONG, // 30 + SchemaFileParser.FieldTypes.ULONG, + SchemaFileParser.FieldTypes.ULONG, + SchemaFileParser.FieldTypes.ULONG, + SchemaFileParser.FieldTypes.ULONG, + SchemaFileParser.FieldTypes.ULONG, + SchemaFileParser.FieldTypes.ULONG, + SchemaFileParser.FieldTypes.ULONG, + SchemaFileParser.FieldTypes.INT, + SchemaFileParser.FieldTypes.INT, + SchemaFileParser.FieldTypes.INT, // 40 + SchemaFileParser.FieldTypes.INT, + SchemaFileParser.FieldTypes.INT, + SchemaFileParser.FieldTypes.INT, + SchemaFileParser.FieldTypes.INT + }; + + @Override + public Map observe(String threadId) { + return (new SchemaFileParser( + "/proc/" + OSGlobals.getPid() + "/task/" + threadId + "/stat", + StatKeys.getStatKeys(), + statTypes, + true)) + .parse(); + } +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/commons/os/observer/impl/IOObserver.java b/src/main/java/org/opensearch/performanceanalyzer/commons/os/observer/impl/IOObserver.java new file mode 100644 index 0000000..692e568 --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/commons/os/observer/impl/IOObserver.java @@ -0,0 +1,80 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.commons.os.observer.impl; + + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Stream; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.performanceanalyzer.commons.os.OSGlobals; +import org.opensearch.performanceanalyzer.commons.os.observer.OsObserver; +import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode; + +public class IOObserver extends OsObserver { + private static final Logger LOGGER = LogManager.getLogger(IOObserver.class); + + public enum StatKeys { + READ_BYTES("read_bytes"), + WRITE_BYTES("write_bytes"), + SYSCR("syscr"), + SYSCW("syscw"), + RCHAR("rchar"), + WCHAR("wchar"); + + public final String label; + + public String getLabel() { + return label; + } + + StatKeys(String label) { + this.label = label; + } + + public static String[] getStatKeys() { + return Stream.of(StatKeys.values()).map(StatKeys::getLabel).toArray(String[]::new); + } + } + + @Override + public Map observe(String threadId) { + try (FileReader fileReader = + new FileReader( + new File( + "/proc/" + + OSGlobals.getPid() + + "/task/" + + threadId + + "/io")); + BufferedReader bufferedReader = new BufferedReader(fileReader); ) { + String line; + Map kvmap = new HashMap<>(); + while ((line = bufferedReader.readLine()) != null) { + String[] toks = line.split("[: ]+"); + String key = toks[0]; + long val = Long.parseLong(toks[1]); + kvmap.put(key, val); + } + return kvmap; + } catch (FileNotFoundException e) { + LOGGER.debug("FileNotFound in parse with exception: {}", () -> e.toString()); + } catch (Exception e) { + LOGGER.debug( + "Error In addSample Tid for: {} with error: {} with ExceptionCode: {}", + () -> threadId, + () -> e.toString(), + () -> StatExceptionCode.THREAD_IO_ERROR.toString()); + } + return Collections.emptyMap(); + } +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/commons/os/observer/impl/SchedObserver.java b/src/main/java/org/opensearch/performanceanalyzer/commons/os/observer/impl/SchedObserver.java new file mode 100644 index 0000000..2957429 --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/commons/os/observer/impl/SchedObserver.java @@ -0,0 +1,51 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.commons.os.observer.impl; + + +import java.util.Map; +import java.util.stream.Stream; +import org.opensearch.performanceanalyzer.commons.os.OSGlobals; +import org.opensearch.performanceanalyzer.commons.os.SchemaFileParser; +import org.opensearch.performanceanalyzer.commons.os.SchemaFileParser.FieldTypes; +import org.opensearch.performanceanalyzer.commons.os.observer.OsObserver; + +public class SchedObserver extends OsObserver { + + public enum SchedKeys { + RUNTICKS("runticks"), + WAITTICKS("waitticks"), + TOTCTXSWS("totctxsws"); + private final String label; + + public String getLabel() { + return label; + } + + SchedKeys(String label) { + this.label = label; + } + + public static String[] getStatKeys() { + return Stream.of(SchedKeys.values()).map(SchedKeys::getLabel).toArray(String[]::new); + } + } + + private static FieldTypes[] schedTypes = { + SchemaFileParser.FieldTypes.ULONG, + SchemaFileParser.FieldTypes.ULONG, + SchemaFileParser.FieldTypes.ULONG + }; + + @Override + public Map observe(String threadId) { + return (new SchemaFileParser( + "/proc/" + OSGlobals.getPid() + "/task/" + threadId + "/schedstat", + SchedKeys.getStatKeys(), + schedTypes)) + .parse(); + } +} diff --git a/src/main/resources/security.policy b/src/main/resources/security.policy new file mode 100644 index 0000000..b8c6c99 --- /dev/null +++ b/src/main/resources/security.policy @@ -0,0 +1,26 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +grant { + permission java.lang.reflect.ReflectPermission "suppressAccessChecks"; + permission java.lang.RuntimePermission "accessDeclaredMembers"; + //- Java 8 jdk.attach fucntionality needs write permissions for /proc/pid/cwd, which varies, + permission java.io.FilePermission "/-","read,readlink,write,delete,execute"; + //permission java.io.FilePermission "/dev/shm/-","read,readlink,write,delete,execute"; + //permission java.io.FilePermission "/proc/-","read,readlink,execute,write,delete"; + //permission java.io.FilePermission "/sys/block/-","read,readlink,execute,write,delete"; + permission java.io.FilePermission "build/tmp/junit_metrics", "read"; + permission com.sun.tools.attach.AttachPermission "attachVirtualMachine"; + permission com.sun.tools.attach.AttachPermission "createAttachProvider"; + permission java.lang.RuntimePermission "manageProcess"; + permission java.lang.RuntimePermission "loadLibrary.attach"; + permission java.lang.RuntimePermission "accessClassInPackage.jdk.internal.misc"; + permission java.lang.RuntimePermission "accessClassInPackage.sun.tools.attach"; + permission java.lang.RuntimePermission "createClassLoader"; + permission java.lang.RuntimePermission "defineClass"; + permission java.lang.management.ManagementPermission "control"; +}; + +