diff --git a/src/datachain/lib/arrow.py b/src/datachain/lib/arrow.py index f9d77c23a..abbe30c56 100644 --- a/src/datachain/lib/arrow.py +++ b/src/datachain/lib/arrow.py @@ -126,7 +126,16 @@ def _process_record( if isinstance(kwargs.get("format"), CsvFileFormat): kwargs["format"] = "csv" arrow_file = ArrowRow(file=file, index=index, kwargs=kwargs) + + if self.output_schema and hasattr(vals[0], "source"): + # if we are reading parquet file written by datachain it might have + # source inside of it already, so we should not duplicate it, instead + # we are re-creating it of the self.source flag + vals[0].source = arrow_file # type: ignore[attr-defined] + + return vals return [arrow_file, *vals] + return vals def _process_non_datachain_record( diff --git a/tests/unit/lib/test_datachain.py b/tests/unit/lib/test_datachain.py index 954c94d28..dcf817f95 100644 --- a/tests/unit/lib/test_datachain.py +++ b/tests/unit/lib/test_datachain.py @@ -1760,6 +1760,22 @@ def test_read_parquet(tmp_dir, test_session): assert df_equal(df1, df) +def test_read_parquet_exported_with_source(test_session, tmp_dir): + path = tmp_dir / "df.parquet" + path2 = tmp_dir / "df2.parquet" + df = pd.DataFrame(DF_DATA) + + df.to_parquet(path) + dc.read_parquet(path, source=True).to_parquet(path2) + df1 = ( + dc.read_parquet(path2, source=True) + .select("first_name", "age", "city") + .to_pandas() + ) + + assert df_equal(df1, df) + + @skip_if_not_sqlite def test_read_parquet_in_memory(tmp_dir): df = pd.DataFrame(DF_DATA)