diff --git a/python/.asv/results/benchmarks.json b/python/.asv/results/benchmarks.json index 49116c0912..a481045246 100644 --- a/python/.asv/results/benchmarks.json +++ b/python/.asv/results/benchmarks.json @@ -4204,8 +4204,49 @@ "version": "a2dfcb01776d52d843c514b129d2256a8031360caa00bc24b890b6dad9246191", "warmup_time": -1 }, + "resample.Resample.peakmem_resample": { + "code": "class Resample:\n @skip_for_params(TIME_PARAMS)\n def peakmem_resample(self, num_rows, downsampling_factor, col_type, aggregation):\n self.lib.read(col_type, date_range=self.date_range, query_builder=self.query_builder)\n\n def setup(self, num_rows, downsampling_factor, col_type, aggregation):\n if (\n col_type == \"datetime\"\n and aggregation == \"sum\"\n or col_type == \"str\"\n and aggregation in [\"sum\", \"mean\", \"min\", \"max\"]\n ):\n self.skipped = True\n raise NotImplementedError(f\"{aggregation} not supported on columns of type {col_type}\")\n \n self.skipped = False\n self.ac = Arctic(self.CONNECTION_STRING)\n self.lib = self.ac[self.LIB_NAME]\n self.date_range = (pd.Timestamp(0), pd.Timestamp(num_rows, unit=\"us\"))\n self.query_builder = QueryBuilder().resample(f\"{downsampling_factor}us\").agg({\"col\": aggregation})\n\n def setup_cache(self):\n start = time.time()\n self._setup_cache()\n self.logger.info(f\"SETUP_CACHE TIME: {time.time() - start}\")", + "name": "resample.Resample.peakmem_resample", + "param_names": [ + "num_rows", + "downsampling_factor", + "col_type", + "aggregation" + ], + "params": [ + [ + "3000000", + "10000000" + ], + [ + "10", + "100", + "100000" + ], + [ + "'bool'", + "'int'", + "'float'", + "'datetime'", + "'str'" + ], + [ + "'sum'", + "'mean'", + "'min'", + "'max'", + "'first'", + "'last'", + "'count'" + ] + ], + "setup_cache_key": "resample:53", + "type": "peakmemory", + "unit": "bytes", + "version": "20835daa6c6fd2af13420212c35b87cdac7153f782b2bc02cb2535131bd654d8" + }, "resample.Resample.time_resample": { - "code": "class Resample:\n def time_resample(self, num_rows, downsampling_factor, col_type, aggregation):\n self.lib.read(col_type, date_range=self.date_range, query_builder=self.query_builder)\n\n def setup(self, num_rows, downsampling_factor, col_type, aggregation):\n if (\n col_type == \"datetime\"\n and aggregation == \"sum\"\n or col_type == \"str\"\n and aggregation in [\"sum\", \"mean\", \"min\", \"max\"]\n ):\n self.skipped = True\n raise NotImplementedError(f\"{aggregation} not supported on columns of type {col_type}\")\n \n self.skipped = False\n self.ac = Arctic(self.CONNECTION_STRING)\n self.lib = self.ac[self.LIB_NAME]\n self.date_range = (pd.Timestamp(0), pd.Timestamp(num_rows, unit=\"us\"))\n self.query_builder = QueryBuilder().resample(f\"{downsampling_factor}us\").agg({\"col\": aggregation})\n\n def setup_cache(self):\n start = time.time()\n self._setup_cache()\n self.logger.info(f\"SETUP_CACHE TIME: {time.time() - start}\")", + "code": "class Resample:\n @skip_for_params(PEAKMEM_PARAMS)\n def time_resample(self, num_rows, downsampling_factor, col_type, aggregation):\n self.lib.read(col_type, date_range=self.date_range, query_builder=self.query_builder)\n\n def setup(self, num_rows, downsampling_factor, col_type, aggregation):\n if (\n col_type == \"datetime\"\n and aggregation == \"sum\"\n or col_type == \"str\"\n and aggregation in [\"sum\", \"mean\", \"min\", \"max\"]\n ):\n self.skipped = True\n raise NotImplementedError(f\"{aggregation} not supported on columns of type {col_type}\")\n \n self.skipped = False\n self.ac = Arctic(self.CONNECTION_STRING)\n self.lib = self.ac[self.LIB_NAME]\n self.date_range = (pd.Timestamp(0), pd.Timestamp(num_rows, unit=\"us\"))\n self.query_builder = QueryBuilder().resample(f\"{downsampling_factor}us\").agg({\"col\": aggregation})\n\n def setup_cache(self):\n start = time.time()\n self._setup_cache()\n self.logger.info(f\"SETUP_CACHE TIME: {time.time() - start}\")", "min_run_count": 2, "name": "resample.Resample.time_resample", "number": 5, @@ -4217,6 +4258,7 @@ ], "params": [ [ + "3000000", "10000000" ], [ @@ -4244,10 +4286,10 @@ "repeat": 0, "rounds": 2, "sample_time": 0.01, - "setup_cache_key": "resample:45", + "setup_cache_key": "resample:53", "type": "time", "unit": "seconds", - "version": "ce482344e844d70c65f71aebd7486d303d5f5c6370e1603234ffd7bb12d49435", + "version": "5a8f9ae968dbf11481dbe381bcb072c86f46190c9e10951ab2ec80ad9c9f116e", "warmup_time": -1 }, "resample.ResampleWide.peakmem_resample_wide": { @@ -4255,7 +4297,7 @@ "name": "resample.ResampleWide.peakmem_resample_wide", "param_names": [], "params": [], - "setup_cache_key": "resample:118", + "setup_cache_key": "resample:129", "type": "peakmemory", "unit": "bytes", "version": "53f042192048c92d282637c1bbcee9e52dacec9086c534782de30d7ff67e77eb" @@ -4270,7 +4312,7 @@ "repeat": 0, "rounds": 2, "sample_time": 0.01, - "setup_cache_key": "resample:118", + "setup_cache_key": "resample:129", "type": "time", "unit": "seconds", "version": "ece714f981e8de31ee8296644624bf8f5fb895e6bf48d64a6ae2a9c50c5db7a2", diff --git a/python/benchmarks/resample.py b/python/benchmarks/resample.py index 54acc48fc5..df7c8d40d6 100644 --- a/python/benchmarks/resample.py +++ b/python/benchmarks/resample.py @@ -11,12 +11,14 @@ import time import numpy as np import pandas as pd +import itertools +import random from arcticdb import Arctic from arcticdb import QueryBuilder from arcticdb.util.logger import get_logger from arcticdb.util.test import random_strings_of_length -from asv_runner.benchmarks.mark import skip_benchmark +from asv_runner.benchmarks.mark import skip_for_params class Resample: @@ -33,12 +35,18 @@ class Resample: "aggregation", ] params = [ - [10_000_000], # num_rows + [3_000_000, 10_000_000], # num_rows [10, 100, 100_000], # downsampling factor ["bool", "int", "float", "datetime", "str"], # col_type ["sum", "mean", "min", "max", "first", "last", "count"], # aggregation ] + # Peakmem params are tuned for the machine that runs the tests. It has 16 CPU threads and 24 IO threads. Having too + # much segments on leads to variability of ~20% in the processing pipeline because of the scheduling. 3_000_000 rows + # are 30 segments, a bit more than the number of IO threads but still not enough to cause large variability + PEAKMEM_PARAMS = list(filter(lambda x: x[0] == 3_000_000, itertools.product(*params))) + TIME_PARAMS = list(filter(lambda x: x[0] != 3_000_000, itertools.product(*params))) + def __init__(self): self.logger = get_logger() @@ -48,6 +56,8 @@ def setup_cache(self): self.logger.info(f"SETUP_CACHE TIME: {time.time() - start}") def _setup_cache(self): + random.seed(42) + np.random.seed(42) ac = Arctic(self.CONNECTION_STRING) ac.delete_library(self.LIB_NAME) lib = ac.create_library(self.LIB_NAME) @@ -100,10 +110,11 @@ def setup(self, num_rows, downsampling_factor, col_type, aggregation): self.date_range = (pd.Timestamp(0), pd.Timestamp(num_rows, unit="us")) self.query_builder = QueryBuilder().resample(f"{downsampling_factor}us").agg({"col": aggregation}) + @skip_for_params(PEAKMEM_PARAMS) def time_resample(self, num_rows, downsampling_factor, col_type, aggregation): self.lib.read(col_type, date_range=self.date_range, query_builder=self.query_builder) - @skip_benchmark + @skip_for_params(TIME_PARAMS) def peakmem_resample(self, num_rows, downsampling_factor, col_type, aggregation): self.lib.read(col_type, date_range=self.date_range, query_builder=self.query_builder)