Skip to content
This repository was archived by the owner on Jun 1, 2024. It is now read-only.

Commit

Permalink
Make sure the sink (and app) do not crash when the ES is unreachable …
Browse files Browse the repository at this point in the history
…during the discovery of the version. (#359)
  • Loading branch information
mivano authored Sep 19, 2020
1 parent 144369b commit c26047a
Show file tree
Hide file tree
Showing 9 changed files with 118 additions and 12 deletions.
Binary file added .ionide/symbolCache.db
Binary file not shown.
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## Changelog

8.3
* Do not crash when ES is unreachable and the option `DetectElasticsearchVersion` is set to true.

* Disable dot-escaping for field names, because ELK already supports dots in field names.
* Support for explicitly setting `Options.TypeName` to `null` this will remove the
deprecated `_type` from the bulk payload being sent to Elastic. Earlier an exception was
Expand Down
2 changes: 1 addition & 1 deletion global.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"sdk": {
"version": "2.1.500"
"version": "2.1.807"
}
}
2 changes: 1 addition & 1 deletion sample/Serilog.Sinks.Elasticsearch.Sample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ static void Main(string[] args)
NumberOfReplicas = 1,
NumberOfShards = 2,
//BufferBaseFilename = "./buffer",
RegisterTemplateFailure = RegisterTemplateRecovery.FailSink,
// RegisterTemplateFailure = RegisterTemplateRecovery.FailSink,
FailureCallback = e => Console.WriteLine("Unable to submit event " + e.MessageTemplate),
EmitEventFailure = EmitEventFailureHandling.WriteToSelfLog |
EmitEventFailureHandling.WriteToFailureSink |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,17 +242,25 @@ public void DiscoverClusterVersion()
{
if (!_options.DetectElasticsearchVersion) return;

var response = _client.Cat.Nodes<StringResponse>(new CatNodesRequestParameters()
try
{
Headers = new[] { "v" }
});
if (!response.Success) return;

_discoveredVersion = response.Body.Split(new[] { '\r', '\n' }, StringSplitOptions.RemoveEmptyEntries)
.FirstOrDefault();
var response = _client.Cat.Nodes<StringResponse>(new CatNodesRequestParameters()
{
Headers = new[] { "v" }
});
if (!response.Success) return;

_discoveredVersion = response.Body.Split(new[] { '\r', '\n' }, StringSplitOptions.RemoveEmptyEntries)
.FirstOrDefault();

if (_discoveredVersion?.StartsWith("7.") ?? false)
_options.TypeName = "_doc";
if (_discoveredVersion?.StartsWith("7.") ?? false)
_options.TypeName = "_doc";
}
catch (Exception ex)
{
SelfLog.WriteLine("Failed to discover the cluster version. {0}", ex);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public abstract class ElasticsearchSinkTestsBase
protected readonly ElasticsearchSinkOptions _options;
protected List<string> _seenHttpPosts = new List<string>();
protected List<int> _seenHttpHeads = new List<int>();
protected List<Tuple<Uri, int>> _seenHttpGets = new List<Tuple<Uri, int>>();
protected List<Tuple<Uri, string>> _seenHttpPuts = new List<Tuple<Uri, string>>();
private IElasticsearchSerializer _serializer;

Expand All @@ -32,10 +33,11 @@ protected ElasticsearchSinkTestsBase()
{
_seenHttpPosts = new List<string>();
_seenHttpHeads = new List<int>();
_seenHttpGets = new List<Tuple<Uri,int>>();
_seenHttpPuts = new List<Tuple<Uri, string>>();

var connectionPool = new SingleNodeConnectionPool(new Uri("http://localhost:9200"));
_connection = new ConnectionStub(_seenHttpPosts, _seenHttpHeads, _seenHttpPuts, () => _templateExistsReturnCode);
_connection = new ConnectionStub(_seenHttpPosts, _seenHttpHeads, _seenHttpPuts, _seenHttpGets, () => _templateExistsReturnCode);
_serializer = JsonNetSerializer.Default(LowLevelRequestResponseSerializer.Instance, new ConnectionSettings(connectionPool, _connection));

_options = new ElasticsearchSinkOptions(connectionPool)
Expand Down Expand Up @@ -119,19 +121,22 @@ public class ConnectionStub : InMemoryConnection
{
private Func<int> _templateExistReturnCode;
private List<int> _seenHttpHeads;
private List<Tuple<Uri, int>> _seenHttpGets;
private List<string> _seenHttpPosts;
private List<Tuple<Uri, string>> _seenHttpPuts;

public ConnectionStub(
List<string> _seenHttpPosts,
List<int> _seenHttpHeads,
List<Tuple<Uri, string>> _seenHttpPuts,
List<Tuple<Uri, int>> _seenHttpGets,
Func<int> templateExistReturnCode
)
{
this._seenHttpPosts = _seenHttpPosts;
this._seenHttpHeads = _seenHttpHeads;
this._seenHttpPuts = _seenHttpPuts;
this._seenHttpGets = _seenHttpGets;
this._templateExistReturnCode = templateExistReturnCode;
}

Expand All @@ -149,6 +154,9 @@ public override TReturn Request<TReturn>(RequestData requestData)
case HttpMethod.POST:
_seenHttpPosts.Add(Encoding.UTF8.GetString(ms.ToArray()));
break;
case HttpMethod.GET:
_seenHttpGets.Add(Tuple.Create(requestData.Uri, this._templateExistReturnCode()));
break;
case HttpMethod.HEAD:
_seenHttpHeads.Add(this._templateExistReturnCode());
break;
Expand All @@ -172,6 +180,9 @@ public override async Task<TResponse> RequestAsync<TResponse>(RequestData reques
case HttpMethod.POST:
_seenHttpPosts.Add(Encoding.UTF8.GetString(ms.ToArray()));
break;
case HttpMethod.GET:
_seenHttpGets.Add(Tuple.Create(requestData.Uri, this._templateExistReturnCode()));
break;
case HttpMethod.HEAD:
_seenHttpHeads.Add(this._templateExistReturnCode());
break;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
using System;
using System.IO;
using System.Text;
using FluentAssertions;
using Xunit;
using Serilog.Debugging;

namespace Serilog.Sinks.Elasticsearch.Tests.Templating
{
[Collection("isolation")]
public class DiscoverVersionHandlesUnavailableServerTests : ElasticsearchSinkTestsBase
{
[Fact]
public void Should_not_crash_when_server_is_unavaiable()
{
// If this crashes, the test will fail
CreateLoggerThatCrashes();
}

[Fact]
public void Should_write_error_to_self_log()
{
var selfLogMessages = new StringBuilder();
SelfLog.Enable(new StringWriter(selfLogMessages));

// Exception occurs on creation - should be logged
CreateLoggerThatCrashes();

var selfLogContents = selfLogMessages.ToString();
selfLogContents.Should().Contain("Failed to discover the cluster version");

}

private static ILogger CreateLoggerThatCrashes()
{
var loggerConfig = new LoggerConfiguration()
.WriteTo.Elasticsearch(new ElasticsearchSinkOptions(new Uri("http://localhost:9199"))
{
DetectElasticsearchVersion = true
});

return loggerConfig.CreateLogger();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
using System;
using FluentAssertions;
using Xunit;

namespace Serilog.Sinks.Elasticsearch.Tests.Templating
{
public class DiscoverVersionTests : ElasticsearchSinkTestsBase
{
private readonly Tuple<Uri,int> _templateGet;

public DiscoverVersionTests()
{
_options.DetectElasticsearchVersion = true;

var loggerConfig = new LoggerConfiguration()
.MinimumLevel.Debug()
.Enrich.WithMachineName()
.WriteTo.ColoredConsole()
.WriteTo.Elasticsearch(_options);

var logger = loggerConfig.CreateLogger();
using ((IDisposable) logger)
{
logger.Error("Test exception. Should not contain an embedded exception object.");
}

this._seenHttpGets.Should().NotBeNullOrEmpty().And.HaveCount(1);
_templateGet = this._seenHttpGets[0];
}


[Fact]
public void TemplatePutToCorrectUrl()
{
var uri = _templateGet.Item1;
uri.AbsolutePath.Should().Be("/_cat/nodes");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace Serilog.Sinks.Elasticsearch.Tests.Templating
public class SendsTemplateHandlesUnavailableServerTests : ElasticsearchSinkTestsBase
{
[Fact]
public void Should_not_crash_when_server_is_unavaiable()
public void Should_not_crash_when_server_is_unavailable()
{
// If this crashes, the test will fail
CreateLoggerThatCrashes();
Expand Down

0 comments on commit c26047a

Please sign in to comment.