diff --git a/python/.asv/results/benchmarks.json b/python/.asv/results/benchmarks.json index 49116c0912..5c2535a2b3 100644 --- a/python/.asv/results/benchmarks.json +++ b/python/.asv/results/benchmarks.json @@ -4278,14 +4278,14 @@ }, "version": 2, "version_chain.IterateVersionChain.time_list_undeleted_versions": { - "code": "class IterateVersionChain:\n def time_list_undeleted_versions(self, num_versions, caching, deleted):\n self.lib.list_versions(symbol=self.symbol(num_versions, deleted))\n\n def setup(self, num_versions, caching, deleted):\n # Disable warnings for version not found\n set_log_level(\"ERROR\")\n \n if caching == \"never\":\n adb._ext.set_config_int(\"VersionMap.ReloadInterval\", 0)\n if caching == \"forever\":\n adb._ext.set_config_int(\"VersionMap.ReloadInterval\", sys.maxsize)\n if caching == \"default\":\n # Leave the default reload interval\n pass\n \n self.ac = Arctic(IterateVersionChain.CONNECTION_STRING)\n self.lib = self.ac[IterateVersionChain.LIB_NAME]\n \n if caching != \"never\":\n # Pre-load the cache\n self.load_all(self.symbol(num_versions, deleted))\n\n def setup_cache(self):\n start = time.time()\n self._setup_cache()\n self.logger.info(f\"SETUP_CACHE TIME: {time.time() - start}\")", + "code": "class IterateVersionChain:\n def time_list_undeleted_versions(self, num_versions, caching, deleted):\n self.lib.list_versions(symbol=self.symbol(num_versions))\n\n def setup(self, num_versions, caching, deleted):\n # Disable warnings for version not found\n set_log_level(\"ERROR\")\n \n if caching == \"never\":\n adb._ext.set_config_int(\"VersionMap.ReloadInterval\", 0)\n if caching == \"forever\":\n adb._ext.set_config_int(\"VersionMap.ReloadInterval\", sys.maxsize)\n if caching == \"default\":\n # Leave the default reload interval\n pass\n \n if deleted:\n ac = Arctic(IterateVersionChain.CONNECTION_STRING_TAIL_DELETED)\n else:\n ac = Arctic(IterateVersionChain.CONNECTION_STRING_UNDELETED)\n self.lib = ac[IterateVersionChain.LIB_NAME]\n \n if caching != \"never\":\n # Pre-load the cache\n self.load_all(self.symbol(num_versions))\n\n def setup_cache(self):\n start = time.time()\n self._setup_cache()\n self.logger.info(f\"SETUP_CACHE TIME: {time.time() - start}\")", "min_run_count": 2, "name": "version_chain.IterateVersionChain.time_list_undeleted_versions", - "number": 13, + "number": 0, "param_names": [ "num_versions", "caching", - "deleted" + "tail_deleted" ], "params": [ [ @@ -4297,29 +4297,29 @@ "'never'" ], [ - "0.0", - "0.99" + "True", + "False" ] ], "repeat": 0, "rounds": 1, - "sample_time": 0.01, - "setup_cache_key": "version_chain:39", + "sample_time": 1, + "setup_cache_key": "version_chain:48", "timeout": 6000, "type": "time", "unit": "seconds", - "version": "8cf2d8b7302ee0311a2bab73cb1ab31134c9676a39bc5e517411e3192a89ead7", + "version": "c994d17e8987906e0964cc8145263dc8ddba0f0e1ec0b8fc729ea437a390605c", "warmup_time": -1 }, "version_chain.IterateVersionChain.time_load_all_versions": { - "code": "class IterateVersionChain:\n def time_load_all_versions(self, num_versions, caching, deleted):\n self.load_all(self.symbol(num_versions, deleted))\n\n def setup(self, num_versions, caching, deleted):\n # Disable warnings for version not found\n set_log_level(\"ERROR\")\n \n if caching == \"never\":\n adb._ext.set_config_int(\"VersionMap.ReloadInterval\", 0)\n if caching == \"forever\":\n adb._ext.set_config_int(\"VersionMap.ReloadInterval\", sys.maxsize)\n if caching == \"default\":\n # Leave the default reload interval\n pass\n \n self.ac = Arctic(IterateVersionChain.CONNECTION_STRING)\n self.lib = self.ac[IterateVersionChain.LIB_NAME]\n \n if caching != \"never\":\n # Pre-load the cache\n self.load_all(self.symbol(num_versions, deleted))\n\n def setup_cache(self):\n start = time.time()\n self._setup_cache()\n self.logger.info(f\"SETUP_CACHE TIME: {time.time() - start}\")", + "code": "class IterateVersionChain:\n def time_load_all_versions(self, num_versions, caching, deleted):\n self.load_all(self.symbol(num_versions))\n\n def setup(self, num_versions, caching, deleted):\n # Disable warnings for version not found\n set_log_level(\"ERROR\")\n \n if caching == \"never\":\n adb._ext.set_config_int(\"VersionMap.ReloadInterval\", 0)\n if caching == \"forever\":\n adb._ext.set_config_int(\"VersionMap.ReloadInterval\", sys.maxsize)\n if caching == \"default\":\n # Leave the default reload interval\n pass\n \n if deleted:\n ac = Arctic(IterateVersionChain.CONNECTION_STRING_TAIL_DELETED)\n else:\n ac = Arctic(IterateVersionChain.CONNECTION_STRING_UNDELETED)\n self.lib = ac[IterateVersionChain.LIB_NAME]\n \n if caching != \"never\":\n # Pre-load the cache\n self.load_all(self.symbol(num_versions))\n\n def setup_cache(self):\n start = time.time()\n self._setup_cache()\n self.logger.info(f\"SETUP_CACHE TIME: {time.time() - start}\")", "min_run_count": 2, "name": "version_chain.IterateVersionChain.time_load_all_versions", - "number": 13, + "number": 0, "param_names": [ "num_versions", "caching", - "deleted" + "tail_deleted" ], "params": [ [ @@ -4331,29 +4331,29 @@ "'never'" ], [ - "0.0", - "0.99" + "True", + "False" ] ], "repeat": 0, "rounds": 1, - "sample_time": 0.01, - "setup_cache_key": "version_chain:39", + "sample_time": 1, + "setup_cache_key": "version_chain:48", "timeout": 6000, "type": "time", "unit": "seconds", - "version": "32c93e66abfbbbaa80b6cdf40c50fc82e21aa1de964c5962ae200444ff26f252", + "version": "e68f85572eefce903e6c90ac005474274128cf893551579dded0dbfa891eb4bf", "warmup_time": -1 }, "version_chain.IterateVersionChain.time_read_alternating": { - "code": "class IterateVersionChain:\n def time_read_alternating(self, num_versions, caching, deleted):\n self.read_from_epoch(self.symbol(num_versions, deleted))\n self.read_v0(self.symbol(num_versions, deleted))\n\n def setup(self, num_versions, caching, deleted):\n # Disable warnings for version not found\n set_log_level(\"ERROR\")\n \n if caching == \"never\":\n adb._ext.set_config_int(\"VersionMap.ReloadInterval\", 0)\n if caching == \"forever\":\n adb._ext.set_config_int(\"VersionMap.ReloadInterval\", sys.maxsize)\n if caching == \"default\":\n # Leave the default reload interval\n pass\n \n self.ac = Arctic(IterateVersionChain.CONNECTION_STRING)\n self.lib = self.ac[IterateVersionChain.LIB_NAME]\n \n if caching != \"never\":\n # Pre-load the cache\n self.load_all(self.symbol(num_versions, deleted))\n\n def setup_cache(self):\n start = time.time()\n self._setup_cache()\n self.logger.info(f\"SETUP_CACHE TIME: {time.time() - start}\")", + "code": "class IterateVersionChain:\n def time_read_alternating(self, num_versions, caching, deleted):\n self.read_from_epoch(self.symbol(num_versions))\n self.read_v0(self.symbol(num_versions))\n\n def setup(self, num_versions, caching, deleted):\n # Disable warnings for version not found\n set_log_level(\"ERROR\")\n \n if caching == \"never\":\n adb._ext.set_config_int(\"VersionMap.ReloadInterval\", 0)\n if caching == \"forever\":\n adb._ext.set_config_int(\"VersionMap.ReloadInterval\", sys.maxsize)\n if caching == \"default\":\n # Leave the default reload interval\n pass\n \n if deleted:\n ac = Arctic(IterateVersionChain.CONNECTION_STRING_TAIL_DELETED)\n else:\n ac = Arctic(IterateVersionChain.CONNECTION_STRING_UNDELETED)\n self.lib = ac[IterateVersionChain.LIB_NAME]\n \n if caching != \"never\":\n # Pre-load the cache\n self.load_all(self.symbol(num_versions))\n\n def setup_cache(self):\n start = time.time()\n self._setup_cache()\n self.logger.info(f\"SETUP_CACHE TIME: {time.time() - start}\")", "min_run_count": 2, "name": "version_chain.IterateVersionChain.time_read_alternating", - "number": 13, + "number": 0, "param_names": [ "num_versions", "caching", - "deleted" + "tail_deleted" ], "params": [ [ @@ -4365,29 +4365,29 @@ "'never'" ], [ - "0.0", - "0.99" + "True", + "False" ] ], "repeat": 0, "rounds": 1, - "sample_time": 0.01, - "setup_cache_key": "version_chain:39", + "sample_time": 1, + "setup_cache_key": "version_chain:48", "timeout": 6000, "type": "time", "unit": "seconds", - "version": "f1f008d2c2efb9386c21fdd3539bb3601b1078e323613d38f13ddadb066cb004", + "version": "117a257c8033130bab1d83ea4c8993283739842e87dccd898107f8ecbd8fd714", "warmup_time": -1 }, "version_chain.IterateVersionChain.time_read_from_epoch": { - "code": "class IterateVersionChain:\n def time_read_from_epoch(self, num_versions, caching, deleted):\n self.read_from_epoch(self.symbol(num_versions, deleted))\n\n def setup(self, num_versions, caching, deleted):\n # Disable warnings for version not found\n set_log_level(\"ERROR\")\n \n if caching == \"never\":\n adb._ext.set_config_int(\"VersionMap.ReloadInterval\", 0)\n if caching == \"forever\":\n adb._ext.set_config_int(\"VersionMap.ReloadInterval\", sys.maxsize)\n if caching == \"default\":\n # Leave the default reload interval\n pass\n \n self.ac = Arctic(IterateVersionChain.CONNECTION_STRING)\n self.lib = self.ac[IterateVersionChain.LIB_NAME]\n \n if caching != \"never\":\n # Pre-load the cache\n self.load_all(self.symbol(num_versions, deleted))\n\n def setup_cache(self):\n start = time.time()\n self._setup_cache()\n self.logger.info(f\"SETUP_CACHE TIME: {time.time() - start}\")", + "code": "class IterateVersionChain:\n def time_read_from_epoch(self, num_versions, caching, deleted):\n self.read_from_epoch(self.symbol(num_versions))\n\n def setup(self, num_versions, caching, deleted):\n # Disable warnings for version not found\n set_log_level(\"ERROR\")\n \n if caching == \"never\":\n adb._ext.set_config_int(\"VersionMap.ReloadInterval\", 0)\n if caching == \"forever\":\n adb._ext.set_config_int(\"VersionMap.ReloadInterval\", sys.maxsize)\n if caching == \"default\":\n # Leave the default reload interval\n pass\n \n if deleted:\n ac = Arctic(IterateVersionChain.CONNECTION_STRING_TAIL_DELETED)\n else:\n ac = Arctic(IterateVersionChain.CONNECTION_STRING_UNDELETED)\n self.lib = ac[IterateVersionChain.LIB_NAME]\n \n if caching != \"never\":\n # Pre-load the cache\n self.load_all(self.symbol(num_versions))\n\n def setup_cache(self):\n start = time.time()\n self._setup_cache()\n self.logger.info(f\"SETUP_CACHE TIME: {time.time() - start}\")", "min_run_count": 2, "name": "version_chain.IterateVersionChain.time_read_from_epoch", - "number": 13, + "number": 0, "param_names": [ "num_versions", "caching", - "deleted" + "tail_deleted" ], "params": [ [ @@ -4399,29 +4399,29 @@ "'never'" ], [ - "0.0", - "0.99" + "True", + "False" ] ], "repeat": 0, "rounds": 1, - "sample_time": 0.01, - "setup_cache_key": "version_chain:39", + "sample_time": 1, + "setup_cache_key": "version_chain:48", "timeout": 6000, "type": "time", "unit": "seconds", - "version": "ee44eb3fe3eecad30de7d9349e47e68cdeb430326b5713b7ae4bfd7abdb63707", + "version": "65cf6d6226805c2655d981a065b9e345cc8b00424f8f35965407dfdc1b8da504", "warmup_time": -1 }, "version_chain.IterateVersionChain.time_read_v0": { - "code": "class IterateVersionChain:\n def time_read_v0(self, num_versions, caching, deleted):\n self.read_v0(self.symbol(num_versions, deleted))\n\n def setup(self, num_versions, caching, deleted):\n # Disable warnings for version not found\n set_log_level(\"ERROR\")\n \n if caching == \"never\":\n adb._ext.set_config_int(\"VersionMap.ReloadInterval\", 0)\n if caching == \"forever\":\n adb._ext.set_config_int(\"VersionMap.ReloadInterval\", sys.maxsize)\n if caching == \"default\":\n # Leave the default reload interval\n pass\n \n self.ac = Arctic(IterateVersionChain.CONNECTION_STRING)\n self.lib = self.ac[IterateVersionChain.LIB_NAME]\n \n if caching != \"never\":\n # Pre-load the cache\n self.load_all(self.symbol(num_versions, deleted))\n\n def setup_cache(self):\n start = time.time()\n self._setup_cache()\n self.logger.info(f\"SETUP_CACHE TIME: {time.time() - start}\")", + "code": "class IterateVersionChain:\n def time_read_v0(self, num_versions, caching, deleted):\n self.read_v0(self.symbol(num_versions))\n\n def setup(self, num_versions, caching, deleted):\n # Disable warnings for version not found\n set_log_level(\"ERROR\")\n \n if caching == \"never\":\n adb._ext.set_config_int(\"VersionMap.ReloadInterval\", 0)\n if caching == \"forever\":\n adb._ext.set_config_int(\"VersionMap.ReloadInterval\", sys.maxsize)\n if caching == \"default\":\n # Leave the default reload interval\n pass\n \n if deleted:\n ac = Arctic(IterateVersionChain.CONNECTION_STRING_TAIL_DELETED)\n else:\n ac = Arctic(IterateVersionChain.CONNECTION_STRING_UNDELETED)\n self.lib = ac[IterateVersionChain.LIB_NAME]\n \n if caching != \"never\":\n # Pre-load the cache\n self.load_all(self.symbol(num_versions))\n\n def setup_cache(self):\n start = time.time()\n self._setup_cache()\n self.logger.info(f\"SETUP_CACHE TIME: {time.time() - start}\")", "min_run_count": 2, "name": "version_chain.IterateVersionChain.time_read_v0", - "number": 13, + "number": 0, "param_names": [ "num_versions", "caching", - "deleted" + "tail_deleted" ], "params": [ [ @@ -4433,18 +4433,18 @@ "'never'" ], [ - "0.0", - "0.99" + "True", + "False" ] ], "repeat": 0, "rounds": 1, - "sample_time": 0.01, - "setup_cache_key": "version_chain:39", + "sample_time": 1, + "setup_cache_key": "version_chain:48", "timeout": 6000, "type": "time", "unit": "seconds", - "version": "4de46121ac1a914c7a5d77c82aa535e29da68e0e2acc7e17e483d168cac49db3", + "version": "e086c5206ded034b9ab56b93a7c774085fd00125211350aee5d2fa3b2a700321", "warmup_time": -1 } } \ No newline at end of file diff --git a/python/benchmarks/version_chain.py b/python/benchmarks/version_chain.py index f0fa297425..0f3f6284da 100644 --- a/python/benchmarks/version_chain.py +++ b/python/benchmarks/version_chain.py @@ -6,6 +6,8 @@ As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. """ +import shutil + from arcticdb import Arctic from arcticdb.exceptions import NoSuchVersionException from arcticdb.config import set_log_level @@ -19,22 +21,29 @@ class IterateVersionChain: - number = 13 timeout = 6000 + sample_time = 1 rounds = 1 - CONNECTION_STRING = "lmdb://version_chain?map_size=20GB" + + DIR_UNDELETED = "version_chain" + DIR_TAIL_DELETED = "version_chain_tail_deleted" + CONNECTION_STRING_UNDELETED = f"lmdb://{DIR_UNDELETED}" + CONNECTION_STRING_TAIL_DELETED = f"lmdb://{DIR_TAIL_DELETED}" + DELETION_POINT = 0.99 # delete the symbol after writing this proportion of the versions LIB_NAME = "lib" - # TODO: Investigate why setup is taking ~50mins with 50k versions on ec2 runners. - # Locally it looks like it shouldn't take more than 15. - params = ([25_000], ["forever", "default", "never"], [0.0, 0.99]) - param_names = ["num_versions", "caching", "deleted"] + params = ([25_000], ["forever", "default", "never"], [True, False]) + + # In the tail_deleted case we delete the symbol after writing the DELETION_POINT fraction of the versions, + # so the tail of the version chain is deleted. + param_names = ["num_versions", "caching", "tail_deleted"] - def symbol(self, num_versions, deleted): - return f"symbol_{num_versions}_{deleted}" + def symbol(self, num_versions): + return f"symbol_{num_versions}" def __init__(self): self.logger = get_logger() + self.lib = None def setup_cache(self): start = time.time() @@ -42,41 +51,66 @@ def setup_cache(self): self.logger.info(f"SETUP_CACHE TIME: {time.time() - start}") def _setup_cache(self): - self.ac = Arctic(IterateVersionChain.CONNECTION_STRING) + ac = Arctic(IterateVersionChain.CONNECTION_STRING_UNDELETED) num_versions_list, caching_list, deleted_list = IterateVersionChain.params - self.ac.delete_library(IterateVersionChain.LIB_NAME) - lib = self.ac.create_library(IterateVersionChain.LIB_NAME) + ac.delete_library(IterateVersionChain.LIB_NAME) + lib = ac.create_library(IterateVersionChain.LIB_NAME) small_df = generate_random_floats_dataframe(2, 2) - # Pre-calculate delete points to avoid repeated math.floor calls delete_points = {} for num_versions in num_versions_list: - for deleted in deleted_list: - symbol = self.symbol(num_versions, deleted) - delete_points[symbol] = math.floor(deleted * num_versions) + delete_points[num_versions] = math.floor(IterateVersionChain.DELETION_POINT * num_versions) + # To save setup time we populate the two libraries by: + # + # Step 1 - write 99% of the versions to one library + # Step 2 - copy the library directory + # Step 3 - delete the symbol on the copy. Write the remaining versions to both source and copy. start_time = time.time() adb._ext.set_config_int("VersionMap.ReloadInterval", sys.maxsize) - # Batch operations by symbol to reduce overhead + for num_versions in num_versions_list: - for deleted in deleted_list: - symbol = self.symbol(num_versions, deleted) - delete_point = delete_points[symbol] + symbol = self.symbol(num_versions) + deletion_point = delete_points[num_versions] + for i in range(deletion_point): + lib.write(symbol, small_df) - # Write all versions in a single loop - for i in range(num_versions): - lib.write(symbol, small_df) - # Only check for deletion once per iteration - if i == delete_point: - lib.delete(symbol) + del lib + del ac - adb._ext.unset_config_int("VersionMap.ReloadInterval") + shutil.rmtree(IterateVersionChain.DIR_TAIL_DELETED, ignore_errors=True) + shutil.copytree(IterateVersionChain.DIR_UNDELETED, IterateVersionChain.DIR_TAIL_DELETED) - print("IterateVersionChain: Setup cache took (s) :", time.time() - start_time) + ac = Arctic(IterateVersionChain.CONNECTION_STRING_UNDELETED) + lib = ac[IterateVersionChain.LIB_NAME] - del self.ac + for num_versions in num_versions_list: + symbol = self.symbol(num_versions) + deletion_point = delete_points[num_versions] + for i in range(deletion_point, num_versions): + lib.write(symbol, small_df) + # reasonableness check + assert lib.read(symbol).version == num_versions - 1 + + del lib + del ac + + ac = Arctic(IterateVersionChain.CONNECTION_STRING_TAIL_DELETED) + lib = ac[IterateVersionChain.LIB_NAME] + for num_versions in num_versions_list: + symbol = self.symbol(num_versions) + lib.delete(symbol) + deletion_point = delete_points[num_versions] + for i in range(deletion_point, num_versions): + lib.write(symbol, small_df) + # reasonableness checks + assert lib.read(symbol).version == num_versions - 1 + # Only versions that have not been deleted are returned by list_versions + assert len(lib.list_versions(symbol)) == num_versions - deletion_point + + adb._ext.unset_config_int("VersionMap.ReloadInterval") def load_all(self, symbol): # Getting tombstoned versions requires a LOAD_ALL @@ -108,30 +142,32 @@ def setup(self, num_versions, caching, deleted): # Leave the default reload interval pass - self.ac = Arctic(IterateVersionChain.CONNECTION_STRING) - self.lib = self.ac[IterateVersionChain.LIB_NAME] + if deleted: + ac = Arctic(IterateVersionChain.CONNECTION_STRING_TAIL_DELETED) + else: + ac = Arctic(IterateVersionChain.CONNECTION_STRING_UNDELETED) + self.lib = ac[IterateVersionChain.LIB_NAME] if caching != "never": # Pre-load the cache - self.load_all(self.symbol(num_versions, deleted)) + self.load_all(self.symbol(num_versions)) def teardown(self, num_versions, caching, deleted): adb._ext.unset_config_int("VersionMap.ReloadInterval") del self.lib - del self.ac def time_load_all_versions(self, num_versions, caching, deleted): - self.load_all(self.symbol(num_versions, deleted)) + self.load_all(self.symbol(num_versions)) def time_list_undeleted_versions(self, num_versions, caching, deleted): - self.lib.list_versions(symbol=self.symbol(num_versions, deleted)) + self.lib.list_versions(symbol=self.symbol(num_versions)) def time_read_v0(self, num_versions, caching, deleted): - self.read_v0(self.symbol(num_versions, deleted)) + self.read_v0(self.symbol(num_versions)) def time_read_from_epoch(self, num_versions, caching, deleted): - self.read_from_epoch(self.symbol(num_versions, deleted)) + self.read_from_epoch(self.symbol(num_versions)) def time_read_alternating(self, num_versions, caching, deleted): - self.read_from_epoch(self.symbol(num_versions, deleted)) - self.read_v0(self.symbol(num_versions, deleted)) + self.read_from_epoch(self.symbol(num_versions)) + self.read_v0(self.symbol(num_versions))