From 07742c4234732042336768e9432651af47c2c19c Mon Sep 17 00:00:00 2001 From: Qi Chen Date: Tue, 24 Jan 2023 09:51:27 -0600 Subject: [PATCH] ENH: add buffer records overflow metrics (#2170) * ENH: add buffer records overflow metrics Signed-off-by: George Chen --- .../dataprepper/metrics/MetricNames.java | 5 ++ .../model/buffer/AbstractBuffer.java | 6 ++- .../model/buffer/AbstractBufferTest.java | 46 +++++++++++++++++++ docs/monitoring.md | 1 + 4 files changed, 57 insertions(+), 1 deletion(-) diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/metrics/MetricNames.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/metrics/MetricNames.java index 3f54ba2cc7..75b6e65f39 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/metrics/MetricNames.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/metrics/MetricNames.java @@ -57,6 +57,11 @@ private MetricNames() {} */ public static final String RECORDS_PROCESSED = "recordsProcessed"; + /** + * Metric representing the number of records failed to be written into the buffer. + */ + public static final String RECORDS_WRITE_FAILED = "recordsWriteFailed"; + /** * Metric representing the time elapsed while writing to a Buffer */ diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/AbstractBuffer.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/AbstractBuffer.java index fffbb41b49..9bc028ec1f 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/AbstractBuffer.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/AbstractBuffer.java @@ -30,6 +30,7 @@ public abstract class AbstractBuffer> implements Buffer { private final AtomicLong recordsInBuffer; private final Counter recordsProcessedCounter; private final Counter writeTimeoutCounter; + private final Counter recordsWriteFailed; private final Timer writeTimer; private final Timer readTimer; private final Timer checkpointTimer; @@ -49,6 +50,7 @@ private AbstractBuffer(final PluginMetrics pluginMetrics, final String pipelineN this.recordsInFlight = pluginMetrics.gauge(MetricNames.RECORDS_INFLIGHT, new AtomicLong()); this.recordsInBuffer = pluginMetrics.gauge(MetricNames.RECORDS_IN_BUFFER, new AtomicLong()); this.recordsProcessedCounter = pluginMetrics.counter(MetricNames.RECORDS_PROCESSED, pipelineName); + this.recordsWriteFailed = pluginMetrics.counter(MetricNames.RECORDS_WRITE_FAILED); this.writeTimeoutCounter = pluginMetrics.counter(MetricNames.WRITE_TIMEOUTS); this.writeTimer = pluginMetrics.timer(MetricNames.WRITE_TIME_ELAPSED); this.readTimer = pluginMetrics.timer(MetricNames.READ_TIME_ELAPSED); @@ -73,6 +75,7 @@ public void write(T record, int timeoutInMillis) throws TimeoutException { recordsInBuffer.incrementAndGet(); postProcess(recordsInBuffer.get()); } catch (TimeoutException e) { + recordsWriteFailed.increment(); writeTimeoutCounter.increment(); throw e; } finally { @@ -92,13 +95,14 @@ public void write(T record, int timeoutInMillis) throws TimeoutException { public void writeAll(Collection records, int timeoutInMillis) throws Exception { long startTime = System.nanoTime(); + final int size = records.size(); try { - final int size = records.size(); doWriteAll(records, timeoutInMillis); recordsWrittenCounter.increment(size); recordsInBuffer.addAndGet(size); postProcess(recordsInBuffer.get()); } catch (Exception e) { + recordsWriteFailed.increment(size); if (e instanceof TimeoutException) { writeTimeoutCounter.increment(); } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/buffer/AbstractBufferTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/buffer/AbstractBufferTest.java index 0cd76ea083..17f13e4ae8 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/buffer/AbstractBufferTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/buffer/AbstractBufferTest.java @@ -182,6 +182,20 @@ public void testWriteTimeoutMetric() throws TimeoutException { Assert.assertEquals(1.0, timeoutMeasurements.get(0).getValue(), 0); } + @Test + public void testWriteRecordsWriteFailedMetric() throws TimeoutException { + // Given + final AbstractBuffer> abstractBuffer = new AbstractBufferTimeoutImpl(testPluginSetting); + + // When/Then + Assert.assertThrows(TimeoutException.class, () -> abstractBuffer.write(new Record<>(UUID.randomUUID().toString()), 1000)); + + final List recordsWriteFailedMeasurements = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(BUFFER_NAME).add(MetricNames.RECORDS_WRITE_FAILED).toString()); + Assert.assertEquals(1, recordsWriteFailedMeasurements.size()); + Assert.assertEquals(1.0, recordsWriteFailedMeasurements.get(0).getValue(), 0); + } + @Test public void testWriteAllTimeoutMetric() throws TimeoutException { // Given @@ -198,6 +212,22 @@ public void testWriteAllTimeoutMetric() throws TimeoutException { Assert.assertEquals(1.0, timeoutMeasurements.get(0).getValue(), 0); } + @Test + public void testWriteAllRecordsWriteFailedMetric() { + // Given + final AbstractBuffer> abstractBuffer = new AbstractBufferRuntimeExceptionImpl(testPluginSetting); + final Collection> testRecords = Arrays.asList( + new Record<>(UUID.randomUUID().toString()), new Record<>(UUID.randomUUID().toString())); + + // When/Then + Assert.assertThrows(RuntimeException.class, () -> abstractBuffer.writeAll(testRecords, 1000)); + + final List recordsWriteFailedMeasurements = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(BUFFER_NAME).add(MetricNames.RECORDS_WRITE_FAILED).toString()); + Assert.assertEquals(1, recordsWriteFailedMeasurements.size()); + Assert.assertEquals(2.0, recordsWriteFailedMeasurements.get(0).getValue(), 0); + } + @Test public void testWriteAllSizeOverflowException() { // Given @@ -305,6 +335,22 @@ public void doWriteAll(Collection> records, int timeoutInMillis) } } + public static class AbstractBufferRuntimeExceptionImpl extends AbstractBufferImpl { + public AbstractBufferRuntimeExceptionImpl(PluginSetting pluginSetting) { + super(pluginSetting); + } + + @Override + public void doWrite(Record record, int timeoutInMillis) { + throw new RuntimeException(); + } + + @Override + public void doWriteAll(Collection> records, int timeoutInMillis) { + throw new RuntimeException(); + } + } + public static class AbstractBufferSizeOverflowImpl extends AbstractBufferImpl { public AbstractBufferSizeOverflowImpl(final PluginSetting pluginSetting) { super(pluginSetting); diff --git a/docs/monitoring.md b/docs/monitoring.md index 7195a15c70..777c581c96 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -29,6 +29,7 @@ plugin types. - `recordsWritten`: number of records written into a buffer. - `recordsRead`: number of records read from a buffer. - `recordsProcessed`: number of records read from a buffer and marked as processed. + - `recordsWriteFailed`: number of records failed to be written into a buffer. - `writeTimeouts`: count of write timeouts in a buffer. - Gauge - `recordsInBuffer`: number of records in the buffer.