Skip to content

Commit

Permalink
chacnge package structure
Browse files Browse the repository at this point in the history
  • Loading branch information
eveleighoj committed Jan 24, 2025
1 parent 0183d9b commit 4a605ad
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 42 deletions.
43 changes: 11 additions & 32 deletions digital_land/package/dataset_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,16 @@ def __init__(self, dataset, path, duckdb_path=None, **kwargs):
self.typology = self.specification.schema[dataset]["typology"]

# set up key file paths
self.fact_path = self.path / f"dataset={self.dataset}" / "fact.parquet"
self.fact_path = self.path / "fact" / f"dataset={self.dataset}" / "fact.parquet"
self.fact_resource_path = (
self.path / f"dataset={self.dataset}" / "fact_resource.parquet"
self.path
/ "fact-resource"
/ f"dataset={self.dataset}"
/ "fact-resource.parquet"
)
self.entity_path = (
self.path / "entity" / f"dataset={self.dataset}" / "entity.parquet"
)
self.entity_path = self.path / f"dataset={self.dataset}" / "entity.parquet"

def get_schema(self):
schema = {}
Expand Down Expand Up @@ -124,7 +129,7 @@ def load_facts(self, transformed_parquet_dir):
"""
This method loads facts into a fact table from a directory containing all transformed files as parquet files
"""
output_path = self.path / f"dataset={self.dataset}" / "fact.parquet"
output_path = self.fact_path
output_path.parent.mkdir(parents=True, exist_ok=True)
logging.info("loading facts from temp table")

Expand All @@ -151,7 +156,7 @@ def load_facts(self, transformed_parquet_dir):

def load_fact_resource(self, transformed_parquet_dir):
logging.info(f"loading fact resources from {str(transformed_parquet_dir)}")
output_path = self.path / f"dataset={self.dataset}" / "fact_resource.parquet"
output_path = self.fact_resource_path
output_path.parent.mkdir(parents=True, exist_ok=True)
fact_resource_fields = self.specification.schema["fact-resource"]["fields"]
fields_str = ", ".join(
Expand All @@ -172,32 +177,6 @@ def load_fact_resource(self, transformed_parquet_dir):
"""
)

# def combine_parquet_files(input_path,output_path):
# """
# This method combines multiple parquet files into a single parquet file
# """
# # check input path is a directory using Path
# if not Path(input_path).is_dir():
# raise ValueError("Input path must be a directory")

# # check output_path is a file that doesn't exist
# if not Path(output_path).is_file():
# raise ValueError("Output path must be a file")

# # use self.conn to use duckdb to combine files
# sql = f"""
# COPY (select * from parquet_scan('{input_path}/*.parquet')) TO '{output_path}' (FORMAT PARQUET);
# """
# self.conn.execute(sql)

# # Combine all the parquet files into a single parquet file
# combined_df = pd.concat(
# [pd.read_parquet(f"{input_path}/{file}") for file in parquet_files]
# )

# # Save the combined dataframe to a parquet file
# combined_df.to_parquet(output_path, index=False)

def load_entities_range(
self,
transformed_parquet_dir,
Expand Down Expand Up @@ -398,7 +377,7 @@ def combine_parquet_files(self, input_path, output_path):
self.conn.execute(sql)

def load_entities(self, transformed_parquet_dir, resource_path, organisation_path):
output_path = self.path / f"dataset={self.dataset}" / "entity.parquet"
output_path = self.entity_path
output_path.parent.mkdir(parents=True, exist_ok=True)

# retrieve entity counnts including and minimum
Expand Down
56 changes: 46 additions & 10 deletions tests/integration/package/test_dataset_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,11 @@ def test_load_facts_single_file(data: dict, expected: int, tmp_path):
package.load_facts(transformed_parquet_dir=transformed_parquet_dir)

output_file = (
tmp_path / "conservation-area" / "dataset=conservation-area" / "fact.parquet"
tmp_path
/ "conservation-area"
/ "fact"
/ "dataset=conservation-area"
/ "fact.parquet"
)
assert os.path.exists(output_file), "fact.parquet file does not exist"

Expand Down Expand Up @@ -421,7 +425,11 @@ def test_load_facts_multiple_files(data1, data2, expected, tmp_path):
package.load_facts(transformed_parquet_dir=transformed_parquet_dir)

output_file = (
tmp_path / "conservation-area" / "dataset=conservation-area" / "fact.parquet"
tmp_path
/ "conservation-area"
/ "fact"
/ "dataset=conservation-area"
/ "fact.parquet"
)
assert os.path.exists(output_file), "fact.parquet file does not exist"

Expand Down Expand Up @@ -472,7 +480,11 @@ def test_load_facts_one_file_with_empty_file(data, expected, tmp_path):
package.load_facts(transformed_parquet_dir=transformed_parquet_dir)

output_file = (
tmp_path / "conservation-area" / "dataset=conservation-area" / "fact.parquet"
tmp_path
/ "conservation-area"
/ "fact"
/ "dataset=conservation-area"
/ "fact.parquet"
)
assert os.path.exists(output_file), "fact.parquet file does not exist"

Expand Down Expand Up @@ -504,8 +516,9 @@ def test_load_fact_resource_single_file(data, expected, tmp_path):
output_file = (
tmp_path
/ "conservation-area"
/ "fact-resource"
/ "dataset=conservation-area"
/ "fact_resource.parquet"
/ "fact-resource.parquet"
)
assert os.path.exists(output_file), "fact-resource.parquet file does not exist"

Expand Down Expand Up @@ -544,8 +557,9 @@ def test_load_fact_resource_two_filea(data_1, data_2, expected, tmp_path):
output_file = (
tmp_path
/ "conservation-area"
/ "fact-resource"
/ "dataset=conservation-area"
/ "fact_resource.parquet"
/ "fact-resource.parquet"
)
assert os.path.exists(output_file), "fact-resource.parquet file does not exist"

Expand Down Expand Up @@ -596,8 +610,9 @@ def test_load_fact_resource_empty_file_with_another(data, expected, tmp_path):
output_file = (
tmp_path
/ "conservation-area"
/ "fact-resource"
/ "dataset=conservation-area"
/ "fact_resource.parquet"
/ "fact-resource.parquet"
)
assert os.path.exists(output_file), "fact-resource.parquet file does not exist"

Expand Down Expand Up @@ -662,7 +677,11 @@ def test_load_entities_single_file(
package.load_entities(transformed_parquet_dir, resource_path, org_path)

output_file = (
tmp_path / "conservation-area" / "dataset=conservation-area" / "entity.parquet"
tmp_path
/ "conservation-area"
/ "entity"
/ "dataset=conservation-area"
/ "entity.parquet"
)
assert os.path.exists(output_file), "entity.parquet file does not exist"

Expand Down Expand Up @@ -737,15 +756,32 @@ def test_load_pq_to_sqlite_basic(
fact_resource_df = pd.DataFrame.from_dict(fact_resource_data)
entity_df = pd.DataFrame.from_dict(entity_data)

(dataset_parquet_path / "fact" / "dataset=conservation-area").mkdir(
parents=True, exist_ok=True
)
(dataset_parquet_path / "fact-resource" / "dataset=conservation-area").mkdir(
parents=True, exist_ok=True
)
(dataset_parquet_path / "entity" / "dataset=conservation-area").mkdir(
parents=True, exist_ok=True
)

fact_df.to_parquet(
dataset_parquet_path / "dataset=conservation-area" / "fact.parquet", index=False
dataset_parquet_path / "fact" / "dataset=conservation-area" / "fact.parquet",
index=False,
)
fact_resource_df.to_parquet(
dataset_parquet_path / "dataset=conservation-area" / "fact_resource.parquet",
dataset_parquet_path
/ "fact-resource"
/ "dataset=conservation-area"
/ "fact-resource.parquet",
index=False,
)
entity_df.to_parquet(
dataset_parquet_path / "dataset=conservation-area" / "entity.parquet",
dataset_parquet_path
/ "entity"
/ "dataset=conservation-area"
/ "entity.parquet",
index=False,
)

Expand Down

0 comments on commit 4a605ad

Please sign in to comment.