Skip to content
This repository was archived by the owner on Jun 1, 2024. It is now read-only.

Commit

Permalink
Added support for controlling the op_type when indexing (#356)
Browse files Browse the repository at this point in the history
* Added support for controlling the op_type when indexing

In order to write to data streams we'll need to set the
op_type to create the default is index

In order to don't serialize null values for the batch action and to not depend
on custom serialization setting we're using LowLevelRequestResponseSerializer.Instance
for serialization of the action.

We won't throw exception when setting the TypeName to null since:
- This is deprecated in v7
- Won't work in v8

* Consider op_type when parsing response from bulk action

* Don't overwrite TypeName if it has been set to null

In order to be able to use templates and at the same time remove
the doc type we shouldn't override it if it's set to null.

* Support for setting the op_type for durable sink

Use the same logic for creating bulk action within
the durable sink as in the standard sink.

* Added change log
  • Loading branch information
orjan authored Sep 18, 2020
1 parent bf50349 commit 144369b
Show file tree
Hide file tree
Showing 11 changed files with 252 additions and 54 deletions.
17 changes: 17 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,23 @@
## Changelog

* Disable dot-escaping for field names, because ELK already supports dots in field names.
* Support for explicitly setting `Options.TypeName` to `null` this will remove the
deprecated `_type` from the bulk payload being sent to Elastic. Earlier an exception was
thrown if the `Options.TypeName` was `null`. _If you're using `AutoRegisterTemplateVersion.ESv7`
we'll not force the type to `_doc` if it's set to `null`. This is a small step towards support
for writing logs to Elastic v8. #345
* Support for setting the Elastic `op_type` e.g. `index` or `create` for bulk actions.
This is a requirement for writing to [data streams](https://www.elastic.co/guide/en/elasticsearch/reference/7.9/data-streams.html)
that's only supporting `create`. Data streams is a more slipped stream way to handle rolling
indices, that previous required an ILM, template and a magic write alias. Now it's more integrated
in Elasticsearch and Kibana. If you're running Elastic `7.9` you'll get rolling indices out of the box
with this configuration:
```
TypeName = null,
IndexFormat = "logs-my-stream",
BatchAction = ElasticOpType.Create,
```
_Note: that current templates doesn't support data streams._ #355

8.2
* Allow the use of templateCustomSettings when reading from settings json (#315)
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ This example shows the options that are currently available when using the appSe
<add key="serilog:write-to:Elasticsearch.typeName" value="myCustomLogEventType"/>
<add key="serilog:write-to:Elasticsearch.pipelineName" value="myCustomPipelineName"/>
<add key="serilog:write-to:Elasticsearch.batchPostingLimit" value="50"/>
<add key="serilog:write-to:Elasticsearch.batchAction" value="create"/>
<add key="serilog:write-to:Elasticsearch.period" value="2"/>
<add key="serilog:write-to:Elasticsearch.inlineFields" value="true"/>
<add key="serilog:write-to:Elasticsearch.restrictedToMinimumLevel" value="Warning"/>
Expand Down Expand Up @@ -179,6 +180,7 @@ In your `appsettings.json` file, under the `Serilog` node, :
"typeName": "myCustomLogEventType",
"pipelineName": "myCustomPipelineName",
"batchPostingLimit": 50,
"batchAction": "create",
"period": 2,
"inlineFields": true,
"restrictedToMinimumLevel": "Warning",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ public static LoggerConfiguration Elasticsearch(
/// <param name="failureSink">Sink to use when Elasticsearch is unable to accept the events. This is optionally and depends on the EmitEventFailure setting.</param>
/// <param name="singleEventSizePostingLimit"><see cref="ElasticsearchSinkOptions.SingleEventSizePostingLimit"/>The maximum length of an event allowed to be posted to Elasticsearch.default null</param>
/// <param name="templateCustomSettings">Add custom elasticsearch settings to the template</param>
/// <param name="batchAction">Configures the OpType being used when inserting document in batch. Must be set to create for data streams.</param>
/// <returns>LoggerConfiguration object</returns>
/// <exception cref="ArgumentNullException"><paramref name="nodeUris"/> is <see langword="null" />.</exception>
public static LoggerConfiguration Elasticsearch(
Expand Down Expand Up @@ -180,7 +181,8 @@ public static LoggerConfiguration Elasticsearch(
ILogEventSink failureSink = null,
long? singleEventSizePostingLimit = null,
int? bufferFileCountLimit = null,
Dictionary<string,string> templateCustomSettings = null)
Dictionary<string,string> templateCustomSettings = null,
ElasticOpType batchAction = ElasticOpType.Index)
{
if (string.IsNullOrEmpty(nodeUris))
throw new ArgumentNullException(nameof(nodeUris), "No Elasticsearch node(s) specified.");
Expand Down Expand Up @@ -208,6 +210,7 @@ public static LoggerConfiguration Elasticsearch(
}

options.BatchPostingLimit = batchPostingLimit;
options.BatchAction = batchAction;
options.SingleEventSizePostingLimit = singleEventSizePostingLimit;
options.Period = TimeSpan.FromSeconds(period);
options.InlineFields = inlineFields;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
using System;
using System.Collections.Generic;
using System.Text;
using Elasticsearch.Net;
using Serilog.Core;
using Serilog.Events;

Expand Down Expand Up @@ -55,15 +54,16 @@ public DurableElasticsearchSink(ElasticsearchSinkOptions options)


var elasticSearchLogClient = new ElasticsearchLogClient(
elasticLowLevelClient: _state.Client,
cleanPayload: _state.Options.BufferCleanPayload);
elasticLowLevelClient: _state.Client,
cleanPayload: _state.Options.BufferCleanPayload,
elasticOpType: _state.Options.BatchAction);

var payloadReader = new ElasticsearchPayloadReader(
pipelineName: _state.Options.PipelineName,
typeName:_state.Options.TypeName,
serialize:_state.Serialize,
getIndexForEvent: _state.GetBufferedIndexForEvent
);
getIndexForEvent: _state.GetBufferedIndexForEvent,
elasticOpType: _state.Options.BatchAction);

_shipper = new ElasticsearchLogShipper(
bufferBaseFilename: _state.Options.BufferBaseFilename,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.ExceptionServices;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading.Tasks;
using Elasticsearch.Net;
using Serilog.Debugging;
Expand All @@ -17,17 +14,21 @@ public class ElasticsearchLogClient : ILogClient<List<string>>
{
private readonly IElasticLowLevelClient _elasticLowLevelClient;
private readonly Func<string, long?, string, string> _cleanPayload;
private readonly ElasticOpType _elasticOpType;

/// <summary>
///
/// </summary>
/// <param name="elasticLowLevelClient"></param>
/// <param name="cleanPayload"></param>
/// <param name="elasticOpType"></param>
public ElasticsearchLogClient(IElasticLowLevelClient elasticLowLevelClient,
Func<string, long?, string, string> cleanPayload)
Func<string, long?, string, string> cleanPayload,
ElasticOpType elasticOpType)
{
_elasticLowLevelClient = elasticLowLevelClient;
_cleanPayload = cleanPayload;
_elasticOpType = elasticOpType;
}

public async Task<SentPayloadResult> SendPayloadAsync(List<string> payload)
Expand Down Expand Up @@ -85,7 +86,7 @@ private InvalidResult GetInvalidPayloadAsync(DynamicResponse baseResult, List<st
bool hasErrors = false;
foreach (dynamic item in items)
{
var itemIndex = item?["index"];
var itemIndex = item?[ElasticsearchSink.BulkAction(_elasticOpType)];
long? status = itemIndex?["status"];
i++;
if (!status.HasValue || status < 300)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
using System.Globalization;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Elasticsearch.Net;

namespace Serilog.Sinks.Elasticsearch.Durable
{
Expand All @@ -17,6 +16,7 @@ public class ElasticsearchPayloadReader: APayloadReader<List<string>>
private readonly string _typeName;
private readonly Func<object, string> _serialize;
private readonly Func<string, DateTime,string> _getIndexForEvent;
private readonly ElasticOpType _elasticOpType;
private List<string> _payload;
private int _count;
private DateTime _date;
Expand All @@ -28,12 +28,15 @@ public class ElasticsearchPayloadReader: APayloadReader<List<string>>
/// <param name="typeName"></param>
/// <param name="serialize"></param>
/// <param name="getIndexForEvent"></param>
public ElasticsearchPayloadReader(string pipelineName,string typeName, Func<object,string> serialize,Func<string,DateTime,string> getIndexForEvent)
/// <param name="elasticOpType"></param>
public ElasticsearchPayloadReader(string pipelineName, string typeName, Func<object, string> serialize,
Func<string, DateTime, string> getIndexForEvent, ElasticOpType elasticOpType)
{
_pipelineName = pipelineName;
_typeName = typeName;
_serialize = serialize;
_getIndexForEvent = getIndexForEvent;
_elasticOpType = elasticOpType;
}

/// <summary>
Expand Down Expand Up @@ -80,18 +83,13 @@ protected override List<string> FinishPayLoad()
protected override void AddToPayLoad(string nextLine)
{
var indexName = _getIndexForEvent(nextLine, _date);
var action = default(object);
var action = ElasticsearchSink.CreateElasticAction(
opType: _elasticOpType,
indexName: indexName, pipelineName: _pipelineName,
id: _count + "_" + Guid.NewGuid(),
mappingType: _typeName);
var actionJson = LowLevelRequestResponseSerializer.Instance.SerializeToString(action);

if (string.IsNullOrWhiteSpace(_pipelineName))
{
action = new { index = new { _index = indexName, _type = _typeName, _id = _count + "_" + Guid.NewGuid() } };
}
else
{
action = new { index = new { _index = indexName, _type = _typeName, _id = _count + "_" + Guid.NewGuid(), pipeline = _pipelineName } };
}

var actionJson = _serialize(action);
_payload.Add(actionJson);
_payload.Add(nextLine);
_count++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Runtime.Serialization;
using System.Threading.Tasks;
using Elasticsearch.Net;
using Elasticsearch.Net.Specification.SecurityApi;
using Serilog.Debugging;
using Serilog.Events;
using Serilog.Sinks.PeriodicBatching;
Expand Down Expand Up @@ -82,7 +82,7 @@ protected override async Task EmitBatchAsync(IEnumerable<LogEvent> events)
if (events == null || !events.Any())
return Task.FromResult<T>(default(T));

var payload = CreatePlayLoad<T>(events);
var payload = CreatePlayLoad(events);
return _state.Client.BulkAsync<T>(PostData.MultiJson(payload));
}

Expand All @@ -97,7 +97,7 @@ protected override async Task EmitBatchAsync(IEnumerable<LogEvent> events)
if (events == null || !events.Any())
return null;

var payload = CreatePlayLoad<T>(events);
var payload = CreatePlayLoad(events);
return _state.Client.Bulk<T>(PostData.MultiJson(payload));
}

Expand Down Expand Up @@ -165,8 +165,7 @@ private static bool HasProperty(dynamic settings, string name)
return settings.GetType().GetProperty(name) != null;
}

private IEnumerable<string> CreatePlayLoad<T>(IEnumerable<LogEvent> events)
where T : class, IElasticsearchResponse, new()
private IEnumerable<string> CreatePlayLoad(IEnumerable<LogEvent> events)
{
if (!_state.TemplateRegistrationSuccess && _state.Options.RegisterTemplateFailure == RegisterTemplateRecovery.FailSink)
{
Expand All @@ -177,19 +176,15 @@ private IEnumerable<string> CreatePlayLoad<T>(IEnumerable<LogEvent> events)
foreach (var e in events)
{
var indexName = _state.GetIndexForEvent(e, e.Timestamp.ToUniversalTime());
var action = default(object);

var pipelineName = _state.Options.PipelineNameDecider?.Invoke(e) ?? _state.Options.PipelineName;
if (string.IsNullOrWhiteSpace(pipelineName))
{
action = new { index = new { _index = indexName, _type = _state.Options.TypeName } };
}
else
{
action = new { index = new { _index = indexName, _type = _state.Options.TypeName, pipeline = pipelineName } };
}
var actionJson = _state.Serialize(action);
payload.Add(actionJson);

var action = CreateElasticAction(
opType: _state.Options.BatchAction,
indexName: indexName,
pipelineName: pipelineName,
mappingType: _state.Options.TypeName);
payload.Add(LowLevelRequestResponseSerializer.Instance.SerializeToString(action));

var sw = new StringWriter();
_state.Formatter.Format(e, sw);
payload.Add(sw.ToString());
Expand All @@ -204,10 +199,15 @@ private void HandleResponse(IEnumerable<LogEvent> events, DynamicResponse result
if (result.Success && result.Body?["errors"] == true)
{
var indexer = 0;
var opType = BulkAction(_state.Options.BatchAction);
var items = result.Body["items"];
foreach (var item in items)
{
if (item["index"] != null && HasProperty(item["index"], "error") && item["index"]["error"] != null)
var action = item.ContainsKey(opType)
? item[opType]
: null;

if (action != null && action.ContainsKey("error"))
{
var e = events.ElementAt(indexer);
if (_state.Options.EmitEventFailure.HasFlag(EmitEventFailureHandling.WriteToSelfLog))
Expand All @@ -216,8 +216,8 @@ private void HandleResponse(IEnumerable<LogEvent> events, DynamicResponse result
SelfLog.WriteLine(
"Failed to store event with template '{0}' into Elasticsearch. Elasticsearch reports for index {1} the following: {2}",
e.MessageTemplate,
item["index"]["_index"],
_state.Serialize(item["index"]["error"]));
action["_index"],
_state.Serialize(action["error"]));
}

if (_state.Options.EmitEventFailure.HasFlag(EmitEventFailureHandling.WriteToFailureSink) &&
Expand Down Expand Up @@ -251,7 +251,6 @@ private void HandleResponse(IEnumerable<LogEvent> events, DynamicResponse result
_state.Options.FailureCallback);
}
}

}
indexer++;
}
Expand All @@ -261,5 +260,69 @@ private void HandleResponse(IEnumerable<LogEvent> events, DynamicResponse result
HandleException(result.OriginalException, events);
}
}

internal static string BulkAction(ElasticOpType elasticOpType) =>
elasticOpType == ElasticOpType.Create
? "create"
: "index";

internal static object CreateElasticAction(ElasticOpType opType, string indexName, string pipelineName = null, string id = null, string mappingType = null)
{
var actionPayload = new ElasticActionPayload(
indexName: indexName,
pipeline: string.IsNullOrWhiteSpace(pipelineName) ? null : pipelineName,
id: id,
mappingType: mappingType
);

var action = opType == ElasticOpType.Create
? (object) new ElasticCreateAction(actionPayload)
: new ElasticIndexAction(actionPayload);
return action;
}

sealed class ElasticCreateAction
{
public ElasticCreateAction(ElasticActionPayload payload)
{
Payload = payload;
}

[DataMember(Name = "create")]
public ElasticActionPayload Payload { get; }
}

sealed class ElasticIndexAction
{
public ElasticIndexAction(ElasticActionPayload payload)
{
Payload = payload;
}

[DataMember(Name = "index")]
public ElasticActionPayload Payload { get; }
}

sealed class ElasticActionPayload {
public ElasticActionPayload(string indexName, string pipeline = null, string id = null, string mappingType = null)
{
IndexName = indexName;
Pipeline = pipeline;
Id = id;
MappingType = mappingType;
}

[DataMember(Name = "_type")]
public string MappingType { get; }

[DataMember(Name = "_index")]
public string IndexName { get; }

[DataMember(Name = "pipeline")]
public string Pipeline { get; }

[DataMember(Name = "_id")]
public string Id { get; }
}
}
}
Loading

0 comments on commit 144369b

Please sign in to comment.