Skip to content

Commit

Permalink
ENH: add buffer records overflow metrics (opensearch-project#2170)
Browse files Browse the repository at this point in the history
* ENH: add buffer records overflow metrics

Signed-off-by: George Chen <[email protected]>
  • Loading branch information
chenqi0805 authored Jan 24, 2023
1 parent 07acd72 commit 07742c4
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ private MetricNames() {}
*/
public static final String RECORDS_PROCESSED = "recordsProcessed";

/**
* Metric representing the number of records failed to be written into the buffer.
*/
public static final String RECORDS_WRITE_FAILED = "recordsWriteFailed";

/**
* Metric representing the time elapsed while writing to a Buffer
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public abstract class AbstractBuffer<T extends Record<?>> implements Buffer<T> {
private final AtomicLong recordsInBuffer;
private final Counter recordsProcessedCounter;
private final Counter writeTimeoutCounter;
private final Counter recordsWriteFailed;
private final Timer writeTimer;
private final Timer readTimer;
private final Timer checkpointTimer;
Expand All @@ -49,6 +50,7 @@ private AbstractBuffer(final PluginMetrics pluginMetrics, final String pipelineN
this.recordsInFlight = pluginMetrics.gauge(MetricNames.RECORDS_INFLIGHT, new AtomicLong());
this.recordsInBuffer = pluginMetrics.gauge(MetricNames.RECORDS_IN_BUFFER, new AtomicLong());
this.recordsProcessedCounter = pluginMetrics.counter(MetricNames.RECORDS_PROCESSED, pipelineName);
this.recordsWriteFailed = pluginMetrics.counter(MetricNames.RECORDS_WRITE_FAILED);
this.writeTimeoutCounter = pluginMetrics.counter(MetricNames.WRITE_TIMEOUTS);
this.writeTimer = pluginMetrics.timer(MetricNames.WRITE_TIME_ELAPSED);
this.readTimer = pluginMetrics.timer(MetricNames.READ_TIME_ELAPSED);
Expand All @@ -73,6 +75,7 @@ public void write(T record, int timeoutInMillis) throws TimeoutException {
recordsInBuffer.incrementAndGet();
postProcess(recordsInBuffer.get());
} catch (TimeoutException e) {
recordsWriteFailed.increment();
writeTimeoutCounter.increment();
throw e;
} finally {
Expand All @@ -92,13 +95,14 @@ public void write(T record, int timeoutInMillis) throws TimeoutException {
public void writeAll(Collection<T> records, int timeoutInMillis) throws Exception {
long startTime = System.nanoTime();

final int size = records.size();
try {
final int size = records.size();
doWriteAll(records, timeoutInMillis);
recordsWrittenCounter.increment(size);
recordsInBuffer.addAndGet(size);
postProcess(recordsInBuffer.get());
} catch (Exception e) {
recordsWriteFailed.increment(size);
if (e instanceof TimeoutException) {
writeTimeoutCounter.increment();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,20 @@ public void testWriteTimeoutMetric() throws TimeoutException {
Assert.assertEquals(1.0, timeoutMeasurements.get(0).getValue(), 0);
}

@Test
public void testWriteRecordsWriteFailedMetric() throws TimeoutException {
// Given
final AbstractBuffer<Record<String>> abstractBuffer = new AbstractBufferTimeoutImpl(testPluginSetting);

// When/Then
Assert.assertThrows(TimeoutException.class, () -> abstractBuffer.write(new Record<>(UUID.randomUUID().toString()), 1000));

final List<Measurement> recordsWriteFailedMeasurements = MetricsTestUtil.getMeasurementList(
new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(BUFFER_NAME).add(MetricNames.RECORDS_WRITE_FAILED).toString());
Assert.assertEquals(1, recordsWriteFailedMeasurements.size());
Assert.assertEquals(1.0, recordsWriteFailedMeasurements.get(0).getValue(), 0);
}

@Test
public void testWriteAllTimeoutMetric() throws TimeoutException {
// Given
Expand All @@ -198,6 +212,22 @@ public void testWriteAllTimeoutMetric() throws TimeoutException {
Assert.assertEquals(1.0, timeoutMeasurements.get(0).getValue(), 0);
}

@Test
public void testWriteAllRecordsWriteFailedMetric() {
// Given
final AbstractBuffer<Record<String>> abstractBuffer = new AbstractBufferRuntimeExceptionImpl(testPluginSetting);
final Collection<Record<String>> testRecords = Arrays.asList(
new Record<>(UUID.randomUUID().toString()), new Record<>(UUID.randomUUID().toString()));

// When/Then
Assert.assertThrows(RuntimeException.class, () -> abstractBuffer.writeAll(testRecords, 1000));

final List<Measurement> recordsWriteFailedMeasurements = MetricsTestUtil.getMeasurementList(
new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(BUFFER_NAME).add(MetricNames.RECORDS_WRITE_FAILED).toString());
Assert.assertEquals(1, recordsWriteFailedMeasurements.size());
Assert.assertEquals(2.0, recordsWriteFailedMeasurements.get(0).getValue(), 0);
}

@Test
public void testWriteAllSizeOverflowException() {
// Given
Expand Down Expand Up @@ -305,6 +335,22 @@ public void doWriteAll(Collection<Record<String>> records, int timeoutInMillis)
}
}

public static class AbstractBufferRuntimeExceptionImpl extends AbstractBufferImpl {
public AbstractBufferRuntimeExceptionImpl(PluginSetting pluginSetting) {
super(pluginSetting);
}

@Override
public void doWrite(Record<String> record, int timeoutInMillis) {
throw new RuntimeException();
}

@Override
public void doWriteAll(Collection<Record<String>> records, int timeoutInMillis) {
throw new RuntimeException();
}
}

public static class AbstractBufferSizeOverflowImpl extends AbstractBufferImpl {
public AbstractBufferSizeOverflowImpl(final PluginSetting pluginSetting) {
super(pluginSetting);
Expand Down
1 change: 1 addition & 0 deletions docs/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ plugin types.
- `recordsWritten`: number of records written into a buffer.
- `recordsRead`: number of records read from a buffer.
- `recordsProcessed`: number of records read from a buffer and marked as processed.
- `recordsWriteFailed`: number of records failed to be written into a buffer.
- `writeTimeouts`: count of write timeouts in a buffer.
- Gauge
- `recordsInBuffer`: number of records in the buffer.
Expand Down

0 comments on commit 07742c4

Please sign in to comment.