Skip to content

Commit

Permalink
KAFKA-8862: Improve Producer error message for failed metadata update (
Browse files Browse the repository at this point in the history
…#18587)

We should provide the same informative error message for both timeout
cases.

Reviewers: Kirk True <[email protected]>, Andrew Schofield <[email protected]>, Ismael Juma <[email protected]>
  • Loading branch information
mjsax committed Jan 21, 2025
1 parent 310867a commit 379049f
Showing 1 changed file with 9 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1178,8 +1178,7 @@ private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long
metadata.awaitUpdate(version, remainingWaitMs);
} catch (TimeoutException ex) {
// Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
final String errorMessage = String.format("Topic %s not present in metadata after %d ms.",
topic, maxWaitMs);
final String errorMessage = getErrorMessage(partitionsCount, topic, partition, maxWaitMs);
if (metadata.getError(topic) != null) {
throw new TimeoutException(errorMessage, metadata.getError(topic).exception());
}
Expand All @@ -1188,11 +1187,7 @@ private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long
cluster = metadata.fetch();
elapsed = time.milliseconds() - nowMs;
if (elapsed >= maxWaitMs) {
final String errorMessage = partitionsCount == null ?
String.format("Topic %s not present in metadata after %d ms.",
topic, maxWaitMs) :
String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.",
partition, topic, partitionsCount, maxWaitMs);
final String errorMessage = getErrorMessage(partitionsCount, topic, partition, maxWaitMs);
if (metadata.getError(topic) != null && metadata.getError(topic).exception() instanceof RetriableException) {
throw new TimeoutException(errorMessage, metadata.getError(topic).exception());
}
Expand All @@ -1208,6 +1203,13 @@ private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long
return new ClusterAndWaitTime(cluster, elapsed);
}

private String getErrorMessage(Integer partitionsCount, String topic, Integer partition, long maxWaitMs) {
return partitionsCount == null ?
String.format("Topic %s not present in metadata after %d ms.",
topic, maxWaitMs) :
String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.",
partition, topic, partitionsCount, maxWaitMs);
}
/**
* Validate that the record size isn't too large
*/
Expand Down

0 comments on commit 379049f

Please sign in to comment.