Skip to content

Separate disruptors for encoder and network communications #188

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,12 +205,13 @@ You can use a `Logstash*Encoder`, `*EventCompositeJsonEncoder`, or any other log
All of the output formatting options are configured at the encoder level.

Internally, the TCP appenders are asynchronous (using the [LMAX Disruptor RingBuffer](https://lmax-exchange.github.io/disruptor/)).
All the encoding and TCP communication is delegated to a single writer thread.
All the encoding and TCP communication are delegated to two threads accordingly.
There is no need to wrap the TCP appenders with another asynchronous appender
(such as `AsyncAppender` or `LoggingEventAsyncDisruptorAppender`).

All the configuration parameters (except for sub-appender) of the [async appenders](#async)
are valid for TCP appenders. For example, `waitStrategyType` and `ringBufferSize`.
are valid for TCP appenders. For example, `waitStrategyType` and `ringBufferSize`.
They are used for customizing the encoding thread. Communication thread has its own properties with a `network` prefix such as `networkRingBufferSize`, `networkWaitStrategy`, `networkThreadNameFormat` - see [AbstractLogstashTcpSocketAppender](https://github.com/bedrin/logstash-logback-encoder/blob/master/src/main/java/net/logstash/logback/appender/AbstractLogstashTcpSocketAppender.java) for details.

The TCP appenders will never block the logging thread.
If the RingBuffer is full (e.g. due to slow network, etc), then events will be dropped.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,7 @@
*/
package net.logstash.logback.appender;

import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.*;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
Expand All @@ -27,18 +24,16 @@
import java.util.Collections;
import java.util.Formatter;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;

import javax.net.SocketFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocket;
import javax.net.ssl.SSLSocketFactory;

import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import net.logstash.logback.encoder.SeparatorParser;

import org.codehaus.mojo.animal_sniffer.IgnoreJRERequirement;
Expand All @@ -53,10 +48,6 @@
import ch.qos.logback.core.util.CloseUtil;
import ch.qos.logback.core.util.Duration;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.LifecycleAware;
import com.lmax.disruptor.RingBuffer;

/**
* An {@link AsyncDisruptorAppender} appender that writes
* events to a TCP {@link Socket} outputStream.
Expand All @@ -82,7 +73,8 @@ public abstract class AbstractLogstashTcpSocketAppender<Event extends DeferredPr
protected static final String HOST_NAME_FORMAT = "%3$s";
protected static final String PORT_FORMAT = "%4$d";
public static final String DEFAULT_THREAD_NAME_FORMAT = "logback-appender-" + APPENDER_NAME_FORMAT + "-" + HOST_NAME_FORMAT + ":" + PORT_FORMAT + "-" + THREAD_INDEX_FORMAT;

public static final String DEFAULT_NETWORK_THREAD_NAME_FORMAT = "logback-appender-network-" + APPENDER_NAME_FORMAT + "-" + HOST_NAME_FORMAT + ":" + PORT_FORMAT + "-" + THREAD_INDEX_FORMAT;

/**
* The default port number of remote logging server (4560).
*/
Expand Down Expand Up @@ -252,11 +244,191 @@ public abstract class AbstractLogstashTcpSocketAppender<Event extends DeferredPr
* @see #isGetHostStringPossible()
*/
private final boolean getHostStringPossible = isGetHostStringPossible();


/**
* The {@link Disruptor} containing the {@link RingBuffer} onto
* which to publish encoded events for transferring.
*/
private Disruptor<LogEvent<byte[]>> networkDisruptor;

/**
* The {@link EventFactory} used to create encoded {@link LogEvent}s for the RingBuffer.
*/
private LogEventFactory<byte[]> networkEventFactory = new LogEventFactory<byte[]>();

/**
* The size of the {@link RingBuffer} with encoded messages.
* Defaults to {@value #DEFAULT_RING_BUFFER_SIZE}.
* If the handler thread is not as fast as the producing threads,
* then the {@link RingBuffer} will eventually fill up,
* at which point producer will block.
* <p>
* Must be a positive power of 2.
*/
private int networkRingBufferSize = DEFAULT_RING_BUFFER_SIZE;

/**
* The {@link ScheduledExecutorService} used to execute the handler task.
*/
private ScheduledThreadPoolExecutor networkExecutorService;

/**
* Defines what happens when there is an exception during
* {@link RingBuffer} processing of encoded events.
*/
private ExceptionHandler<LogEvent> networkExceptionHandler = new LogEventExceptionHandler();

/**
* The {@link ThreadFactory} used to create the handler thread for encoded events.
*/
private ThreadFactory networkThreadFactory = new NetworkWorkerThreadFactory();

/**
* Pattern used by the {@link WorkerThreadFactory} to set the
* handler thread name.
* Defaults to {@value #DEFAULT_NETWORK_THREAD_NAME_FORMAT}.
* <p>
*
* If you change the {@link #networkThreadFactory}, then this
* value may not be honored.
* <p>
*
* The string is a format pattern understood by {@link Formatter#format(String, Object...)}.
* {@link Formatter#format(String, Object...)} is used to
* construct the actual thread name prefix.
* The first argument (%1$s) is the string appender name.
* The second argument (%2$d) is the numerical thread index.
* Other arguments can be made available by subclasses.
*/
private String networkThreadNameFormat = DEFAULT_NETWORK_THREAD_NAME_FORMAT;

/**
* The {@link WaitStrategy} to used by the RingBuffer
* when pulling encoded events to be processed by {@link #eventHandler}.
* <p>
* By default, a {@link BlockingWaitStrategy} is used, which is the most
* CPU conservative, but results in a higher latency.
* If you need lower latency (at the cost of higher CPU usage),
* consider using a {@link SleepingWaitStrategy} or a {@link PhasedBackoffWaitStrategy}.
*/
private WaitStrategy networkWaitStrategy = DEFAULT_WAIT_STRATEGY;

/**
* Temporary byte array used for encoding events
*/
private ByteArrayOutputStream baos = new ByteArrayOutputStream();

private final static LogEventTranslator<byte[]> LOG_EVENT_ENCODER_TRANSLATOR = new LogEventTranslator<byte[]>();

/**
* The default {@link ThreadFactory} used to create the handler thread.
*/
private class NetworkWorkerThreadFactory implements ThreadFactory {

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName(calculateThreadName());
t.setDaemon(useDaemonThread);
return t;
}

}

/**
* Event handler responsible for encoding events
*/
private class EncodingEventHandler implements EventHandler<LogEvent<Event>>, LifecycleAware {

private final AbstractLogstashTcpSocketAppender<Event>.TcpSendingEventHandler tcpSendingEventHandler;

private EncodingEventHandler(AbstractLogstashTcpSocketAppender<Event>.TcpSendingEventHandler tcpSendingEventHandler) {
this.tcpSendingEventHandler = tcpSendingEventHandler;
}

@Override
public void onEvent(LogEvent<Event> logEvent, long sequence, boolean endOfBatch) throws Exception {

if (logEvent.event != null) {

encoder.doEncode(logEvent.event);
byte[] encodedEvent = baos.toByteArray();
baos.reset();

networkDisruptor.getRingBuffer().publishEvent(LOG_EVENT_ENCODER_TRANSLATOR, encodedEvent);

}

}

@SuppressWarnings("unchecked")
@Override
public void onStart() {

networkExecutorService = new ScheduledThreadPoolExecutor(
getThreadPoolCoreSize(),
networkThreadFactory);

networkDisruptor = new Disruptor<LogEvent<byte[]>>(
networkEventFactory,
networkRingBufferSize,
networkExecutorService,
ProducerType.SINGLE,
networkWaitStrategy);

networkDisruptor.handleExceptionsWith(networkExceptionHandler);

networkDisruptor.handleEventsWith(new EventClearingEventHandler<byte[]>(tcpSendingEventHandler));

try {
encoder.init(baos);
} catch (IOException ioe) {
addStatus(new ErrorStatus(
"Failed to init encoder", this, ioe));
}

networkDisruptor.start();
}

@Override
public void onShutdown() {
closeEncoder();

try {
networkDisruptor.shutdown(1, TimeUnit.MINUTES);
} catch (com.lmax.disruptor.TimeoutException e) {
addWarn("Some queued events have not been logged due to requested shutdown");
}

networkExecutorService.shutdown();

try {
if (!networkExecutorService.awaitTermination(1, TimeUnit.MINUTES)) {
addWarn("Some queued events have not been logged due to requested shutdown");
}
} catch (InterruptedException e) {
addWarn("Some queued events have not been logged due to requested shutdown", e);
}
}

private void closeEncoder() {
try {
encoder.close();
} catch (IOException ioe) {
addStatus(new ErrorStatus(
"Failed to close encoder", this, ioe));
}

encoder.stop();
}

}


/**
* Event handler responsible for performing the TCP transmission.
*/
private class TcpSendingEventHandler implements EventHandler<LogEvent<Event>>, LifecycleAware {
private class TcpSendingEventHandler implements EventHandler<LogEvent<byte[]>>, LifecycleAware {

/**
* Max number of consecutive failed connection attempts for which
Expand Down Expand Up @@ -345,7 +517,7 @@ public void run() {
*
* A null event indicates that this is a keep alive message.
*/
getDisruptor().getRingBuffer().publishEvent(getEventTranslator(), null);
networkDisruptor.getRingBuffer().publishEvent(LOG_EVENT_ENCODER_TRANSLATOR, null);
scheduleKeepAlive(currentTime);
} else {
scheduleKeepAlive(lastSent);
Expand Down Expand Up @@ -404,7 +576,7 @@ public void run() {
}

@Override
public void onEvent(LogEvent<Event> logEvent, long sequence, boolean endOfBatch) throws Exception {
public void onEvent(LogEvent<byte[]> encodedLogEvent, long sequence, boolean endOfBatch) throws Exception {

for (int i = 0; i < MAX_REPEAT_WRITE_ATTEMPTS; i++) {
if (this.socket == null) {
Expand Down Expand Up @@ -433,12 +605,12 @@ public void onEvent(LogEvent<Event> logEvent, long sequence, boolean endOfBatch)
/*
* A null event indicates that this is a keep alive message.
*/
if (logEvent.event != null) {
if (encodedLogEvent.event != null) {
/*
* This is a standard (non-keepAlive) event.
* Therefore, we need to send the event.
*/
encoder.doEncode(logEvent.event);
outputStream.write(encodedLogEvent.event);
} else if (hasKeepAliveDurationElapsed(lastSentTimestamp, currentTime)){
/*
* This is a keep alive event, and the keepAliveDuration has passed,
Expand Down Expand Up @@ -492,7 +664,6 @@ public void onStart() {
@Override
public void onShutdown() {
unscheduleKeepAlive();
closeEncoder();
closeSocket();
}

Expand Down Expand Up @@ -536,8 +707,6 @@ private synchronized void openSocket() {
tempSocket.connect(new InetSocketAddress(getHostString(currentDestination), currentDestination.getPort()), acceptConnectionTimeout);
tempOutputStream = new BufferedOutputStream(tempSocket.getOutputStream(), writeBufferSize);

encoder.init(tempOutputStream);

addInfo(peerId + "connection established.");

this.socket = tempSocket;
Expand Down Expand Up @@ -633,25 +802,14 @@ private synchronized void closeSocket() {
}
}

private void closeEncoder() {
try {
encoder.close();
} catch (IOException ioe) {
addStatus(new ErrorStatus(
"Failed to close encoder", this, ioe));
}

encoder.stop();
}

private synchronized void scheduleKeepAlive(long basedOnTime) {
if (isKeepAliveEnabled() && !Thread.currentThread().isInterrupted()) {
if (keepAliveRunnable == null) {
keepAliveRunnable = new KeepAliveRunnable();
}
long delay = keepAliveDuration.getMilliseconds() - (System.currentTimeMillis() - basedOnTime);
try {
keepAliveFuture = getExecutorService().schedule(
keepAliveFuture = networkExecutorService.schedule(
keepAliveRunnable,
delay,
TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -706,7 +864,7 @@ public Socket createSocket() throws IOException {

public AbstractLogstashTcpSocketAppender() {
super();
setEventHandler(new TcpSendingEventHandler());
setEventHandler(new EncodingEventHandler(new TcpSendingEventHandler()));
setThreadNameFormat(DEFAULT_THREAD_NAME_FORMAT);
}

Expand Down Expand Up @@ -1132,4 +1290,33 @@ public void setThreadNameFormat(String threadNameFormat) {
super.setThreadNameFormat(threadNameFormat);
}

protected String calculateThreadName() {
List<Object> threadNameFormatParams = getThreadNameFormatParams();
return String.format(networkThreadNameFormat, threadNameFormatParams.toArray(new Object[threadNameFormatParams.size()]));
}

public int getNetworkRingBufferSize() {
return networkRingBufferSize;
}

public void setNetworkRingBufferSize(int networkRingBufferSize) {
this.networkRingBufferSize = networkRingBufferSize;
}

public WaitStrategy getNetworkWaitStrategy() {
return networkWaitStrategy;
}

public void setNetworkWaitStrategy(WaitStrategy networkWaitStrategy) {
this.networkWaitStrategy = networkWaitStrategy;
}

public String getNetworkThreadNameFormat() {
return networkThreadNameFormat;
}

public void setNetworkThreadNameFormat(String networkThreadNameFormat) {
this.networkThreadNameFormat = networkThreadNameFormat;
}

}
Loading