Skip to content

Commit 58035ea

Browse files
authored
[FLINK-38423][table-api] Add VECTOR_SEARCH connector API (#27037)
1 parent 78f6e77 commit 58035ea

File tree

5 files changed

+327
-0
lines changed

5 files changed

+327
-0
lines changed
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.connector.source;
20+
21+
import org.apache.flink.annotation.PublicEvolving;
22+
import org.apache.flink.configuration.ReadableConfig;
23+
import org.apache.flink.table.connector.source.search.AsyncVectorSearchFunctionProvider;
24+
import org.apache.flink.table.connector.source.search.VectorSearchFunctionProvider;
25+
import org.apache.flink.types.RowKind;
26+
27+
import java.io.Serializable;
28+
29+
/**
30+
* A {@link DynamicTableSource} that searches rows of an external storage system by one or more
31+
* vectors during runtime.
32+
*
33+
* <p>Compared to {@link ScanTableSource}, the source does not have to read the entire table and can
34+
* lazily fetch individual values from a (possibly continuously changing) external table when
35+
* necessary.
36+
*
37+
* <p>Note: Compared to {@link ScanTableSource}, a {@link VectorSearchTableSource} only supports
38+
* emitting insert-only changes (see also {@link RowKind}).
39+
*
40+
* <p>In the last step, the planner will call {@link #getSearchRuntimeProvider(VectorSearchContext)}
41+
* to obtain a provider of runtime implementation. The search fields that are required to perform a
42+
* search are derived from a query by the planner and will be provided in the given {@link
43+
* VectorSearchTableSource.VectorSearchContext#getSearchColumns()}. The values for those search
44+
* fields are passed at runtime.
45+
*/
46+
@PublicEvolving
47+
public interface VectorSearchTableSource extends DynamicTableSource {
48+
49+
/**
50+
* Returns a {@code VectorSearchRuntimeProvider}. VectorSearchRuntimeProvider is a base
51+
* interface that should be extended (is this true) by child interfaces for specialized vector
52+
* searches.
53+
*
54+
* <p>There exist different interfaces for runtime implementation which is why {@link
55+
* VectorSearchRuntimeProvider} serves as the base interface.
56+
*
57+
* <p>Independent of the provider interface, a source implementation can work on either
58+
* arbitrary objects or internal data structures (see {@link org.apache.flink.table.data} for
59+
* more information).
60+
*
61+
* <p>The given {@link VectorSearchContext} offers utilities for the planner to create runtime
62+
* implementation with minimal dependencies to internal data structures.
63+
*
64+
* @see VectorSearchFunctionProvider
65+
* @see AsyncVectorSearchFunctionProvider
66+
*/
67+
VectorSearchRuntimeProvider getSearchRuntimeProvider(VectorSearchContext context);
68+
69+
// --------------------------------------------------------------------------------------------
70+
// Helper interfaces
71+
// --------------------------------------------------------------------------------------------
72+
73+
/**
74+
* Context for creating runtime implementation via a {@link VectorSearchRuntimeProvider}.
75+
*
76+
* <p>It offers utilities for the planner to create runtime implementation with minimal
77+
* dependencies to internal data structures.
78+
*
79+
* <p>Methods should be called in {@link #getSearchRuntimeProvider(VectorSearchContext)}.
80+
* Returned instances that are {@link Serializable} can be directly passed into the runtime
81+
* implementation class.
82+
*/
83+
@PublicEvolving
84+
interface VectorSearchContext extends DynamicTableSource.Context {
85+
86+
/**
87+
* Returns an array of key index paths that should be used during the search. The indices
88+
* are 0-based and support composite keys within (possibly nested) structures.
89+
*
90+
* <p>For example, given a table with data type {@code ROW < i INT, s STRING, r ROW < i2
91+
* INT, s2 STRING > >}, this method would return {@code [[0], [2, 1]]} when {@code i} and
92+
* {@code s2} are used for performing a lookup.
93+
*
94+
* @return array of key index paths
95+
*/
96+
int[][] getSearchColumns();
97+
98+
/**
99+
* Runtime config provided to the provider. The config can be used by the planner or vector
100+
* search provider at runtime. For example, async options can be used by planner to choose
101+
* async inference. Other config such as http timeout or retry can be used to configure
102+
* search functions.
103+
*/
104+
ReadableConfig runtimeConfig();
105+
}
106+
107+
/**
108+
* Provides actual runtime implementation for reading the data.
109+
*
110+
* <p>There exists different interfaces for runtime implementation which is why {@link
111+
* VectorSearchRuntimeProvider} serves as the base interface.
112+
*
113+
* @see VectorSearchFunctionProvider
114+
* @see AsyncVectorSearchFunctionProvider
115+
*/
116+
@PublicEvolving
117+
interface VectorSearchRuntimeProvider {}
118+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.connector.source.search;
20+
21+
import org.apache.flink.annotation.PublicEvolving;
22+
import org.apache.flink.table.connector.source.VectorSearchTableSource;
23+
import org.apache.flink.table.functions.AsyncVectorSearchFunction;
24+
25+
/** A provider for creating {@link AsyncVectorSearchFunction}. */
26+
@PublicEvolving
27+
public interface AsyncVectorSearchFunctionProvider
28+
extends VectorSearchTableSource.VectorSearchRuntimeProvider {
29+
30+
/** Helper function for creating a static provider. */
31+
static AsyncVectorSearchFunctionProvider of(
32+
AsyncVectorSearchFunction asyncVectorSearchFunction) {
33+
return () -> asyncVectorSearchFunction;
34+
}
35+
36+
/** Creates an {@link AsyncVectorSearchFunction} instance. */
37+
AsyncVectorSearchFunction createAsyncVectorSearchFunction();
38+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.connector.source.search;
20+
21+
import org.apache.flink.annotation.PublicEvolving;
22+
import org.apache.flink.table.connector.source.VectorSearchTableSource;
23+
import org.apache.flink.table.functions.VectorSearchFunction;
24+
25+
/** A provider for creating {@link VectorSearchFunction}. */
26+
@PublicEvolving
27+
public interface VectorSearchFunctionProvider
28+
extends VectorSearchTableSource.VectorSearchRuntimeProvider {
29+
30+
/** Helper function for creating a static provider. */
31+
static VectorSearchFunctionProvider of(VectorSearchFunction searchFunction) {
32+
return () -> searchFunction;
33+
}
34+
35+
/** Creates an {@link VectorSearchFunction} instance. */
36+
VectorSearchFunction createVectorSearchFunction();
37+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.functions;
20+
21+
import org.apache.flink.annotation.PublicEvolving;
22+
import org.apache.flink.table.api.TableException;
23+
import org.apache.flink.table.data.GenericRowData;
24+
import org.apache.flink.table.data.RowData;
25+
26+
import java.util.Collection;
27+
import java.util.concurrent.CompletableFuture;
28+
29+
/**
30+
* A wrapper class of {@link AsyncTableFunction} for asynchronous vector search.
31+
*
32+
* <p>The output type of this table function is fixed as {@link RowData}.
33+
*/
34+
@PublicEvolving
35+
public abstract class AsyncVectorSearchFunction extends AsyncTableFunction<RowData> {
36+
37+
/**
38+
* Asynchronously search result based on input row to find topK matched rows.
39+
*
40+
* @param topK - The number of topK matched rows to return.
41+
* @param queryData - A {@link RowData} that wraps input for search function.
42+
* @return A collection of all searched results.
43+
*/
44+
public abstract CompletableFuture<Collection<RowData>> asyncVectorSearch(
45+
int topK, RowData queryData);
46+
47+
/** Invokes {@link #asyncVectorSearch} and chains futures. */
48+
public void eval(CompletableFuture<Collection<RowData>> future, Object... args) {
49+
int topK = (int) args[0];
50+
GenericRowData argsData = new GenericRowData(args.length - 1);
51+
for (int i = 1; i < args.length; ++i) {
52+
argsData.setField(i, args[i]);
53+
}
54+
asyncVectorSearch(topK, argsData)
55+
.whenComplete(
56+
(result, exception) -> {
57+
if (exception != null) {
58+
future.completeExceptionally(
59+
new TableException(
60+
String.format(
61+
"Failed to execute asynchronously search with input row %s.",
62+
argsData),
63+
exception));
64+
return;
65+
}
66+
future.complete(result);
67+
});
68+
}
69+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.functions;
20+
21+
import org.apache.flink.annotation.PublicEvolving;
22+
import org.apache.flink.table.data.GenericRowData;
23+
import org.apache.flink.table.data.RowData;
24+
import org.apache.flink.util.FlinkRuntimeException;
25+
26+
import java.io.IOException;
27+
import java.util.Collection;
28+
29+
/**
30+
* A wrapper class of {@link TableFunction} for synchronous vector search.
31+
*
32+
* <p>The output type of this table function is fixed as {@link RowData}.
33+
*/
34+
@PublicEvolving
35+
public abstract class VectorSearchFunction extends TableFunction<RowData> {
36+
37+
/**
38+
* Synchronously search result based on input row to find topK matched rows.
39+
*
40+
* @param topK - The number of topK results to return.
41+
* @param queryData - A {@link RowData} that wraps input for vector search function.
42+
* @return A collection of predicted results.
43+
*/
44+
public abstract Collection<RowData> vectorSearch(int topK, RowData queryData)
45+
throws IOException;
46+
47+
/** Invoke {@link #vectorSearch} and handle exceptions. */
48+
public final void eval(Object... args) {
49+
int topK = (int) args[0];
50+
GenericRowData argsData = new GenericRowData(args.length - 1);
51+
for (int i = 1; i < args.length; ++i) {
52+
argsData.setField(i, args[i]);
53+
}
54+
try {
55+
Collection<RowData> results = vectorSearch(topK, argsData);
56+
if (results == null) {
57+
return;
58+
}
59+
results.forEach(this::collect);
60+
} catch (Exception e) {
61+
throw new FlinkRuntimeException(
62+
String.format("Failed to execute search with input row %s.", argsData), e);
63+
}
64+
}
65+
}

0 commit comments

Comments
 (0)