diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 48f3133d..0b06b4de 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -97,6 +97,20 @@ jobs: first: ${{ needs.real-version-in-tag.outputs.real_version }} second: "1.26.0" operator: ">=" + newer-or-equal-than-1_29: + name: "Check if the version is newer than 1.29" + needs: real-version-in-tag + runs-on: ubuntu-latest + outputs: + check: ${{ steps.semver_compare.outputs.result }} + steps: + - name: Semver Compare + id: semver_compare + uses: fabriziocacicia/semver-compare-action@v0.1.0 + with: + first: ${{ needs.real-version-in-tag.outputs.real_version }} + second: "1.29.0" + operator: ">=" filter-memory-leak: name: Filter (cache) memory leak when querying while importing if: ${{ github.event.inputs.test_to_run == 'filter-memory-leak' || github.event.inputs.test_to_run == '' }} @@ -341,6 +355,7 @@ jobs: DISTANCE: l2-squared REQUIRED_RECALL: 0.999 PERSISTENCE_LSM_ACCESS_STRATEGY: ${{inputs.lsm_access_strategy}} + BOOT_DISK_SIZE: 20GB steps: - uses: actions/checkout@v3 - name: Login to Docker Hub @@ -374,6 +389,7 @@ jobs: DISTANCE: l2-squared REQUIRED_RECALL: 0.992 PERSISTENCE_LSM_ACCESS_STRATEGY: ${{inputs.lsm_access_strategy}} + BOOT_DISK_SIZE: 20GB steps: - uses: actions/checkout@v3 - name: Login to Docker Hub @@ -397,6 +413,42 @@ jobs: path: 'results' destination: 'ann-pipelines/github-action-runs' glob: '*.json' + ann-benchmarks-multivector-gcp: + name: "[bench GCP] MULTIVECTOR3M" + needs: [newer-or-equal-than-1_29] + if: ${{ (needs.newer-or-equal-than-1_29.outputs.check == 'true') && (github.event.inputs.test_to_run == 'ann-benchmarks-multivector-gcp' || github.event.inputs.test_to_run == '') }} + runs-on: ubuntu-latest + timeout-minutes: 60 + env: + DATASET: lotte-recreation-reduced-vl + DISTANCE: dot + REQUIRED_RECALL: 0.992 + PERSISTENCE_LSM_ACCESS_STRATEGY: ${{inputs.lsm_access_strategy}} + BOOT_DISK_SIZE: 20GB + MULTIVECTOR_DATASET: true + steps: + - uses: actions/checkout@v3 + - name: Login to Docker Hub + uses: docker/login-action@v2 + with: + username: ${{secrets.DOCKER_USERNAME}} + password: ${{secrets.DOCKER_PASSWORD}} + - id: 'gcs_auth' + name: 'Authenticate to Google Cloud' + uses: 'google-github-actions/auth@v1' + with: + credentials_json: ${{secrets.GCP_SERVICE_ACCOUNT_BENCHMARKS}} + - name: 'Set up Cloud SDK' + uses: 'google-github-actions/setup-gcloud@v1' + - name: Run chaos test + if: always() + run: ./ann_benchmark_gcp.sh + - id: 'upload-files' + uses: 'google-github-actions/upload-cloud-storage@v1' + with: + path: 'results' + destination: 'ann-pipelines/github-action-runs' + glob: '*.json' batch-import-many-classes: name: One class receives long and expensive batches, user tries to create and delete 100s of classes in parallel if: ${{ github.event.inputs.test_to_run == 'batch-import-many-classes' || github.event.inputs.test_to_run == '' }} diff --git a/ann_benchmark.sh b/ann_benchmark.sh index 21a18aee..36eac5dd 100755 --- a/ann_benchmark.sh +++ b/ann_benchmark.sh @@ -4,6 +4,7 @@ set -e dataset=${DATASET:-"sift-128-euclidean"} distance=${DISTANCE:-"l2-squared"} +multivector=${MULTIVECTOR_DATASET:-"false"} function wait_weaviate() { echo "Wait for Weaviate to be ready" @@ -37,11 +38,25 @@ mkdir -p datasets echo "Datasets exists locally" else echo "Downloading dataset" - curl -LO http://ann-benchmarks.com/${dataset}.hdf5 + if [ "$multivector" = true ]; then + + echo "Downloading multivector dataset" + curl -LO https://storage.googleapis.com/ann-datasets/custom/Multivector/${dataset}.hdf5 + else + echo "Downloading single vector dataset" + curl -LO http://ann-benchmarks.com/${dataset}.hdf5 + fi fi ) -docker run --network host -t -v "$PWD/results:/workdir/results" -v "$PWD/datasets:/datasets" ann_benchmarks python3 run.py -v /datasets/${dataset}.hdf5 -d $distance -m 32 --labels "pq=false,after_restart=false,weaviate_version=$WEAVIATE_VERSION,cloud_provider=$CLOUD_PROVIDER,machine_type=$MACHINE_TYPE,os=$OS" + +if [ "$multivector" = true ]; then + multivector_flag="-mv" +else + multivector_flag="" +fi + +docker run --network host -t -v "$PWD/results:/workdir/results" -v "$PWD/datasets:/datasets" ann_benchmarks python3 run.py $multivector_flag -v /datasets/${dataset}.hdf5 -d $distance -m 32 --labels "pq=false,after_restart=false,weaviate_version=$WEAVIATE_VERSION,cloud_provider=$CLOUD_PROVIDER,machine_type=$MACHINE_TYPE,os=$OS" echo "Initial run complete, now restart Weaviate" @@ -53,7 +68,7 @@ echo "Weaviate ready, wait 30s for caches to be hot" sleep 30 echo "Second run (query only)" -docker run --network host -t -v "$PWD/results:/workdir/results" -v "$PWD/datasets:/datasets" ann_benchmarks python3 run.py -v /datasets/${dataset}.hdf5 -d $distance -m 32 --query-only --labels "pq=false,after_restart=true,weaviate_version=$WEAVIATE_VERSION,cloud_provider=$CLOUD_PROVIDER,machine_type=$MACHINE_TYPE,os=$OS" +docker run --network host -t -v "$PWD/results:/workdir/results" -v "$PWD/datasets:/datasets" ann_benchmarks python3 run.py $multivector_flag -v /datasets/${dataset}.hdf5 -d $distance -m 32 --query-only --labels "pq=false,after_restart=true,weaviate_version=$WEAVIATE_VERSION,cloud_provider=$CLOUD_PROVIDER,machine_type=$MACHINE_TYPE,os=$OS" docker run --network host -t -v "$PWD/datasets:/datasets" \ -v "$PWD/results:/workdir/results" \ diff --git a/ann_benchmark_gcp.sh b/ann_benchmark_gcp.sh index 90d275ef..1d4901a6 100755 --- a/ann_benchmark_gcp.sh +++ b/ann_benchmark_gcp.sh @@ -4,14 +4,17 @@ set -e ZONE=${ZONE:-"us-central1-a"} MACHINE_TYPE=${MACHINE_TYPE:-"n2-standard-8"} +BOOT_DISK_SIZE=${BOOT_DISK_SIZE:-"10GB"} CLOUD_PROVIDER="gcp" OS="ubuntu-2204-lts" +MULTIVECTOR_DATASET=${MULTIVECTOR_DATASET:-"false"} instance="benchmark-$(uuidgen | tr [:upper:] [:lower:])" gcloud compute instances create $instance \ --image-family=$OS --image-project=ubuntu-os-cloud \ - --machine-type=$MACHINE_TYPE --zone $ZONE + --machine-type=$MACHINE_TYPE --zone $ZONE \ + --boot-disk-size=$BOOT_DISK_SIZE function cleanup { gcloud compute instances delete $instance --zone $ZONE --quiet @@ -43,6 +46,6 @@ gcloud compute ssh --zone $ZONE $instance -- "mkdir -p ~/apps/" gcloud compute scp --zone $ZONE --recurse apps/ann-benchmarks "$instance:~/apps/" gcloud compute scp --zone $ZONE --recurse apps/weaviate-no-restart-on-crash/ "$instance:~/apps/" gcloud compute scp --zone $ZONE --recurse ann_benchmark.sh "$instance:~" -gcloud compute ssh --zone $ZONE $instance -- "DATASET=$DATASET DISTANCE=$DISTANCE REQUIRED_RECALL=$REQUIRED_RECALL WEAVIATE_VERSION=$WEAVIATE_VERSION MACHINE_TYPE=$MACHINE_TYPE CLOUD_PROVIDER=$CLOUD_PROVIDER OS=$OS bash ann_benchmark.sh" +gcloud compute ssh --zone $ZONE $instance -- "MULTIVECTOR_DATASET=$MULTIVECTOR_DATASET DATASET=$DATASET DISTANCE=$DISTANCE REQUIRED_RECALL=$REQUIRED_RECALL WEAVIATE_VERSION=$WEAVIATE_VERSION MACHINE_TYPE=$MACHINE_TYPE CLOUD_PROVIDER=$CLOUD_PROVIDER OS=$OS bash ann_benchmark.sh" mkdir -p results gcloud compute scp --zone $ZONE --recurse "$instance:~/results/*.json" results/ diff --git a/ann_benchmark_quantization_gcp.sh b/ann_benchmark_quantization_gcp.sh index dd2e5f51..ef2a2cab 100755 --- a/ann_benchmark_quantization_gcp.sh +++ b/ann_benchmark_quantization_gcp.sh @@ -4,6 +4,8 @@ set -e ZONE=${ZONE:-"us-central1-a"} MACHINE_TYPE=${MACHINE_TYPE:-"n2-standard-8"} +BOOT_DISK_SIZE=${BOOT_DISK_SIZE:-"10GB"} +MULTIVECTOR_DATASET=${MULTIVECTOR_DATASET:-"false"} export CLOUD_PROVIDER="gcp" export OS="ubuntu-2204-lts" @@ -11,7 +13,8 @@ instance="benchmark-$(uuidgen | tr [:upper:] [:lower:])" gcloud compute instances create $instance \ --image-family=$OS --image-project=ubuntu-os-cloud \ - --machine-type=$MACHINE_TYPE --zone $ZONE + --machine-type=$MACHINE_TYPE --zone $ZONE \ + --boot-disk-size=$BOOT_DISK_SIZE function cleanup { gcloud compute instances delete $instance --quiet --zone $ZONE @@ -43,6 +46,6 @@ gcloud compute ssh --zone $ZONE $instance -- "mkdir -p ~/apps/" gcloud compute scp --zone $ZONE --recurse apps/ann-benchmarks "$instance:~/apps/" gcloud compute scp --zone $ZONE --recurse apps/weaviate-no-restart-on-crash/ "$instance:~/apps/" gcloud compute scp --zone $ZONE --recurse ann_benchmark_quantization.sh "$instance:~" -gcloud compute ssh --zone $ZONE $instance -- "DATASET=$DATASET DISTANCE=$DISTANCE REQUIRED_RECALL=$REQUIRED_RECALL QUANTIZATION=$QUANTIZATION WEAVIATE_VERSION=$WEAVIATE_VERSION MACHINE_TYPE=$MACHINE_TYPE CLOUD_PROVIDER=$CLOUD_PROVIDER OS=$OS bash ann_benchmark_quantization.sh" +gcloud compute ssh --zone $ZONE $instance -- "MULTIVECTOR_DATASET=$MULTIVECTOR_DATASET DATASET=$DATASET DISTANCE=$DISTANCE REQUIRED_RECALL=$REQUIRED_RECALL QUANTIZATION=$QUANTIZATION WEAVIATE_VERSION=$WEAVIATE_VERSION MACHINE_TYPE=$MACHINE_TYPE CLOUD_PROVIDER=$CLOUD_PROVIDER OS=$OS bash ann_benchmark_quantization.sh" mkdir -p results gcloud compute scp --zone $ZONE --recurse "$instance:~/results/*.json" results/ diff --git a/apps/ann-benchmarks/analyze.py b/apps/ann-benchmarks/analyze.py index 0f8760fc..a4031fd3 100644 --- a/apps/ann-benchmarks/analyze.py +++ b/apps/ann-benchmarks/analyze.py @@ -37,6 +37,19 @@ def test_recall_before_after(self): f"allowed delta for recall before and after restart is {allowed_delta}, got before={mean_recall_before}, after={mean_recall_after}", ) + def test_qps_before_after(self): + allowed_delta = 0.25 + mean_qps_before = self.df.loc[self.df["after_restart"] == "false", "qps"].mean() + mean_qps_after = self.df.loc[self.df["after_restart"] == "true", "qps"].mean() + + min_val, max_val = min(mean_qps_before, mean_qps_after), max( + mean_qps_before, mean_qps_after + ) + self.assertTrue( + min_val > max_val * (1 - allowed_delta), + f"qps before and after restart are not within the allowed delta of {allowed_delta}, got before={mean_qps_before}, after={mean_qps_after}", + ) + if __name__ == "__main__": unittest.main() diff --git a/apps/ann-benchmarks/requirements.txt b/apps/ann-benchmarks/requirements.txt index cc8ebf03..9f2117d3 100644 --- a/apps/ann-benchmarks/requirements.txt +++ b/apps/ann-benchmarks/requirements.txt @@ -1,5 +1,6 @@ -weaviate-client==4.7.0-rc.0 +weaviate-client>=4.11.0 loguru==0.5.3 seaborn==0.12.2 -h5py==3.11.0 -pandas==2.2.2 +h5py==3.13.0 +pandas==2.2.3 +torch==2.6.0 \ No newline at end of file diff --git a/apps/ann-benchmarks/run.py b/apps/ann-benchmarks/run.py index cdaaeea9..09028fa1 100644 --- a/apps/ann-benchmarks/run.py +++ b/apps/ann-benchmarks/run.py @@ -3,6 +3,7 @@ import sys from loguru import logger import h5py +import torch import grpc import pathlib import time @@ -30,6 +31,7 @@ "quantization": False, "dim_to_segment_ratio": 4, "override": False, + "multivector": False, } pathlib.Path("./results").mkdir(parents=True, exist_ok=True) @@ -48,6 +50,7 @@ parser.add_argument("-q", "--query-only", action=argparse.BooleanOptionalAction) parser.add_argument("-o", "--override", action=argparse.BooleanOptionalAction) parser.add_argument("-s", "--dim-to-segment-ratio") +parser.add_argument("-mv", "--multivector", action=argparse.BooleanOptionalAction, default=False) args = parser.parse_args() @@ -80,9 +83,15 @@ values["dim_to_segment_ratio"] = int(args.dim_to_segment_ratio) values["labels"]["dim_to_segment_ratio"] = values["dim_to_segment_ratio"] +values["multivector"] = args.multivector + f = h5py.File(args.vectors) values["labels"]["dataset_file"] = os.path.basename(args.vectors) vectors = f["train"] +if values["multivector"]: + vector_dim: int = 128 + vectors = [torch.from_numpy(sample.reshape(-1, vector_dim)) for sample in vectors] + efC = values["efC"] distance = args.distance @@ -94,13 +103,14 @@ quantization = values["quantization"] override = values["override"] dim_to_seg_ratio = values["dim_to_segment_ratio"] + multivector = values["multivector"] before_import = time.time() logger.info( f"Starting import with efC={efC}, m={m}, shards={shards}, distance={distance}" ) if override == False: - reset_schema(client, efC, m, shards, distance) - load_records(client, vectors, quantization, dim_to_seg_ratio, override) + reset_schema(client, efC, m, shards, distance, multivector) + load_records(client, vectors, quantization, dim_to_seg_ratio, override, multivector) elapsed = time.time() - before_import logger.info( f"Finished import with efC={efC}, m={m}, shards={shards} in {str(timedelta(seconds=elapsed))}" @@ -109,11 +119,5 @@ time.sleep(30) logger.info(f"Starting querying for efC={efC}, m={m}, shards={shards}") - query( - client, - stub, - f, - values["ef"], - values["labels"], - ) + query(client, stub, f, values["ef"], values["labels"], values["multivector"]) logger.info(f"Finished querying for efC={efC}, m={m}, shards={shards}") diff --git a/apps/ann-benchmarks/weaviate_import.py b/apps/ann-benchmarks/weaviate_import.py index ac5bee51..db0cc7d3 100644 --- a/apps/ann-benchmarks/weaviate_import.py +++ b/apps/ann-benchmarks/weaviate_import.py @@ -11,16 +11,34 @@ class_name = "Vector" -def reset_schema(client: weaviate.WeaviateClient, efC, m, shards, distance): +def reset_schema(client: weaviate.WeaviateClient, efC, m, shards, distance, multivector=False): client.collections.delete_all() client.collections.create( name=class_name, - vectorizer_config=wvc.Configure.Vectorizer.none(), - vector_index_config=wvc.Configure.VectorIndex.hnsw( - ef_construction=efC, - max_connections=m, - ef=-1, - distance_metric=wvc.VectorDistances(distance), + vectorizer_config=( + wvc.Configure.Vectorizer.none() + if not multivector + else [ + wvc.Configure.NamedVectors.none( + name="multivector", + vector_index_config=wvc.Configure.VectorIndex.hnsw( + ef_construction=efC, + max_connections=m, + ef=-1, + multi_vector=wvc.Configure.VectorIndex.MultiVector.multi_vector(), + ), + ) + ] + ), + vector_index_config=( + wvc.Configure.VectorIndex.hnsw( + ef_construction=efC, + max_connections=m, + ef=-1, + distance_metric=wvc.VectorDistances(distance), + ) + if not multivector + else None ), properties=[ wvc.Property( @@ -34,13 +52,18 @@ def reset_schema(client: weaviate.WeaviateClient, efC, m, shards, distance): def load_records( - client: weaviate.WeaviateClient, vectors, quantization, dim_to_seg_ratio, override + client: weaviate.WeaviateClient, + vectors, + quantization, + dim_to_seg_ratio, + override, + multivector=False, ): collection = client.collections.get(class_name) i = 0 if vectors == None: vectors = [None] * 10_000_000 - batch_size = 1000 + batch_size = 100 len_objects = len(vectors) with client.batch.fixed_size(batch_size=batch_size) as batch: @@ -55,11 +78,14 @@ def load_records( data_object = { "i": i, } + multivector_object = {} + if multivector: + multivector_object["multivector"] = vector batch.add_object( properties=data_object, - vector=vector, - collection=class_name, + vector=vector if multivector is False else multivector_object, uuid=uuid.UUID(int=i), + collection=class_name, ) i += 1 @@ -67,7 +93,6 @@ def load_records( logger.error(err.message) if quantization in ["pq", "sq"] and override == False: - if quantization == "pq": collection.config.update( vector_index_config=wvc.Reconfigure.VectorIndex.hnsw( @@ -83,6 +108,7 @@ def load_records( ) ) + check_shards_readonly(collection) wait_for_all_shards_ready(collection) i = 100000 @@ -107,23 +133,34 @@ def load_records( for err in client.batch.failed_objects: logger.error(err.message) + logger.info("Waiting for vector indexing to finish") + collection.batch.wait_for_vector_indexing() + logger.info("Vector indexing finished") + logger.info(f"Finished writing {len_objects} records") -def wait_for_all_shards_ready(collection: weaviate.collections.Collection): +def check_shards_readonly(collection: weaviate.collections.Collection): status = [s.status for s in collection.config.get_shards()] if not all(s == "READONLY" for s in status): raise Exception(f"shards are not READONLY at beginning: {status}") + +def wait_for_all_shards_ready(collection: weaviate.collections.Collection): max_wait = 300 before = time.time() while True: - time.sleep(3) - status = [s.status for s in collection.config.get_shards()] + try: + status = [s.status for s in collection.config.get_shards()] + except Exception as e: + logger.error(f"Error getting shards status: {e}") + continue + if all(s == "READY" for s in status): - logger.info(f"finished in {time.time()-before}s") + logger.debug(f"finished in {time.time()-before}s") return if time.time() - before > max_wait: raise Exception(f"after {max_wait}s not all shards READY: {status}") + time.sleep(3) diff --git a/apps/ann-benchmarks/weaviate_query.py b/apps/ann-benchmarks/weaviate_query.py index 2402255f..e5061d8a 100644 --- a/apps/ann-benchmarks/weaviate_query.py +++ b/apps/ann-benchmarks/weaviate_query.py @@ -6,23 +6,35 @@ import weaviate.classes.config as wvc from weaviate.exceptions import WeaviateQueryException import h5py +import torch import json from loguru import logger from weaviate_pprof import obtain_heap_profile +from weaviate_import import wait_for_all_shards_ready limit = 10 class_name = "Vector" results = [] -def search_grpc(collection: weaviate.collections.Collection, dataset, i, input_vec): +def search_grpc( + collection: weaviate.collections.Collection, dataset, i, input_vec, multivector=False +): out = {} before = time.time() try: - objs = collection.query.near_vector( - near_vector=input_vec, limit=limit, return_properties=[] - ).objects + if not multivector: + objs = collection.query.near_vector( + near_vector=input_vec, limit=limit, return_properties=[] + ).objects + else: + objs = collection.query.near_vector( + near_vector=input_vec, + limit=limit, + target_vector="multivector", + return_properties=[], + ).objects except WeaviateQueryException as e: logger.error(e.message) objs = [] @@ -36,27 +48,47 @@ def search_grpc(collection: weaviate.collections.Collection, dataset, i, input_v return out -def query(client: weaviate.WeaviateClient, stub, dataset, ef_values, labels): +def query(client: weaviate.WeaviateClient, stub, dataset, ef_values, labels, multivector=False): collection = client.collections.get(class_name) schema = collection.config.get() shards = schema.sharding_config.actual_count - efC = schema.vector_index_config.ef_construction - m = schema.vector_index_config.max_connections + if not multivector: + efC = schema.vector_index_config.ef_construction + m = schema.vector_index_config.max_connections + else: + efC = schema.vector_config["multivector"].vector_index_config.ef_construction + m = schema.vector_config["multivector"].vector_index_config.max_connections logger.info(f"build params: shards={shards}, efC={efC}, m={m} labels={labels}") - vectors = dataset["test"] + if multivector: + vector_dim: int = 128 + vectors = [torch.from_numpy(sample.reshape(-1, vector_dim)) for sample in vectors] run_id = f"{int(time.time())}" for ef in ef_values: for api in ["grpc"]: - collection.config.update(vector_index_config=wvc.Reconfigure.VectorIndex.hnsw(ef=ef)) + if not multivector: + collection.config.update( + vector_index_config=wvc.Reconfigure.VectorIndex.hnsw(ef=ef) + ) + else: + collection.config.update( + vectorizer_config=[ + wvc.Reconfigure.NamedVectors.update( + name="multivector", + vector_index_config=wvc.Reconfigure.VectorIndex.hnsw(ef=ef), + ) + ] + ) + + wait_for_all_shards_ready(collection) took = 0 recall = 0 for i, vec in enumerate(vectors): res = {} if api == "grpc": - res = search_grpc(collection, dataset, i, vec.tolist()) + res = search_grpc(collection, dataset, i, vec.tolist(), multivector) elif api == "grpc_clientless": res = search_grpc_clientless(stub, dataset, i, vec) elif api == "graphql": diff --git a/apps/weaviate-no-restart-on-crash/docker-compose.yml b/apps/weaviate-no-restart-on-crash/docker-compose.yml index aa9604aa..d94697d6 100644 --- a/apps/weaviate-no-restart-on-crash/docker-compose.yml +++ b/apps/weaviate-no-restart-on-crash/docker-compose.yml @@ -29,4 +29,5 @@ services: - DISABLE_TELEMETRY - PROMETHEUS_MONITORING_ENABLED - PERSISTENCE_LSM_MAX_SEGMENT_SIZE + - HNSW_STARTUP_WAIT_FOR_VECTOR_CACHE=true ...