Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 47 additions & 5 deletions python/.asv/results/benchmarks.json
Original file line number Diff line number Diff line change
Expand Up @@ -4204,8 +4204,49 @@
"version": "a2dfcb01776d52d843c514b129d2256a8031360caa00bc24b890b6dad9246191",
"warmup_time": -1
},
"resample.Resample.peakmem_resample": {
"code": "class Resample:\n @skip_for_params(TIME_PARAMS)\n def peakmem_resample(self, num_rows, downsampling_factor, col_type, aggregation):\n self.lib.read(col_type, date_range=self.date_range, query_builder=self.query_builder)\n\n def setup(self, num_rows, downsampling_factor, col_type, aggregation):\n if (\n col_type == \"datetime\"\n and aggregation == \"sum\"\n or col_type == \"str\"\n and aggregation in [\"sum\", \"mean\", \"min\", \"max\"]\n ):\n self.skipped = True\n raise NotImplementedError(f\"{aggregation} not supported on columns of type {col_type}\")\n \n self.skipped = False\n self.ac = Arctic(self.CONNECTION_STRING)\n self.lib = self.ac[self.LIB_NAME]\n self.date_range = (pd.Timestamp(0), pd.Timestamp(num_rows, unit=\"us\"))\n self.query_builder = QueryBuilder().resample(f\"{downsampling_factor}us\").agg({\"col\": aggregation})\n\n def setup_cache(self):\n start = time.time()\n self._setup_cache()\n self.logger.info(f\"SETUP_CACHE TIME: {time.time() - start}\")",
"name": "resample.Resample.peakmem_resample",
"param_names": [
"num_rows",
"downsampling_factor",
"col_type",
"aggregation"
],
"params": [
[
"3000000",
"10000000"
],
[
"10",
"100",
"100000"
],
[
"'bool'",
"'int'",
"'float'",
"'datetime'",
"'str'"
],
[
"'sum'",
"'mean'",
"'min'",
"'max'",
"'first'",
"'last'",
"'count'"
]
],
"setup_cache_key": "resample:52",
"type": "peakmemory",
"unit": "bytes",
"version": "20835daa6c6fd2af13420212c35b87cdac7153f782b2bc02cb2535131bd654d8"
},
"resample.Resample.time_resample": {
"code": "class Resample:\n def time_resample(self, num_rows, downsampling_factor, col_type, aggregation):\n self.lib.read(col_type, date_range=self.date_range, query_builder=self.query_builder)\n\n def setup(self, num_rows, downsampling_factor, col_type, aggregation):\n if (\n col_type == \"datetime\"\n and aggregation == \"sum\"\n or col_type == \"str\"\n and aggregation in [\"sum\", \"mean\", \"min\", \"max\"]\n ):\n self.skipped = True\n raise NotImplementedError(f\"{aggregation} not supported on columns of type {col_type}\")\n \n self.skipped = False\n self.ac = Arctic(self.CONNECTION_STRING)\n self.lib = self.ac[self.LIB_NAME]\n self.date_range = (pd.Timestamp(0), pd.Timestamp(num_rows, unit=\"us\"))\n self.query_builder = QueryBuilder().resample(f\"{downsampling_factor}us\").agg({\"col\": aggregation})\n\n def setup_cache(self):\n start = time.time()\n self._setup_cache()\n self.logger.info(f\"SETUP_CACHE TIME: {time.time() - start}\")",
"code": "class Resample:\n @skip_for_params(PEAKMEM_PARAMS)\n def time_resample(self, num_rows, downsampling_factor, col_type, aggregation):\n self.lib.read(col_type, date_range=self.date_range, query_builder=self.query_builder)\n\n def setup(self, num_rows, downsampling_factor, col_type, aggregation):\n if (\n col_type == \"datetime\"\n and aggregation == \"sum\"\n or col_type == \"str\"\n and aggregation in [\"sum\", \"mean\", \"min\", \"max\"]\n ):\n self.skipped = True\n raise NotImplementedError(f\"{aggregation} not supported on columns of type {col_type}\")\n \n self.skipped = False\n self.ac = Arctic(self.CONNECTION_STRING)\n self.lib = self.ac[self.LIB_NAME]\n self.date_range = (pd.Timestamp(0), pd.Timestamp(num_rows, unit=\"us\"))\n self.query_builder = QueryBuilder().resample(f\"{downsampling_factor}us\").agg({\"col\": aggregation})\n\n def setup_cache(self):\n start = time.time()\n self._setup_cache()\n self.logger.info(f\"SETUP_CACHE TIME: {time.time() - start}\")",
"min_run_count": 2,
"name": "resample.Resample.time_resample",
"number": 5,
Expand All @@ -4217,6 +4258,7 @@
],
"params": [
[
"3000000",
"10000000"
],
[
Expand Down Expand Up @@ -4244,18 +4286,18 @@
"repeat": 0,
"rounds": 2,
"sample_time": 0.01,
"setup_cache_key": "resample:45",
"setup_cache_key": "resample:52",
"type": "time",
"unit": "seconds",
"version": "ce482344e844d70c65f71aebd7486d303d5f5c6370e1603234ffd7bb12d49435",
"version": "5a8f9ae968dbf11481dbe381bcb072c86f46190c9e10951ab2ec80ad9c9f116e",
"warmup_time": -1
},
"resample.ResampleWide.peakmem_resample_wide": {
"code": "class ResampleWide:\n def peakmem_resample_wide(self):\n self.lib.read(self.SYM, query_builder=self.query_builder)\n\n def setup(self):\n self.ac = Arctic(self.CONNECTION_STRING)\n self.lib = self.ac[self.LIB_NAME]\n aggs = dict()\n for col in self.COLS:\n aggs[col] = \"last\"\n self.query_builder = QueryBuilder().resample(\"30us\").agg(aggs)\n\n def setup_cache(self):\n ac = Arctic(self.CONNECTION_STRING)\n ac.delete_library(self.LIB_NAME)\n lib = ac.create_library(self.LIB_NAME)\n rng = np.random.default_rng()\n num_rows = 3000\n index = pd.date_range(pd.Timestamp(0, unit=\"us\"), freq=\"us\", periods=num_rows)\n data = dict()\n for col in self.COLS:\n data[col] = 100 * rng.random(num_rows, dtype=np.float64)\n df = pd.DataFrame(data, index=index)\n lib.write(self.SYM, df)",
"name": "resample.ResampleWide.peakmem_resample_wide",
"param_names": [],
"params": [],
"setup_cache_key": "resample:118",
"setup_cache_key": "resample:126",
"type": "peakmemory",
"unit": "bytes",
"version": "53f042192048c92d282637c1bbcee9e52dacec9086c534782de30d7ff67e77eb"
Expand All @@ -4270,7 +4312,7 @@
"repeat": 0,
"rounds": 2,
"sample_time": 0.01,
"setup_cache_key": "resample:118",
"setup_cache_key": "resample:126",
"type": "time",
"unit": "seconds",
"version": "ece714f981e8de31ee8296644624bf8f5fb895e6bf48d64a6ae2a9c50c5db7a2",
Expand Down
14 changes: 11 additions & 3 deletions python/benchmarks/resample.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@
import time
import numpy as np
import pandas as pd
import itertools

from arcticdb import Arctic
from arcticdb import QueryBuilder
from arcticdb.util.logger import get_logger
from arcticdb.util.test import random_strings_of_length
from asv_runner.benchmarks.mark import skip_benchmark
from asv_runner.benchmarks.mark import skip_for_params


class Resample:
Expand All @@ -33,12 +34,18 @@ class Resample:
"aggregation",
]
params = [
[10_000_000], # num_rows
[3_000_000, 10_000_000], # num_rows
[10, 100, 100_000], # downsampling factor
["bool", "int", "float", "datetime", "str"], # col_type
["sum", "mean", "min", "max", "first", "last", "count"], # aggregation
]

# Peakmem params are tuned for the machine that runs the tests. It has 16 CPU threads and 24 IO threads. Having too
# much segments on leads to variability of ~20% in the processing pipeline because of the scheduling. 3_000_000 rows
# are 30 segments, a bit more than the number of IO threads but still not enough to cause large variability
PEAKMEM_PARAMS = list(filter(lambda x: x[0] == 3_000_000, itertools.product(*params)))
TIME_PARAMS = list(filter(lambda x: x[0] != 3_000_000, itertools.product(*params)))

def __init__(self):
self.logger = get_logger()

Expand Down Expand Up @@ -100,10 +107,11 @@ def setup(self, num_rows, downsampling_factor, col_type, aggregation):
self.date_range = (pd.Timestamp(0), pd.Timestamp(num_rows, unit="us"))
self.query_builder = QueryBuilder().resample(f"{downsampling_factor}us").agg({"col": aggregation})

@skip_for_params(PEAKMEM_PARAMS)
def time_resample(self, num_rows, downsampling_factor, col_type, aggregation):
self.lib.read(col_type, date_range=self.date_range, query_builder=self.query_builder)

@skip_benchmark
@skip_for_params(TIME_PARAMS)
def peakmem_resample(self, num_rows, downsampling_factor, col_type, aggregation):
self.lib.read(col_type, date_range=self.date_range, query_builder=self.query_builder)

Expand Down
Loading