diff --git a/docs/en/connector-v2/sink/Milvus.md b/docs/en/connector-v2/sink/Milvus.md index 0f1232ed0c0..59f8920950e 100644 --- a/docs/en/connector-v2/sink/Milvus.md +++ b/docs/en/connector-v2/sink/Milvus.md @@ -39,20 +39,20 @@ This Milvus sink connector write data to Milvus or Zilliz Cloud, it has the foll ## Sink Options -| Name | Type | Required | Default | Description | -|----------------------|---------|----------|------------------------------|------------------------------------------------------------------------------------| -| url | String | Yes | - | The URL to connect to Milvus or Zilliz Cloud. | -| token | String | Yes | - | User:password | -| database | String | No | - | Write data to which database, default is source database. | -| schema_save_mode | enum | No | CREATE_SCHEMA_WHEN_NOT_EXIST | Auto create table when table not exist. | -| enable_auto_id | boolean | No | false | Primary key column enable autoId. | -| enable_upsert | boolean | No | false | Upsert data not insert. | -| enable_dynamic_field | boolean | No | true | Enable create table with dynamic field. | -| batch_size | int | No | 1000 | Write batch size. | -| partition_key | String | No | | Milvus partition key field | -| create_index | boolean | No | false | Automatically create vector indexes for collection to improve query performance. | -| load_collection | boolean | No | false | Load collection into Milvus memory for immediate query availability. | -| collection_description | Map | No | {} | Collection descriptions map where key is collection name and value is description. | +| Name | Type | Required | Default | Description | +|------------------------|---------------------|----------|------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------| +| url | String | Yes | - | The URL to connect to Milvus or Zilliz Cloud. | +| token | String | Yes | - | User:password | +| database | String | No | - | Write data to which database, default is source database. | +| schema_save_mode | enum | No | CREATE_SCHEMA_WHEN_NOT_EXIST | Auto create table when table not exist. | +| enable_auto_id | boolean | No | false | Primary key column enable autoId. | +| enable_upsert | boolean | No | false | Upsert data not insert. | +| enable_dynamic_field | boolean | No | true | Enable create table with dynamic field. | +| batch_size | int | No | 1000 | Write batch size. When the number of buffered records reaches `batch_size` or the time reaches `checkpoint.interval`, it will trigger a write flush | +| partition_key | String | No | | Milvus partition key field | +| create_index | boolean | No | false | Automatically create vector indexes for collection to improve query performance. | +| load_collection | boolean | No | false | Load collection into Milvus memory for immediate query availability. | +| collection_description | Map | No | {} | Collection descriptions map where key is collection name and value is description. | ## Task Example diff --git a/docs/zh/connector-v2/sink/Milvus.md b/docs/zh/connector-v2/sink/Milvus.md index 37c1d467eed..b992c907fda 100644 --- a/docs/zh/connector-v2/sink/Milvus.md +++ b/docs/zh/connector-v2/sink/Milvus.md @@ -19,7 +19,7 @@ Milvus sink连接器将数据写入Milvus或Zilliz Cloud,它具有以下功能 ##数据类型映射 -| Milvus数据类型 | SeaTunnel 数据类型 | +| Milvus数据类型 | SeaTunnel 数据类型 | |---------------------|---------------------| | INT8 | TINYINT | | INT16 | SMALLINT | @@ -39,20 +39,24 @@ Milvus sink连接器将数据写入Milvus或Zilliz Cloud,它具有以下功能 ## Sink 选项 -| 名字 | 类型 | 是否必传 | 默认值 | 描述 | -|----------------------|---------|----------|------------------------------|-----------------------------------------------------------| -| url | String | 是 | - | 连接到Milvus或Zilliz Cloud的URL。 | -| token | String | 是 | - | 用户:密码 | -| database | String | 否 | - | 将数据写入哪个数据库,默认为源数据库。 | -| schema_save_mode | enum | 否 | CREATE_SCHEMA_WHEN_NOT_EXIST | 当表不存在时自动创建表。 | -| enable_auto_id | boolean | 否 | false | 主键列启用autoId。 | -| enable_upsert | boolean | 否 | false | 是否启用upsert。 | -| enable_dynamic_field | boolean | 否 | true | 是否启用带动态字段的创建表。 | -| batch_size | int | 否 | 1000 | 写入批大小。 | -| partition_key | String | 否 | | Milvus分区键字段 | +| 名字 | 类型 | 是否必传 | 默认值 | 描述 | +|------------------------|---------------------|------|------------------------------|---------------------------------------------------------------------| +| url | String | 是 | - | 连接到Milvus或Zilliz Cloud的URL。 | +| token | String | 是 | - | 用户:密码 | +| database | String | 否 | - | 将数据写入哪个数据库,默认为源数据库。 | +| schema_save_mode | enum | 否 | CREATE_SCHEMA_WHEN_NOT_EXIST | 当表不存在时自动创建表。 | +| enable_auto_id | boolean | 否 | false | 主键列启用autoId。 | +| enable_upsert | boolean | 否 | false | 是否启用upsert。 | +| enable_dynamic_field | boolean | 否 | true | 是否启用带动态字段的创建表。 | +| batch_size | int | 否 | 1000 | 写入批大小。当缓冲记录数达到 `batch_size` 或时间达到 `checkpoint.interval` 时,将触发一次写入刷新 | +| partition_key | String | 否 | | Milvus分区键字段 | +| create_index | boolean | No | false | 自动为集合创建向量索引以提高查询性能 | +| load_collection | boolean | No | false | 将集合加载到 Milvus 内存中以便立即进行查询 | +| collection_description | Map | No | {} | 集合描述映射,其中键是集合名称,值是描述 | ## 任务示例 +### 基础配置 ```bash sink { Milvus { @@ -63,6 +67,23 @@ sink { } ``` +### 带 Index 和 Loading 的高级配置 +```bash +sink { + Milvus { + url = "http://127.0.0.1:19530" + token = "username:password" + batch_size = 1000 + create_index = true + load_collection = true + collection_description = { + "user_vectors" = "User embedding vectors for recommendation" + "product_vectors" = "Product feature vectors for search" + } + } +} +``` + ## 变更日志 \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkOptions.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkOptions.java index 1c7d00f3a98..4113be01b8d 100644 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkOptions.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkOptions.java @@ -82,6 +82,7 @@ public class MilvusSinkOptions extends MilvusBaseOptions { .intType() .defaultValue(1000) .withDescription("writer batch size"); + public static final Option RATE_LIMIT = Options.key("rate_limit") .intType() diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java index 98b2b46c3b4..55402896ace 100644 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java @@ -61,13 +61,7 @@ public MilvusSinkWriter( public void write(SeaTunnelRow element) { batchWriter.addToBatch(element); if (batchWriter.needFlush()) { - try { - // Flush the batch writer - batchWriter.flush(); - } catch (Exception e) { - log.error("flush Milvus sink writer failed", e); - throw new MilvusConnectorException(MilvusConnectionErrorCode.WRITE_DATA_FAIL, e); - } + flush(); } } @@ -81,6 +75,7 @@ public void write(SeaTunnelRow element) { */ @Override public Optional prepareCommit() throws IOException { + flush(); return Optional.empty(); } @@ -110,4 +105,14 @@ public void close() throws IOException { throw new MilvusConnectorException(MilvusConnectionErrorCode.CLOSE_CLIENT_ERROR, e); } } + + private void flush() { + try { + // Flush the batch writer + batchWriter.flush(); + } catch (Exception e) { + log.error("flush Milvus sink writer failed", e); + throw new MilvusConnectorException(MilvusConnectionErrorCode.WRITE_DATA_FAIL, e); + } + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java index 3aed4f14550..1ad1fc9882c 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java @@ -58,6 +58,7 @@ import io.milvus.grpc.IndexDescription; import io.milvus.grpc.KeyValuePair; import io.milvus.grpc.MutationResult; +import io.milvus.grpc.QueryResults; import io.milvus.param.ConnectParam; import io.milvus.param.IndexType; import io.milvus.param.MetricType; @@ -69,6 +70,7 @@ import io.milvus.param.collection.HasCollectionParam; import io.milvus.param.collection.LoadCollectionParam; import io.milvus.param.dml.InsertParam; +import io.milvus.param.dml.QueryParam; import io.milvus.param.index.CreateIndexParam; import io.milvus.param.index.DescribeIndexParam; import lombok.extern.slf4j.Slf4j; @@ -82,6 +84,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -715,4 +719,114 @@ private void verifyIndexesExist(String database, String collection) { log.info("Index verification passed for collection: {}.{}", database, collection); } + + @TestTemplate + public void testStreamingFakeToMilvus(TestContainer container) + throws IOException, InterruptedException { + // flush by checkpoint interval + String jobId = "1"; + String database = "streaming_test"; + String collection = "streaming_simple_example"; + String vectorField = "book_intro"; + int checkpointInterval = 30000; + CompletableFuture.runAsync( + () -> { + try { + container.executeJob( + "/streaming-fake-to-milvus.conf", + jobId, + "database=" + database, + "collection=" + collection, + "batch_size=3"); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + }); + + // count write records + waitCollectionReady(database, collection, vectorField); + Awaitility.await() + .atMost(60, TimeUnit.SECONDS) + .pollInterval(2, TimeUnit.SECONDS) + .until(() -> countCollectionEntities(database, collection) >= 9); + Assertions.assertEquals(9, countCollectionEntities(database, collection)); + TimeUnit.MILLISECONDS.sleep(checkpointInterval); + Assertions.assertEquals(10, countCollectionEntities(database, collection)); + + // cancel jobs + container.cancelJob(jobId); + } + + private void waitCollectionReady( + String databaseName, String collectionName, String vectorFieldName) { + // assert table exist + Awaitility.await() + .atMost(60, TimeUnit.SECONDS) + .pollInterval(2, TimeUnit.SECONDS) + .until( + () -> { + R hasCollectionResponse = + this.milvusClient.hasCollection( + HasCollectionParam.newBuilder() + .withDatabaseName(databaseName) + .withCollectionName(collectionName) + .build()); + Assertions.assertEquals( + R.Status.Success.getCode(), + hasCollectionResponse.getStatus(), + Optional.ofNullable(hasCollectionResponse.getException()) + .map(Exception::getMessage) + .orElse("")); + return hasCollectionResponse.getData(); + }); + + // create index + R createIndexResponse = + milvusClient.createIndex( + CreateIndexParam.newBuilder() + .withDatabaseName(databaseName) + .withCollectionName(collectionName) + .withFieldName(vectorFieldName) + .withIndexType(IndexType.FLAT) + .withMetricType(MetricType.L2) + .build()); + Assertions.assertEquals( + R.Status.Success.getCode(), + createIndexResponse.getStatus(), + Optional.ofNullable(createIndexResponse.getException()) + .map(Exception::getMessage) + .orElse("")); + + // load collection + R loadCollectionResponse = + milvusClient.loadCollection( + LoadCollectionParam.newBuilder() + .withDatabaseName(databaseName) + .withCollectionName(collectionName) + .build()); + Assertions.assertEquals( + R.Status.Success.getCode(), + loadCollectionResponse.getStatus(), + Optional.ofNullable(loadCollectionResponse.getException()) + .map(Exception::getMessage) + .orElse("")); + } + + private long countCollectionEntities(String databaseName, String collectionName) { + R queryResults = + milvusClient.query( + QueryParam.newBuilder() + .withDatabaseName(databaseName) + .withCollectionName(collectionName) + .withOutFields(Collections.singletonList("count(*)")) + .build()); + Assertions.assertEquals(R.Status.Success.getCode(), queryResults.getStatus()); + return queryResults + .getData() + .getFieldsData(0) + .getScalars() + .getLongData() + .getDataList() + .get(0); + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/resources/streaming-fake-to-milvus.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/resources/streaming-fake-to-milvus.conf new file mode 100644 index 00000000000..4f7a8b9ab2a --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/resources/streaming-fake-to-milvus.conf @@ -0,0 +1,67 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 30000 +} + +source { + FakeSource { + row.num = 10 + vector.dimension= 4 + schema = { + table = ${collection} + columns = [ + { + name = book_id + type = bigint + nullable = false + defaultValue = 0 + comment = "primary key id" + }, + { + name = book_intro + type = float_vector + columnScale =4 + comment = "vector" + }, + { + name = book_title + type = string + nullable = true + comment = "topic" + } + ] + primaryKey { + name = book_id + columnNames = [book_id] + } + } + } +} + +sink { + Milvus { + url = "http://milvus-e2e:19530" + token = "root:Milvus" + database = ${database} + enable_upsert = false + batch_size = ${batch_size} + } +} \ No newline at end of file