Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ public interface IConsumerConfigurationBuilder
/// <returns></returns>
IConsumerConfigurationBuilder ManualAssignPartitions(string topicName, IEnumerable<int> partitions);

/// <summary>
/// Explicitly defines the topic, partitions and offsets that will be used to read the messages
/// </summary>
/// <param name="topicName">Topic name</param>
/// <param name="partitionOffsets">The partition offset dictionary [Partition ID, Offset]</param>
/// <returns></returns>
IConsumerConfigurationBuilder ManualAssignPartitionOffsets(string topicName, IDictionary<int, long> partitionOffsets);

/// <summary>
/// Sets the topics that will be used to read the messages, the partitions will be automatically assigned
/// </summary>
Expand Down
16 changes: 16 additions & 0 deletions src/KafkaFlow.Abstractions/Configuration/TopicPartitionsOffset.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using System.Collections.Generic;

namespace KafkaFlow.Configuration;

public class TopicPartitionOffsets

Check warning on line 5 in src/KafkaFlow.Abstractions/Configuration/TopicPartitionsOffset.cs

View workflow job for this annotation

GitHub Actions / Test deployment

Missing XML comment for publicly visible type or member 'TopicPartitionOffsets'
{
public TopicPartitionOffsets(string name, IDictionary<int, long> partitionOffsets)

Check warning on line 7 in src/KafkaFlow.Abstractions/Configuration/TopicPartitionsOffset.cs

View workflow job for this annotation

GitHub Actions / Test deployment

Missing XML comment for publicly visible type or member 'TopicPartitionOffsets.TopicPartitionOffsets(string, IDictionary<int, long>)'
{
this.Name = name;
this.PartitionOffsets = partitionOffsets;
}

public string Name { get; }

Check warning on line 13 in src/KafkaFlow.Abstractions/Configuration/TopicPartitionsOffset.cs

View workflow job for this annotation

GitHub Actions / Test deployment

Missing XML comment for publicly visible type or member 'TopicPartitionOffsets.Name'

public IDictionary<int, long> PartitionOffsets { get; }

Check warning on line 15 in src/KafkaFlow.Abstractions/Configuration/TopicPartitionsOffset.cs

View workflow job for this annotation

GitHub Actions / Test deployment

Missing XML comment for publicly visible type or member 'TopicPartitionOffsets.PartitionOffsets'
}
10 changes: 7 additions & 3 deletions src/KafkaFlow/Configuration/ConsumerConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ public ConsumerConfiguration(
Confluent.Kafka.ConsumerConfig consumerConfig,
IReadOnlyList<string> topics,
IReadOnlyList<TopicPartitions> manualAssignPartitions,
IReadOnlyList<TopicPartitionOffsets> manualAssignPartitionOffsets,
string consumerName,
ClusterConfiguration clusterConfiguration,
bool managementDisabled,
Expand Down Expand Up @@ -48,6 +49,7 @@ public ConsumerConfiguration(
this.AutoCommitInterval = autoCommitInterval;
this.Topics = topics ?? throw new ArgumentNullException(nameof(topics));
this.ManualAssignPartitions = manualAssignPartitions ?? throw new ArgumentNullException(nameof(manualAssignPartitions));
this.ManualAssignPartitionOffsets = manualAssignPartitionOffsets ?? throw new ArgumentNullException(nameof(manualAssignPartitionOffsets));
this.ConsumerName = consumerName ?? Guid.NewGuid().ToString();
this.ClusterConfiguration = clusterConfiguration;
this.ManagementDisabled = managementDisabled;
Expand All @@ -60,9 +62,9 @@ public ConsumerConfiguration(
this.PendingOffsetsStatisticsHandlers = pendingOffsetsStatisticsHandlers;
this.CustomFactory = customFactory;

this.BufferSize = bufferSize > 0 ?
bufferSize :
throw new ArgumentOutOfRangeException(
this.BufferSize = bufferSize > 0
? bufferSize
: throw new ArgumentOutOfRangeException(
nameof(bufferSize),
bufferSize,
"The value must be greater than 0");
Expand All @@ -76,6 +78,8 @@ public ConsumerConfiguration(

public IReadOnlyList<TopicPartitions> ManualAssignPartitions { get; }

public IReadOnlyList<TopicPartitionOffsets> ManualAssignPartitionOffsets { get; }

public string ConsumerName { get; }

public ClusterConfiguration ClusterConfiguration { get; }
Expand Down
8 changes: 8 additions & 0 deletions src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ internal sealed class ConsumerConfigurationBuilder : IConsumerConfigurationBuild
{
private readonly List<string> _topics = new();
private readonly List<TopicPartitions> _topicsPartitions = new();
private readonly List<TopicPartitionOffsets> _topicsPartitionOffsets = new();
private readonly List<Action<string>> _statisticsHandlers = new();

private readonly List<PendingOffsetsStatisticsHandler> _pendingOffsetsStatisticsHandlers = new();
Expand Down Expand Up @@ -61,6 +62,12 @@ public IConsumerConfigurationBuilder ManualAssignPartitions(string topicName, IE
return this;
}

public IConsumerConfigurationBuilder ManualAssignPartitionOffsets(string topicName, IDictionary<int, long> partitionOffsets)
{
_topicsPartitionOffsets.Add(new TopicPartitionOffsets(topicName, partitionOffsets));
return this;
}

public IConsumerConfigurationBuilder WithConsumerConfig(Confluent.Kafka.ConsumerConfig config)
{
_consumerConfig = config;
Expand Down Expand Up @@ -261,6 +268,7 @@ public IConsumerConfiguration Build(ClusterConfiguration clusterConfiguration)
consumerConfigCopy,
_topics,
_topicsPartitions,
_topicsPartitionOffsets,
_name,
clusterConfiguration,
_disableManagement,
Expand Down
5 changes: 5 additions & 0 deletions src/KafkaFlow/Configuration/IConsumerConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ public interface IConsumerConfiguration
/// </summary>
IReadOnlyList<TopicPartitions> ManualAssignPartitions { get; }

/// <summary>
/// Gets the topic partition offsets to manually assign
/// </summary>
IReadOnlyList<TopicPartitionOffsets> ManualAssignPartitionOffsets { get; }

/// <summary>
/// Gets the consumer name
/// </summary>
Expand Down
76 changes: 44 additions & 32 deletions src/KafkaFlow/Consumers/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,7 @@ public ConsumerStatus Status
return ConsumerStatus.Running;
}

return this.FlowManager.PausedPartitions.Count == this.Assignment.Count ?
ConsumerStatus.Paused :
ConsumerStatus.PartiallyRunning;
return this.FlowManager.PausedPartitions.Count == this.Assignment.Count ? ConsumerStatus.Paused : ConsumerStatus.PartiallyRunning;
}
}

Expand Down Expand Up @@ -130,14 +128,13 @@ public WatermarkOffsets QueryWatermarkOffsets(Confluent.Kafka.TopicPartition top

public IEnumerable<TopicPartitionLag> GetTopicPartitionsLag()
{
return this.Assignment.Select(
tp =>
{
var offset = Math.Max(0, _currentPartitionsOffsets.GetOrAdd(tp, _ => this.GetPosition(tp)));
var offsetEnd = Math.Max(0, this.GetWatermarkOffsets(tp).High.Value);
return this.Assignment.Select(tp =>
{
var offset = Math.Max(0, _currentPartitionsOffsets.GetOrAdd(tp, _ => this.GetPosition(tp)));
var offsetEnd = Math.Max(0, this.GetWatermarkOffsets(tp).High.Value);

return new TopicPartitionLag(tp.Topic, tp.Partition.Value, offset == 0 ? 0 : offsetEnd - offset);
});
return new TopicPartitionLag(tp.Topic, tp.Partition.Value, offset == 0 ? 0 : offsetEnd - offset);
});
}

public void Commit(IReadOnlyCollection<Confluent.Kafka.TopicPartitionOffset> offsets)
Expand Down Expand Up @@ -217,27 +214,26 @@ public async ValueTask<ConsumeResult<byte[], byte[]>> ConsumeAsync(CancellationT

private void RegisterLogErrorHandler()
{
this.OnError(
(_, error) =>
this.OnError((_, error) =>
{
var errorData = new
{
var errorData = new
{
Code = error.Code.ToString(),
error.Reason,
error.IsBrokerError,
error.IsLocalError,
error.IsError,
};

if (error.IsFatal)
{
_logHandler.Error("Kafka Consumer Internal Error", null, errorData);
}
else
{
_logHandler.Warning("Kafka Consumer Internal Warning", errorData);
}
});
Code = error.Code.ToString(),
error.Reason,
error.IsBrokerError,
error.IsLocalError,
error.IsError,
};

if (error.IsFatal)
{
_logHandler.Error("Kafka Consumer Internal Error", null, errorData);
}
else
{
_logHandler.Warning("Kafka Consumer Internal Warning", errorData);
}
});
}

private void EnsureConsumer()
Expand Down Expand Up @@ -277,11 +273,16 @@ private void EnsureConsumer()

if (this.Configuration.ManualAssignPartitions.Any())
{
this.ManualAssign(this.Configuration.ManualAssignPartitions);
this.ManualAssignPartitions(this.Configuration.ManualAssignPartitions);
}

if (this.Configuration.ManualAssignPartitionOffsets.Any())
{
this.ManualAssignPartitionOffsets(this.Configuration.ManualAssignPartitionOffsets);
}
}

private void ManualAssign(IEnumerable<TopicPartitions> topics)
private void ManualAssignPartitions(IEnumerable<TopicPartitions> topics)
{
var partitions = topics
.SelectMany(
Expand All @@ -293,6 +294,17 @@ private void ManualAssign(IEnumerable<TopicPartitions> topics)
this.FirePartitionsAssignedHandlers(_consumer, partitions);
}

private void ManualAssignPartitionOffsets(IEnumerable<TopicPartitionOffsets> topics)
{
var partitionOffsets = topics
.SelectMany(topic => topic.PartitionOffsets.Select(partitionOffset => new Confluent.Kafka.TopicPartitionOffset(
topic.Name, new Partition(partitionOffset.Key), new Offset(partitionOffset.Value))))
.ToList();

_consumer.Assign(partitionOffsets);
this.FirePartitionsAssignedHandlers(_consumer, partitionOffsets.Select(x => x.TopicPartition).ToList());
}

private void FirePartitionsAssignedHandlers(
IConsumer<byte[], byte[]> consumer,
List<Confluent.Kafka.TopicPartition> partitions)
Expand Down
69 changes: 64 additions & 5 deletions tests/KafkaFlow.IntegrationTests/ConsumerTest.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
using System;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Collections.Generic;
using AutoFixture;
using global::Microsoft.Extensions.DependencyInjection;
using global::Microsoft.VisualStudio.TestTools.UnitTesting;
using Confluent.Kafka;
using KafkaFlow.Consumers;
using KafkaFlow.Serializer;
using KafkaFlow.IntegrationTests.Core;
using KafkaFlow.IntegrationTests.Core.Handlers;
using KafkaFlow.IntegrationTests.Core.Messages;
Expand Down Expand Up @@ -127,11 +131,10 @@ public async Task PauseResumeHeartbeatTest()

// Act
await Task.WhenAll(
messages.Select(
m => producer.ProduceAsync(
Bootstrapper.PauseResumeTopicName,
m.Id.ToString(),
m)));
messages.Select(m => producer.ProduceAsync(
Bootstrapper.PauseResumeTopicName,
m.Id.ToString(),
m)));

await Task.Delay(40000);

Expand All @@ -157,4 +160,60 @@ public void AddConsumer_WithSharedConsumerConfig_ConsumersAreConfiguratedIndepen
Assert.IsNotNull(consumers.FirstOrDefault(x => x.GroupId.Equals(Bootstrapper.ProtobufGroupId)));
Assert.IsNotNull(consumers.FirstOrDefault(x => x.GroupId.Equals(Bootstrapper.ProtobufGzipGroupId)));
}

[TestMethod]
public async Task ManualAssignPartitionOffsetsTest()
{
// Arrange
var producer = _provider.GetRequiredService<IMessageProducer<OffsetTrackerProducer>>();
var messages = _fixture
.Build<OffsetTrackerMessage>()
.Without(m => m.Offset)
.CreateMany(10).ToList();

messages.ForEach(m => producer.Produce(m.Id.ToString(), m, null, report => DeliveryHandler(report, messages)));

foreach (var message in messages)
{
await MessageStorage.AssertMessageAsync(message);
}

var endOffset = MessageStorage.GetOffsetTrack();
MessageStorage.Clear();

// Act
var serviceProviderHelper = new ServiceProviderHelper();

await serviceProviderHelper.GetServiceProviderAsync(
consumerConfig =>
{
consumerConfig.ManualAssignPartitionOffsets(Bootstrapper.OffsetTrackerTopicName, new Dictionary<int, long> { { 0, endOffset - 4 } })
.WithGroupId("ManualAssignPartitionOffsetsTest")
.WithBufferSize(100)
.WithWorkersCount(10)
.AddMiddlewares(middlewares => middlewares
.AddDeserializer<JsonCoreDeserializer>()
.AddTypedHandlers(handlers => handlers.AddHandler<OffsetTrackerMessageHandler>()));
}, null);

// Assert
for (var i = 0; i < 5; i++)
{
await MessageStorage.AssertOffsetTrackerMessageAsync(messages[i], false);
}

for (var i = 5; i < 10; i++)
{
await MessageStorage.AssertOffsetTrackerMessageAsync(messages[i]);
}

await serviceProviderHelper.StopBusAsync();
}

private static void DeliveryHandler(DeliveryReport<byte[], byte[]> report, List<OffsetTrackerMessage> messages)
{
var key = Encoding.UTF8.GetString(report.Message.Key);
var message = messages.First(m => m.Id.ToString() == key);
message.Offset = report.Offset;
}
}
25 changes: 25 additions & 0 deletions tests/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ internal static class Bootstrapper
internal const string AvroGroupId = "consumer-avro";
internal const string JsonGroupId = "consumer-json";
internal const string NullGroupId = "consumer-null";
internal const string OffsetTrackerGroupId = "consumer-offset-tracker";


private const string ProtobufTopicName = "test-protobuf";
Expand All @@ -43,6 +44,7 @@ internal static class Bootstrapper
private const string AvroTopicName = "test-avro";
private const string NullTopicName = "test-null";
private const string DefaultParamsTopicName = "test-default-params";
internal const string OffsetTrackerTopicName = "test-offset-tracker";

private static readonly Lazy<IServiceProvider> s_lazyProvider = new(SetupProvider);

Expand Down Expand Up @@ -207,6 +209,7 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection
.CreateTopicIfNotExists(ProtobufGzipTopicName, 2, 1)
.CreateTopicIfNotExists(ProtobufGzipTopicName2, 2, 1)
.CreateTopicIfNotExists(NullTopicName, 1, 1)
.CreateTopicIfNotExists(OffsetTrackerTopicName, 1, 1)
.CreateTopicIfNotExists(DefaultParamsTopicName)
.AddConsumer(
consumer => consumer
Expand Down Expand Up @@ -270,6 +273,22 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection
.WithAutoOffsetReset(AutoOffsetReset.Latest)
.AddMiddlewares(
middlewares => middlewares.Add<NullHandlerMiddleware>(MiddlewareLifetime.Singleton)))
.AddConsumer(
consumer => consumer
.Topic(OffsetTrackerTopicName)
.WithGroupId(OffsetTrackerGroupId)
.WithBufferSize(100)
.WithWorkersCount(10)
.WithAutoOffsetReset(AutoOffsetReset.Latest)
.AddMiddlewares(
middlewares => middlewares
.AddDeserializer<JsonCoreDeserializer>()
.AddTypedHandlers(
handlers =>
handlers
.WithHandlerLifetime(InstanceLifetime.Singleton)
.AddHandler<OffsetTrackerMessageHandler>()
)))
.AddConsumer(
consumer => consumer
.Topics(GzipTopicName)
Expand Down Expand Up @@ -326,6 +345,12 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection
.AddMiddlewares(
middlewares => middlewares
.AddSerializer<JsonCoreSerializer>()))
.AddProducer<OffsetTrackerProducer>(
producer => producer
.DefaultTopic(OffsetTrackerTopicName)
.AddMiddlewares(
middlewares => middlewares
.AddSerializer<JsonCoreSerializer>()))
.AddProducer<JsonGzipProducer>(
producer => producer
.DefaultTopic(JsonGzipTopicName)
Expand Down
Loading
Loading