Skip to content

Commit f3d05f8

Browse files
committed
fix some test
1 parent f80d5bd commit f3d05f8

File tree

3 files changed

+43
-37
lines changed

3 files changed

+43
-37
lines changed

clients/src/test/java/org/apache/kafka/common/message/MessageTest.java

Lines changed: 39 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -182,43 +182,47 @@ public void testJoinGroupRequestVersions() throws Exception {
182182
testAllMessageRoundTripsFromVersion((short) 5, newRequest.get().setGroupInstanceId("instanceId"));
183183
}
184184

185-
@Test
186-
public void testListOffsetsRequestVersions() throws Exception {
187-
List<ListOffsetsTopic> v = Collections.singletonList(new ListOffsetsTopic()
188-
.setName("topic")
189-
.setPartitions(Collections.singletonList(new ListOffsetsPartition()
190-
.setPartitionIndex(0)
191-
.setTimestamp(123L))));
192-
Supplier<ListOffsetsRequestData> newRequest = () -> new ListOffsetsRequestData()
193-
.setTopics(v)
194-
.setReplicaId(0);
195-
testAllMessageRoundTrips(newRequest.get());
196-
testAllMessageRoundTripsFromVersion((short) 2, newRequest.get().setIsolationLevel(IsolationLevel.READ_COMMITTED.id()));
185+
@ParameterizedTest
186+
@ApiKeyVersionsSource(apiKey = ApiKeys.LIST_OFFSETS)
187+
public void testListOffsetsRequestVersions(short version) throws Exception {
188+
ListOffsetsRequestData request = new ListOffsetsRequestData()
189+
.setReplicaId(0)
190+
.setIsolationLevel(version >= 2 ? IsolationLevel.READ_COMMITTED.id() : 0)
191+
.setTopics(singletonList(
192+
new ListOffsetsTopic()
193+
.setTopicId(version >= 12 ? Uuid.randomUuid() : Uuid.ZERO_UUID)
194+
.setName(version < 12 ? "topic" : "")
195+
.setPartitions(singletonList(
196+
new ListOffsetsPartition()
197+
.setPartitionIndex(0)
198+
.setCurrentLeaderEpoch(version >= 4 ? 10 : -1)
199+
.setTimestamp(123L)
200+
))
201+
));
202+
203+
testMessageRoundTrip(version, request, request);
197204
}
198205

199-
@Test
200-
public void testListOffsetsResponseVersions() throws Exception {
201-
ListOffsetsPartitionResponse partition = new ListOffsetsPartitionResponse()
202-
.setErrorCode(Errors.NONE.code())
203-
.setPartitionIndex(0);
204-
List<ListOffsetsTopicResponse> topics = Collections.singletonList(new ListOffsetsTopicResponse()
205-
.setName("topic")
206-
.setPartitions(Collections.singletonList(partition)));
207-
Supplier<ListOffsetsResponseData> response = () -> new ListOffsetsResponseData()
208-
.setTopics(topics);
209-
for (short version = ApiKeys.LIST_OFFSETS.oldestVersion(); version <= ApiKeys.LIST_OFFSETS.latestVersion(); ++version) {
210-
ListOffsetsResponseData responseData = response.get();
211-
responseData.topics().get(0).partitions().get(0)
212-
.setOffset(456L)
213-
.setTimestamp(123L);
214-
if (version > 1) {
215-
responseData.setThrottleTimeMs(1000);
216-
}
217-
if (version > 3) {
218-
partition.setLeaderEpoch(1);
219-
}
220-
testEquivalentMessageRoundTrip(version, responseData);
221-
}
206+
@ParameterizedTest
207+
@ApiKeyVersionsSource(apiKey = ApiKeys.LIST_OFFSETS)
208+
public void testListOffsetsResponseVersions(short version) throws Exception {
209+
ListOffsetsResponseData response = new ListOffsetsResponseData()
210+
.setThrottleTimeMs(version >= 2 ? 1000 : 0)
211+
.setTopics(singletonList(
212+
new ListOffsetsTopicResponse()
213+
.setTopicId(version >= 12 ? Uuid.randomUuid() : Uuid.ZERO_UUID)
214+
.setName(version < 12 ? "topic" : "")
215+
.setPartitions(singletonList(
216+
new ListOffsetsPartitionResponse()
217+
.setPartitionIndex(0)
218+
.setErrorCode(Errors.NONE.code())
219+
.setTimestamp(123L)
220+
.setOffset(456L)
221+
.setLeaderEpoch(version >= 4 ? 1 : -1)
222+
))
223+
));
224+
225+
testMessageRoundTrip(version, response, response);
222226
}
223227

224228
@Test

server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,9 @@ public short fetchRequestVersion() {
274274
}
275275

276276
public short listOffsetRequestVersion() {
277-
if (this.isAtLeast(IBP_4_2_IV1)) {
277+
if (this.isAtLeast(IBP_4_3_IV0)) {
278+
return 12;
279+
} else if (this.isAtLeast(IBP_4_2_IV1)) {
278280
return 11;
279281
} else if (this.isAtLeast(IBP_4_0_IV3)) {
280282
return 10;

storage/src/test/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsetsTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ public void testResponseOnPartialError() {
193193
Map<TopicIdPartition, ListOffsetsPartitionStatus> statusByPartition = Map.of(
194194
new TopicIdPartition(Uuid.randomUuid(), 0, "test"), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(),
195195
new TopicIdPartition(Uuid.randomUuid(), 1, "test"), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build(),
196-
new TopicIdPartition(Uuid.randomUuid(), 0, "test1"), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(holder)).build()
196+
new TopicIdPartition(Uuid.randomUuid(), 0, "test1"), ListOffsetsPartitionStatus.builder().futureHolderOpt(Optional.of(errorFutureHolder)).build()
197197
);
198198

199199
DelayedRemoteListOffsets delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, 5, statusByPartition, partitionOrException, responseCallback);

0 commit comments

Comments
 (0)