diff --git a/digital_land/package/dataset_parquet.py b/digital_land/package/dataset_parquet.py index 365c73ed..54a62935 100644 --- a/digital_land/package/dataset_parquet.py +++ b/digital_land/package/dataset_parquet.py @@ -54,11 +54,16 @@ def __init__(self, dataset, path, duckdb_path=None, **kwargs): self.typology = self.specification.schema[dataset]["typology"] # set up key file paths - self.fact_path = self.path / f"dataset={self.dataset}" / "fact.parquet" + self.fact_path = self.path / "fact" / f"dataset={self.dataset}" / "fact.parquet" self.fact_resource_path = ( - self.path / f"dataset={self.dataset}" / "fact_resource.parquet" + self.path + / "fact-resource" + / f"dataset={self.dataset}" + / "fact-resource.parquet" + ) + self.entity_path = ( + self.path / "entity" / f"dataset={self.dataset}" / "entity.parquet" ) - self.entity_path = self.path / f"dataset={self.dataset}" / "entity.parquet" def get_schema(self): schema = {} @@ -124,7 +129,7 @@ def load_facts(self, transformed_parquet_dir): """ This method loads facts into a fact table from a directory containing all transformed files as parquet files """ - output_path = self.path / f"dataset={self.dataset}" / "fact.parquet" + output_path = self.fact_path output_path.parent.mkdir(parents=True, exist_ok=True) logging.info("loading facts from temp table") @@ -151,7 +156,7 @@ def load_facts(self, transformed_parquet_dir): def load_fact_resource(self, transformed_parquet_dir): logging.info(f"loading fact resources from {str(transformed_parquet_dir)}") - output_path = self.path / f"dataset={self.dataset}" / "fact_resource.parquet" + output_path = self.fact_resource_path output_path.parent.mkdir(parents=True, exist_ok=True) fact_resource_fields = self.specification.schema["fact-resource"]["fields"] fields_str = ", ".join( @@ -172,32 +177,6 @@ def load_fact_resource(self, transformed_parquet_dir): """ ) - # def combine_parquet_files(input_path,output_path): - # """ - # This method combines multiple parquet files into a single parquet file - # """ - # # check input path is a directory using Path - # if not Path(input_path).is_dir(): - # raise ValueError("Input path must be a directory") - - # # check output_path is a file that doesn't exist - # if not Path(output_path).is_file(): - # raise ValueError("Output path must be a file") - - # # use self.conn to use duckdb to combine files - # sql = f""" - # COPY (select * from parquet_scan('{input_path}/*.parquet')) TO '{output_path}' (FORMAT PARQUET); - # """ - # self.conn.execute(sql) - - # # Combine all the parquet files into a single parquet file - # combined_df = pd.concat( - # [pd.read_parquet(f"{input_path}/{file}") for file in parquet_files] - # ) - - # # Save the combined dataframe to a parquet file - # combined_df.to_parquet(output_path, index=False) - def load_entities_range( self, transformed_parquet_dir, @@ -398,7 +377,7 @@ def combine_parquet_files(self, input_path, output_path): self.conn.execute(sql) def load_entities(self, transformed_parquet_dir, resource_path, organisation_path): - output_path = self.path / f"dataset={self.dataset}" / "entity.parquet" + output_path = self.entity_path output_path.parent.mkdir(parents=True, exist_ok=True) # retrieve entity counnts including and minimum diff --git a/tests/integration/package/test_dataset_parquet.py b/tests/integration/package/test_dataset_parquet.py index 0f004295..f94244dd 100644 --- a/tests/integration/package/test_dataset_parquet.py +++ b/tests/integration/package/test_dataset_parquet.py @@ -380,7 +380,11 @@ def test_load_facts_single_file(data: dict, expected: int, tmp_path): package.load_facts(transformed_parquet_dir=transformed_parquet_dir) output_file = ( - tmp_path / "conservation-area" / "dataset=conservation-area" / "fact.parquet" + tmp_path + / "conservation-area" + / "fact" + / "dataset=conservation-area" + / "fact.parquet" ) assert os.path.exists(output_file), "fact.parquet file does not exist" @@ -421,7 +425,11 @@ def test_load_facts_multiple_files(data1, data2, expected, tmp_path): package.load_facts(transformed_parquet_dir=transformed_parquet_dir) output_file = ( - tmp_path / "conservation-area" / "dataset=conservation-area" / "fact.parquet" + tmp_path + / "conservation-area" + / "fact" + / "dataset=conservation-area" + / "fact.parquet" ) assert os.path.exists(output_file), "fact.parquet file does not exist" @@ -472,7 +480,11 @@ def test_load_facts_one_file_with_empty_file(data, expected, tmp_path): package.load_facts(transformed_parquet_dir=transformed_parquet_dir) output_file = ( - tmp_path / "conservation-area" / "dataset=conservation-area" / "fact.parquet" + tmp_path + / "conservation-area" + / "fact" + / "dataset=conservation-area" + / "fact.parquet" ) assert os.path.exists(output_file), "fact.parquet file does not exist" @@ -504,8 +516,9 @@ def test_load_fact_resource_single_file(data, expected, tmp_path): output_file = ( tmp_path / "conservation-area" + / "fact-resource" / "dataset=conservation-area" - / "fact_resource.parquet" + / "fact-resource.parquet" ) assert os.path.exists(output_file), "fact-resource.parquet file does not exist" @@ -544,8 +557,9 @@ def test_load_fact_resource_two_filea(data_1, data_2, expected, tmp_path): output_file = ( tmp_path / "conservation-area" + / "fact-resource" / "dataset=conservation-area" - / "fact_resource.parquet" + / "fact-resource.parquet" ) assert os.path.exists(output_file), "fact-resource.parquet file does not exist" @@ -596,8 +610,9 @@ def test_load_fact_resource_empty_file_with_another(data, expected, tmp_path): output_file = ( tmp_path / "conservation-area" + / "fact-resource" / "dataset=conservation-area" - / "fact_resource.parquet" + / "fact-resource.parquet" ) assert os.path.exists(output_file), "fact-resource.parquet file does not exist" @@ -662,7 +677,11 @@ def test_load_entities_single_file( package.load_entities(transformed_parquet_dir, resource_path, org_path) output_file = ( - tmp_path / "conservation-area" / "dataset=conservation-area" / "entity.parquet" + tmp_path + / "conservation-area" + / "entity" + / "dataset=conservation-area" + / "entity.parquet" ) assert os.path.exists(output_file), "entity.parquet file does not exist" @@ -737,15 +756,32 @@ def test_load_pq_to_sqlite_basic( fact_resource_df = pd.DataFrame.from_dict(fact_resource_data) entity_df = pd.DataFrame.from_dict(entity_data) + (dataset_parquet_path / "fact" / "dataset=conservation-area").mkdir( + parents=True, exist_ok=True + ) + (dataset_parquet_path / "fact-resource" / "dataset=conservation-area").mkdir( + parents=True, exist_ok=True + ) + (dataset_parquet_path / "entity" / "dataset=conservation-area").mkdir( + parents=True, exist_ok=True + ) + fact_df.to_parquet( - dataset_parquet_path / "dataset=conservation-area" / "fact.parquet", index=False + dataset_parquet_path / "fact" / "dataset=conservation-area" / "fact.parquet", + index=False, ) fact_resource_df.to_parquet( - dataset_parquet_path / "dataset=conservation-area" / "fact_resource.parquet", + dataset_parquet_path + / "fact-resource" + / "dataset=conservation-area" + / "fact-resource.parquet", index=False, ) entity_df.to_parquet( - dataset_parquet_path / "dataset=conservation-area" / "entity.parquet", + dataset_parquet_path + / "entity" + / "dataset=conservation-area" + / "entity.parquet", index=False, )