Skip to content

Commit

Permalink
Let Exception bubble out of consumeRecords()
Browse files Browse the repository at this point in the history
Signed-off-by: Josh Crean <[email protected]>
Signed-off-by: Josh Crean <[email protected]>
  • Loading branch information
Josh Crean committed Jan 16, 2025
1 parent e0a9171 commit be6b750
Showing 1 changed file with 3 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -274,16 +274,13 @@ public void testBufferOverflowPauseResume() throws InterruptedException, Excepti
}

@Test
public void testKafkaMetadata() {
public void testKafkaMetadata() throws Exception {
String topic = topicConfig.getName();
consumerRecords = createPlainTextRecords(topic, 0L);
when(kafkaConsumer.poll(any(Duration.class))).thenReturn(consumerRecords);
consumer = createObjectUnderTest("plaintext", false);

try {
consumer.onPartitionsAssigned(List.of(new TopicPartition(topic, testPartition)));
consumer.consumeRecords();
} catch (Exception e){}
consumer.onPartitionsAssigned(List.of(new TopicPartition(topic, testPartition)));
consumer.consumeRecords();
final Map.Entry<Collection<Record<Event>>, CheckpointState> bufferRecords = buffer.read(1000);
ArrayList<Record<Event>> bufferedRecords = new ArrayList<>(bufferRecords.getKey());
Assertions.assertEquals(consumerRecords.count(), bufferedRecords.size());
Expand Down

0 comments on commit be6b750

Please sign in to comment.