|
85 | 85 | import java.util.List; |
86 | 86 | import java.util.Map; |
87 | 87 | import java.util.Optional; |
| 88 | +import java.util.concurrent.CompletableFuture; |
88 | 89 | import java.util.concurrent.TimeUnit; |
89 | 90 | import java.util.stream.Collectors; |
90 | 91 | import java.util.stream.Stream; |
@@ -728,56 +729,56 @@ public void testStreamingFakeToMilvus(TestContainer container) |
728 | 729 | String collection = "streaming_simple_example"; |
729 | 730 | String vectorField = "book_intro"; |
730 | 731 | int checkpointInterval = 30000; |
731 | | - new Thread( |
732 | | - () -> { |
733 | | - try { |
734 | | - container.executeJob( |
735 | | - "/streaming-fake-to-milvus.conf", |
736 | | - jobId, |
737 | | - "database=" + database, |
738 | | - "collection=" + collection, |
739 | | - "batch_size=3"); |
740 | | - } catch (IOException | InterruptedException e) { |
741 | | - throw new RuntimeException(e); |
742 | | - } |
743 | | - }) |
744 | | - .start(); |
| 732 | + CompletableFuture.runAsync( |
| 733 | + () -> { |
| 734 | + try { |
| 735 | + container.executeJob( |
| 736 | + "/streaming-fake-to-milvus.conf", |
| 737 | + jobId, |
| 738 | + "database=" + database, |
| 739 | + "collection=" + collection, |
| 740 | + "batch_size=3"); |
| 741 | + } catch (IOException | InterruptedException e) { |
| 742 | + throw new RuntimeException(e); |
| 743 | + } |
| 744 | + }); |
745 | 745 |
|
746 | 746 | // count write records |
747 | | - long count; |
748 | 747 | waitCollectionReady(database, collection, vectorField); |
749 | | - do { |
750 | | - count = countCollectionEntities(database, collection); |
751 | | - } while (count < 9); |
752 | | - Assertions.assertEquals(9, count); |
| 748 | + Awaitility.await() |
| 749 | + .atMost(60, TimeUnit.SECONDS) |
| 750 | + .pollInterval(2, TimeUnit.SECONDS) |
| 751 | + .until(() -> countCollectionEntities(database, collection) >= 9); |
| 752 | + Assertions.assertEquals(9, countCollectionEntities(database, collection)); |
753 | 753 | TimeUnit.MILLISECONDS.sleep(checkpointInterval); |
754 | | - count = countCollectionEntities(database, collection); |
755 | | - Assertions.assertEquals(10, count); |
| 754 | + Assertions.assertEquals(10, countCollectionEntities(database, collection)); |
756 | 755 |
|
757 | 756 | // cancel jobs |
758 | 757 | container.cancelJob(jobId); |
759 | 758 | } |
760 | 759 |
|
761 | 760 | private void waitCollectionReady( |
762 | | - String databaseName, String collectionName, String vectorFieldName) |
763 | | - throws InterruptedException { |
| 761 | + String databaseName, String collectionName, String vectorFieldName) { |
764 | 762 | // assert table exist |
765 | | - R<Boolean> hasCollectionResponse; |
766 | | - do { |
767 | | - TimeUnit.SECONDS.sleep(1); |
768 | | - hasCollectionResponse = |
769 | | - this.milvusClient.hasCollection( |
770 | | - HasCollectionParam.newBuilder() |
771 | | - .withDatabaseName(databaseName) |
772 | | - .withCollectionName(collectionName) |
773 | | - .build()); |
774 | | - Assertions.assertEquals( |
775 | | - R.Status.Success.getCode(), |
776 | | - hasCollectionResponse.getStatus(), |
777 | | - Optional.ofNullable(hasCollectionResponse.getException()) |
778 | | - .map(Exception::getMessage) |
779 | | - .orElse("")); |
780 | | - } while (!hasCollectionResponse.getData()); |
| 763 | + Awaitility.await() |
| 764 | + .atMost(60, TimeUnit.SECONDS) |
| 765 | + .pollInterval(2, TimeUnit.SECONDS) |
| 766 | + .until( |
| 767 | + () -> { |
| 768 | + R<Boolean> hasCollectionResponse = |
| 769 | + this.milvusClient.hasCollection( |
| 770 | + HasCollectionParam.newBuilder() |
| 771 | + .withDatabaseName(databaseName) |
| 772 | + .withCollectionName(collectionName) |
| 773 | + .build()); |
| 774 | + Assertions.assertEquals( |
| 775 | + R.Status.Success.getCode(), |
| 776 | + hasCollectionResponse.getStatus(), |
| 777 | + Optional.ofNullable(hasCollectionResponse.getException()) |
| 778 | + .map(Exception::getMessage) |
| 779 | + .orElse("")); |
| 780 | + return hasCollectionResponse.getData(); |
| 781 | + }); |
781 | 782 |
|
782 | 783 | // create index |
783 | 784 | R<RpcStatus> createIndexResponse = |
|
0 commit comments