Skip to content
This repository has been archived by the owner on Feb 12, 2025. It is now read-only.

Commit

Permalink
Make UDP client resilient to socket connection issues (#115 closes #114)
Browse files Browse the repository at this point in the history
  • Loading branch information
luigiberrettini authored Jan 21, 2018
1 parent 8bc28f4 commit c74c74a
Show file tree
Hide file tree
Showing 10 changed files with 139 additions and 79 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ The maximum length of a message is detailed in many RFCs that can be summarized
* `udp` - settings related to UDP:
* `server` - IP or hostname of the Syslog server (default: `127.0.0.1`)
* `port` - port the Syslog server is listening on (default: `514`)
* `reconnectInterval` - the time interval, in milliseconds, after which a connection is retried (default: `500`)
* `connectionCheckTimeout` - the time, in microseconds, to wait for a response when checking the UDP socket connection status (default: `100`; `0` means the only check performed is `UdpClient and inner socket != null`)
* `tcp` - settings related to TCP:
* `server` - IP or hostname of the Syslog server (default: `127.0.0.1`)
* `port` - port the Syslog server is listening on (default: `514`)
Expand Down
19 changes: 19 additions & 0 deletions src/NLog.Targets.Syslog/Extensions/SocketExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Licensed under the BSD license
// See the LICENSE file in the project root for more information

using System.Net.Sockets;

namespace NLog.Targets.Syslog.Extensions
{
internal static class SocketExtensions
{
public static bool IsConnected(this Socket socket, int timeout)
{
if (timeout <= 0)
return true;

var isDisconnected = socket?.Poll(timeout, SelectMode.SelectRead) == true && socket?.Available == 0;
return !isDisconnected;
}
}
}
43 changes: 40 additions & 3 deletions src/NLog.Targets.Syslog/MessageSend/MessageTransmitter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,28 @@
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using NLog.Targets.Syslog.Extensions;
using NLog.Targets.Syslog.Settings;

namespace NLog.Targets.Syslog.MessageSend
{
internal abstract class MessageTransmitter
{
private static readonly Dictionary<ProtocolType, Func<MessageTransmitterConfig, MessageTransmitter>> TransmitterFactory;
protected static readonly TimeSpan ZeroSecondsTimeSpan = TimeSpan.FromSeconds(0);

private volatile bool neverConnected;
private readonly TimeSpan recoveryTime;
protected volatile bool disposed;

protected string Server { get; }

protected string IpAddress { get; }

protected int Port { get; }

protected abstract bool Ready { get; }

static MessageTransmitter()
{
TransmitterFactory = new Dictionary<ProtocolType, Func<MessageTransmitterConfig, MessageTransmitter>>
Expand All @@ -35,15 +43,44 @@ public static MessageTransmitter FromConfig(MessageTransmitterConfig messageTran
return TransmitterFactory[messageTransmitterConfig.Protocol](messageTransmitterConfig);
}

protected MessageTransmitter(string server, int port)
protected MessageTransmitter(string server, int port, int reconnectInterval)
{
neverConnected = true;
recoveryTime = TimeSpan.FromMilliseconds(reconnectInterval);
Server = server;
IpAddress = Dns.GetHostAddresses(server).FirstOrDefault()?.ToString();
Port = port;
}

public abstract Task SendMessageAsync(ByteArray message, CancellationToken token);
public Task SendMessageAsync(ByteArray message, CancellationToken token)
{
if (token.IsCancellationRequested)
return Task.FromResult<object>(null);

if (Ready)
return SendAsync(message, token);

var delay = neverConnected ? ZeroSecondsTimeSpan : recoveryTime;
neverConnected = false;
return Task.Delay(delay, token)
.Then(_ => Setup(), token)
.Unwrap()
.Then(_ => SendAsync(message, token), token)
.Unwrap();
}

public void Dispose()
{
if (disposed)
return;
disposed = true;
TearDown();
}

protected abstract Task Setup();

protected abstract Task SendAsync(ByteArray message, CancellationToken token);

public abstract void Dispose();
protected abstract void TearDown();
}
}
89 changes: 26 additions & 63 deletions src/NLog.Targets.Syslog/MessageSend/Tcp.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,66 +17,35 @@ namespace NLog.Targets.Syslog.MessageSend
{
internal class Tcp : MessageTransmitter
{
private static readonly TimeSpan ZeroSecondsTimeSpan = TimeSpan.FromSeconds(0);
private static readonly byte[] LineFeedBytes = { 0x0A };

private volatile bool neverConnected;
private readonly TimeSpan recoveryTime;
private readonly KeepAlive keepAlive;
private readonly int connectionCheckTimeout;
private readonly KeepAlive keepAlive;
private readonly bool useTls;
private readonly Func<X509Certificate2Collection> retrieveClientCertificates;
private readonly int dataChunkSize;
private readonly FramingMethod framing;
private TcpClient tcp;
private Stream stream;
private volatile bool disposed;

public Tcp(TcpConfig tcpConfig) : base(tcpConfig.Server, tcpConfig.Port)
protected override bool Ready
{
get { return tcp?.Connected == true && tcp.Client.IsConnected(connectionCheckTimeout) == true; }
}

public Tcp(TcpConfig tcpConfig) : base(tcpConfig.Server, tcpConfig.Port, tcpConfig.ReconnectInterval)
{
neverConnected = true;
recoveryTime = TimeSpan.FromMilliseconds(tcpConfig.ReconnectInterval);
keepAlive = new KeepAlive(tcpConfig.KeepAlive);
connectionCheckTimeout = tcpConfig.ConnectionCheckTimeout;
keepAlive = new KeepAlive(tcpConfig.KeepAlive);
useTls = tcpConfig.Tls.Enabled;
retrieveClientCertificates = tcpConfig.Tls.RetrieveClientCertificates;
framing = tcpConfig.Framing;
dataChunkSize = tcpConfig.DataChunkSize;
}

public override Task SendMessageAsync(ByteArray message, CancellationToken token)
{
if (token.IsCancellationRequested)
return Task.FromResult<object>(null);

if (tcp?.Connected == true && IsSocketConnected())
return WriteAsync(message, token);

var delay = neverConnected ? ZeroSecondsTimeSpan : recoveryTime;
neverConnected = false;

return Task.Delay(delay, token)
.Then(_ => InitTcpClient(), token)
.Unwrap()
.Then(_ => ConnectAsync(), token)
.Unwrap()
.Then(_ => WriteAsync(message, token), token)
.Unwrap();
}

private bool IsSocketConnected()
protected override Task Setup()
{
if (connectionCheckTimeout <= 0)
return true;

var isDisconnected = tcp.Client.Poll(connectionCheckTimeout, SelectMode.SelectRead) && tcp.Client.Available == 0;
return !isDisconnected;
}

private Task InitTcpClient()
{
DisposeSslStreamNotTcpClientInnerStream();
DisposeTcpClientAndItsInnerStream();
TearDown();

tcp = new TcpClient();
tcp.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ExclusiveAddressUse, true);
Expand All @@ -85,16 +54,27 @@ private Task InitTcpClient()
// Call WSAIoctl via IOControl
tcp.Client.IOControl(IOControlCode.KeepAliveValues, keepAlive.ToByteArray(), null);

return Task.FromResult<object>(null);
}

private Task ConnectAsync()
{
return tcp
.ConnectAsync(IpAddress, Port)
.Then(_ => stream = SslDecorate(tcp), CancellationToken.None);
}

protected override Task SendAsync(ByteArray message, CancellationToken token)
{
if (token.IsCancellationRequested)
return Task.FromResult<object>(null);

return FramingTask(message)
.Then(_ => WriteAsync(0, message, token), token)
.Unwrap();
}

protected override void TearDown()
{
DisposeSslStreamNotTcpClientInnerStream();
DisposeTcpClientAndItsInnerStream();
}

private Stream SslDecorate(TcpClient tcpClient)
{
var tcpStream = tcpClient.GetStream();
Expand All @@ -109,13 +89,6 @@ private Stream SslDecorate(TcpClient tcpClient)
return sslStream;
}

private Task WriteAsync(ByteArray message, CancellationToken token)
{
return FramingTask(message)
.Then(_ => WriteAsync(0, message, token), token)
.Unwrap();
}

private Task FramingTask(ByteArray message)
{
if (framing == FramingMethod.NonTransparent)
Expand Down Expand Up @@ -144,16 +117,6 @@ private Task WriteAsync(int offset, ByteArray data, CancellationToken token)
.Unwrap();
}

public override void Dispose()
{
if (disposed)
return;
disposed = true;

DisposeSslStreamNotTcpClientInnerStream();
DisposeTcpClientAndItsInnerStream();
}

private void DisposeSslStreamNotTcpClientInnerStream()
{
if (useTls)
Expand Down
30 changes: 20 additions & 10 deletions src/NLog.Targets.Syslog/MessageSend/Udp.cs
Original file line number Diff line number Diff line change
@@ -1,37 +1,47 @@
// Licensed under the BSD license
// See the LICENSE file in the project root for more information

using System;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using NLog.Targets.Syslog.Extensions;
using NLog.Targets.Syslog.Settings;

namespace NLog.Targets.Syslog.MessageSend
{
internal class Udp : MessageTransmitter
{
private readonly UdpClient udp;
private volatile bool disposed;
private readonly int connectionCheckTimeout;
private UdpClient udp;

public Udp(UdpConfig udpConfig) : base(udpConfig.Server, udpConfig.Port)
protected override bool Ready
{
get { return udp?.Client?.IsConnected(connectionCheckTimeout) == true; }
}

public Udp(UdpConfig udpConfig) : base(udpConfig.Server, udpConfig.Port, udpConfig.ReconnectInterval)
{
connectionCheckTimeout = udpConfig.ConnectionCheckTimeout;
}

protected override Task Setup()
{
TearDown();
udp = new UdpClient(IpAddress, Port);
return Task.FromResult<object>(null);
}

public override Task SendMessageAsync(ByteArray message, CancellationToken token)
protected override Task SendAsync(ByteArray message, CancellationToken token)
{
if (token.IsCancellationRequested)
return Task.FromResult<object>(null);

return udp.SendAsync(message, message.Length);
}

public override void Dispose()
protected override void TearDown()
{
if (disposed)
return;
disposed = true;
udp.Close();
udp?.Close();
}
}
}
1 change: 1 addition & 0 deletions src/NLog.Targets.Syslog/NLog.Targets.Syslog.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
<Compile Include="AsyncLogger.cs" />
<Compile Include="ByteArray.cs" />
<Compile Include="Extensions\NumberExtensions.cs" />
<Compile Include="Extensions\SocketExtensions.cs" />
<Compile Include="Extensions\StackTraceExtensions.cs" />
<Compile Include="Settings\MessageTransmitterConfig.cs" />
<Compile Include="Settings\TcpConfig.cs" />
Expand Down
7 changes: 6 additions & 1 deletion src/NLog.Targets.Syslog/NLog.Targets.Syslog.xsd
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
xmlns:nlog="http://www.nlog-project.org/schemas/NLog.xsd">
<xs:import namespace="http://www.nlog-project.org/schemas/NLog.xsd" />
<xs:annotation>
<xs:documentation>NLog Syslog target schema for Intellisense™ 4.0.1</xs:documentation>
<xs:documentation>NLog Syslog target schema for Intellisense™ 4.1.0</xs:documentation>
</xs:annotation>
<xs:complexType name="Syslog">
<xs:complexContent>
Expand Down Expand Up @@ -113,9 +113,13 @@
<xs:choice minOccurs="0" maxOccurs="unbounded">
<xs:element name="server" type="xs:string" minOccurs="1" maxOccurs="1" />
<xs:element name="port" type="xs:integer" minOccurs="0" maxOccurs="1" />
<xs:element name="reconnectInterval" type="xs:integer" minOccurs="0" maxOccurs="1" />
<xs:element name="connectionCheckTimeout" type="xs:integer" minOccurs="0" maxOccurs="1" />
</xs:choice>
<xs:attribute name="server" type="xs:string" />
<xs:attribute name="port" type="xs:integer" />
<xs:attribute name="reconnectInterval" type="xs:integer" />
<xs:attribute name="connectionCheckTimeout" type="xs:integer" />
</xs:complexType>
</xs:element>
<xs:element name="tcp" minOccurs="0" maxOccurs="1">
Expand Down Expand Up @@ -161,6 +165,7 @@
<xs:attribute name="server" type="xs:string" />
<xs:attribute name="port" type="xs:integer" />
<xs:attribute name="reconnectInterval" type="xs:integer" />
<xs:attribute name="connectionCheckTimeout" type="xs:integer" />
<xs:attribute name="framing" type="xs:string" />
<xs:attribute name="dataChunkSize" type="xs:integer" />
</xs:complexType>
Expand Down
4 changes: 2 additions & 2 deletions src/NLog.Targets.Syslog/Properties/AssemblyInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@
[assembly: AssemblyCopyright("Copyright © 2013 - present by Jesper Hess Nielsen, Luigi Berrettini and others: https://github.com/graffen/NLog.Targets.Syslog/graphs/contributors")]
[assembly: ComVisible(false)]
[assembly: AssemblyVersion("4.0.0.0")]
[assembly: AssemblyFileVersion("4.0.1.0")]
[assembly: AssemblyInformationalVersion("4.0.1")]
[assembly: AssemblyFileVersion("4.1.0.0")]
[assembly: AssemblyInformationalVersion("4.1.0")]
Loading

0 comments on commit c74c74a

Please sign in to comment.