Skip to content

Fix for reading exported parquet #1071

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Jul 9, 2025
Merged
9 changes: 9 additions & 0 deletions src/datachain/lib/arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,16 @@ def _process_record(
if isinstance(kwargs.get("format"), CsvFileFormat):
kwargs["format"] = "csv"
arrow_file = ArrowRow(file=file, index=index, kwargs=kwargs)

if self.output_schema and hasattr(vals[0], "source"):
# if we are reading parquet file written by datachain it might have
# source inside of it already, so we should not duplicate it, instead
# we are re-creating it
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cleanup the comment a bit ... (no extra line is needed)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's also add to docs (source=True) - that if it enabled and the file already has it - it will be rewritten (e.g. when file was generated by datachain before)

# of the self.source flag
vals[0].source = arrow_file # type: ignore[attr-defined]
return vals
return [arrow_file, *vals]

return vals

def _process_non_datachain_record(
Expand Down
16 changes: 16 additions & 0 deletions tests/unit/lib/test_datachain.py
Original file line number Diff line number Diff line change
Expand Up @@ -1686,6 +1686,22 @@ def test_read_parquet(tmp_dir, test_session):
assert df_equal(df1, df)


def test_read_parquet_exported_with_source(test_session, tmp_dir):
path = tmp_dir / "df.parquet"
path2 = tmp_dir / "df2.parquet"
df = pd.DataFrame(DF_DATA)

df.to_parquet(path)
dc.read_parquet(path, source=True).to_parquet(path2)
df1 = (
dc.read_parquet(path2, source=True)
.select("first_name", "age", "city")
.to_pandas()
)

assert df_equal(df1, df)


@skip_if_not_sqlite
def test_read_parquet_in_memory(tmp_dir):
df = pd.DataFrame(DF_DATA)
Expand Down