Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better documentation of behaviour with irregular source chunks #33

Open
mjwillson opened this issue Aug 18, 2021 · 5 comments
Open

Better documentation of behaviour with irregular source chunks #33

mjwillson opened this issue Aug 18, 2021 · 5 comments

Comments

@mjwillson
Copy link

mjwillson commented Aug 18, 2021

Hiya!

In some source datasets the original chunks have irregular sizes. For example one netcdf file per year, where some years are leap years hence shorter than others.

Although the data model would seem to support this in principle, I couldn't get Rechunk or ConsolidateChunks to work without regular source chunk sizes.

Is this a fundamental limitation or something that could be in scope to address? And do you have any recommendations for getting data like this into regular chunks? In the leap year case, first splitting the source chunks into one chunk per day using SplitChunks could be an option. Although in general this would require splitting into chunks of gcd(*all_possible_source_chunk_sizes) which could be too small.

@shoyer
Copy link
Member

shoyer commented Aug 18, 2021

This should totally work! This happens internally in rechunking all the time, if the source and destination chunks are not a multiple of the other.

If you can provide an reproducible example, that would be very helpful for narrowing this down.

And do you have any recommendations for getting data like this into regular chunks? In the leap year case, first splitting the source chunks into one chunk per day using SplitChunks could be an option. Although in general this would require splitting into chunks of gcd(*all_possible_source_chunk_sizes) which could be too small.

You can do this via SplitChunks followed by ConsolidateChunks with the same chunks, as described this section of the docs:
https://xarray-beam.readthedocs.io/en/latest/rechunking.html#low-level-rechunking

Xarray-Beam uses a smarter algorithm than splitting into the GCD of all source chunk sizes. Instead, it should only make just the right irregular chunks (in SplitChunks) such that they can be joined together into the new regular chunks (in ConsolidateChunks).

@shoyer
Copy link
Member

shoyer commented Aug 18, 2021

To give a concrete example, here's what it would look like to rechunk "per year" daily data into a chunk-size of exactly 365 days:

import apache_beam as beam
import pandas as pd
import numpy as np
import xarray
import xarray_beam as xbeam

def make_chunks():
  offset = 0
  for year in range(2010, 2020):
    time = pd.date_range(f'{year}-01-01', f'{year}-12-31', freq='D')
    key = xbeam.Key({'time': offset})
    chunk = xarray.Dataset({'foo': ('time', np.zeros(time.size)), 'time': time})
    yield key, chunk
    offset += time.size

print(
    make_chunks()
    | beam.MapTuple(lambda k, v: v.sizes['time'])
)
# [365, 365, 366, 365, 365, 365, 366, 365, 365, 365]

print(
    make_chunks()
    | xbeam.SplitChunks({'time': 365})
    | beam.MapTuple(lambda k, v: v.sizes['time'])
)
# [365, 365, 365, 1, 364, 1, 364, 1, 364, 1, 364, 2, 363, 2, 363, 2, 363, 2]

print(
    make_chunks()
    | xbeam.SplitChunks({'time': 365})
    | xbeam.ConsolidateChunks({'time': 365})
    | beam.MapTuple(lambda k, v: v.sizes['time'])
)
# [365, 365, 365, 365, 365, 365, 365, 365, 365, 365, 2]

@mjwillson
Copy link
Author

Ah that's great, thanks! This solves my problem, but just to give a bit more detail why I was confused about this:

With Rechunk, it explicitly asks for a dict of source_chunks: sizes of source chunks so not clear at all what you would pass for this if the source chunks are irregular, I tried passing None, {} and {"time": None} none of which worked. I now see that if I pass source_chunks={"time": some_arbitrary_incorrect_number} it seems to work, although I'd be wary relying on this?

With ConsolidateChunks: I now see that this does work provided that each consecutive grouping of chunks adds up to the exact requested target chunk size. I was trying to use it without first splitting the chunks into this form, and got a rather opaque AssertionError. A simplified example of what I was attempting:

inputs = [
  (xb.Key({"time": 0}, {"foo"}), xa.Dataset({"foo": (("time",), np.zeros(2))})),
  (xb.Key({"time": 2}, {"foo"}), xa.Dataset({"foo": (("time",), np.ones(7))})),
  (xb.Key({"time": 9}, {"foo"}), xa.Dataset({"foo": (("time",), np.ones(3))})),
]
with beam.Pipeline() as p:
    p | beam.Create(inputs) | xb.ConsolidateChunks(target_chunks={"time": 7})

=> AssertionError: (Key(offsets={'time': 7}, vars={'foo'}), Key(offsets={'time': 9}, vars={'foo'}))

My mistake although a better error would help here.

I also wasn't aware that SplitChunks would work the way it does with irregular chunks (with consecutive groupings in its output summing to the requested chunk size), I had assumed it would split each input chunk independently in a way that wouldn't guarantee this invariant, or that it would only split into an exact divisor of the source chunk size. Although now I see the example under low-level rechunking which clarifies things a bit.

It would help if these functions (SplitChunks, ConsolidateChunks, Rechunk) had some explanation in their docstrings about what invariants they require in their input and guarantee in their output. And for Rechunk, some guidance on whether irregular chunk sizes are supported and if so, what to pass for source_chunks.

Your example above would be great to include in the docs too, to demonstrate how to work with irregularly-chunked source data :)

Thanks again!

@mjwillson mjwillson changed the title Support for irregular source chunks Better documentation of behaviour with irregular source chunks Aug 19, 2021
@shoyer
Copy link
Member

shoyer commented Aug 20, 2021

With Rechunk, it explicitly asks for a dict of source_chunks: sizes of source chunks so not clear at all what you would pass for this if the source chunks are irregular, I tried passing None, {} and {"time": None} none of which worked. I now see that if I pass source_chunks={"time": some_arbitrary_incorrect_number} it seems to work, although I'd be wary relying on this?

Indeed, Rechunk can safely handle arbitrary source chunks. They are only used by the heuristic for determining intermediate chunks, so it's OK if they are not exact. The algorithm should handle small descrepancies (like 365 vs 366 elements) just fine.

I agree that this would all be well worth documenting!

@shoyer
Copy link
Member

shoyer commented Aug 21, 2021

With ConsolidateChunks: I now see that this does work provided that each consecutive grouping of chunks adds up to the exact requested target chunk size. I was trying to use it without first splitting the chunks into this form, and got a rather opaque AssertionError. A simplified example of what I was attempting:

inputs = [
  (xb.Key({"time": 0}, {"foo"}), xa.Dataset({"foo": (("time",), np.zeros(2))})),
  (xb.Key({"time": 2}, {"foo"}), xa.Dataset({"foo": (("time",), np.ones(7))})),
  (xb.Key({"time": 9}, {"foo"}), xa.Dataset({"foo": (("time",), np.ones(3))})),
]
with beam.Pipeline() as p:
    p | beam.Create(inputs) | xb.ConsolidateChunks(target_chunks={"time": 7})

=> AssertionError: (Key(offsets={'time': 7}, vars={'foo'}), Key(offsets={'time': 9}, vars={'foo'}))

My mistake although a better error would help here.

Thinking about this case a little more, I think we could actually probably safely remove this assertion error (after adding suitable documentation).

The behavior of ConsolidateChunks would then be simply to group together all chunks starting within the given multiple of the origin. If existing chunks don't exactly align with the multiples of the desired chunks, then you'll end up with irregular groups, but they'll be roughly the same size. In this example, you'd have one chunk of size 9 and another of size 3.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants