Skip to content

Commit f331fb0

Browse files
Rechunker script (AxFoundation#686)
* Rechunker script * Update rechunker * update tool * fix codefactor * update testss * fix typo * fix * add a docstring * Add reference * also point to more info * Add backend_key * add to bin * update target size * disable codefactor * move * fix iterator * update documentation * fix docs
1 parent 72af4ca commit f331fb0

File tree

7 files changed

+356
-4
lines changed

7 files changed

+356
-4
lines changed

bin/rechunker

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
#!/usr/bin/env python
2+
import os.path
3+
import argparse
4+
import strax
5+
import pandas as pd
6+
7+
def parse_args():
8+
parser = argparse.ArgumentParser(
9+
description="Rechunker for FileSytemBackend. Interfaces with strax.rechunker."
10+
"Please see the documentation of strax.rechunker for more information: "
11+
"github.com/AxFoundation/strax/blob/31c114c5f8329e53289d5127fb2125e71c3d6aae/strax/storage/files.py#L371")
12+
parser.add_argument(
13+
'--source',
14+
type=str,
15+
help="Target directory to rechunk, should be a folder in a "
16+
"strax.DataDrictory (one datatype)")
17+
parser.add_argument(
18+
'--dest', '--destination',
19+
default=None,
20+
dest='dest',
21+
type=str,
22+
help='Where to store rechunked data. If nothing is specified, replace the source.',
23+
)
24+
parser.add_argument(
25+
'--compressor',
26+
choices=list(strax.io.COMPRESSORS.keys()),
27+
help="Recompress using one of these compressors. If nothing specified, "
28+
"use the same compressor as the source")
29+
parser.add_argument(
30+
'--rechunk',
31+
default=True,
32+
choices=[True, False],
33+
type=bool,
34+
help="rechunk the data")
35+
parser.add_argument(
36+
'--target_size_mb', '--target-size-mb',
37+
dest='target_size_mb',
38+
type=int,
39+
default=strax.default_chunk_size_mb,
40+
help="Target size MB (uncompressed) of the rechunked data")
41+
parser.add_argument(
42+
'--write_stats_to', '--write-stats-to',
43+
dest='write_stats_to',
44+
type=str,
45+
default=None,
46+
help="Write some information to this file (csv format)")
47+
args = parser.parse_args()
48+
return args
49+
50+
51+
def main():
52+
args = parse_args()
53+
source_mb = strax.utils.dir_size_mb(args.source)
54+
report = strax.rechunker(source_directory=args.source,
55+
dest_directory=args.dest,
56+
replace=args.dest is None,
57+
compressor=args.compressor,
58+
target_size_mb=args.target_size_mb,
59+
rechunk=args.rechunk,
60+
)
61+
if args.dest is not None:
62+
recompressed_mb = strax.utils.dir_size_mb(args.dest)
63+
else:
64+
recompressed_mb = strax.utils.dir_size_mb(args.source)
65+
report.update(dict(source_mb=source_mb,
66+
dest_mb=recompressed_mb)
67+
)
68+
if args.write_stats_to:
69+
if os.path.exists(args.write_stats_to):
70+
df = pd.read_csv(args.write_stats_to)
71+
else:
72+
df = pd.DataFrame()
73+
df_new = pd.concat(
74+
[df,
75+
pd.DataFrame({k: [v] for k, v in report.items()})
76+
])
77+
df_new.to_csv(args.write_stats_to, index=False)
78+
79+
print(f'Re-compressed {args.source}')
80+
for k, v in report.items():
81+
print(f'\t{k:16}\t{v}')
82+
83+
84+
if __name__ == '__main__':
85+
main()

docs/source/advanced/recompression.rst

Lines changed: 66 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
Recompressing & moving data
22
===========================
3+
There are two options for recompressing data:
4+
- via the context :py:func:`context.copy_to_frontend`
5+
- via a dedicated script ``rechunker`` that only works for filesystem backends and works outside the context.
6+
37
In order to recompress data with another compression algorithm the
48
:py:func:`context.copy_to_frontend` function can be used.
5-
The function works on a per run_id, per datatype basis. In the example
9+
The function works on a per run_id-, per datatype- basis. In the example
610
below, peaks data is copied to a second frontend.
711

812

@@ -134,4 +138,64 @@ re-writing the data to disk (as done folder_C in the example above).
134138
As such, for further use, it does not matter if the data is coming from
135139
either of folders folder_A-folder_C as the metadata will tell strax
136140
which compressor to use. Different compressors may have different
137-
performance for loading/writing data.
141+
performance for loading/writing data.
142+
143+
Rechunker script
144+
================
145+
From strax v1.2.2 onwards, a ``rechunker`` script is automatically installed with strax.
146+
It can be used to re-write data in the ``FileSystem`` backend.
147+
148+
149+
For example:
150+
151+
.. code-block:: bash
152+
153+
rechunker --source 009104-raw_records_aqmon-rfzvpzj4mf --compressor zstd
154+
155+
will output:
156+
157+
158+
.. code-block:: rst
159+
160+
Will write to /tmp/tmpoj0xpr78 and make sub-folder 009104-raw_records_aqmon-rfzvpzj4mf
161+
Rechunking 009104-raw_records_aqmon-rfzvpzj4mf to /tmp/tmpoj0xpr78/009104-raw_records_aqmon-rfzvpzj4mf
162+
move /tmp/tmpoj0xpr78/009104-raw_records_aqmon-rfzvpzj4mf to 009104-raw_records_aqmon-rfzvpzj4mf
163+
Re-compressed 009104-raw_records_aqmon-rfzvpzj4mf
164+
backend_key 009104-raw_records_aqmon-rfzvpzj4mf
165+
load_time 0.4088103771209717
166+
write_time 0.07699322700500488
167+
uncompressed_mb 1.178276
168+
source_compressor zstd
169+
dest_compressor zstd
170+
source_mb 0.349217
171+
dest_mb 0.349218
172+
173+
Using script to profile write/read rates for compressors
174+
--------------------------------------------------------
175+
This script can easily be used to profile different compressors:
176+
177+
.. code-block:: bash
178+
179+
for COMPRESSOR in zstd bz2 lz4 blosc zstd; \
180+
do echo $COMPRESSOR; \
181+
rechunker \
182+
--source 009104-raw_records-rfzvpzj4mf \
183+
--write_stats_to test.csv \
184+
--compressor $COMPRESSOR; \
185+
done
186+
187+
We can check the output in python using:
188+
189+
.. code-block:: python
190+
191+
>>> import pandas as pd
192+
>>> df = pd.read_csv('test.csv')
193+
>>> df['read_mbs'] = df['uncompressed_mb']/df['load_time']
194+
>>> df['write_mbs'] = df['uncompressed_mb']/df['write_time']
195+
>>> print(df[['source_compressor', 'read_mbs', 'dest_compressor', 'write_mbs']].to_string())
196+
source_compressor read_mbs dest_compressor write_mbs
197+
0 zstd 313.922890 zstd 298.429123
198+
1 zstd 284.530054 bz2 8.932259
199+
2 bz2 20.289876 lz4 228.932498
200+
3 lz4 372.491150 blosc 433.494794
201+
4 blosc 725.154966 zstd 215.765177

setup.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ def open_requirements(path):
3434
extras_require={
3535
'docs': docs_requires
3636
},
37+
scripts=[
38+
'bin/rechunker',
39+
],
3740
long_description_content_type="text/markdown",
3841
packages=setuptools.find_packages() + ['extra_requirements'],
3942
package_dir={'extra_requirements': 'extra_requirements'},

strax/context.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1855,11 +1855,24 @@ def copy_to_frontend(self,
18551855
# Need to load a new loader each time since it's a generator
18561856
# and will be exhausted otherwise.
18571857
loader = s_be.loader(s_be_key)
1858+
1859+
def wrapped_loader():
1860+
"""Wrapped loader for changing the target_size_mb"""
1861+
while True:
1862+
try:
1863+
# pylint: disable=cell-var-from-loop
1864+
data = next(loader)
1865+
# Update target chunk size for re-chunking
1866+
data.target_size_mb = md['chunk_target_size_mb']
1867+
except StopIteration:
1868+
return
1869+
yield data
1870+
18581871
# Fill the target buffer
18591872
t_be_str, t_be_key = t_sf.find(data_key, write=True)
18601873
target_be = t_sf._get_backend(t_be_str)
18611874
saver = target_be._saver(t_be_key, md)
1862-
saver.save_from(loader, rechunk=rechunk)
1875+
saver.save_from(wrapped_loader(), rechunk=rechunk)
18631876
except NotImplementedError:
18641877
# Target is not susceptible
18651878
continue

strax/storage/files.py

Lines changed: 110 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
import tempfile
44
import os
55
import os.path as osp
6-
6+
import typing
7+
import time
78
from bson import json_util
89
import shutil
910

@@ -364,3 +365,111 @@ def _close(self):
364365
@export
365366
class InvalidFolderNameFormat(Exception):
366367
pass
368+
369+
370+
@export
371+
def rechunker(source_directory:str,
372+
dest_directory: typing.Optional[str] = None,
373+
replace: bool = False,
374+
compressor: typing.Optional[str] = None,
375+
target_size_mb: typing.Optional[str] = None,
376+
rechunk: bool = True,
377+
)-> dict:
378+
"""
379+
Rechunk/Recompress a strax-datatype saved in a FileSystemBackend
380+
outside of a strax.Context. For a user-friendly context centered
381+
alternative, see strax.Context.copy_to_frontend:
382+
github.com/AxFoundation/strax/blob/a0d51fdd3bea52c228c8f74c614fc77bb7cf1bc5/strax/context.py#L1792 # noqa
383+
384+
One can either specify a destination directory where to store a new
385+
copy of this data with <dest_directory> or replace the input file
386+
with it's rechunked version.
387+
388+
This function can either:
389+
- rechunk (if <rechunk? is True), probably incrementing the
390+
<target_size_mb> is also useful (create larger chunks)
391+
- recompress (if a <compressor> is specified)
392+
393+
One can also rechunk and recompress simultaneously
394+
395+
:param source_directory: Path to a folder containing a single
396+
strax.DataType.
397+
:param dest_directory: Head of a folder whereto write new data. If
398+
nothing is specified, write to a temporary directory.
399+
:param replace: Delete the source_directory and replace it with it's
400+
rechunked version
401+
:param compressor: Compressor to be used in saving the rechunked
402+
data
403+
:param target_size_mb: Target size of chunks (uncompressed). As long
404+
as a chunk is smaller than this many MB, keep adding new MBs
405+
until the chunk is at least target_size_mb or we run out of
406+
chunks.
407+
:param rechunk: Do we want to rechunk?
408+
:return: Dictionary with some information on the write/load times
409+
involved.
410+
"""
411+
if not os.path.exists(source_directory):
412+
raise FileNotFoundError(f'No file at {source_directory}')
413+
if not replace and dest_directory is None:
414+
raise ValueError(f'Specify a destination path <dest_file> when '
415+
f'not replacing the original path')
416+
backend_key = os.path.basename(os.path.normpath(source_directory))
417+
418+
if dest_directory is None and replace:
419+
_temp_dir = tempfile.TemporaryDirectory()
420+
dest_directory = _temp_dir.name
421+
else:
422+
_temp_dir = False
423+
424+
425+
if os.path.basename(os.path.normpath(dest_directory)) != backend_key:
426+
# New key should be correct! If there is not an exact match,
427+
# we want to make sure that we append the backend_key correctly
428+
print(f'Will write to {dest_directory} and make sub-folder {backend_key}')
429+
dest_directory = os.path.join(dest_directory, backend_key)
430+
backend = strax.FileSytemBackend()
431+
meta_data = backend.get_metadata(source_directory)
432+
source_compressor = meta_data['compressor']
433+
434+
if compressor is not None:
435+
meta_data['compressor'] = compressor
436+
if target_size_mb is not None:
437+
meta_data['chunk_target_size_mb'] = target_size_mb
438+
439+
data_loader = backend.loader(source_directory)
440+
441+
load_time_seconds = []
442+
def loader():
443+
"""Wrapped loader for bookkeeping load time"""
444+
while True:
445+
try:
446+
t0 = time.time()
447+
data = next(data_loader)
448+
# Update target chunk size for re-chunking
449+
data.target_size_mb = meta_data['chunk_target_size_mb']
450+
load_time_seconds.append(time.time()-t0)
451+
except StopIteration:
452+
return
453+
yield data
454+
print(f'Rechunking {source_directory} to {dest_directory}')
455+
saver = backend._saver(dest_directory, metadata=meta_data)
456+
457+
write_time_start = time.time()
458+
saver.save_from(loader(), rechunk=rechunk)
459+
load_time = sum(load_time_seconds)
460+
write_time = time.time() - write_time_start - load_time
461+
462+
if replace:
463+
print(f'move {dest_directory} to {source_directory}')
464+
shutil.rmtree(source_directory)
465+
shutil.move(dest_directory, source_directory)
466+
if _temp_dir:
467+
_temp_dir.cleanup()
468+
469+
return dict(backend_key=backend_key,
470+
load_time=load_time,
471+
write_time=write_time,
472+
uncompressed_mb= sum([x['nbytes'] for x in meta_data['chunks']]) / 1e6,
473+
source_compressor=source_compressor,
474+
dest_compressor=meta_data['compressor'],
475+
)

strax/utils.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import pandas as pd
1919
from collections.abc import Mapping
2020
from warnings import warn
21+
import os
2122

2223

2324
# Change numba's caching backend from pickle to dill
@@ -696,3 +697,22 @@ def apply_selection(x,
696697
del x2
697698

698699
return x
700+
701+
702+
def dir_size_mb(start_path ='.'):
703+
"""
704+
Calculate the total size of all files in start_path
705+
Thanks https://stackoverflow.com/a/1392549/18280620
706+
"""
707+
if not os.path.exists(start_path):
708+
return 0
709+
710+
total_size = 0
711+
for dirpath, dirnames, filenames in os.walk(start_path):
712+
for f in filenames:
713+
fp = os.path.join(dirpath, f)
714+
# skip if it is symbolic link
715+
if not os.path.islink(fp):
716+
total_size += os.path.getsize(fp)
717+
718+
return total_size / 1e6

0 commit comments

Comments
 (0)