Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MAINT: add bytes metrics into opensearch source #3646

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/
package org.opensearch.dataprepper.plugins.source.opensearch;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.buffer.Buffer;
Expand Down Expand Up @@ -32,6 +33,7 @@ public class OpenSearchService {

private static final Logger LOG = LoggerFactory.getLogger(OpenSearchService.class);

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
static final Duration EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT = Duration.ofSeconds(30);
static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(30);

Expand Down Expand Up @@ -83,13 +85,13 @@ private OpenSearchService(final SearchAccessor searchAccessor,
public void start() {
switch(searchAccessor.getSearchContextType()) {
case POINT_IN_TIME:
searchWorker = new PitWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager, openSearchSourcePluginMetrics);
searchWorker = new PitWorker(OBJECT_MAPPER, searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager, openSearchSourcePluginMetrics);
break;
case SCROLL:
searchWorker = new ScrollWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager, openSearchSourcePluginMetrics);
searchWorker = new ScrollWorker(OBJECT_MAPPER, searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager, openSearchSourcePluginMetrics);
break;
case NONE:
searchWorker = new NoSearchContextWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager, openSearchSourcePluginMetrics);
searchWorker = new NoSearchContextWorker(OBJECT_MAPPER, searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager, openSearchSourcePluginMetrics);
break;
default:
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.plugins.source.opensearch.metrics;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Timer;
import org.opensearch.dataprepper.metrics.PluginMetrics;

Expand All @@ -15,12 +16,17 @@ public class OpenSearchSourcePluginMetrics {
static final String INDICES_PROCESSED = "indicesProcessed";
static final String INDEX_PROCESSING_TIME_ELAPSED = "indexProcessingTime";
static final String PROCESSING_ERRORS = "processingErrors";
static final String BYTES_RECEIVED = "bytesReceived";
static final String BYTES_PROCESSED = "bytesProcessed";

private final Counter documentsProcessedCounter;
private final Counter indicesProcessedCounter;
private final Counter processingErrorsCounter;
private final Timer indexProcessingTimeTimer;

private final DistributionSummary bytesReceivedSummary;
private final DistributionSummary bytesProcessedSummary;

public static OpenSearchSourcePluginMetrics create(final PluginMetrics pluginMetrics) {
return new OpenSearchSourcePluginMetrics(pluginMetrics);
}
Expand All @@ -30,6 +36,8 @@ private OpenSearchSourcePluginMetrics(final PluginMetrics pluginMetrics) {
indicesProcessedCounter = pluginMetrics.counter(INDICES_PROCESSED);
processingErrorsCounter = pluginMetrics.counter(PROCESSING_ERRORS);
indexProcessingTimeTimer = pluginMetrics.timer(INDEX_PROCESSING_TIME_ELAPSED);
bytesReceivedSummary = pluginMetrics.summary(BYTES_RECEIVED);
bytesProcessedSummary = pluginMetrics.summary(BYTES_PROCESSED);
}

public Counter getDocumentsProcessedCounter() {
Expand All @@ -47,4 +55,12 @@ public Counter getProcessingErrorsCounter() {
public Timer getIndexProcessingTimeTimer() {
return indexProcessingTimeTimer;
}

public DistributionSummary getBytesReceivedSummary() {
return bytesReceivedSummary;
}

public DistributionSummary getBytesProcessedSummary() {
return bytesProcessedSummary;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.plugins.source.opensearch.worker;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
Expand Down Expand Up @@ -42,6 +43,7 @@ public class NoSearchContextWorker implements SearchWorker, Runnable {

private static final Logger LOG = LoggerFactory.getLogger(NoSearchContextWorker.class);

private final ObjectMapper objectMapper;
private final SearchAccessor searchAccessor;
private final OpenSearchSourceConfiguration openSearchSourceConfiguration;
private final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator;
Expand All @@ -52,13 +54,15 @@ public class NoSearchContextWorker implements SearchWorker, Runnable {

private int noAvailableIndicesCount = 0;

public NoSearchContextWorker(final SearchAccessor searchAccessor,
final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator,
final BufferAccumulator<Record<Event>> bufferAccumulator,
final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier,
final AcknowledgementSetManager acknowledgementSetManager,
final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics) {
public NoSearchContextWorker(final ObjectMapper objectMapper,
final SearchAccessor searchAccessor,
final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator,
final BufferAccumulator<Record<Event>> bufferAccumulator,
final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier,
final AcknowledgementSetManager acknowledgementSetManager,
final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics) {
this.objectMapper = objectMapper;
this.searchAccessor = searchAccessor;
this.sourceCoordinator = sourceCoordinator;
this.openSearchSourceConfiguration = openSearchSourceConfiguration;
Expand Down Expand Up @@ -154,11 +158,14 @@ private void processIndex(final SourcePartition<OpenSearchIndexProgressState> op

searchWithSearchAfterResults.getDocuments().stream().map(Record::new).forEach(record -> {
try {
final long documentBytes = objectMapper.writeValueAsBytes(record.getData().getJsonNode()).length;
openSearchSourcePluginMetrics.getBytesReceivedSummary().record(documentBytes);
if (Objects.nonNull(acknowledgementSet)) {
acknowledgementSet.add(record.getData());
}
bufferAccumulator.add(record);
openSearchSourcePluginMetrics.getDocumentsProcessedCounter().increment();
openSearchSourcePluginMetrics.getBytesProcessedSummary().record(documentBytes);
} catch (Exception e) {
openSearchSourcePluginMetrics.getProcessingErrorsCounter().increment();
LOG.error("Failed writing OpenSearch documents to buffer. The last document created has document id '{}' from index '{}' : {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/
package org.opensearch.dataprepper.plugins.source.opensearch.worker;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
Expand Down Expand Up @@ -55,6 +56,7 @@ public class PitWorker implements SearchWorker, Runnable {
static final String EXTEND_KEEP_ALIVE_TIME = "1m";
private static final Duration EXTEND_KEEP_ALIVE_DURATION = Duration.ofMinutes(1);

private final ObjectMapper objectMapper;
private final SearchAccessor searchAccessor;
private final OpenSearchSourceConfiguration openSearchSourceConfiguration;
private final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator;
Expand All @@ -66,13 +68,15 @@ public class PitWorker implements SearchWorker, Runnable {

private int noAvailableIndicesCount = 0;

public PitWorker(final SearchAccessor searchAccessor,
public PitWorker(final ObjectMapper objectMapper,
final SearchAccessor searchAccessor,
final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator,
final BufferAccumulator<Record<Event>> bufferAccumulator,
final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier,
final AcknowledgementSetManager acknowledgementSetManager,
final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics) {
this.objectMapper = objectMapper;
this.searchAccessor = searchAccessor;
this.sourceCoordinator = sourceCoordinator;
this.openSearchSourceConfiguration = openSearchSourceConfiguration;
Expand Down Expand Up @@ -191,11 +195,14 @@ private void processIndex(final SourcePartition<OpenSearchIndexProgressState> op

searchWithSearchAfterResults.getDocuments().stream().map(Record::new).forEach(record -> {
try {
final long documentBytes = objectMapper.writeValueAsBytes(record.getData().getJsonNode()).length;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Writing this to bytes is going to add a performance hit.

Also, there is a possibility that this will be somewhat different than the input since we are looking at the Event here rather than the actual JSON document.

Copy link
Collaborator Author

@chenqi0805 chenqi0805 Nov 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The JsonNode in the event is essentially SearchResults/hit/source JsonNode. That document unit is uniform across bytesReceived and bytesProcessed so that user can make a comparison. I am open to other alternative unit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that we set it to the document _source here:

That may or may not be the best metric, but I think it can work.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not have a better solution for performance hit yet since the SDK client returns ObjectNode...

openSearchSourcePluginMetrics.getBytesReceivedSummary().record(documentBytes);
if (Objects.nonNull(acknowledgementSet)) {
acknowledgementSet.add(record.getData());
}
bufferAccumulator.add(record);
openSearchSourcePluginMetrics.getDocumentsProcessedCounter().increment();
openSearchSourcePluginMetrics.getBytesProcessedSummary().record(documentBytes);
} catch (Exception e) {
openSearchSourcePluginMetrics.getProcessingErrorsCounter().increment();
LOG.error("Failed writing OpenSearch documents to buffer. The last document created has document id '{}' from index '{}' : {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/
package org.opensearch.dataprepper.plugins.source.opensearch.worker;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
Expand Down Expand Up @@ -50,6 +51,7 @@ public class ScrollWorker implements SearchWorker {
private static final Duration BACKOFF_ON_SCROLL_LIMIT_REACHED = Duration.ofSeconds(120);
static final String SCROLL_TIME_PER_BATCH = "1m";

private final ObjectMapper objectMapper;
private final SearchAccessor searchAccessor;
private final OpenSearchSourceConfiguration openSearchSourceConfiguration;
private final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator;
Expand All @@ -60,13 +62,15 @@ public class ScrollWorker implements SearchWorker {

private int noAvailableIndicesCount = 0;

public ScrollWorker(final SearchAccessor searchAccessor,
public ScrollWorker(final ObjectMapper objectMapper,
final SearchAccessor searchAccessor,
final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator,
final BufferAccumulator<Record<Event>> bufferAccumulator,
final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier,
final AcknowledgementSetManager acknowledgementSetManager,
final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics) {
this.objectMapper = objectMapper;
this.searchAccessor = searchAccessor;
this.openSearchSourceConfiguration = openSearchSourceConfiguration;
this.sourceCoordinator = sourceCoordinator;
Expand Down Expand Up @@ -198,11 +202,14 @@ private void writeDocumentsToBuffer(final List<Event> documents,
final AcknowledgementSet acknowledgementSet) {
documents.stream().map(Record::new).forEach(record -> {
try {
final long documentBytes = objectMapper.writeValueAsBytes(record.getData().getJsonNode()).length;
openSearchSourcePluginMetrics.getBytesReceivedSummary().record(documentBytes);
if (Objects.nonNull(acknowledgementSet)) {
acknowledgementSet.add(record.getData());
}
bufferAccumulator.add(record);
openSearchSourcePluginMetrics.getDocumentsProcessedCounter().increment();
openSearchSourcePluginMetrics.getBytesProcessedSummary().record(documentBytes);
} catch (Exception e) {
openSearchSourcePluginMetrics.getProcessingErrorsCounter().increment();
LOG.error("Failed writing OpenSearch documents to buffer. The last document created has document id '{}' from index '{}' : {}",
Expand Down
Loading
Loading