Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,7 @@ data/pr_hourly_era5land_2019.nc
data/t2m_daily_avg_era5land_2019.nc
src/dense_204_a.csv
src/final_204_a.csv

##############################################################################
# ---- core dumps -------------------------------------------------------------
*[0-9]+
80 changes: 76 additions & 4 deletions src/I_O/forcing_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,43 @@ std::unique_ptr<float[]> NetCDFLoader::loadTimeChunk(size_t startTime, size_t nu

return data;
}

// Load data by any chunk: time/lat/lon into memory
std::unique_ptr<float[]> NetCDFLoader::loadChunk(size_t startTime, size_t numTime,
size_t startLat, size_t numLat,
size_t startLon, size_t numLon) {
// Check if chunk is out of bounds
if (numTime == 0 || numLat == 0 || numLon == 0) {
throw std::invalid_argument("Size of chunk dimensions must be greater than zero");
}
if (startTime >= timeSize || startLat >= latSize || startLon >= lonSize) {
throw std::out_of_range("Start indices out of range");
}
if (startTime + numTime > timeSize || startLat + numLat > latSize || startLon + numLon > lonSize) {
throw std::out_of_range("Requested chunk exceeds available data");
}

// Calculate total elements in the chunk
size_t totalElements = numTime * numLat * numLon;

// Allocate memory for the chunk
std::unique_ptr<float[]> data = std::make_unique<float[]>(totalElements);

// Define start and count arrays for subsetting
size_t start[3] = {startTime, startLat, startLon};
size_t count[3] = {numTime, numLat, numLon};

// Read data from NetCDF file using C API
int status = nc_get_vara_float(ncid, varid, start, count, data.get());
checkError(status, "Reading variable data");

std::cout << "Loaded chunk for " << varName << ": time steps " << startTime << " to "
<< (startTime + numTime - 1) << ", lat " << startLat << " to "
<< (startLat + numLat - 1) << ", lon " << startLon << " to "
<< (startLon + numLon - 1) << std::endl;

return data;
}

// Get a single value from pre-loaded chunk data
float NetCDFLoader::getValueFromChunk(const std::unique_ptr<float[]>& chunkData,
Expand All @@ -211,8 +248,43 @@ float NetCDFLoader::getValueFromChunk(const std::unique_ptr<float[]>& chunkData,
size_t index = relativeTimeIndex * (latSize * lonSize) + latIndex * lonSize + lonIndex;
return chunkData[index];
}

// Check if data is loaded correctly
bool NetCDFLoader::isDataLoaded() const {
return ncid >= 0 && timeSize > 0 && latSize > 0 && lonSize > 0;


// Calculate spatial chunks based on latitude and longitude sizes
std::vector<SpatialChunk> NetCDFLoader::calculateSpatialChunks(size_t latSize, size_t lonSize, int numChunks) {
if (numChunks <= 0) {
throw std::invalid_argument("Number of chunks must be positive");
}
if (latSize == 0 || lonSize == 0) {
throw std::invalid_argument("Spatial dimensions must be positive");
}

std::vector<SpatialChunk> chunks(numChunks);

// Divide by the smaller dimension for better load balancing
if (latSize <= lonSize) {
// Divide latitude dimension
size_t latChunkSize = latSize / numChunks;
size_t remainder = latSize % numChunks;

for (int r = 0; r < numChunks; ++r) {
chunks[r].startLat = r * latChunkSize + std::min(static_cast<size_t>(r), remainder);
chunks[r].numLat = latChunkSize + (r < remainder ? 1 : 0);
chunks[r].startLon = 0;
chunks[r].numLon = lonSize; // Full longitude range
}
} else {
// Divide longitude dimension
size_t lonChunkSize = lonSize / numChunks;
size_t remainder = lonSize % numChunks;

for (int r = 0; r < numChunks; ++r) {
chunks[r].startLat = 0;
chunks[r].numLat = latSize; // Full latitude range
chunks[r].startLon = r * lonChunkSize + std::min(static_cast<size_t>(r), remainder);
chunks[r].numLon = lonChunkSize + (r < remainder ? 1 : 0);
}
}

return chunks;
}
24 changes: 20 additions & 4 deletions src/I_O/forcing_loader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,17 @@
#include <utility> // for std::pair
#include <vector>

// Structure to hold spatial chunk information
// Current logic is simple: divide the lat/lon grid into rectangular chunks based on # of processes (ranks)
struct SpatialChunk {
size_t startLat, numLat; // Start index and number of latitudes in the chunk
size_t startLon, numLon;

// Constructor for convenience
SpatialChunk(size_t sLat = 0, size_t nLat = 0, size_t sLon = 0, size_t nLon = 0)
: startLat(sLat), numLat(nLat), startLon(sLon), numLon(nLon) {}
};


class LookupMapper {
public:
Expand Down Expand Up @@ -59,9 +70,11 @@ class NetCDFLoader {

// Load data by time chunk into memory
std::unique_ptr<float[]> loadTimeChunk(size_t startTime, size_t numTimeSteps);

// Check if data is loaded correctly
bool isDataLoaded() const;

// Load data by any chunk: time/lat/lon into memory
std::unique_ptr<float[]> loadChunk(size_t startTime, size_t numTime, // start index of time dimension, number of time steps in loading chunk
size_t startLat, size_t numLat,
size_t startLon, size_t numLon);

// Getters
size_t getTimeSize() const { return timeSize; }
Expand All @@ -83,7 +96,10 @@ class NetCDFLoader {

// Return NetCDF file and variable IDs for advanced operations: maybe needed in the future
int getFileId() const { return ncid; }
int getVariableId() const { return varid; }
int getVariableId() const { return varid; }

// Calculate chunks for this NetCDF file
std::vector<SpatialChunk> calculateSpatialChunks(size_t latSize, size_t lonSize, int numChunks);
};


116 changes: 110 additions & 6 deletions src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,11 +277,58 @@ int main(int argc, char** argv) {
int baseChunk = totalRows / totalRanks;
int remainder = totalRows % totalRanks;

/* for splitting links here, considering the whole forcing domain is split into #GPU rectangles,
we need to add logic that make sure the links in each rank has corresponding forcing data */

int start = 0;

std::cout << "Total rows: " << totalRows << std::endl;
std::cout << "Total ranks: " << totalRanks << std::endl;


/* LOGIC to pass information of spatial chunking
current logic is very simple: divide the lat/lon grid into rectangular chunks based on # of processes (ranks)
divide the smaller dimension

This code is redundant, but kept for clarity and future reference
will combine the loading for pr and t2m into one function

totalRanks here is a bit confusing. it's not the total number of ranks, but the number of ranks that will be used other than rank 0.
*/
// initialize netcdf loader for pr
NetCDFLoader prLoader("../data/pr_hourly_era5land_2019.nc", "pr");
size_t lat_size = prLoader.getLatSize();
size_t lon_size = prLoader.getLonSize();
size_t time_size = prLoader.getTimeSize();
std::cout << "Dimensions for pr: lat=" << lat_size << ", lon=" << lon_size << ", time=" << time_size << std::endl;
std::cout << totalRanks << " ranks will be used for spatial chunking." << std::endl;
// calculate spatial chunks
std::vector<SpatialChunk> spatial_chunks_pr = prLoader.calculateSpatialChunks(lat_size, lon_size, totalRanks);
std::cout << "Spatial chunks for pr:" << std::endl;
for (int i = 0; i < spatial_chunks_pr.size(); ++i) {
std::cout << "Chunk " << i << ": lat[" << spatial_chunks_pr[i].startLat
<< ":" << spatial_chunks_pr[i].startLat + spatial_chunks_pr[i].numLat - 1 << "], "
<< "lon[" << spatial_chunks_pr[i].startLon
<< ":" << spatial_chunks_pr[i].startLon + spatial_chunks_pr[i].numLon - 1 << "], "
<< "size: " << spatial_chunks_pr[i].numLat << "x" << spatial_chunks_pr[i].numLon << std::endl;
}

// similarly, initialize netcdf loader for t2m & calculate spatial chunks
NetCDFLoader t2mLoader("../data/t2m_daily_avg_era5land_2019.nc", "t2m");
size_t lat_size_t2m = t2mLoader.getLatSize();
size_t lon_size_t2m = t2mLoader.getLonSize();
size_t time_size_t2m = t2mLoader.getTimeSize();
std::cout << "Dimensions for t2m: lat=" << lat_size_t2m << ", lon=" << lon_size_t2m << ", time=" << time_size_t2m << std::endl;
std::vector<SpatialChunk> spatial_chunks_t2m = t2mLoader.calculateSpatialChunks(lat_size_t2m, lon_size_t2m, totalRanks);
std::cout << "Spatial chunks for t2m:" << std::endl;
for (int i = 0; i < spatial_chunks_t2m.size(); ++i) {
std::cout << "Chunk " << i << ": lat[" << spatial_chunks_t2m[i].startLat
<< ":" << spatial_chunks_t2m[i].startLat + spatial_chunks_t2m[i].numLat - 1 << "], "
<< "lon[" << spatial_chunks_t2m[i].startLon
<< ":" << spatial_chunks_t2m[i].startLon + spatial_chunks_t2m[i].numLon - 1 << "], "
<< "size: " << spatial_chunks_t2m[i].numLat << "x" << spatial_chunks_t2m[i].numLon << std::endl;
}

for (int r = 1; r <= totalRanks; ++r) {
// Calculate actual chunk size for this rank
int chunkSize = baseChunk + (r <= remainder ? 1 : 0);
Expand All @@ -304,6 +351,26 @@ int main(int argc, char** argv) {

// Move to next chunk
start = end;


/* Send spatial chunk information to each worker */
// Send spatial chunk for pr - spatial boundaries
SpatialChunk chunk_pr = spatial_chunks_pr[r - 1]; // r-1 because rank 0 is master
std::cout << "Sending spatial chunk of pr to rank " << r << std::endl;
MPI_Send(&chunk_pr.startLat, 1, MPI_UNSIGNED_LONG, r, 0, MPI_COMM_WORLD);
MPI_Send(&chunk_pr.numLat, 1, MPI_UNSIGNED_LONG, r, 1, MPI_COMM_WORLD);
MPI_Send(&chunk_pr.startLon, 1, MPI_UNSIGNED_LONG, r, 2, MPI_COMM_WORLD);
MPI_Send(&chunk_pr.numLon, 1, MPI_UNSIGNED_LONG, r, 3, MPI_COMM_WORLD);

// Send spatial chunk for t2m - spatial boundaries
SpatialChunk chunk_t2m = spatial_chunks_t2m[r - 1]; // r-1 because rank 0 is master
std::cout << "Sending spatial chunk of t2m to rank " << r << std::endl;
MPI_Send(&chunk_t2m.startLat, 1, MPI_UNSIGNED_LONG, r, 4, MPI_COMM_WORLD);
MPI_Send(&chunk_t2m.numLat, 1, MPI_UNSIGNED_LONG, r, 5, MPI_COMM_WORLD);
MPI_Send(&chunk_t2m.startLon, 1, MPI_UNSIGNED_LONG, r, 6, MPI_COMM_WORLD);
MPI_Send(&chunk_t2m.numLon, 1, MPI_UNSIGNED_LONG, r, 7, MPI_COMM_WORLD);

std::cout << "Rank 0 finished sending chunks" << std::endl;
}

}
Expand Down Expand Up @@ -504,14 +571,43 @@ int main(int argc, char** argv) {
streamPoint[s] = lat * lon_size + lon;
}

// Before adding forcings, we receieve the spatial chunk information
size_t start_lat_pr, num_lat_pr, start_lon_pr, num_lon_pr;
size_t start_lat_t2m, num_lat_t2m, start_lon_t2m, num_lon_t2m;
// Receive spatial chunk for pr
MPI_Recv(&start_lat_pr, 1, MPI_UNSIGNED_LONG, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
MPI_Recv(&num_lat_pr, 1, MPI_UNSIGNED_LONG, 0, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
MPI_Recv(&start_lon_pr, 1, MPI_UNSIGNED_LONG, 0, 2, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
MPI_Recv(&num_lon_pr, 1, MPI_UNSIGNED_LONG, 0, 3, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
// Receive spatial chunk for t2m
MPI_Recv(&start_lat_t2m, 1, MPI_UNSIGNED_LONG, 0, 4, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
MPI_Recv(&num_lat_t2m, 1, MPI_UNSIGNED_LONG, 0, 5, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
MPI_Recv(&start_lon_t2m, 1, MPI_UNSIGNED_LONG, 0, 6, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
MPI_Recv(&num_lon_t2m, 1, MPI_UNSIGNED_LONG, 0, 7, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
// Print the received spatial chunk information
std::cout << "Received spatial chunk for pr: "
<< "start_lat=" << start_lat_pr
<< ", num_lat=" << num_lat_pr
<< ", start_lon=" << start_lon_pr
<< ", num_lon=" << num_lon_pr << std::endl;
std::cout << "Received spatial chunk for t2m: "
<< "start_lat=" << start_lat_t2m
<< ", num_lat=" << num_lat_t2m
<< ", start_lon=" << start_lon_t2m
<< ", num_lon=" << num_lon_t2m << std::endl;
// ────────── End receiving spatial chunks ──────────

// ─── Adding forcings (first 2 days only) ─────────────────────────────────
struct NCForcing {
std::string path, var;
double dt; // hours per time step

// add spatial chunking info
size_t start_lat, num_lat, start_lon, num_lon;
};
std::vector<NCForcing> ncForcings = {
{"../data/pr_hourly_era5land_2019.nc", "pr", 1.0},
{"../data/t2m_daily_avg_era5land_2019.nc","t2m", 24.0}
{"../data/pr_hourly_era5land_2019.nc", "pr", 1.0, start_lat_pr, num_lat_pr, start_lon_pr, num_lon_pr},
{"../data/t2m_daily_avg_era5land_2019.nc","t2m", 24.0, start_lat_t2m, num_lat_t2m, start_lon_t2m, num_lon_t2m}
};
int nForc = int(ncForcings.size());

Expand All @@ -532,12 +628,20 @@ int main(int argc, char** argv) {
size_t steps2d = size_t(std::round(daysToLoad * 24.0 / fm.dt));
steps2d = std::min(fullTime, steps2d);

auto raw = loader.loadTimeChunk(0, steps2d);
// auto raw = loader.loadTimeChunk(0, steps2d);

// instead, load the chunk for the spatial rectangle defined by start_lat, num_lat, start_lon, num_lon
auto raw = loader.loadChunk(
0, steps2d,
fm.start_lat, fm.num_lat,
fm.start_lon, fm.num_lon
);

h_forc_dt.push_back(fm.dt);
h_forc_nT.push_back(steps2d);

size_t gridPts = loader.getLatSize() * loader.getLonSize();
// size_t gridPts = loader.getLatSize() * loader.getLonSize();
size_t gridPts = fm.num_lat * fm.num_lon; // HERE should be actual spatial chunk size instead of full domain size
float *basePtr = raw.get();

for (size_t t = 0; t < steps2d; ++t) {
Expand Down Expand Up @@ -793,8 +897,8 @@ int main(int argc, char** argv) {
for (int v = 0; v < N_EQ; ++v) state_vals[v] = v;

//Netcdf file attributes (will be defined in yaml)
std::string dense_filename = "/scratch/gpfs/am2192/dense_example_rank_"+ std::to_string(rank)+".nc";
std::string final_filename = "/scratch/gpfs/am2192/final_example_rank_"+ std::to_string(rank)+".nc";
std::string dense_filename = "/scratch/gpfs/dl0307/dense_example_rank_"+ std::to_string(rank)+".nc";
std::string final_filename = "/scratch/gpfs/dl0307/final_example_rank_"+ std::to_string(rank)+".nc";
int compression_level = 0;

// Write only the final time step (2D output)
Expand Down