@@ -17,7 +17,7 @@ internal class Consumer : IConsumer
1717 private readonly ILogHandler _logHandler ;
1818 private readonly bool _stopTheWorldStrategy ;
1919
20- private readonly List < Action < IDependencyResolver , IConsumer < byte [ ] , byte [ ] > , List < TopicPartition > > >
20+ private readonly List < Action < IDependencyResolver , IConsumer < byte [ ] , byte [ ] > , List < Confluent . Kafka . TopicPartition > > >
2121 _partitionsAssignedHandlers = new ( ) ;
2222
2323 private readonly List < Action < IDependencyResolver , IConsumer < byte [ ] , byte [ ] > ,
@@ -26,7 +26,7 @@ private readonly List<Action<IDependencyResolver, IConsumer<byte[], byte[]>, Lis
2626
2727 private readonly List < Action < IConsumer < byte [ ] , byte [ ] > , Error > > _errorsHandlers = new ( ) ;
2828 private readonly List < Action < IConsumer < byte [ ] , byte [ ] > , string > > _statisticsHandlers = new ( ) ;
29- private readonly ConcurrentDictionary < TopicPartition , long > _currentPartitionsOffsets = new ( ) ;
29+ private readonly ConcurrentDictionary < Confluent . Kafka . TopicPartition , long > _currentPartitionsOffsets = new ( ) ;
3030 private readonly ConsumerFlowManager _flowManager ;
3131 private readonly Event _maxPollIntervalExceeded ;
3232
@@ -71,7 +71,7 @@ public Consumer(
7171
7272 public IReadOnlyList < string > Subscription { get ; private set ; } = new List < string > ( ) ;
7373
74- public IReadOnlyList < TopicPartition > Assignment { get ; private set ; } = new List < TopicPartition > ( ) ;
74+ public IReadOnlyList < Confluent . Kafka . TopicPartition > Assignment { get ; private set ; } = new List < Confluent . Kafka . TopicPartition > ( ) ;
7575
7676 public IConsumerFlowManager FlowManager => _flowManager ;
7777
@@ -99,7 +99,7 @@ public ConsumerStatus Status
9999 }
100100 }
101101
102- public void OnPartitionsAssigned ( Action < IDependencyResolver , IConsumer < byte [ ] , byte [ ] > , List < TopicPartition > > handler ) =>
102+ public void OnPartitionsAssigned ( Action < IDependencyResolver , IConsumer < byte [ ] , byte [ ] > , List < Confluent . Kafka . TopicPartition > > handler ) =>
103103 _partitionsAssignedHandlers . Add ( handler ) ;
104104
105105 public void OnPartitionsRevoked (
@@ -112,13 +112,13 @@ public void OnError(Action<IConsumer<byte[], byte[]>, Error> handler) =>
112112 public void OnStatistics ( Action < IConsumer < byte [ ] , byte [ ] > , string > handler ) =>
113113 _statisticsHandlers . Add ( handler ) ;
114114
115- public Offset GetPosition ( TopicPartition topicPartition ) =>
115+ public Offset GetPosition ( Confluent . Kafka . TopicPartition topicPartition ) =>
116116 _consumer . Position ( topicPartition ) ;
117117
118- public WatermarkOffsets GetWatermarkOffsets ( TopicPartition topicPartition ) =>
118+ public WatermarkOffsets GetWatermarkOffsets ( Confluent . Kafka . TopicPartition topicPartition ) =>
119119 _consumer . GetWatermarkOffsets ( topicPartition ) ;
120120
121- public WatermarkOffsets QueryWatermarkOffsets ( TopicPartition topicPartition , TimeSpan timeout ) =>
121+ public WatermarkOffsets QueryWatermarkOffsets ( Confluent . Kafka . TopicPartition topicPartition , TimeSpan timeout ) =>
122122 _consumer . QueryWatermarkOffsets ( topicPartition , timeout ) ;
123123
124124 public List < Confluent . Kafka . TopicPartitionOffset > OffsetsForTimes (
@@ -285,7 +285,9 @@ private void EnsureConsumer()
285285 private void ManualAssignPartitions ( IEnumerable < TopicPartitions > topics )
286286 {
287287 var partitions = topics
288- . SelectMany ( topic => topic . Partitions . Select ( partition => new TopicPartition ( topic . Name , new Partition ( partition ) ) ) )
288+ . SelectMany (
289+ topic => topic . Partitions . Select (
290+ partition => new Confluent . Kafka . TopicPartition ( topic . Name , new Partition ( partition ) ) ) )
289291 . ToList ( ) ;
290292
291293 _consumer . Assign ( partitions ) ;
@@ -305,7 +307,7 @@ private void ManualAssignPartitionOffsets(IEnumerable<TopicPartitionOffsets> top
305307
306308 private void FirePartitionsAssignedHandlers (
307309 IConsumer < byte [ ] , byte [ ] > consumer ,
308- List < TopicPartition > partitions )
310+ List < Confluent . Kafka . TopicPartition > partitions )
309311 {
310312 if ( _stopTheWorldStrategy )
311313 {
@@ -333,7 +335,7 @@ private void FirePartitionRevokedHandlers(IConsumer<byte[], byte[]> consumer, Li
333335 if ( _stopTheWorldStrategy )
334336 {
335337 _partitionsRevokedHandlers . ForEach ( handler => handler ( _dependencyResolver , consumer , partitions ) ) ;
336- this . Assignment = new List < TopicPartition > ( ) ;
338+ this . Assignment = new List < Confluent . Kafka . TopicPartition > ( ) ;
337339 this . Subscription = new List < string > ( ) ;
338340 _currentPartitionsOffsets . Clear ( ) ;
339341 _flowManager . Stop ( ) ;
0 commit comments