Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for logging MDC in the Kafka buffer #4131

Merged
merged 2 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,18 @@
import org.apache.kafka.common.errors.RecordBatchTooLargeException;
import org.opensearch.dataprepper.plugins.kafka.admin.KafkaAdminAccessor;
import org.opensearch.dataprepper.plugins.kafka.buffer.serialization.BufferSerializationFactory;
import org.opensearch.dataprepper.plugins.kafka.common.KafkaMdc;
import org.opensearch.dataprepper.plugins.kafka.common.serialization.CommonSerializationFactory;
import org.opensearch.dataprepper.plugins.kafka.common.serialization.SerializationFactory;
import org.opensearch.dataprepper.plugins.kafka.common.thread.KafkaPluginThreadFactory;
import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumer;
import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumerFactory;
import org.opensearch.dataprepper.plugins.kafka.producer.KafkaCustomProducer;
import org.opensearch.dataprepper.plugins.kafka.producer.KafkaCustomProducerFactory;
import org.opensearch.dataprepper.plugins.kafka.service.TopicServiceFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import java.time.Duration;
import java.util.Collection;
Expand All @@ -53,6 +56,7 @@ public class KafkaBuffer extends AbstractBuffer<Record<Event>> {
public static final int INNER_BUFFER_BATCH_SIZE = 250000;
static final String WRITE = "Write";
static final String READ = "Read";
static final String MDC_KAFKA_PLUGIN_VALUE = "buffer";
private final KafkaCustomProducer producer;
private final KafkaAdminAccessor kafkaAdminAccessor;
private final AbstractBuffer<Record<Event>> innerBuffer;
Expand Down Expand Up @@ -80,7 +84,7 @@ public KafkaBuffer(final PluginSetting pluginSetting, final KafkaBufferConfig ka
final List<KafkaCustomConsumer> consumers = kafkaCustomConsumerFactory.createConsumersForTopic(kafkaBufferConfig, kafkaBufferConfig.getTopic(),
innerBuffer, consumerMetrics, acknowledgementSetManager, byteDecoder, shutdownInProgress, false, circuitBreaker);
this.kafkaAdminAccessor = new KafkaAdminAccessor(kafkaBufferConfig, List.of(kafkaBufferConfig.getTopic().getGroupId()));
this.executorService = Executors.newFixedThreadPool(consumers.size());
this.executorService = Executors.newFixedThreadPool(consumers.size(), KafkaPluginThreadFactory.defaultExecutorThreadFactory(MDC_KAFKA_PLUGIN_VALUE));
consumers.forEach(this.executorService::submit);

this.drainTimeout = kafkaBufferConfig.getDrainTimeout();
Expand All @@ -89,6 +93,7 @@ public KafkaBuffer(final PluginSetting pluginSetting, final KafkaBufferConfig ka
@Override
public void writeBytes(final byte[] bytes, final String key, int timeoutInMillis) throws Exception {
try {
setMdc();
producer.produceRawData(bytes, key);
} catch (final Exception e) {
LOG.error(e.getMessage(), e);
Expand All @@ -102,15 +107,21 @@ public void writeBytes(final byte[] bytes, final String key, int timeoutInMillis
throw new RuntimeException(e);
}
}
finally {
resetMdc();
}
}

@Override
public void doWrite(Record<Event> record, int timeoutInMillis) throws TimeoutException {
try {
setMdc();
producer.produceRecords(record);
} catch (final Exception e) {
LOG.error(e.getMessage(), e);
throw new RuntimeException(e);
} finally {
resetMdc();
}
}

Expand All @@ -121,29 +132,50 @@ public boolean isByteBuffer() {

@Override
public void doWriteAll(Collection<Record<Event>> records, int timeoutInMillis) throws Exception {
for ( Record<Event> record: records ) {
for (Record<Event> record : records) {
doWrite(record, timeoutInMillis);
}
}

@Override
public Map.Entry<Collection<Record<Event>>, CheckpointState> doRead(int timeoutInMillis) {
return innerBuffer.read(timeoutInMillis);
try {
setMdc();
return innerBuffer.read(timeoutInMillis);
} finally {
resetMdc();
}
}

@Override
public void postProcess(final Long recordsInBuffer) {
innerBuffer.postProcess(recordsInBuffer);
try {
setMdc();

innerBuffer.postProcess(recordsInBuffer);
} finally {
resetMdc();
}
}

@Override
public void doCheckpoint(CheckpointState checkpointState) {
innerBuffer.doCheckpoint(checkpointState);
try {
setMdc();
innerBuffer.doCheckpoint(checkpointState);
} finally {
resetMdc();
}
}

@Override
public boolean isEmpty() {
return kafkaAdminAccessor.areTopicsEmpty() && innerBuffer.isEmpty();
try {
setMdc();
return kafkaAdminAccessor.areTopicsEmpty() && innerBuffer.isEmpty();
} finally {
resetMdc();
}
}

@Override
Expand All @@ -158,21 +190,35 @@ public boolean isWrittenOffHeapOnly() {

@Override
public void shutdown() {
shutdownInProgress.set(true);
executorService.shutdown();

try {
if (executorService.awaitTermination(EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS)) {
LOG.info("Successfully waited for consumer task to terminate");
} else {
LOG.warn("Consumer task did not terminate in time, forcing termination");
setMdc();

shutdownInProgress.set(true);
executorService.shutdown();

try {
if (executorService.awaitTermination(EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS)) {
LOG.info("Successfully waited for consumer task to terminate");
} else {
LOG.warn("Consumer task did not terminate in time, forcing termination");
executorService.shutdownNow();
}
} catch (final InterruptedException e) {
LOG.error("Interrupted while waiting for consumer task to terminate", e);
executorService.shutdownNow();
}
} catch (final InterruptedException e) {
LOG.error("Interrupted while waiting for consumer task to terminate", e);
executorService.shutdownNow();

innerBuffer.shutdown();
} finally {
resetMdc();
}
}

private static void setMdc() {
MDC.put(KafkaMdc.MDC_KAFKA_PLUGIN_KEY, MDC_KAFKA_PLUGIN_VALUE);
}

innerBuffer.shutdown();
private static void resetMdc() {
MDC.remove(KafkaMdc.MDC_KAFKA_PLUGIN_KEY);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.kafka.common;public class KafkaMdc {
public static final String MDC_KAFKA_PLUGIN_KEY = "kafkaPluginType";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.kafka.common.thread;

import org.opensearch.dataprepper.plugins.kafka.common.KafkaMdc;
import org.slf4j.MDC;

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
* An implementation of {@link ThreadFactory} for Kafka plugin threads.
*/
public class KafkaPluginThreadFactory implements ThreadFactory {
private final ThreadFactory delegateThreadFactory;
private final String threadPrefix;
private final String kafkaPluginType;
private final AtomicInteger threadNumber = new AtomicInteger(1);

KafkaPluginThreadFactory(
final ThreadFactory delegateThreadFactory,
final String kafkaPluginType) {
this.delegateThreadFactory = delegateThreadFactory;
this.threadPrefix = "kafka-" + kafkaPluginType + "-";
this.kafkaPluginType = kafkaPluginType;
}

/**
* Creates an instance specifically for use with {@link Executors}.
*
* @param kafkaPluginType The name of the plugin type. e.g. sink, source, buffer
* @return An instance of the {@link KafkaPluginThreadFactory}.
*/
public static KafkaPluginThreadFactory defaultExecutorThreadFactory(final String kafkaPluginType) {
return new KafkaPluginThreadFactory(Executors.defaultThreadFactory(), kafkaPluginType);
}

@Override
public Thread newThread(final Runnable runnable) {
final Thread thread = delegateThreadFactory.newThread(() -> {
MDC.put(KafkaMdc.MDC_KAFKA_PLUGIN_KEY, kafkaPluginType);
try {
runnable.run();
} finally {
MDC.clear();
}
});
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we invoke MDC.clear() after the run?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it really matters because the thread will end. So the ThreadLocalContext is lost. But, I could do it.


thread.setName(threadPrefix + threadNumber.getAndIncrement());

return thread;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@

package org.opensearch.dataprepper.plugins.kafka.buffer;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
Expand All @@ -25,6 +27,8 @@
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer;
import org.opensearch.dataprepper.plugins.kafka.admin.KafkaAdminAccessor;
import org.opensearch.dataprepper.plugins.kafka.common.KafkaMdc;
import org.opensearch.dataprepper.plugins.kafka.common.thread.KafkaPluginThreadFactory;
import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionType;
Expand All @@ -35,6 +39,7 @@
import org.opensearch.dataprepper.plugins.kafka.producer.KafkaCustomProducerFactory;
import org.opensearch.dataprepper.plugins.kafka.producer.ProducerWorker;
import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat;
import org.slf4j.MDC;

import java.time.Duration;
import java.util.Arrays;
Expand Down Expand Up @@ -151,7 +156,7 @@ public KafkaBuffer createObjectUnderTest(final List<KafkaCustomConsumer> consume
blockingBuffer = mock;
})) {

executorsMockedStatic.when(() -> Executors.newFixedThreadPool(anyInt())).thenReturn(executorService);
executorsMockedStatic.when(() -> Executors.newFixedThreadPool(anyInt(), any(KafkaPluginThreadFactory.class))).thenReturn(executorService);
return new KafkaBuffer(pluginSetting, bufferConfig, acknowledgementSetManager, null, awsCredentialsSupplier, circuitBreaker);
}
}
Expand Down Expand Up @@ -353,4 +358,84 @@ public void testShutdown_InterruptedException() throws InterruptedException {
verify(executorService).awaitTermination(eq(EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT), eq(TimeUnit.SECONDS));
verify(executorService).shutdownNow();
}

@Nested
class MdcTests {
private MockedStatic<MDC> mdcMockedStatic;

@BeforeEach
void setUp() {
mdcMockedStatic = mockStatic(MDC.class);
}

@AfterEach
void tearDown() {
mdcMockedStatic.close();
}

@Test
void writeBytes_sets_and_clears_MDC() throws Exception {
createObjectUnderTest().writeBytes(new byte[] {}, UUID.randomUUID().toString(), 100);

mdcMockedStatic.verify(() -> MDC.put(KafkaMdc.MDC_KAFKA_PLUGIN_KEY, "buffer"));
mdcMockedStatic.verify(() -> MDC.remove(KafkaMdc.MDC_KAFKA_PLUGIN_KEY));
}

@Test
void doWrite_sets_and_clears_MDC() throws Exception {
createObjectUnderTest().doWrite(mock(Record.class), 100);

mdcMockedStatic.verify(() -> MDC.put(KafkaMdc.MDC_KAFKA_PLUGIN_KEY, "buffer"));
mdcMockedStatic.verify(() -> MDC.remove(KafkaMdc.MDC_KAFKA_PLUGIN_KEY));
}

@Test
void doWriteAll_sets_and_clears_MDC() throws Exception {
final List<Record<Event>> records = Collections.singletonList(mock(Record.class));
createObjectUnderTest().doWriteAll(records, 100);

mdcMockedStatic.verify(() -> MDC.put(KafkaMdc.MDC_KAFKA_PLUGIN_KEY, "buffer"));
mdcMockedStatic.verify(() -> MDC.remove(KafkaMdc.MDC_KAFKA_PLUGIN_KEY));
}

@Test
void doRead_sets_and_clears_MDC() throws Exception {
createObjectUnderTest().doRead(100);

mdcMockedStatic.verify(() -> MDC.put(KafkaMdc.MDC_KAFKA_PLUGIN_KEY, "buffer"));
mdcMockedStatic.verify(() -> MDC.remove(KafkaMdc.MDC_KAFKA_PLUGIN_KEY));
}

@Test
void doCheckpoint_sets_and_clears_MDC() throws Exception {
createObjectUnderTest().doCheckpoint(mock(CheckpointState.class));

mdcMockedStatic.verify(() -> MDC.put(KafkaMdc.MDC_KAFKA_PLUGIN_KEY, "buffer"));
mdcMockedStatic.verify(() -> MDC.remove(KafkaMdc.MDC_KAFKA_PLUGIN_KEY));
}

@Test
void postProcess_sets_and_clears_MDC() throws Exception {
createObjectUnderTest().postProcess(100L);

mdcMockedStatic.verify(() -> MDC.put(KafkaMdc.MDC_KAFKA_PLUGIN_KEY, "buffer"));
mdcMockedStatic.verify(() -> MDC.remove(KafkaMdc.MDC_KAFKA_PLUGIN_KEY));
}

@Test
void isEmpty_sets_and_clears_MDC() throws Exception {
createObjectUnderTest().isEmpty();

mdcMockedStatic.verify(() -> MDC.put(KafkaMdc.MDC_KAFKA_PLUGIN_KEY, "buffer"));
mdcMockedStatic.verify(() -> MDC.remove(KafkaMdc.MDC_KAFKA_PLUGIN_KEY));
}

@Test
void shutdown_sets_and_clears_MDC() throws Exception {
createObjectUnderTest().shutdown();

mdcMockedStatic.verify(() -> MDC.put(KafkaMdc.MDC_KAFKA_PLUGIN_KEY, "buffer"));
mdcMockedStatic.verify(() -> MDC.remove(KafkaMdc.MDC_KAFKA_PLUGIN_KEY));
}
}
}
Loading
Loading