Skip to content

Commit

Permalink
KAFKA-8862: Improve Producer error message for failed metadata update (
Browse files Browse the repository at this point in the history
…#18587)

We should provide the same informative error message for both timeout
cases.

Reviewers: Kirk True <ktrue@confluent.io>, Andrew Schofield <aschofield@confluent.io>, Ismael Juma <ismael@juma.me.uk>
mjsax committed Jan 21, 2025
1 parent c6d452b commit 6612dd5
Showing 1 changed file with 9 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -1104,8 +1104,7 @@ private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long
metadata.awaitUpdate(version, remainingWaitMs);
} catch (TimeoutException ex) {
// Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
final String errorMessage = String.format("Topic %s not present in metadata after %d ms.",
topic, maxWaitMs);
final String errorMessage = getErrorMessage(partitionsCount, topic, partition, maxWaitMs);
if (metadata.getError(topic) != null) {
throw new TimeoutException(errorMessage, metadata.getError(topic).exception());
}
@@ -1114,11 +1113,7 @@ private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long
cluster = metadata.fetch();
elapsed = time.milliseconds() - nowMs;
if (elapsed >= maxWaitMs) {
final String errorMessage = partitionsCount == null ?
String.format("Topic %s not present in metadata after %d ms.",
topic, maxWaitMs) :
String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.",
partition, topic, partitionsCount, maxWaitMs);
final String errorMessage = getErrorMessage(partitionsCount, topic, partition, maxWaitMs);
if (metadata.getError(topic) != null && metadata.getError(topic).exception() instanceof RetriableException) {
throw new TimeoutException(errorMessage, metadata.getError(topic).exception());
}
@@ -1134,6 +1129,13 @@ private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long
return new ClusterAndWaitTime(cluster, elapsed);
}

private String getErrorMessage(Integer partitionsCount, String topic, Integer partition, long maxWaitMs) {
return partitionsCount == null ?
String.format("Topic %s not present in metadata after %d ms.",
topic, maxWaitMs) :
String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.",
partition, topic, partitionsCount, maxWaitMs);
}
/**
* Validate that the record size isn't too large
*/

0 comments on commit 6612dd5

Please sign in to comment.