Skip to content

Commit

Permalink
Merge pull request #50 from boutproject/squashoutput-time-split
Browse files Browse the repository at this point in the history
Option to split output in time into several files
  • Loading branch information
johnomotani authored Aug 4, 2021
2 parents 3a66e6c + 3a34a8c commit 07a2473
Show file tree
Hide file tree
Showing 3 changed files with 386 additions and 59 deletions.
17 changes: 17 additions & 0 deletions boutdata/scripts/bout_squashoutput.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,23 @@ def int_or_none(string):
help="Read data in parallel. Value is the number of processes to use, pass 0 "
"to use as many as there are physical cores.",
)
parser.add_argument(
"-t",
"--time_split_size",
type=int,
default=None,
help="By default no splitting is done. If an integer value is passed, the "
"output is split into files with length in the t-dimension equal to that "
"value. The outputs are labelled by prefacing a counter (starting by default "
"at 0, but see time_split_first_label) to the file name before the .nc suffix.",
)
parser.add_argument(
"--time_split_first_label",
type=int,
default=0,
help="Value at which to start the counter labelling output files when "
"time_split_size is used.",
)

if argcomplete:
argcomplete.autocomplete(parser)
Expand Down
136 changes: 106 additions & 30 deletions boutdata/squashoutput.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ def squashoutput(
delete=False,
tind_auto=False,
parallel=False,
time_split_size=None,
time_split_first_label=0,
):
"""
Collect all data from BOUT.dmp.* files and create a single output file.
Expand Down Expand Up @@ -89,7 +91,16 @@ def squashoutput(
If set to True or 0, use the multiprocessing library to read data in parallel
with the maximum number of available processors. If set to an int, use that many
processes.
time_split_size : int, optional
By default no splitting is done. If an integer value is passed, the output is
split into files with length in the t-dimension equal to that value. The outputs
are labelled by prefacing a counter (starting by default at 0, but see
time_split_first_label) to the file name before the .nc suffix.
time_split_first_label : int, default 0
Value at which to start the counter labelling output files when time_split_size
is used.
"""
# use local imports to allow fast import for tab-completion
from boutdata.data import BoutOutputs
from boututils.datafile import DataFile
from boututils.boutarray import BoutArray
Expand All @@ -114,20 +125,16 @@ def squashoutput(
fullpath = os.path.join(datadir, outputname)

if append:
if time_split_size is not None:
raise ValueError("'time_split_size' is not compatible with append=True")
datadirnew = tempfile.mkdtemp(dir=datadir)
for f in glob.glob(datadir + "/BOUT.dmp.*.??"):
for f in glob.glob(os.path.join(datadir, "BOUT.dmp.*.??")):
if not quiet:
print("moving", f, flush=True)
shutil.move(f, datadirnew)
oldfile = datadirnew + "/" + outputname
datadir = datadirnew

if os.path.isfile(fullpath) and not append:
raise ValueError(
"{} already exists. Collect may try to read from this file, which is "
"presumably not desired behaviour.".format(fullpath)
)

# useful object from BOUT pylib to access output data
outputs = BoutOutputs(
datadir,
Expand All @@ -141,7 +148,23 @@ def squashoutput(
tind_auto=tind_auto,
parallel=parallel,
)

# Create file(s) for output and write data
filenames, t_slices = _get_filenames_t_slices(
time_split_size, time_split_first_label, fullpath, outputs.tind
)

if not append:
for f in filenames:
if os.path.isfile(f):
raise ValueError(
"{} already exists, squashoutput() will not overwrite. Also, "
"for some filenames collect may try to read from this file, which "
"is presumably not desired behaviour.".format(fullpath)
)

outputvars = outputs.keys()

# Read a value to cache the files
outputs[outputvars[0]]

Expand Down Expand Up @@ -173,29 +196,38 @@ def squashoutput(
"For some reason t_array has some duplicated entries in the new "
"and old file."
)
# Create single file for output and write data
with DataFile(fullpath, create=True, write=True, format=format, **kwargs) as f:
for varname in outputvars:
if not quiet:
print(varname, flush=True)

var = outputs[varname]
if append:
dims = outputs.dimensions[varname]
if "t" in dims:
var = var[cropnew:, ...]
varold = old[varname]
var = BoutArray(numpy.append(varold, var, axis=0), var.attributes)

if singleprecision:
if not isinstance(var, int):
var = BoutArray(numpy.float32(var), var.attributes)

f.write(varname, var)
# Write changes, free memory
f.sync()
var = None
gc.collect()
kwargs["format"] = format

files = [DataFile(name, create=True, write=True, **kwargs) for name in filenames]

for varname in outputvars:
if not quiet:
print(varname, flush=True)

var = outputs[varname]
dims = outputs.dimensions[varname]
if append:
if "t" in dims:
var = var[cropnew:, ...]
varold = old[varname]
var = BoutArray(numpy.append(varold, var, axis=0), var.attributes)

if singleprecision:
if not isinstance(var, int):
var = BoutArray(numpy.float32(var), var.attributes)

if "t" in dims:
for f, t_slice in zip(files, t_slices):
f.write(varname, var[t_slice])
else:
for f in files:
f.write(varname, var)

var = None
gc.collect()

for f in files:
f.close()

del outputs
gc.collect()
Expand All @@ -215,3 +247,47 @@ def squashoutput(
# Note that get_chunk_cache() returns a tuple, so we have to unpack it when
# passing to set_chunk_cache.
set_chunk_cache(*netcdf4_chunk_cache)


def _get_filenames_t_slices(time_split_size, time_split_first_label, fullpath, tind):
"""
Create the filenames and slices used for splitting output in time. If not
splitting, 'do nothing'.
Parameters
----------
time_split_size : int or None
See docstring of squashoutput().
time_split_first_label : int
See docstring of squashoutput().
fullpath : str
Path of the directory where data files are.
tind : slice
slice object applied to time dimension when reading data. Used to
calculate length of time dimension when time_split_size is set.
Returns
-------
filenames : list of str
File names to write output to.
t_slices : list of slice
Slices to be applied to the time dimension to select data for each
output file.
"""
if time_split_size is None:
return [fullpath], [slice(None)]
else:
# tind.stop - tind.start is the total number of t-indices ignoring the step.
# Adding tind.step - 1 and integer-dividing by tind.step converts to the total
# number accounting for the step.
nt = (tind.stop - tind.start + tind.step - 1) // tind.step
n_outputs = (nt + time_split_size - 1) // time_split_size
filenames = []
t_slices = []
for i in range(n_outputs):
parts = fullpath.split(".")
parts[-2] += str(time_split_first_label + i)
filename = ".".join(parts)
filenames.append(filename)
t_slices.append(slice(i * time_split_size, (i + 1) * time_split_size))
return filenames, t_slices
Loading

0 comments on commit 07a2473

Please sign in to comment.