From b78532c0ea3dd5bd61d6901df136e8867ab6c719 Mon Sep 17 00:00:00 2001 From: My Chiffon Nguyen Date: Tue, 8 Oct 2024 22:33:01 -0700 Subject: [PATCH] Refactor data module --- .gitignore | 6 +- src/data_pipeline/__init__.py | 0 src/data_pipeline/data_processing.py | 139 +++++++++++++++++++++++ src/data_pipeline/instruct_datasets.py | 1 + src/data_pipeline/mnemonic_processing.py | 1 + src/process_data/process_data.py | 73 ------------ 6 files changed, 146 insertions(+), 74 deletions(-) create mode 100644 src/data_pipeline/__init__.py create mode 100644 src/data_pipeline/data_processing.py create mode 100644 src/data_pipeline/instruct_datasets.py create mode 100644 src/data_pipeline/mnemonic_processing.py delete mode 100644 src/process_data/process_data.py diff --git a/.gitignore b/.gitignore index b2496ac..97a8797 100644 --- a/.gitignore +++ b/.gitignore @@ -165,4 +165,8 @@ cython_debug/ # Data /data .parquet -.csv \ No newline at end of file +.csv + +# Write up +/pdf +.pdf diff --git a/src/data_pipeline/__init__.py b/src/data_pipeline/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/data_pipeline/data_processing.py b/src/data_pipeline/data_processing.py new file mode 100644 index 0000000..f9c423e --- /dev/null +++ b/src/data_pipeline/data_processing.py @@ -0,0 +1,139 @@ +"""A module for processing data, combining them from various sources and load into usable format(s).""" + +import logging +from pathlib import Path + +import pandas as pd + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + + +def load_parquet_data(path: Path | str) -> pd.DataFrame: + """Load parquet data into a dataframe. + + Args: + path (Path | str): The path to the parquet data. + + Returns: + df (pd.DataFrame): The parquet data as a dataframe. + """ + df = pd.DataFrame() + paths = Path(path).rglob("*.parquet") + for path in paths: + temp_data = pd.read_parquet(path, engine="pyarrow") + df = pd.concat([df, temp_data]) + + # Lowercase terms + df["term"] = df["term"].str.lower() + + logger.info(f"Read {df.shape[0]} rows from parquet files.") + + assert df.shape[1] == 2, "Data must have 2 columns." + assert df.columns[0] == "term", "First column must be 'term'." + assert df.columns[1] == "mnemonic", "Second column must be 'mnemonic'." + + return df + + +def load_clean_txt_csv_data(path: Path | str) -> pd.DataFrame: + """Load txt or csv data into a dataframe and clean it. + + Args: + path (Path | str): The path to the txt or csv data. + + Returns: + df (pd.DataFrame): The txt or csv data as a dataframe. + + Raises: + FileNotFoundError: If no txt or csv files are found in the specified path. + """ + df = pd.DataFrame() + paths = [p for p in Path(path).rglob("*") if p.suffix in [".txt", ".csv"]] + logger.info(f"Loading txt/csv files from {paths}.") + + if not paths: + logger.error("No txt or csv files found in the specified path.") + raise FileNotFoundError("No txt or csv files found in the specified path.") + + # Read only the first two columns + for path in paths: + if path.suffix == ".txt": + temp_data = pd.read_csv( + path, + sep="\t", + header=None, + skiprows=2, + usecols=[0, 1], + skip_blank_lines=True, + names=["term", "mnemonic"], + ) + else: + temp_data = pd.read_csv(path, names=["term", "mnemonic"], usecols=[0, 1]) + df = pd.concat([df, temp_data]) + + logger.info(f"Read {df.shape[0]} rows from txt/csv files.") + + # Column names + assert df["term"].str.islower().all(), "All terms should be lower case." + + # Drop empty mnemonics + df.dropna(subset=["mnemonic"], inplace=True) + logger.info(f"From txt/csv files, kept {df.shape[0]} rows with mnemonics.") + + # Remove leading and trailing double quotes from mnemonics + df["mnemonic"] = df["mnemonic"].str.strip('"') + return df + + +def combine_datasets( + input_path: Path | str = "data/raw", + output_path: Path | str = "data/final", + output_format: str = "csv", +) -> pd.DataFrame: + """Combines an external dataset with a local dataset, cleans the data by removing duplicates, and saves the result to a specified format. + + Args: + input_path (Path | str): + The directory containing the local dataset. Defaults to "data/raw". + output_path (Path | str): + The output directory where the combined data will be saved. Defaults to "data/final". + output_format (str): + The format in which to save the combined dataset ('csv' or 'parquet'). Defaults to 'csv'. + + Returns: + pd.DataFrame: The cleaned, combined dataset. + + Raises: + ValueError: If the provided output format is not 'csv' or 'parquet'. + """ + # TODO: Add error handling for invalid input paths + + # Set up output directories and file + Path(output_path).parent.mkdir(parents=True, exist_ok=True) + output_file = f"{output_path}.{output_format}" + + # Load and combine the datasets + external_df = load_parquet_data(input_path) + local_df = load_clean_txt_csv_data(input_path) + combined_df = pd.concat([local_df, external_df]) + + # Clean the data + combined_df.drop_duplicates(subset=["term"], inplace=True, keep="first") + + # Write to output file + if output_format == "csv": + combined_df.to_csv(output_file, index=False) + elif output_format == "parquet": + combined_df.to_parquet(output_file, index=False) + else: + raise ValueError("Invalid output format. Must be either 'csv' or 'parquet'.") + + logger.info( + f"Saved combined data to '{output_file}' with {combined_df.shape[0]} unique terms." + ) + + return combined_df + + +combine_datasets() diff --git a/src/data_pipeline/instruct_datasets.py b/src/data_pipeline/instruct_datasets.py new file mode 100644 index 0000000..8c4c785 --- /dev/null +++ b/src/data_pipeline/instruct_datasets.py @@ -0,0 +1 @@ +"""Create prompts, instructions, and datasets for the instruction tuning Llama3.""" diff --git a/src/data_pipeline/mnemonic_processing.py b/src/data_pipeline/mnemonic_processing.py new file mode 100644 index 0000000..8e310e4 --- /dev/null +++ b/src/data_pipeline/mnemonic_processing.py @@ -0,0 +1 @@ +"""Module for processing mnemonics, including code to categorize, standardize or diversify them using OpenAI.""" diff --git a/src/process_data/process_data.py b/src/process_data/process_data.py deleted file mode 100644 index e5bbe2d..0000000 --- a/src/process_data/process_data.py +++ /dev/null @@ -1,73 +0,0 @@ -"""A module for reading, cleaning, processing data, including code to standardize or diversify mnemonics.""" - -from pathlib import Path - -import pandas as pd - - -def read_parquet_data(): - """Read parquet data into a dataframe.""" - data = pd.DataFrame() - paths = Path("data").rglob("*.parquet") - for path in paths: - temp_data = pd.read_parquet(path, engine="pyarrow") - data = pd.concat([data, temp_data]) - - return data - - -def combine_datasets( - output_path: Path | str = "data/final", output_format: str = "csv" -) -> pd.DataFrame: - """Combines an external dataset with a local dataset, cleans the data by removing duplicates, and saves the result to a specified format. - - Args: - output_path (Path | str): The output directory where the combined data will be saved. Defaults to "data/final". - output_format (str): The format in which to save the combined dataset ('csv' or 'parquet'). Defaults to 'csv'. - - Returns: - pd.DataFrame: The cleaned, combined dataset. - - Raises: - ValueError: If the provided output format is not 'csv' or 'parquet'. - """ - # Set up output directories and - Path(output_path).parent.mkdir(parents=True, exist_ok=True) - output_file = f"{output_path}.{output_format}" - - # Load the external dataset - external_data_path = Path("data/processed/combined_data.csv") - if external_data_path.exists(): - external_df = pd.read_csv(external_data_path) - else: - external_df = read_parquet_data() - - # Load the local dataset - local_csv_files = [ - f - for f in Path("data").rglob("*.csv") - if str(f.resolve()) != str(Path(output_file).resolve()) - ] - if not local_csv_files: - raise FileNotFoundError("No CSV files found in the 'data' directory.") - local_df = pd.concat([pd.read_csv(f) for f in local_csv_files]) - - # Standardize column names - local_df.rename(columns={"Word": "term", "Mnemonic": "mnemonic"}, inplace=True) - - # Combine the local and external datasets, keeping only the relevant columns (assumed to be the first 2 in local_df) - combined_df = pd.concat([local_df.iloc[:, :2], external_df]) - - # Clean the data - combined_df["term"] = combined_df["term"].str.lower() - combined_df.drop_duplicates(subset=["term"], inplace=True) - - # Write to output file - if output_format == "csv": - combined_df.to_csv(output_file, index=False) - elif output_format == "parquet": - combined_df.to_parquet(output_file, index=False) - else: - raise ValueError("Invalid output format. Must be either 'csv' or 'parquet'.") - - return combined_df