diff --git a/README.md b/README.md index 71e2130..5c17445 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ for notes on deploying the project on a live system. ### Prerequisites -- Need to have acces to Levante. +- Need to have access to Levante. - Need your own [conda environment](https://docs.dkrz.de/doc/levante/code-development/python.html#set-up-conda-for-individual-environments). ### Installing @@ -35,6 +35,8 @@ of conduct, and the process for submitting pull requests to us. - **Cosmin M. Marina** - *Provided Initial Scripts* - [cosminmarina](https://github.com/cosminmarina) + - **Eugenio Lorente-Ramos** - *Enhanced data aquisition scripts* - + [eugenioLR](https://github.com/eugenioLR) See also the list of [contributors](https://github.com/cosminmarina/dkrz_utils/contributors) diff --git a/src/cmip6_data_acq/SLURM_data_acquisition.sh b/src/climate_data_acq/SLURM_data_acquisition.sh similarity index 78% rename from src/cmip6_data_acq/SLURM_data_acquisition.sh rename to src/climate_data_acq/SLURM_data_acquisition.sh index b16c2af..48a22dd 100755 --- a/src/cmip6_data_acq/SLURM_data_acquisition.sh +++ b/src/climate_data_acq/SLURM_data_acquisition.sh @@ -19,6 +19,6 @@ module load python3/2022.01-gcc-11.2.0 module load clint module load xces -python 0_data_acq_main_ECROPS.py +python data_acquisition_main.py -p reanalysis --era5_vars_hour "10u,10v,msl,tp,q,2t" -f hour --exp_reanalysis ERA5 --dir ./data_acq diff --git a/src/cmip6_data_acq/copy_files.py b/src/climate_data_acq/copy_files.py similarity index 65% rename from src/cmip6_data_acq/copy_files.py rename to src/climate_data_acq/copy_files.py index 15cd56e..7015d64 100755 --- a/src/cmip6_data_acq/copy_files.py +++ b/src/climate_data_acq/copy_files.py @@ -5,6 +5,7 @@ import sys import argparse + def copy_files_from_csv(csv_file_path, destination_folder, variable, experiment): """ Copies files listed in a CSV file to a structured destination folder. @@ -15,14 +16,14 @@ def copy_files_from_csv(csv_file_path, destination_folder, variable, experiment) :param experiment: Experiment name (e.g., 'historical', 'past2k'). """ # Open the CSV file and read the file paths - with open(csv_file_path, mode='r') as csv_file: + with open(csv_file_path, mode="r") as csv_file: csv_reader = csv.reader(csv_file) - + for row in csv_reader: original_file_path = row[0].strip() - + # Extract ensemble name from the file path - path_components = original_file_path.split('/') + path_components = original_file_path.split("/") try: # Find the position of the experiment in the path exp_index = path_components.index(experiment) @@ -30,20 +31,24 @@ def copy_files_from_csv(csv_file_path, destination_folder, variable, experiment) except (ValueError, IndexError): print(f"Could not extract ensemble from: {original_file_path}") continue - + # Build destination path based on experiment type - if experiment.startswith('ssp'): + if experiment.startswith("ssp"): # Projections: destination_folder/projections//// - dest_dir = os.path.join(destination_folder, variable, 'projections', experiment, ensemble) + dest_dir = os.path.join( + destination_folder, variable, "projections", experiment, ensemble + ) else: # Historical/Past2K: destination_folder//// - dest_dir = os.path.join(destination_folder, variable, experiment, ensemble) + dest_dir = os.path.join( + destination_folder, variable, experiment, ensemble + ) os.makedirs(dest_dir, exist_ok=True) - + # Copy file to destination file_name = os.path.basename(original_file_path) dest_file_path = os.path.join(dest_dir, file_name) - + try: shutil.copy2(original_file_path, dest_file_path) print(f"Copied: {original_file_path} -> {dest_file_path}") @@ -55,23 +60,33 @@ def copy_files_from_csv(csv_file_path, destination_folder, variable, experiment) print(f"Error copying {original_file_path}: {e}") sys.stdout.flush() + def main(): # Set up command line arguments parser = argparse.ArgumentParser( - description='Copy CMIP6 files to structured directories based on CSV lists.' + description="Copy CMIP6 files to structured directories based on CSV lists." + ) + parser.add_argument( + "-s", + "--source", + default="./data_acq/", + help="Folder containing CSV files (default: ./data_acq/)", + ) + parser.add_argument( + "-d", + "--dest", + default="./data_raw/", + help="Destination base folder (default: ./data_raw/)", + ) + parser.add_argument( + "-p", + "--pattern", + default="*.csv", + help="Glob pattern to select specific CSV files (default: *.csv)", ) - parser.add_argument('-s', '--source', - default='./data_acq/', - help='Folder containing CSV files (default: ./data_acq/)') - parser.add_argument('-d', '--dest', - default='./data_raw/', - help='Destination base folder (default: ./data_raw/)') - parser.add_argument('-p', '--pattern', - default='*.csv', - help='Glob pattern to select specific CSV files (default: *.csv)') - + args = parser.parse_args() - + # Use the paths from arguments (or defaults if not provided) data_acq_folder = args.source destination_folder = args.dest @@ -82,11 +97,11 @@ def main(): data_acq_folder += os.path.sep if not destination_folder.endswith(os.path.sep): destination_folder += os.path.sep - + # Find matching CSV files using pattern search_pattern = os.path.join(data_acq_folder, file_pattern) csv_files = sorted(glob.glob(search_pattern)) - + print(f"Source folder: {data_acq_folder}") print(f"Destination folder: {destination_folder}") print(f"Search pattern: {file_pattern}") @@ -94,7 +109,9 @@ def main(): sys.stdout.flush() if not csv_files: - print(f"No CSV files found matching pattern: '{file_pattern}' in {data_acq_folder}") + print( + f"No CSV files found matching pattern: '{file_pattern}' in {data_acq_folder}" + ) sys.stdout.flush() return @@ -102,24 +119,34 @@ def main(): for csv_file_path in csv_files: print(f"Processing CSV: {csv_file_path}") sys.stdout.flush() - + # Extract variable and experiment from filename filename = os.path.basename(csv_file_path) - parts = filename.split('__cmip6_')[-1].split('_[')[0].split('_') - - # Determine experiment and variable - if parts[0] == 'past2k': - experiment = 'past2k' - variable = parts[1] - elif parts[0].startswith('ssp'): + if "cmip6" in filename: + parts = filename.split("__cmip6_")[-1].split("_[")[0].split("_") + + # Determine experiment and variable + match parts[0]: + case "past2k": + experiment = "past2k" + variable = parts[1] + case "historical": + experiment = "historical" + variable = parts[0] + case ["ssp", *_]: + experiment = parts[0] + variable = parts[1] + elif "reanalisys" in filename: + parts = filename.split("__reanalisys_")[-1].split("_[")[0].split("_") experiment = parts[0] variable = parts[1] else: - experiment = 'historical' - variable = parts[0] - + print(f"File {csv_file_path} could not be processed.") + continue + # Copy files with structured paths copy_files_from_csv(csv_file_path, destination_folder, variable, experiment) + if __name__ == "__main__": main() diff --git a/src/cmip6_data_acq/copy_files.sh b/src/climate_data_acq/copy_files.sh similarity index 100% rename from src/cmip6_data_acq/copy_files.sh rename to src/climate_data_acq/copy_files.sh diff --git a/src/cmip6_data_acq/data_acq_freva_search_ECROPS.py b/src/climate_data_acq/data_acq_freva_search_ECROPS.py similarity index 68% rename from src/cmip6_data_acq/data_acq_freva_search_ECROPS.py rename to src/climate_data_acq/data_acq_freva_search_ECROPS.py index a5dac12..fcf5bc7 100755 --- a/src/cmip6_data_acq/data_acq_freva_search_ECROPS.py +++ b/src/climate_data_acq/data_acq_freva_search_ECROPS.py @@ -11,12 +11,8 @@ import logging import os -# homevardir = os.path.join(os.sep, "home", "b", "b381971", 'ECROPS', 'ERA_CSVS') -# homevardir = os.path.join(os.sep, "home", "b", "b392996", 'ECROPS', 'ERA_CSVS') -homevardir = "/work/bb1478/b382610/wildfires/data/find_vars_cmip6/data_acq/" - -def freva_search_ssp(project, model, var, freq, experiment): +def freva_search_ssp(project, model, var, freq, experiment, homevardir): """ Get all the ssp files from FREVA for the inputs and write them to a csv, e.g. "mpi-esm1-2-hr__cmip6_ssp585_rsds_day.csv". @@ -44,7 +40,7 @@ def freva_search_ssp(project, model, var, freq, experiment): ssp_files_list = list( ssp_files ) # make the freva generator object ssp_files a list for list functions e.g. len() - ssp_files_array = np.sort(np.array(ssp_files_list)) + ssp_files_array = np.sort(ssp_files_list) ## 2. Get all the unique ensemble ids to be used in matching with all other ssp files all_ensembles = [] @@ -53,16 +49,12 @@ def freva_search_ssp(project, model, var, freq, experiment): all_ensembles.append( res.get("ensemble")[0] ) # get the first (only) value of the dictionary + unique_ensembles = np.unique( - np.array(all_ensembles) + all_ensembles ) # then filter out only the unique ensemble values - logging.info( - str(experiment) - + " for " - + str(var) - + " unique ensemble ids = " - + str(unique_ensembles) - ) + + logging.info(f"{experiment} for {var} unique ensemble ids = {unique_ensembles}") # Get the number of ssp files per unique ensemble id: Function is called only for logging the number of files get_files_from_unique_ensembles( @@ -74,42 +66,22 @@ def freva_search_ssp(project, model, var, freq, experiment): project, model, var, freq, "historical", unique_ensembles ) - np_historical_files_array = np.sort(np.array(historical_files_array)) + np_historical_files_array = np.sort(historical_files_array) ### logging.info(str(var) + " total HISTORICAL num of files = " + str(np_historical_files_array.size)) ## Write everything to csv files - ssp_csv_filename = ( - str(model) - + "__" - + project - + "_" - + str(experiment) - + "_" - + str(var) - + "_" - + str(freq) - + ".csv" - ) - ssp_files_array.tofile(os.path.join(os.sep, homevardir, ssp_csv_filename), sep="\n") + ssp_csv_filename = f"{model}__{project}_{experiment}_{var}_{freq}.csv" + ssp_files_array.tofile(os.path.join(homevardir, ssp_csv_filename), sep="\n") historical_csv_filename = ( - str(model) - + "__" - + project - + "_" - + str(experiment) - + "_" - + str(var) - + "_" - + str(freq) - + "_historical" - + ".csv" + f"{model}__{project}_{experiment}_{var}_{freq}_historical.csv" ) + np_historical_files_array.tofile( - os.path.join(os.sep, homevardir, historical_csv_filename), sep="\n" + os.path.join(homevardir, historical_csv_filename), sep="\n" ) -def freva_search_historical(project, model, var, freq): +def freva_search_historical(project, model, var, freq, homevardir): """ Retreives all the historical files from FREVA and writes them to csv, e.g. "mpi-esm1-2-hr__cmip6_rsds_day_allhistorical.csv" @@ -131,7 +103,7 @@ def freva_search_historical(project, model, var, freq): ## iteratable freva generator object ssp_files can either be tranformed to a list or parsed, ## not both, it lives through one iteration it seems historical_files_list = list(historical_files) - historical_files_array = np.sort(np.array(historical_files_list)) + historical_files_array = np.sort(historical_files_list) ### logging.info(str(experiment) + " for " + str(var) + " total num of files = " + str(ssp_files_array.size)) @@ -142,12 +114,11 @@ def freva_search_historical(project, model, var, freq): all_ensembles.append( res.get("ensemble")[0] ) # get the first and only value of the dictionary + unique_ensembles = np.unique( - np.array(all_ensembles) + all_ensembles ) # then filter out only the unique ensemble values - logging.info( - "Historical for " + str(var) + " unique ensemble ids = " + str(unique_ensembles) - ) + logging.info(f"Historical for {var} unique ensemble ids = {unique_ensembles}") # Get the number of historical files per unique ensemble id: Function is calles only for logging the number of files get_files_from_unique_ensembles( @@ -155,23 +126,16 @@ def freva_search_historical(project, model, var, freq): ) ## Write everything to csv files - all_historical_csv = ( - str(model) - + "__" - + project - + "_" - + str(var) - + "_" - + str(freq) - + "_allhistorical" - + ".csv" - ) + all_historical_csv = f"{model}__{project}_{var}_{freq}_allhistorical.csv" + historical_files_array.tofile( - os.path.join(os.sep, homevardir, all_historical_csv), sep="\n" + os.path.join(homevardir, all_historical_csv), sep="\n" ) -def freva_search_reanalysis(project, experiment, var, freq): # , geopoten_value): +def freva_search_reanalysis( + project, experiment, var, freq, homevardir +): # , geopoten_value): """ Retreive from FREVA all reanalysis files such as ERA5 and write the list to csv, e.g. "era5__reanalysis_day_tas.csv" @@ -188,14 +152,7 @@ def freva_search_reanalysis(project, experiment, var, freq): # , geopoten_value ) reanalysis_files_list = list(reanalysis_files) - #### FOR SOME REASON THE BELOW DOES NOT WORK, TO BE DELETED, HAS BEEN SUBSTITUTED IN data_prepr_timerange_targetvar_zg - # ## 2. Get the geopotential height files we need, in case the var has this attribute (not 999999) - # if geopoten_value != 999999: - # for f in reanalysis_files_list: - # if str(geopoten_value) not in f: - # reanalysis_files_list.remove(f) - - reanalysis_files_array = np.sort(np.array(reanalysis_files_list)) + reanalysis_files_array = np.sort(reanalysis_files_list) ## 3. Get all the unique ensemble ids for each var all_ensembles = [] @@ -204,23 +161,19 @@ def freva_search_reanalysis(project, experiment, var, freq): # , geopoten_value all_ensembles.append( res.get("ensemble")[0] ) # get the first(and only) value of the dictionary + unique_ensembles = np.unique( - np.array(all_ensembles) + all_ensembles ) # then filter out only the unique ensemble values logging.info( - str(experiment) - + " reanalysis for " - + str(var) - + " unique ensemble ids = " - + str(unique_ensembles) + f"{experiment} reanalysis for {var} unique ensemble ids = {unique_ensembles}" ) ## Write everything to csv files - all_reanalysis_csv_filename = ( - str(experiment) + "__" + project + "_" + str(freq) + "_" + str(var) + ".csv" - ) + all_reanalysis_csv_filename = f"{experiment}__{project}_{freq}_{var}.csv" + reanalysis_files_array.tofile( - os.path.join(os.sep, homevardir, all_reanalysis_csv_filename), sep="\n" + os.path.join(homevardir, all_reanalysis_csv_filename), sep="\n" ) @@ -249,18 +202,11 @@ def get_files_from_unique_ensembles( time_frequency=freq, experiment=experiment, ) - n = 0 for file in files: - n = n + 1 files_array.append(file) + logging.info( - str(experiment) - + " " - + str(var) - + " files for ensemble " - + str(unique_ens) - + " = " - + str(n) + f"{experiment} {var} files for ensemble {unique_ens} = {len(files)}" ) return files_array diff --git a/src/climate_data_acq/data_acquisition_main.py b/src/climate_data_acq/data_acquisition_main.py new file mode 100644 index 0000000..e980e49 --- /dev/null +++ b/src/climate_data_acq/data_acquisition_main.py @@ -0,0 +1,132 @@ +################################################################################# +# Title: Main class routine for searching and logging available FREVA datasets +# module load order: python3, clint, xces, then run script +# Author: Odysseas Vlachopoulos, Cosmin M. Marina, Eugenio Lorente-Ramos +# Project: testing +################################################################################## + +import logging +import sys + +import data_acq_freva_search_ECROPS +import argparse + + +def copy_data( + projects, + models, + variables_cmip, + variables_era5_daily_monthly, + variables_era5_hourly, + frequency, + exp_cmip6, + exp_reanalysis, + homevardir, +): + # First initialize a logger instance + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + force=True, + handlers=[ + logging.FileHandler("LOG_Data_Acquisition_FREVA_output.log"), + logging.StreamHandler(sys.stdout), + ], + ) + logging.info("Started Freva files main programme \n") + freq_longname_map = {"mon": "monthly", "day": "daily", "hour": "hourly"} + + for project in projects: + project = project.lower() + match project: + case "cmip6": + for model in models: + model = model.lower() + for freq in frequency: + freq = freq.lower() + for exp in exp_cmip6: + exp = exp.lower() + for var in variables_cmip: + var = var.lower() + logging.info( + f"\n \nMODEL: {model}, EXPERIMENT: {exp}, VARIABLE: {var}, FREQUENCY: {freq}\n" + ) + + if exp == "historical": + data_acq_freva_search_ECROPS.freva_search_historical( + project, model, var, frequency, homevardir + ) + logging.info( + "\n\n **** Finished with Historical files **** \n\n" + ) + else: + data_acq_freva_search_ECROPS.freva_search_ssp( + project, model, var, frequency, exp, homevardir + ) + logging.info( + "\n\n **** Finished with SSP files **** \n \n" + ) + + case "reanalysis": + for freq in frequency: + for exp_reanalysis_i in exp_reanalysis: + freq = freq.lower() + freq_longname = freq_longname_map[freq] + + var_set = None + match freq: + case "mon" | "day": + var_set = variables_era5_daily_monthly + case "hour": + var_set = variables_era5_hourly + case _: + raise ValueError( + "Incorrect frequency, try 'mon', 'day' or 'hour'." + ) + + for var in var_set: + var = var.lower() + logging.info( + f"\n \nPROJECT: {project}, EXPERIMENT: {exp_reanalysis_i}, VARIABLE: {var}, FREQUENCY: {freq}\n" + ) + data_acq_freva_search_ECROPS.freva_search_reanalysis( + project, exp_reanalysis_i, var, freq, homevardir + ) + logging.info( + f"\n\n **** Finished with ERA5 {freq_longname} data files **** \n \n" + ) + + case _: + ValueError( + f"Project {project} not recognized, try 'cmip6' or 'reanalysis'" + ) + + +def main(): + parser = argparse.ArgumentParser(prog="DKRZ Data path downloader.") + parser.add_argument("-p", "--projects", default="reanalysis") + parser.add_argument("-m", "--models", default="") + parser.add_argument("--cmip6_vars", default="") + parser.add_argument("--era5_vars_month", default="") + parser.add_argument("--era5_vars_hour", default="") + parser.add_argument("--exp_cmip", default="") + parser.add_argument("--exp_reanalysis", default="era5") + parser.add_argument("-f", "--frequency", default="") + parser.add_argument("-d", "--dir", default="./data_acq") + args = parser.parse_args() + + copy_data( + args.projects.split(","), + args.models.split(","), + args.cmip6_vars.split(","), + args.era5_vars_month.split(","), + args.era5_vars_hour.split(","), + args.frequency.split(","), + args.exp_cmip.split(","), + args.exp_reanalysis.split(","), + args.dir, + ) + + +if __name__ == "__main__": + main() diff --git a/src/cmip6_data_acq/0_data_acq_main_ECROPS.py b/src/cmip6_data_acq/0_data_acq_main_ECROPS.py deleted file mode 100755 index f191cc1..0000000 --- a/src/cmip6_data_acq/0_data_acq_main_ECROPS.py +++ /dev/null @@ -1,148 +0,0 @@ -################################################################################# -# Title: Main class routine for searching and logging available FREVA datasets -# module load order: python3, clint, xces, then run script -# Author: Odysseas Vlachopoulos -# Project: testing -################################################################################## - -import logging -import sys - -# from FREVA import freva_search -import data_acq_freva_search_ECROPS -import os - -# projects = ['cmip6', 'reanalysis'] -projects = ["cmip6"] -# models = ['cesm2', -# 'cnrm-cm6-1-HR', -# 'gfdl-esm4', -# 'ec-earth3', -# 'mpi-esm1-2-hr', -# 'noresm2-mm', -# 'hadgem3-gc31-mm'] -models = ["mpi-esm1-2-lr"] - -# models = [] # DO NOT DO ANYTHING FOR CMIP6 -variables_cmip = ["tdps", "ua", "va", "tasmax", "lai"] - -# variables_era5_daily_monthly = ['tasmax', 'tasmin', 'tas', 'pr', 'rsds', 'tdps', 'sfcwind', 'hurs'] -variables_era5_daily_monthly = ["tdps", "ua", "va", "tasmax", "lai"] -# variables_era5_hourly = ['uas', 'vas'] -variables_era5_hourly: list[str] = [] - -# variables_era5_hourly = ['uas', 'vas', 'rsds', 'tdps'] -# 10m wind speed vas and uas are calculated with ECROPS function in wofost_util/util.py wind10to2(wind10) function - - -geopotential_height = 50000 # 500hPa -vorticity_height = 20000 # 200hPa - -# frequency = ['hour', 'day', 'mon'] -frequency = ["day"] -# frequency = ['mon'] -# exp_cmip6 = ['ssp370', 'ssp585', 'historical'] -exp_cmip6 = ["historical", "past2k"] -exp_reanalysis = "era5" - - -def main(): - # First initialize a logger instance - logging.basicConfig( - level=logging.INFO, - format="%(asctime)s [%(levelname)s] %(message)s", - force=True, - handlers=[ - logging.FileHandler("LOG_Data_Acquisition_FREVA_output.log"), - logging.StreamHandler(sys.stdout), - ], - ) - logging.info("Started Freva files main programme \n") - - for project in projects: - if project == "cmip6": - for i in range(len(models)): - for exp in exp_cmip6: - for var in variables_cmip: - logging.info( - "\n \n" - + "MODEL: " - + str(models[i]) - + ", EXPERIMENT: " - + str(exp) - + ", VARIABLE: " - + str(var) - + ", FREQUENCY: " - + str(frequency) - + "\n" - ) - if not exp == "historical": - data_acq_freva_search_ECROPS.freva_search_ssp( - project, models[i], var, frequency, exp - ) - logging.info( - "\n\n **** Finished with SSP files **** \n \n" - ) - if exp == "historical": - data_acq_freva_search_ECROPS.freva_search_historical( - project, models[i], var, frequency - ) - logging.info( - "\n\n **** Finished with Historical files **** \n\n" - ) - - if project == "reanalysis": - for var in variables_era5_daily_monthly: - logging.info( - "\n \n" - + "PROJECT: " - + str(project) - + ", EXPERIMENT: " - + str(exp_reanalysis) - + ", VARIABLE: " - + str(var) - + ", FREQUENCY: " - + str(frequency[2]) - + "\n" - ) - data_acq_freva_search_ECROPS.freva_search_reanalysis( - project, exp_reanalysis, var, frequency[2] - ) - - for var in variables_era5_daily_monthly: - logging.info( - "\n \n" - + "PROJECT: " - + str(project) - + ", EXPERIMENT: " - + str(exp_reanalysis) - + ", VARIABLE: " - + str(var) - + ", FREQUENCY: " - + str(frequency[1]) - + "\n" - ) - data_acq_freva_search_ECROPS.freva_search_reanalysis( - project, exp_reanalysis, var, frequency[1] - ) - - for var in variables_era5_hourly: - logging.info( - "\n \n" - + "PROJECT: " - + str(project) - + ", EXPERIMENT: " - + str(exp_reanalysis) - + ", VARIABLE: " - + str(var) - + ", FREQUENCY: " - + str(frequency[0]) - + "\n" - ) - data_acq_freva_search_ECROPS.freva_search_reanalysis( - project, exp_reanalysis, var, frequency[0] - ) - - -if __name__ == "__main__": - main()