diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java b/core/src/main/java/kafka/server/share/DelayedShareFetch.java index 7b061a28bd50e..bc8146910ad21 100644 --- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java +++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java @@ -526,7 +526,6 @@ else if (isolationType == FetchIsolation.HIGH_WATERMARK) return offsetSnapshot.highWatermark(); else return offsetSnapshot.lastStableOffset(); - } private LinkedHashMap readFromLog(LinkedHashMap topicPartitionFetchOffsets, diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java index 9585714deb85f..0f7a82f9e158c 100644 --- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java +++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java @@ -42,7 +42,6 @@ import org.apache.kafka.server.share.metrics.ShareGroupMetrics; import org.apache.kafka.server.storage.log.FetchIsolation; import org.apache.kafka.server.storage.log.FetchParams; -import org.apache.kafka.server.storage.log.FetchPartitionData; import org.apache.kafka.server.util.MockTime; import org.apache.kafka.server.util.timer.SystemTimer; import org.apache.kafka.server.util.timer.SystemTimerReaper; @@ -88,7 +87,6 @@ import static kafka.server.share.SharePartitionManagerTest.REMOTE_FETCH_MAX_WAIT_MS; import static kafka.server.share.SharePartitionManagerTest.buildLogReadResult; import static kafka.server.share.SharePartitionManagerTest.mockReplicaManagerDelayedShareFetch; -import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.createShareAcquiredRecords; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -98,11 +96,11 @@ import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.when; @@ -212,17 +210,14 @@ public void testTryCompleteWhenMinBytesNotSatisfiedOnFirstFetch() { when(sp0.canAcquireRecords()).thenReturn(true); when(sp1.canAcquireRecords()).thenReturn(false); - when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( - createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); // We are testing the case when the share partition is getting fetched for the first time, so for the first time - // the fetchOffsetMetadata will return empty. Post the readFromLog call, the fetchOffsetMetadata will be + // the fetchOffsetMetadata will return empty. Post the first readFromLog call, the fetchOffsetMetadata will be // populated for the share partition, which has 1 as the positional difference, so it doesn't satisfy the minBytes(2). when(sp0.fetchOffsetMetadata(anyLong())) .thenReturn(Optional.empty()) .thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0))); LogOffsetMetadata hwmOffsetMetadata = new LogOffsetMetadata(1, 1, 1); - mockTopicIdPartitionFetchBytes(replicaManager, tp0, hwmOffsetMetadata); doAnswer(invocation -> buildLogReadResult(List.of(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); BiConsumer exceptionHandler = mockExceptionHandler(); @@ -230,6 +225,7 @@ public void testTryCompleteWhenMinBytesNotSatisfiedOnFirstFetch() { PartitionMaxBytesStrategy partitionMaxBytesStrategy = mockPartitionMaxBytes(Set.of(tp0)); Partition p0 = mock(Partition.class); + mockTopicIdPartitionFetchBytes(hwmOffsetMetadata, p0); when(p0.isLeader()).thenReturn(true); Partition p1 = mock(Partition.class); @@ -258,7 +254,7 @@ public void testTryCompleteWhenMinBytesNotSatisfiedOnFirstFetch() { assertFalse(delayedShareFetch.isCompleted()); - // Since sp1 cannot be acquired, tryComplete should return false. + // Since minBytes(2) is not satisfied (only 1 byte available from sp0), and sp1 cannot be acquired, tryComplete should return false. assertFalse(delayedShareFetch.tryComplete()); assertFalse(delayedShareFetch.isCompleted()); Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(any()); @@ -270,7 +266,7 @@ public void testTryCompleteWhenMinBytesNotSatisfiedOnFirstFetch() { assertNull(shareGroupMetrics.topicPartitionsFetchRatio(groupId)); delayedShareFetch.lock().unlock(); - Mockito.verify(exceptionHandler, times(1)).accept(any(), any()); + Mockito.verify(exceptionHandler, never()).accept(any(), any()); } @Test @@ -296,18 +292,16 @@ public void testTryCompleteWhenMinBytesNotSatisfiedOnSubsequentFetch() { when(sp0.canAcquireRecords()).thenReturn(true); when(sp1.canAcquireRecords()).thenReturn(false); - when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( - createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); // We are testing the case when the share partition has been fetched before, hence we are mocking positionDiff // functionality to give the file position difference as 1 byte, so it doesn't satisfy the minBytes(2). LogOffsetMetadata hwmOffsetMetadata = mock(LogOffsetMetadata.class); when(hwmOffsetMetadata.positionDiff(any())).thenReturn(1); when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(mock(LogOffsetMetadata.class))); - mockTopicIdPartitionFetchBytes(replicaManager, tp0, hwmOffsetMetadata); BiConsumer exceptionHandler = mockExceptionHandler(); Partition p0 = mock(Partition.class); + mockTopicIdPartitionFetchBytes(hwmOffsetMetadata, p0); when(p0.isLeader()).thenReturn(true); Partition p1 = mock(Partition.class); @@ -336,7 +330,7 @@ public void testTryCompleteWhenMinBytesNotSatisfiedOnSubsequentFetch() { Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(any()); assertTrue(delayedShareFetch.lock().tryLock()); delayedShareFetch.lock().unlock(); - Mockito.verify(exceptionHandler, times(1)).accept(any(), any()); + Mockito.verify(exceptionHandler, never()).accept(any(), any()); } @Test @@ -360,8 +354,6 @@ public void testDelayedShareFetchTryCompleteReturnsTrue() { when(sp0.canAcquireRecords()).thenReturn(true); when(sp1.canAcquireRecords()).thenReturn(false); - when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class), any())).thenReturn( - createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); doAnswer(invocation -> buildLogReadResult(List.of(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0))); @@ -373,6 +365,7 @@ public void testDelayedShareFetchTryCompleteReturnsTrue() { when(time.hiResClockMs()).thenReturn(120L).thenReturn(140L); ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time); Uuid fetchId = Uuid.randomUuid(); + BiConsumer exceptionHandler = mockExceptionHandler(); DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() .withShareFetchData(shareFetch) .withSharePartitions(sharePartitions) @@ -381,6 +374,7 @@ public void testDelayedShareFetchTryCompleteReturnsTrue() { .withShareGroupMetrics(shareGroupMetrics) .withTime(time) .withFetchId(fetchId) + .withExceptionHandler(exceptionHandler) .build()); when(sp0.maybeAcquireFetchLock(fetchId)).thenReturn(true); @@ -388,17 +382,23 @@ public void testDelayedShareFetchTryCompleteReturnsTrue() { assertFalse(delayedShareFetch.isCompleted()); - // Since sp1 can be acquired, tryComplete should return true. - assertTrue(delayedShareFetch.tryComplete()); - assertTrue(delayedShareFetch.isCompleted()); - Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(any()); - assertTrue(delayedShareFetch.lock().tryLock()); - assertEquals(1, shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).count()); - assertEquals(20, shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).sum()); - assertEquals(1, shareGroupMetrics.topicPartitionsFetchRatio(groupId).count()); - assertEquals(50, shareGroupMetrics.topicPartitionsFetchRatio(groupId).sum()); + try (MockedStatic mockedShareFetchUtils = Mockito.mockStatic(ShareFetchUtils.class, Mockito.CALLS_REAL_METHODS)) { + mockedShareFetchUtils.when(() -> ShareFetchUtils.processFetchResponse(any(), any(), any(), any(), any())) + .thenReturn(Map.of(tp0, mock(ShareFetchResponseData.PartitionData.class))); - delayedShareFetch.lock().unlock(); + // Since sp0 can be acquired, tryComplete should return true. + assertTrue(delayedShareFetch.tryComplete()); + assertTrue(delayedShareFetch.isCompleted()); + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(any()); + assertTrue(delayedShareFetch.lock().tryLock()); + assertEquals(1, shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).count()); + assertEquals(20, shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).sum()); + assertEquals(1, shareGroupMetrics.topicPartitionsFetchRatio(groupId).count()); + assertEquals(50, shareGroupMetrics.topicPartitionsFetchRatio(groupId).sum()); + Mockito.verify(exceptionHandler, never()).accept(any(), any()); + + delayedShareFetch.lock().unlock(); + } } @Test @@ -479,8 +479,6 @@ public void testReplicaManagerFetchShouldHappenOnComplete() { when(sp0.canAcquireRecords()).thenReturn(true); when(sp1.canAcquireRecords()).thenReturn(false); - when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class), any())).thenReturn( - createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); doAnswer(invocation -> buildLogReadResult(List.of(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); PartitionMaxBytesStrategy partitionMaxBytesStrategy = mockPartitionMaxBytes(Set.of(tp0)); @@ -489,6 +487,7 @@ public void testReplicaManagerFetchShouldHappenOnComplete() { when(time.hiResClockMs()).thenReturn(10L).thenReturn(140L); ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time); Uuid fetchId = Uuid.randomUuid(); + BiConsumer exceptionHandler = mockExceptionHandler(); DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() .withShareFetchData(shareFetch) .withReplicaManager(replicaManager) @@ -497,29 +496,36 @@ public void testReplicaManagerFetchShouldHappenOnComplete() { .withShareGroupMetrics(shareGroupMetrics) .withTime(time) .withFetchId(fetchId) + .withExceptionHandler(exceptionHandler) .build()); when(sp0.maybeAcquireFetchLock(fetchId)).thenReturn(true); when(sp1.maybeAcquireFetchLock(fetchId)).thenReturn(true); - assertFalse(delayedShareFetch.isCompleted()); - delayedShareFetch.forceComplete(); + try (MockedStatic mockedShareFetchUtils = Mockito.mockStatic(ShareFetchUtils.class, Mockito.CALLS_REAL_METHODS)) { + mockedShareFetchUtils.when(() -> ShareFetchUtils.processFetchResponse(any(), any(), any(), any(), any())) + .thenReturn(Map.of(tp0, mock(ShareFetchResponseData.PartitionData.class))); - // Since we can acquire records from sp0, replicaManager.readFromLog should be called once and only for sp0. - Mockito.verify(replicaManager, times(1)).readFromLog( + assertFalse(delayedShareFetch.isCompleted()); + delayedShareFetch.forceComplete(); + + Mockito.verify(exceptionHandler, never()).accept(any(), any()); + // Since we can acquire records from sp0, replicaManager.readFromLog should be called once and only for sp0. + Mockito.verify(replicaManager, times(1)).readFromLog( any(), any(), any(ReplicaQuota.class), anyBoolean()); - Mockito.verify(sp0, times(1)).nextFetchOffset(); - Mockito.verify(sp1, times(0)).nextFetchOffset(); - assertTrue(delayedShareFetch.isCompleted()); - assertTrue(shareFetch.isCompleted()); - Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(any()); - assertTrue(delayedShareFetch.lock().tryLock()); - assertEquals(1, shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).count()); - assertEquals(130, shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).sum()); - assertEquals(1, shareGroupMetrics.topicPartitionsFetchRatio(groupId).count()); - assertEquals(50, shareGroupMetrics.topicPartitionsFetchRatio(groupId).sum()); + Mockito.verify(sp0, times(1)).nextFetchOffset(); + Mockito.verify(sp1, times(0)).nextFetchOffset(); + assertTrue(delayedShareFetch.isCompleted()); + assertTrue(shareFetch.isCompleted()); + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(any()); + assertTrue(delayedShareFetch.lock().tryLock()); + assertEquals(1, shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).count()); + assertEquals(130, shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).sum()); + assertEquals(1, shareGroupMetrics.topicPartitionsFetchRatio(groupId).count()); + assertEquals(50, shareGroupMetrics.topicPartitionsFetchRatio(groupId).sum()); - delayedShareFetch.lock().unlock(); + delayedShareFetch.lock().unlock(); + } } @Test @@ -658,33 +664,38 @@ public void testForceCompleteTriggersDelayedActionsQueue() { sharePartitions2.put(tp2, sp2); Uuid fetchId2 = Uuid.randomUuid(); + BiConsumer exceptionHandler = mockExceptionHandler(); DelayedShareFetch delayedShareFetch2 = spy(DelayedShareFetchBuilder.builder() .withShareFetchData(shareFetch2) .withReplicaManager(replicaManager) .withSharePartitions(sharePartitions2) .withPartitionMaxBytesStrategy(partitionMaxBytesStrategy) .withFetchId(fetchId2) + .withExceptionHandler(exceptionHandler) .build()); // sp1 can be acquired now when(sp1.maybeAcquireFetchLock(fetchId2)).thenReturn(true); when(sp1.canAcquireRecords()).thenReturn(true); - when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class), any())).thenReturn( - createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); - - // when forceComplete is called for delayedShareFetch2, since tp1 is common in between delayed share fetch - // requests, it should add a "check and complete" action for request key tp1 on the purgatory. - delayedShareFetch2.forceComplete(); - assertTrue(delayedShareFetch2.isCompleted()); - assertTrue(shareFetch2.isCompleted()); - Mockito.verify(replicaManager, times(1)).readFromLog( - any(), any(), any(ReplicaQuota.class), anyBoolean()); - assertFalse(delayedShareFetch1.isCompleted()); - Mockito.verify(replicaManager, times(1)).addToActionQueue(any()); - Mockito.verify(replicaManager, times(0)).tryCompleteActions(); - Mockito.verify(delayedShareFetch2, times(1)).releasePartitionLocks(any()); - assertTrue(delayedShareFetch2.lock().tryLock()); - delayedShareFetch2.lock().unlock(); + try (MockedStatic mockedShareFetchUtils = Mockito.mockStatic(ShareFetchUtils.class, Mockito.CALLS_REAL_METHODS)) { + mockedShareFetchUtils.when(() -> ShareFetchUtils.processFetchResponse(any(), any(), any(), any(), any())) + .thenReturn(Map.of(tp0, mock(ShareFetchResponseData.PartitionData.class))); + + // when forceComplete is called for delayedShareFetch2, since tp1 is common in between delayed share fetch + // requests, it should add a "check and complete" action for request key tp1 on the purgatory. + delayedShareFetch2.forceComplete(); + assertTrue(delayedShareFetch2.isCompleted()); + assertTrue(shareFetch2.isCompleted()); + Mockito.verify(exceptionHandler, never()).accept(any(), any()); + Mockito.verify(replicaManager, times(1)).readFromLog( + any(), any(), any(ReplicaQuota.class), anyBoolean()); + assertFalse(delayedShareFetch1.isCompleted()); + Mockito.verify(replicaManager, times(1)).addToActionQueue(any()); + Mockito.verify(replicaManager, times(0)).tryCompleteActions(); + Mockito.verify(delayedShareFetch2, times(1)).releasePartitionLocks(any()); + assertTrue(delayedShareFetch2.lock().tryLock()); + delayedShareFetch2.lock().unlock(); + } } @Test @@ -763,8 +774,6 @@ public void testExceptionInMinBytesCalculation() { BROKER_TOPIC_STATS); when(sp0.canAcquireRecords()).thenReturn(true); - when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( - createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); doAnswer(invocation -> buildLogReadResult(List.of(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); // Mocking partition object to throw an exception during min bytes calculation while calling fetchOffsetSnapshot @@ -1033,17 +1042,6 @@ public void testPartitionMaxBytesFromUniformStrategyWhenAllPartitionsAreAcquirab Uuid.randomUuid().toString(), new CompletableFuture<>(), List.of(tp0, tp1, tp2, tp3, tp4), BATCH_SIZE, MAX_FETCH_RECORDS, BROKER_TOPIC_STATS); - when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class), any())).thenReturn( - createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); - when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class), any())).thenReturn( - createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); - when(sp2.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class), any())).thenReturn( - createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); - when(sp3.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class), any())).thenReturn( - createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); - when(sp4.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class), any())).thenReturn( - createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); - // All 5 partitions are acquirable. doAnswer(invocation -> buildLogReadResult(sharePartitions.keySet().stream().toList())).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); @@ -1060,12 +1058,14 @@ public void testPartitionMaxBytesFromUniformStrategyWhenAllPartitionsAreAcquirab mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp4, 1); Uuid fetchId = Uuid.randomUuid(); + BiConsumer exceptionHandler = mockExceptionHandler(); DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() .withShareFetchData(shareFetch) .withSharePartitions(sharePartitions) .withReplicaManager(replicaManager) .withPartitionMaxBytesStrategy(PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM)) .withFetchId(fetchId) + .withExceptionHandler(exceptionHandler) .build()); when(sp0.maybeAcquireFetchLock(fetchId)).thenReturn(true); @@ -1074,29 +1074,40 @@ public void testPartitionMaxBytesFromUniformStrategyWhenAllPartitionsAreAcquirab when(sp3.maybeAcquireFetchLock(fetchId)).thenReturn(true); when(sp4.maybeAcquireFetchLock(fetchId)).thenReturn(true); - assertTrue(delayedShareFetch.tryComplete()); - assertTrue(delayedShareFetch.isCompleted()); + try (MockedStatic mockedShareFetchUtils = Mockito.mockStatic(ShareFetchUtils.class, Mockito.CALLS_REAL_METHODS)) { + mockedShareFetchUtils.when(() -> ShareFetchUtils.processFetchResponse(any(), any(), any(), any(), any())) + .thenReturn(Map.of( + tp0, mock(ShareFetchResponseData.PartitionData.class), + tp1, mock(ShareFetchResponseData.PartitionData.class), + tp2, mock(ShareFetchResponseData.PartitionData.class), + tp3, mock(ShareFetchResponseData.PartitionData.class), + tp4, mock(ShareFetchResponseData.PartitionData.class))); - // Since all partitions are acquirable, maxbytes per partition = requestMaxBytes(i.e. 1024*1020) / acquiredTopicPartitions(i.e. 5) - int expectedPartitionMaxBytes = 1024 * 1020 / 5; - LinkedHashMap expectedReadPartitionInfo = new LinkedHashMap<>(); - sharePartitions.keySet().forEach(topicIdPartition -> expectedReadPartitionInfo.put(topicIdPartition, - new FetchRequest.PartitionData( - topicIdPartition.topicId(), - 0, - 0, - expectedPartitionMaxBytes, - Optional.empty() - ))); + assertTrue(delayedShareFetch.tryComplete()); + assertTrue(delayedShareFetch.isCompleted()); - Mockito.verify(replicaManager, times(1)).readFromLog( - shareFetch.fetchParams(), - CollectionConverters.asScala( - sharePartitions.keySet().stream().map(topicIdPartition -> - new Tuple2<>(topicIdPartition, expectedReadPartitionInfo.get(topicIdPartition))).collect(Collectors.toList()) - ), - QuotaFactory.UNBOUNDED_QUOTA, - true); + // Since all partitions are acquirable, maxbytes per partition = requestMaxBytes(i.e. 1024*1020) / acquiredTopicPartitions(i.e. 5) + int expectedPartitionMaxBytes = 1024 * 1020 / 5; + LinkedHashMap expectedReadPartitionInfo = new LinkedHashMap<>(); + sharePartitions.keySet().forEach(topicIdPartition -> expectedReadPartitionInfo.put(topicIdPartition, + new FetchRequest.PartitionData( + topicIdPartition.topicId(), + 0, + 0, + expectedPartitionMaxBytes, + Optional.empty() + ))); + + Mockito.verify(exceptionHandler, never()).accept(any(), any()); + Mockito.verify(replicaManager, times(1)).readFromLog( + shareFetch.fetchParams(), + CollectionConverters.asScala( + sharePartitions.keySet().stream().map(topicIdPartition -> + new Tuple2<>(topicIdPartition, expectedReadPartitionInfo.get(topicIdPartition))).collect(Collectors.toList()) + ), + QuotaFactory.UNBOUNDED_QUOTA, + true); + } } @Test @@ -1132,11 +1143,6 @@ public void testPartitionMaxBytesFromUniformStrategyWhenFewPartitionsAreAcquirab new CompletableFuture<>(), List.of(tp0, tp1, tp2, tp3, tp4), BATCH_SIZE, MAX_FETCH_RECORDS, BROKER_TOPIC_STATS); - when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class), any())).thenReturn( - createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); - when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class), any())).thenReturn( - createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); - // Only 2 out of 5 partitions are acquirable. Set acquirableTopicPartitions = new LinkedHashSet<>(); acquirableTopicPartitions.add(tp0); @@ -1150,12 +1156,14 @@ public void testPartitionMaxBytesFromUniformStrategyWhenFewPartitionsAreAcquirab mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp1, 1); Uuid fetchId = Uuid.randomUuid(); + BiConsumer exceptionHandler = mockExceptionHandler(); DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() .withShareFetchData(shareFetch) .withSharePartitions(sharePartitions) .withReplicaManager(replicaManager) .withPartitionMaxBytesStrategy(PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM)) .withFetchId(fetchId) + .withExceptionHandler(exceptionHandler) .build()); when(sp0.maybeAcquireFetchLock(fetchId)).thenReturn(true); @@ -1164,29 +1172,37 @@ public void testPartitionMaxBytesFromUniformStrategyWhenFewPartitionsAreAcquirab when(sp3.maybeAcquireFetchLock(fetchId)).thenReturn(true); when(sp4.maybeAcquireFetchLock(fetchId)).thenReturn(false); - assertTrue(delayedShareFetch.tryComplete()); - assertTrue(delayedShareFetch.isCompleted()); + try (MockedStatic mockedShareFetchUtils = Mockito.mockStatic(ShareFetchUtils.class, Mockito.CALLS_REAL_METHODS)) { + mockedShareFetchUtils.when(() -> ShareFetchUtils.processFetchResponse(any(), any(), any(), any(), any())) + .thenReturn(Map.of( + tp0, mock(ShareFetchResponseData.PartitionData.class), + tp1, mock(ShareFetchResponseData.PartitionData.class))); - // Since only 2 partitions are acquirable, maxbytes per partition = requestMaxBytes(i.e. 1024*1024) / acquiredTopicPartitions(i.e. 2) - int expectedPartitionMaxBytes = 1024 * 1024 / 2; - LinkedHashMap expectedReadPartitionInfo = new LinkedHashMap<>(); - acquirableTopicPartitions.forEach(topicIdPartition -> expectedReadPartitionInfo.put(topicIdPartition, - new FetchRequest.PartitionData( - topicIdPartition.topicId(), - 0, - 0, - expectedPartitionMaxBytes, - Optional.empty() - ))); + assertTrue(delayedShareFetch.tryComplete()); + assertTrue(delayedShareFetch.isCompleted()); - Mockito.verify(replicaManager, times(1)).readFromLog( - shareFetch.fetchParams(), - CollectionConverters.asScala( - acquirableTopicPartitions.stream().map(topicIdPartition -> - new Tuple2<>(topicIdPartition, expectedReadPartitionInfo.get(topicIdPartition))).collect(Collectors.toList()) - ), - QuotaFactory.UNBOUNDED_QUOTA, - true); + // Since only 2 partitions are acquirable, maxbytes per partition = requestMaxBytes(i.e. 1024*1024) / acquiredTopicPartitions(i.e. 2) + int expectedPartitionMaxBytes = 1024 * 1024 / 2; + LinkedHashMap expectedReadPartitionInfo = new LinkedHashMap<>(); + acquirableTopicPartitions.forEach(topicIdPartition -> expectedReadPartitionInfo.put(topicIdPartition, + new FetchRequest.PartitionData( + topicIdPartition.topicId(), + 0, + 0, + expectedPartitionMaxBytes, + Optional.empty() + ))); + + Mockito.verify(replicaManager, times(1)).readFromLog( + shareFetch.fetchParams(), + CollectionConverters.asScala( + acquirableTopicPartitions.stream().map(topicIdPartition -> + new Tuple2<>(topicIdPartition, expectedReadPartitionInfo.get(topicIdPartition))).collect(Collectors.toList()) + ), + QuotaFactory.UNBOUNDED_QUOTA, + true); + Mockito.verify(exceptionHandler, never()).accept(any(), any()); + } } @Test @@ -1665,34 +1681,39 @@ public void testRemoteStorageFetchRequestCompletionOnFutureCompletionFailure() { when(replicaManager.getPartitionOrException(tp0.topicPartition())).thenReturn(p0); when(replicaManager.getPartitionOrException(tp1.topicPartition())).thenReturn(p1); + BiConsumer exceptionHandler = mockExceptionHandler(); DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() .withShareFetchData(shareFetch) .withSharePartitions(sharePartitions) .withReplicaManager(replicaManager) .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, tp1))) .withFetchId(fetchId) + .withExceptionHandler(exceptionHandler) .build()); // sp0 is acquirable, sp1 is not acquirable. when(sp0.maybeAcquireFetchLock(fetchId)).thenReturn(true); when(sp1.maybeAcquireFetchLock(fetchId)).thenReturn(false); - when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( - createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + try (MockedStatic mockedShareFetchUtils = Mockito.mockStatic(ShareFetchUtils.class, Mockito.CALLS_REAL_METHODS)) { + mockedShareFetchUtils.when(() -> ShareFetchUtils.processFetchResponse(any(), any(), any(), any(), any())) + .thenReturn(Map.of(tp0, new ShareFetchResponseData.PartitionData().setErrorCode(Errors.REQUEST_TIMED_OUT.code()))); - assertFalse(delayedShareFetch.isCompleted()); - assertTrue(delayedShareFetch.tryComplete()); + assertFalse(delayedShareFetch.isCompleted()); + assertTrue(delayedShareFetch.tryComplete()); - assertTrue(delayedShareFetch.isCompleted()); - // Pending remote fetch object gets created for delayed share fetch. - assertNotNull(delayedShareFetch.pendingRemoteFetches()); - // Verify the locks are released for tp0. - Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0)); - assertTrue(shareFetch.isCompleted()); - assertEquals(Set.of(tp0), future.join().keySet()); - assertEquals(Errors.REQUEST_TIMED_OUT.code(), future.join().get(tp0).errorCode()); - assertTrue(delayedShareFetch.lock().tryLock()); - delayedShareFetch.lock().unlock(); + assertTrue(delayedShareFetch.isCompleted()); + // Pending remote fetch object gets created for delayed share fetch. + assertNotNull(delayedShareFetch.pendingRemoteFetches()); + // Verify the locks are released for tp0. + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0)); + Mockito.verify(exceptionHandler, never()).accept(any(), any()); + assertTrue(shareFetch.isCompleted()); + assertEquals(Set.of(tp0), future.join().keySet()); + assertEquals(Errors.REQUEST_TIMED_OUT.code(), future.join().get(tp0).errorCode()); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); + } } @Test @@ -1702,7 +1723,6 @@ public void testRemoteStorageFetchRequestCompletionOnFutureCompletionSuccessfull SharePartition sp0 = mock(SharePartition.class); - when(sp0.canAcquireRecords()).thenReturn(true); LinkedHashMap sharePartitions = new LinkedHashMap<>(); @@ -2151,12 +2171,10 @@ static void mockTopicIdPartitionToReturnDataEqualToMinBytes(ReplicaManager repli when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition())).thenReturn(partition); } - private void mockTopicIdPartitionFetchBytes(ReplicaManager replicaManager, TopicIdPartition topicIdPartition, LogOffsetMetadata hwmOffsetMetadata) { + private void mockTopicIdPartitionFetchBytes(LogOffsetMetadata hwmOffsetMetadata, Partition partition) { LogOffsetSnapshot endOffsetSnapshot = new LogOffsetSnapshot(1, mock(LogOffsetMetadata.class), hwmOffsetMetadata, mock(LogOffsetMetadata.class)); - Partition partition = mock(Partition.class); when(partition.fetchOffsetSnapshot(any(), anyBoolean())).thenReturn(endOffsetSnapshot); - when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition())).thenReturn(partition); } private PartitionMaxBytesStrategy mockPartitionMaxBytes(Set partitions) {