Skip to content

Commit 6fd1077

Browse files
author
smallrain.xuxy
committed
add dashvector datastore
1 parent b02139d commit 6fd1077

File tree

7 files changed

+516
-19
lines changed

7 files changed

+516
-19
lines changed

README.md

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ This README provides detailed information on how to set up, develop, and deploy
4848
- [Supabase](#supabase)
4949
- [Postgres](#postgres)
5050
- [AnalyticDB](#analyticdb)
51+
- [DashVector](#dashvector)
5152
- [Running the API Locally](#running-the-api-locally)
5253
- [Testing a Localhost Plugin in ChatGPT](#testing-a-localhost-plugin-in-chatgpt)
5354
- [Personalization](#personalization)
@@ -184,6 +185,10 @@ Follow these steps to quickly set up and run the ChatGPT Retrieval Plugin:
184185
export ELASTICSEARCH_INDEX=<elasticsearch_index_name>
185186
export ELASTICSEARCH_REPLICAS=<elasticsearch_replicas>
186187
export ELASTICSEARCH_SHARDS=<elasticsearch_shards>
188+
189+
# DashVector
190+
export DASHVECTOR_API_KEY=<your_dashvector_api_key>
191+
export DASHVECTOR_COLLECTION=<your_dashvector_collection>
187192
```
188193

189194
10. Run the API locally: `poetry run start`
@@ -295,11 +300,11 @@ poetry install
295300

296301
The API requires the following environment variables to work:
297302

298-
| Name | Required | Description |
299-
| ---------------- | -------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
300-
| `DATASTORE` | Yes | This specifies the vector database provider you want to use to store and query embeddings. You can choose from `elasticsearch`, `chroma`, `pinecone`, `weaviate`, `zilliz`, `milvus`, `qdrant`, `redis`, `azuresearch`, `supabase`, `postgres`, `analyticdb`. |
301-
| `BEARER_TOKEN` | Yes | This is a secret token that you need to authenticate your requests to the API. You can generate one using any tool or method you prefer, such as [jwt.io](https://jwt.io/). |
302-
| `OPENAI_API_KEY` | Yes | This is your OpenAI API key that you need to generate embeddings using the `text-embedding-ada-002` model. You can get an API key by creating an account on [OpenAI](https://openai.com/). |
303+
| Name | Required | Description |
304+
| ---------------- | -------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
305+
| `DATASTORE` | Yes | This specifies the vector database provider you want to use to store and query embeddings. You can choose from `elasticsearch`, `chroma`, `pinecone`, `weaviate`, `zilliz`, `milvus`, `qdrant`, `redis`, `azuresearch`, `supabase`, `postgres`, `analyticdb`, `dashvector`. |
306+
| `BEARER_TOKEN` | Yes | This is a secret token that you need to authenticate your requests to the API. You can generate one using any tool or method you prefer, such as [jwt.io](https://jwt.io/). |
307+
| `OPENAI_API_KEY` | Yes | This is your OpenAI API key that you need to generate embeddings using the `text-embedding-ada-002` model. You can get an API key by creating an account on [OpenAI](https://openai.com/). |
303308

304309
### Using the plugin with Azure OpenAI
305310

@@ -377,6 +382,10 @@ For detailed setup instructions, refer to [`/docs/providers/llama/setup.md`](/do
377382

378383
[Elasticsearch](https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html) currently supports storing vectors through the `dense_vector` field type and uses them to calculate document scores. Elasticsearch 8.0 builds on this functionality to support fast, approximate nearest neighbor search (ANN). This represents a much more scalable approach, allowing vector search to run efficiently on large datasets. For detailed setup instructions, refer to [`/docs/providers/elasticsearch/setup.md`](/docs/providers/elasticsearch/setup.md).
379384

385+
#### DashVector
386+
387+
[DashVector](https://help.aliyun.com/document_detail/2510225.html) is a fully-managed vectorDB service that supports high-dimension dense and sparse vectors , real-time insertion, and filtered search. It is built to scale automatically and can adapt to different application requirements. For detailed setup instructions, refer to [`/docs/providers/dashvector/setup.md`](/docs/providers/dashvector/setup.md).
388+
380389
### Running the API locally
381390

382391
To run the API locally, you first need to set the requisite environment variables with the `export` command:
@@ -559,7 +568,7 @@ feature/advanced-chunking-strategy-123
559568

560569
While the ChatGPT Retrieval Plugin is designed to provide a flexible solution for semantic search and retrieval, it does have some limitations:
561570

562-
- **Keyword search limitations**: The embeddings generated by the `text-embedding-ada-002` model may not always be effective at capturing exact keyword matches. As a result, the plugin might not return the most relevant results for queries that rely heavily on specific keywords. Some vector databases, like Elasticsearch, Pinecone, Weaviate and Azure Cognitive Search, use hybrid search and might perform better for keyword searches.
571+
- **Keyword search limitations**: The embeddings generated by the `text-embedding-ada-002` model may not always be effective at capturing exact keyword matches. As a result, the plugin might not return the most relevant results for queries that rely heavily on specific keywords. Some vector databases, like DashVector, Elasticsearch, Pinecone, Weaviate and Azure Cognitive Search, use hybrid search and might perform better for keyword searches.
563572
- **Sensitive data handling**: The plugin does not automatically detect or filter sensitive data. It is the responsibility of the developers to ensure that they have the necessary authorization to include content in the Retrieval Plugin and that the content complies with data privacy requirements.
564573
- **Scalability**: The performance of the plugin may vary depending on the chosen vector database provider and the size of the dataset. Some providers may offer better scalability and performance than others.
565574
- **Language support**: The plugin currently uses OpenAI's `text-embedding-ada-002` model, which is optimized for use in English. However, it is still robust enough to generate good results for a variety of languages.
@@ -613,3 +622,7 @@ We would like to extend our gratitude to the following contributors for their co
613622
- [mmmaia](https://github.com/mmmaia)
614623
- [Elasticsearch](https://www.elastic.co/)
615624
- [joemcelroy](https://github.com/joemcelroy)
625+
- [DashVector](https://help.aliyun.com/document_detail/2510225.html)
626+
- [yingdachen](http://github.com/yingdachen)
627+
- [yurnom](https://github.com/yurnom)
628+
- [xiaoyuxee](https://github.com/xiaoyuxee)

datastore/factory.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,12 @@ async def get_datastore() -> DataStore:
6666
)
6767

6868
return ElasticsearchDataStore()
69+
case "dashvector":
70+
from datastore.providers.dashvector_datastore import DashVectorDataStore
71+
72+
return DashVectorDataStore()
6973
case _:
7074
raise ValueError(
7175
f"Unsupported vector database: {datastore}. "
72-
f"Try one of the following: llama, elasticsearch, pinecone, weaviate, milvus, zilliz, redis, azuresearch, or qdrant"
76+
f"Try one of the following: llama, elasticsearch, pinecone, weaviate, milvus, zilliz, redis, azuresearch, qdrant, or dashvector"
7377
)
Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
import os
2+
from typing import Any, Dict, List, Optional
3+
4+
import dashvector
5+
from dashvector import Client, Doc
6+
7+
from tenacity import retry, wait_random_exponential, stop_after_attempt
8+
import asyncio
9+
from loguru import logger
10+
11+
from datastore.datastore import DataStore
12+
from models.models import (
13+
DocumentChunk,
14+
DocumentChunkMetadata,
15+
DocumentChunkWithScore,
16+
DocumentMetadataFilter,
17+
QueryResult,
18+
QueryWithEmbedding,
19+
Source,
20+
)
21+
from services.date import to_unix_timestamp
22+
23+
# Read environment variables for DashVector configuration
24+
DASHVECTOR_API_KEY = os.environ.get("DASHVECTOR_API_KEY")
25+
DASHVECTOR_COLLECTION = os.environ.get("DASHVECTOR_COLLECTION")
26+
assert DASHVECTOR_API_KEY is not None
27+
assert DASHVECTOR_COLLECTION is not None
28+
29+
# Set the batch size for vector upsert to DashVector
30+
UPSERT_BATCH_SIZE = 100
31+
32+
# Set the dimension for embedding
33+
VECTOR_DIMENSION = 1536
34+
35+
36+
class DashVectorDataStore(DataStore):
37+
def __init__(self):
38+
# Init dashvector client
39+
client = Client(api_key=DASHVECTOR_API_KEY)
40+
self._client = client
41+
42+
# Get the collection in DashVector
43+
collection = client.get(DASHVECTOR_COLLECTION)
44+
45+
# Check if the collection exists in DashVector
46+
if collection:
47+
logger.info(f"Connected existed collection {DASHVECTOR_COLLECTION}.")
48+
self._collection = collection
49+
else:
50+
self._create_collection()
51+
52+
@retry(wait=wait_random_exponential(min=1, max=20),
53+
stop=stop_after_attempt(3))
54+
async def _upsert(self, chunks: Dict[str, List[DocumentChunk]]) -> List[str]:
55+
"""
56+
Takes in a dict from document id to list of document chunks and inserts them into the collection.
57+
Return a list of document ids.
58+
"""
59+
# Initialize a list of ids to return
60+
doc_ids: List[str] = []
61+
# Initialize a list of vectors to upsert
62+
docs = []
63+
# Loop through the dict items
64+
for doc_id, chunk_list in chunks.items():
65+
# Append the id to the ids list
66+
doc_ids.append(doc_id)
67+
logger.info(f"Upserting document_id: {doc_id}")
68+
for chunk in chunk_list:
69+
fields = self._get_dashvector_fields(chunk.metadata)
70+
# Add the text to the fields
71+
fields["text"] = chunk.text
72+
docs.append(
73+
Doc(id=chunk.id, vector=chunk.embedding, fields=fields)
74+
)
75+
76+
# Split the vectors list into batches of the specified size
77+
batches = [
78+
docs[i: i + UPSERT_BATCH_SIZE]
79+
for i in range(0, len(docs), UPSERT_BATCH_SIZE)
80+
]
81+
82+
# Upsert each batch to DashVector
83+
for batch in batches:
84+
logger.info(f"Upserting batch of size {len(batch)}")
85+
resp = self._collection.upsert(docs=batch)
86+
if resp:
87+
logger.info("Upserted batch successfully")
88+
else:
89+
raise Exception(f"Failed to upsert batch, error: {resp}")
90+
91+
return doc_ids
92+
93+
@retry(wait=wait_random_exponential(min=1, max=20),
94+
stop=stop_after_attempt(3))
95+
async def _query(
96+
self,
97+
queries: List[QueryWithEmbedding],
98+
) -> List[QueryResult]:
99+
"""
100+
Takes in a list of queries with embeddings and filters and returns a list of query results with matching document chunks and scores.
101+
"""
102+
103+
# Define a helper coroutine that performs a single query and returns a QueryResult
104+
async def _single_query(query: QueryWithEmbedding) -> QueryResult:
105+
logger.debug(f"Query: {query.query}")
106+
107+
# Convert the metadata filter object to a dict with dashvector filter expressions
108+
dashvector_filter = self._get_dashvector_filter(query.filter)
109+
110+
resp = self._collection.query(vector=query.embedding,
111+
topk=query.top_k,
112+
filter=dashvector_filter)
113+
if not resp:
114+
raise Exception(f"Error querying in collection: {resp}")
115+
116+
query_results: List[DocumentChunkWithScore] = []
117+
for doc in resp:
118+
score = doc.score
119+
metadata = doc.fields
120+
text = metadata.pop("text")
121+
122+
# Create a document chunk with score object with the result data
123+
result = DocumentChunkWithScore(
124+
id=doc.id,
125+
score=score,
126+
text=text,
127+
metadata=metadata,
128+
)
129+
query_results.append(result)
130+
return QueryResult(query=query.query, results=query_results)
131+
132+
# Use asyncio.gather to run multiple _single_query coroutines concurrently and collect their results
133+
results: List[QueryResult] = await asyncio.gather(
134+
*[_single_query(query) for query in queries]
135+
)
136+
137+
return results
138+
139+
@retry(wait=wait_random_exponential(min=1, max=20),
140+
stop=stop_after_attempt(3))
141+
async def delete(
142+
self,
143+
ids: Optional[List[str]] = None,
144+
filter: Optional[DocumentMetadataFilter] = None,
145+
delete_all: Optional[bool] = None,
146+
) -> bool:
147+
"""
148+
Removes vectors by ids, filter, or everything from the collection.
149+
"""
150+
151+
# Delete all vectors from the collection if delete_all is True
152+
if delete_all:
153+
logger.info(f"Deleting all vectors from collection")
154+
resp = self._collection.delete(delete_all=True)
155+
if not resp:
156+
raise Exception(
157+
f"Error deleting all vectors, error: {resp.message}"
158+
)
159+
logger.info(f"Deleted all vectors successfully")
160+
return True
161+
162+
# Delete vectors by filter
163+
if filter:
164+
# Query the docs by filter
165+
resp = self._collection.query(topk=1024, filter=self._get_dashvector_filter(filter))
166+
if not resp:
167+
raise Exception(
168+
f"Error deleting vectors with filter, error: {resp.message}"
169+
)
170+
if ids is not None:
171+
ids += [doc.id for doc in resp]
172+
else :
173+
ids = [doc.id for doc in resp]
174+
175+
# Delete vectors that match the document ids from the collection if the ids list is not empty
176+
if ids is not None and len(ids) > 0:
177+
logger.info(f"Deleting vectors with ids {ids}")
178+
resp = self._collection.delete(ids)
179+
if not resp:
180+
raise Exception(
181+
f"Error deleting vectors with ids, error: {resp.message}"
182+
)
183+
logger.info(f"Deleted vectors with ids successfully")
184+
return True
185+
186+
def _get_dashvector_filter(
187+
self, filter: Optional[DocumentMetadataFilter] = None
188+
) -> Optional[str]:
189+
if filter is None:
190+
return None
191+
192+
dashvector_filter = []
193+
for field, value in filter.dict().items():
194+
if value is not None:
195+
if field == "start_date":
196+
dashvector_filter.append(f"created_at >= {to_unix_timestamp(value)}")
197+
elif field == "end_date":
198+
dashvector_filter.append(f"created_at <= {to_unix_timestamp(value)}")
199+
else:
200+
if isinstance(value, str):
201+
dashvector_filter.append(f"{field} = '{value}'")
202+
else:
203+
dashvector_filter.append(f"{field} = {value}")
204+
205+
return " and ".join(dashvector_filter)
206+
207+
def _get_dashvector_fields(
208+
self, metadata: Optional[DocumentChunkMetadata] = None
209+
) -> Dict[str, Any]:
210+
dashvector_fields = {}
211+
# For each field in the Metadata, check if it has a value and add it to the dashvector fields
212+
for field, value in metadata.dict().items():
213+
if value is not None:
214+
if field == "created_at":
215+
dashvector_fields[field] = to_unix_timestamp(value)
216+
elif field == "source":
217+
dashvector_fields[field] = value.name
218+
else:
219+
dashvector_fields[field] = value
220+
return dashvector_fields
221+
222+
def _delete_collection(self) -> None:
223+
resp = self._client.delete(DASHVECTOR_COLLECTION)
224+
if not resp:
225+
raise Exception(
226+
f"Error delete collection, error: {resp.message}"
227+
)
228+
229+
def _create_collection(self) -> None:
230+
"""
231+
Create dashvector collection for vector management.
232+
"""
233+
234+
# Get all fields in the metadata object in a list
235+
fields_schema = {
236+
field: str for field in DocumentChunkMetadata.__fields__.keys()
237+
if field != "created_at"
238+
}
239+
# used for compare created time
240+
fields_schema["created_at"] = int
241+
242+
logger.info(
243+
f"Creating collection {DASHVECTOR_COLLECTION} with metadata config {fields_schema}."
244+
)
245+
246+
# Create new collection
247+
resp = self._client.create(
248+
DASHVECTOR_COLLECTION,
249+
dimension=VECTOR_DIMENSION,
250+
fields_schema=fields_schema
251+
)
252+
if not resp:
253+
raise Exception(
254+
f"Fail to create collection {DASHVECTOR_COLLECTION}. "
255+
f"Error: {resp.message}"
256+
)
257+
258+
# set self collection
259+
collection = self._client.get(DASHVECTOR_COLLECTION)
260+
if not collection:
261+
raise Exception(
262+
f"Fail to get collection {DASHVECTOR_COLLECTION}. "
263+
f"Error: {collection}"
264+
)
265+
self._collection = collection
266+
logger.info(
267+
f"Collection {DASHVECTOR_COLLECTION} created successfully.")

0 commit comments

Comments
 (0)