diff --git a/README.md b/README.md index 6b0765f7..89219f2a 100644 --- a/README.md +++ b/README.md @@ -205,12 +205,13 @@ You can use a `Logstash*Encoder`, `*EventCompositeJsonEncoder`, or any other log All of the output formatting options are configured at the encoder level. Internally, the TCP appenders are asynchronous (using the [LMAX Disruptor RingBuffer](https://lmax-exchange.github.io/disruptor/)). -All the encoding and TCP communication is delegated to a single writer thread. +All the encoding and TCP communication are delegated to two threads accordingly. There is no need to wrap the TCP appenders with another asynchronous appender (such as `AsyncAppender` or `LoggingEventAsyncDisruptorAppender`). All the configuration parameters (except for sub-appender) of the [async appenders](#async) -are valid for TCP appenders. For example, `waitStrategyType` and `ringBufferSize`. +are valid for TCP appenders. For example, `waitStrategyType` and `ringBufferSize`. +They are used for customizing the encoding thread. Communication thread has its own properties with a `network` prefix such as `networkRingBufferSize`, `networkWaitStrategy`, `networkThreadNameFormat` - see [AbstractLogstashTcpSocketAppender](https://github.com/bedrin/logstash-logback-encoder/blob/master/src/main/java/net/logstash/logback/appender/AbstractLogstashTcpSocketAppender.java) for details. The TCP appenders will never block the logging thread. If the RingBuffer is full (e.g. due to slow network, etc), then events will be dropped. diff --git a/src/main/java/net/logstash/logback/appender/AbstractLogstashTcpSocketAppender.java b/src/main/java/net/logstash/logback/appender/AbstractLogstashTcpSocketAppender.java index 10d3f294..3029b6bb 100644 --- a/src/main/java/net/logstash/logback/appender/AbstractLogstashTcpSocketAppender.java +++ b/src/main/java/net/logstash/logback/appender/AbstractLogstashTcpSocketAppender.java @@ -13,10 +13,7 @@ */ package net.logstash.logback.appender; -import java.io.BufferedOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; +import java.io.*; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; @@ -27,18 +24,16 @@ import java.util.Collections; import java.util.Formatter; import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Future; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import javax.net.SocketFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSocket; import javax.net.ssl.SSLSocketFactory; +import com.lmax.disruptor.*; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.ProducerType; import net.logstash.logback.encoder.SeparatorParser; import org.codehaus.mojo.animal_sniffer.IgnoreJRERequirement; @@ -53,10 +48,6 @@ import ch.qos.logback.core.util.CloseUtil; import ch.qos.logback.core.util.Duration; -import com.lmax.disruptor.EventHandler; -import com.lmax.disruptor.LifecycleAware; -import com.lmax.disruptor.RingBuffer; - /** * An {@link AsyncDisruptorAppender} appender that writes * events to a TCP {@link Socket} outputStream. @@ -82,7 +73,8 @@ public abstract class AbstractLogstashTcpSocketAppender> networkDisruptor; + + /** + * The {@link EventFactory} used to create encoded {@link LogEvent}s for the RingBuffer. + */ + private LogEventFactory networkEventFactory = new LogEventFactory(); + + /** + * The size of the {@link RingBuffer} with encoded messages. + * Defaults to {@value #DEFAULT_RING_BUFFER_SIZE}. + * If the handler thread is not as fast as the producing threads, + * then the {@link RingBuffer} will eventually fill up, + * at which point producer will block. + *

+ * Must be a positive power of 2. + */ + private int networkRingBufferSize = DEFAULT_RING_BUFFER_SIZE; + + /** + * The {@link ScheduledExecutorService} used to execute the handler task. + */ + private ScheduledThreadPoolExecutor networkExecutorService; + + /** + * Defines what happens when there is an exception during + * {@link RingBuffer} processing of encoded events. + */ + private ExceptionHandler networkExceptionHandler = new LogEventExceptionHandler(); + + /** + * The {@link ThreadFactory} used to create the handler thread for encoded events. + */ + private ThreadFactory networkThreadFactory = new NetworkWorkerThreadFactory(); + + /** + * Pattern used by the {@link WorkerThreadFactory} to set the + * handler thread name. + * Defaults to {@value #DEFAULT_NETWORK_THREAD_NAME_FORMAT}. + *

+ * + * If you change the {@link #networkThreadFactory}, then this + * value may not be honored. + *

+ * + * The string is a format pattern understood by {@link Formatter#format(String, Object...)}. + * {@link Formatter#format(String, Object...)} is used to + * construct the actual thread name prefix. + * The first argument (%1$s) is the string appender name. + * The second argument (%2$d) is the numerical thread index. + * Other arguments can be made available by subclasses. + */ + private String networkThreadNameFormat = DEFAULT_NETWORK_THREAD_NAME_FORMAT; + + /** + * The {@link WaitStrategy} to used by the RingBuffer + * when pulling encoded events to be processed by {@link #eventHandler}. + *

+ * By default, a {@link BlockingWaitStrategy} is used, which is the most + * CPU conservative, but results in a higher latency. + * If you need lower latency (at the cost of higher CPU usage), + * consider using a {@link SleepingWaitStrategy} or a {@link PhasedBackoffWaitStrategy}. + */ + private WaitStrategy networkWaitStrategy = DEFAULT_WAIT_STRATEGY; + + /** + * Temporary byte array used for encoding events + */ + private ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + private final static LogEventTranslator LOG_EVENT_ENCODER_TRANSLATOR = new LogEventTranslator(); + + /** + * The default {@link ThreadFactory} used to create the handler thread. + */ + private class NetworkWorkerThreadFactory implements ThreadFactory { + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r); + t.setName(calculateThreadName()); + t.setDaemon(useDaemonThread); + return t; + } + + } + + /** + * Event handler responsible for encoding events + */ + private class EncodingEventHandler implements EventHandler>, LifecycleAware { + + private final AbstractLogstashTcpSocketAppender.TcpSendingEventHandler tcpSendingEventHandler; + + private EncodingEventHandler(AbstractLogstashTcpSocketAppender.TcpSendingEventHandler tcpSendingEventHandler) { + this.tcpSendingEventHandler = tcpSendingEventHandler; + } + + @Override + public void onEvent(LogEvent logEvent, long sequence, boolean endOfBatch) throws Exception { + + if (logEvent.event != null) { + + encoder.doEncode(logEvent.event); + byte[] encodedEvent = baos.toByteArray(); + baos.reset(); + + networkDisruptor.getRingBuffer().publishEvent(LOG_EVENT_ENCODER_TRANSLATOR, encodedEvent); + + } + + } + + @SuppressWarnings("unchecked") + @Override + public void onStart() { + + networkExecutorService = new ScheduledThreadPoolExecutor( + getThreadPoolCoreSize(), + networkThreadFactory); + + networkDisruptor = new Disruptor>( + networkEventFactory, + networkRingBufferSize, + networkExecutorService, + ProducerType.SINGLE, + networkWaitStrategy); + + networkDisruptor.handleExceptionsWith(networkExceptionHandler); + + networkDisruptor.handleEventsWith(new EventClearingEventHandler(tcpSendingEventHandler)); + + try { + encoder.init(baos); + } catch (IOException ioe) { + addStatus(new ErrorStatus( + "Failed to init encoder", this, ioe)); + } + + networkDisruptor.start(); + } + + @Override + public void onShutdown() { + closeEncoder(); + + try { + networkDisruptor.shutdown(1, TimeUnit.MINUTES); + } catch (com.lmax.disruptor.TimeoutException e) { + addWarn("Some queued events have not been logged due to requested shutdown"); + } + + networkExecutorService.shutdown(); + + try { + if (!networkExecutorService.awaitTermination(1, TimeUnit.MINUTES)) { + addWarn("Some queued events have not been logged due to requested shutdown"); + } + } catch (InterruptedException e) { + addWarn("Some queued events have not been logged due to requested shutdown", e); + } + } + + private void closeEncoder() { + try { + encoder.close(); + } catch (IOException ioe) { + addStatus(new ErrorStatus( + "Failed to close encoder", this, ioe)); + } + + encoder.stop(); + } + + } + + /** * Event handler responsible for performing the TCP transmission. */ - private class TcpSendingEventHandler implements EventHandler>, LifecycleAware { + private class TcpSendingEventHandler implements EventHandler>, LifecycleAware { /** * Max number of consecutive failed connection attempts for which @@ -345,7 +517,7 @@ public void run() { * * A null event indicates that this is a keep alive message. */ - getDisruptor().getRingBuffer().publishEvent(getEventTranslator(), null); + networkDisruptor.getRingBuffer().publishEvent(LOG_EVENT_ENCODER_TRANSLATOR, null); scheduleKeepAlive(currentTime); } else { scheduleKeepAlive(lastSent); @@ -404,7 +576,7 @@ public void run() { } @Override - public void onEvent(LogEvent logEvent, long sequence, boolean endOfBatch) throws Exception { + public void onEvent(LogEvent encodedLogEvent, long sequence, boolean endOfBatch) throws Exception { for (int i = 0; i < MAX_REPEAT_WRITE_ATTEMPTS; i++) { if (this.socket == null) { @@ -433,12 +605,12 @@ public void onEvent(LogEvent logEvent, long sequence, boolean endOfBatch) /* * A null event indicates that this is a keep alive message. */ - if (logEvent.event != null) { + if (encodedLogEvent.event != null) { /* * This is a standard (non-keepAlive) event. * Therefore, we need to send the event. */ - encoder.doEncode(logEvent.event); + outputStream.write(encodedLogEvent.event); } else if (hasKeepAliveDurationElapsed(lastSentTimestamp, currentTime)){ /* * This is a keep alive event, and the keepAliveDuration has passed, @@ -492,7 +664,6 @@ public void onStart() { @Override public void onShutdown() { unscheduleKeepAlive(); - closeEncoder(); closeSocket(); } @@ -536,8 +707,6 @@ private synchronized void openSocket() { tempSocket.connect(new InetSocketAddress(getHostString(currentDestination), currentDestination.getPort()), acceptConnectionTimeout); tempOutputStream = new BufferedOutputStream(tempSocket.getOutputStream(), writeBufferSize); - encoder.init(tempOutputStream); - addInfo(peerId + "connection established."); this.socket = tempSocket; @@ -633,17 +802,6 @@ private synchronized void closeSocket() { } } - private void closeEncoder() { - try { - encoder.close(); - } catch (IOException ioe) { - addStatus(new ErrorStatus( - "Failed to close encoder", this, ioe)); - } - - encoder.stop(); - } - private synchronized void scheduleKeepAlive(long basedOnTime) { if (isKeepAliveEnabled() && !Thread.currentThread().isInterrupted()) { if (keepAliveRunnable == null) { @@ -651,7 +809,7 @@ private synchronized void scheduleKeepAlive(long basedOnTime) { } long delay = keepAliveDuration.getMilliseconds() - (System.currentTimeMillis() - basedOnTime); try { - keepAliveFuture = getExecutorService().schedule( + keepAliveFuture = networkExecutorService.schedule( keepAliveRunnable, delay, TimeUnit.MILLISECONDS); @@ -706,7 +864,7 @@ public Socket createSocket() throws IOException { public AbstractLogstashTcpSocketAppender() { super(); - setEventHandler(new TcpSendingEventHandler()); + setEventHandler(new EncodingEventHandler(new TcpSendingEventHandler())); setThreadNameFormat(DEFAULT_THREAD_NAME_FORMAT); } @@ -1132,4 +1290,33 @@ public void setThreadNameFormat(String threadNameFormat) { super.setThreadNameFormat(threadNameFormat); } + protected String calculateThreadName() { + List threadNameFormatParams = getThreadNameFormatParams(); + return String.format(networkThreadNameFormat, threadNameFormatParams.toArray(new Object[threadNameFormatParams.size()])); + } + + public int getNetworkRingBufferSize() { + return networkRingBufferSize; + } + + public void setNetworkRingBufferSize(int networkRingBufferSize) { + this.networkRingBufferSize = networkRingBufferSize; + } + + public WaitStrategy getNetworkWaitStrategy() { + return networkWaitStrategy; + } + + public void setNetworkWaitStrategy(WaitStrategy networkWaitStrategy) { + this.networkWaitStrategy = networkWaitStrategy; + } + + public String getNetworkThreadNameFormat() { + return networkThreadNameFormat; + } + + public void setNetworkThreadNameFormat(String networkThreadNameFormat) { + this.networkThreadNameFormat = networkThreadNameFormat; + } + } diff --git a/src/main/java/net/logstash/logback/appender/AsyncDisruptorAppender.java b/src/main/java/net/logstash/logback/appender/AsyncDisruptorAppender.java index d4749988..e4f824dd 100644 --- a/src/main/java/net/logstash/logback/appender/AsyncDisruptorAppender.java +++ b/src/main/java/net/logstash/logback/appender/AsyncDisruptorAppender.java @@ -168,7 +168,7 @@ public abstract class AsyncDisruptorAppender> exceptionHandler = new LogEventExceptionHandler(); + private ExceptionHandler exceptionHandler = new LogEventExceptionHandler(); /** * Consecutive number of dropped events. @@ -243,7 +243,7 @@ protected static class LogEvent { * Factory for creating the initial {@link LogEvent}s to populate * the {@link RingBuffer}. */ - private static class LogEventFactory implements EventFactory> { + protected static class LogEventFactory implements EventFactory> { @Override public LogEvent newInstance() { @@ -255,7 +255,7 @@ public LogEvent newInstance() { * The default {@link ThreadFactory} used to create the handler thread. */ private class WorkerThreadFactory implements ThreadFactory { - + @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); @@ -264,10 +264,10 @@ public Thread newThread(Runnable r) { return t; } } - + /** * Sets the {@link LogEvent#event} to the logback Event. - * Used when publishing events to the {@link RingBuffer}. + * Used when publishing events to the {@link RingBuffer}. */ protected static class LogEventTranslator implements EventTranslatorOneArg, Event> { @@ -283,10 +283,10 @@ public void translateTo(LogEvent logEvent, long sequence, Event event) { * * Currently, just logs to the logback context. */ - private class LogEventExceptionHandler implements ExceptionHandler> { + protected class LogEventExceptionHandler implements ExceptionHandler { @Override - public void handleEventException(Throwable ex, long sequence, LogEvent event) { + public void handleEventException(Throwable ex, long sequence, LogEvent event) { addError("Unable to process event: " + ex.getMessage(), ex); } @@ -305,7 +305,7 @@ public void handleOnShutdownException(Throwable ex) { * Clears the event after a delegate event handler has processed the event, * so that the event can be garbage collected. */ - private static class EventClearingEventHandler implements EventHandler>, LifecycleAware { + protected static class EventClearingEventHandler implements EventHandler>, LifecycleAware { private final EventHandler> delegate; diff --git a/src/test/java/net/logstash/logback/appender/LogstashTcpSocketAppenderTest.java b/src/test/java/net/logstash/logback/appender/LogstashTcpSocketAppenderTest.java index 062a5b5d..d45076e8 100644 --- a/src/test/java/net/logstash/logback/appender/LogstashTcpSocketAppenderTest.java +++ b/src/test/java/net/logstash/logback/appender/LogstashTcpSocketAppenderTest.java @@ -13,32 +13,12 @@ */ package net.logstash.logback.appender; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyInt; -import static org.mockito.Matchers.argThat; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.reset; -import static org.mockito.Mockito.timeout; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.SocketAddress; -import java.net.SocketException; -import java.net.SocketTimeoutException; -import java.nio.charset.Charset; -import java.util.concurrent.Future; - -import javax.net.SocketFactory; - +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.Context; +import ch.qos.logback.core.encoder.Encoder; +import ch.qos.logback.core.status.StatusManager; +import ch.qos.logback.core.util.Duration; import net.logstash.logback.encoder.SeparatorParser; - import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -50,11 +30,18 @@ import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; -import ch.qos.logback.classic.spi.ILoggingEvent; -import ch.qos.logback.core.Context; -import ch.qos.logback.core.encoder.Encoder; -import ch.qos.logback.core.status.StatusManager; -import ch.qos.logback.core.util.Duration; +import javax.net.SocketFactory; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.net.*; +import java.nio.charset.Charset; +import java.util.concurrent.Future; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.*; @RunWith(MockitoJUnitRunner.class) public class LogstashTcpSocketAppenderTest { @@ -74,7 +61,7 @@ public class LogstashTcpSocketAppenderTest { private ILoggingEvent event1; @Mock - private ILoggingEvent event2; + private ByteArrayOutputStream byteArrayOutputStream; @Mock private SocketFactory socketFactory; @@ -132,39 +119,42 @@ public void testEncoderCalled() throws Exception { public void testReconnectOnOpen() throws Exception { appender.addDestination("localhost:10000"); appender.setReconnectionDelay(new Duration(100)); - - reset(socketFactory); - when(socketFactory.createSocket()) - .thenThrow(new SocketTimeoutException()) - .thenReturn(socket); - + + doThrow(SocketTimeoutException.class) + .doNothing() + .when(socket).connect(host("localhost", 10000), anyInt()); + + doReturn(new byte[]{0}).when(byteArrayOutputStream).toByteArray(); + appender.start(); verify(encoder).start(); appender.append(event1); - - verify(encoder, timeout(VERIFICATION_TIMEOUT)).init(any(OutputStream.class)); - - verify(encoder, timeout(VERIFICATION_TIMEOUT)).doEncode(event1); + + verify(socket, timeout(VERIFICATION_TIMEOUT)).connect(any(SocketAddress.class), anyInt()); + + verify(outputStream, timeout(VERIFICATION_TIMEOUT)).write(any(byte[].class), anyInt(), anyInt()); + } @Test public void testReconnectOnWrite() throws Exception { + + doThrow(new SocketException()).doNothing().when(socket).connect(any(SocketAddress.class), anyInt()); + appender.addDestination("localhost:10000"); appender.setReconnectionDelay(new Duration(100)); appender.start(); verify(encoder).start(); - - doThrow(new SocketException()).doNothing().when(encoder).doEncode(event1); - + appender.append(event1); + + verify(socket, timeout(VERIFICATION_TIMEOUT).times(2)).connect(any(SocketAddress.class), anyInt()); - verify(encoder, timeout(VERIFICATION_TIMEOUT).times(2)).init(any(OutputStream.class)); - - verify(encoder, timeout(VERIFICATION_TIMEOUT).times(2)).doEncode(event1); + verify(encoder, timeout(VERIFICATION_TIMEOUT).times(1)).doEncode(event1); } @Test @@ -188,8 +178,8 @@ public void testReconnectOnReadFailure() throws Exception { verify(encoder).start(); appender.append(event1); - - verify(encoder, timeout(VERIFICATION_TIMEOUT).times(2)).init(any(OutputStream.class)); + + verify(socket, timeout(VERIFICATION_TIMEOUT).times(2)).connect(any(SocketAddress.class), anyInt()); verify(encoder, timeout(VERIFICATION_TIMEOUT)).doEncode(event1); } @@ -274,9 +264,11 @@ public void testReconnectToSecondaryOnWrite() throws Exception { // First attempt of sending the event throws an exception while subsequent // attempts will succeed. This should force the appender to close the connection // and attempt to reconnect starting from the first host of the list. - doThrow(new SocketException()) + doReturn(new byte[]{0}).when(byteArrayOutputStream).toByteArray(); + + doThrow(new IOException()) .doNothing() - .when(encoder).doEncode(event1); + .when(outputStream).write(any(byte[].class), anyInt(), anyInt()); // Start the appender and verify it is actually started @@ -286,7 +278,6 @@ public void testReconnectToSecondaryOnWrite() throws Exception { appender.append(event1); - // THREE connection attempts must have been made in total verify(socket, timeout(VERIFICATION_TIMEOUT).times(3)).connect(any(SocketAddress.class), anyInt()); InOrder inOrder = inOrder(socket); @@ -372,10 +363,9 @@ public void testReconnectWaitWhenExhausted() throws Exception { // At this point, it should be connected to primary. appender.start(); verify(encoder).start(); - - + // THREE connection attempts must have been made in total - verify(socket, timeout(appender.getReconnectionDelay().getMilliseconds()+50).times(3)).connect(any(SocketAddress.class), anyInt()); + verify(socket, timeout(VERIFICATION_TIMEOUT).times(3)).connect(any(SocketAddress.class), anyInt()); InOrder inOrder = inOrder(socket, encoder); // 1) fail to connect on primary at startup