Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using already created communicator ... and more #655

Merged
merged 5 commits into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 53 additions & 27 deletions utils/obsproc/NetCDFToIodaConverter.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ namespace gdasapp {
int niMetadata; // number of int metadata fields

// Non optional metadata
Eigen::ArrayXf longitude; //
Eigen::ArrayXf latitude; // " error
Eigen::ArrayXf longitude; // geo-location
Eigen::ArrayXf latitude; // "
Eigen::Array<int64_t, Eigen::Dynamic, 1> datetime; // Epoch date in seconds
std::string referenceDate; // Reference date for epoch time
std::string referenceDate; // Reference date for epoch time

// Obs info
Eigen::ArrayXf obsVal; // Observation value
Expand Down Expand Up @@ -62,7 +62,8 @@ namespace gdasapp {
class NetCDFToIodaConverter {
public:
// Constructor: Stores the configuration as a data members
explicit NetCDFToIodaConverter(const eckit::Configuration & fullConfig) {
explicit NetCDFToIodaConverter(const eckit::Configuration & fullConfig,
const eckit::mpi::Comm & comm): comm_(comm) {
// time window info
std::string winbegin;
std::string winend;
Expand All @@ -85,37 +86,34 @@ namespace gdasapp {

// Method to write out a IODA file (writter called in destructor)
void writeToIoda() {
// Get communicator
const eckit::mpi::Comm & comm = oops::mpi::world();

// Extract ioda variables from the provider's files
int myrank = comm.rank();
int myrank = comm_.rank();
int nobs(0);

// Currently need 1 PE per file, abort if not the case
ASSERT(comm.size() == inputFilenames_.size());
ASSERT(comm_.size() == inputFilenames_.size());

// Read the provider's netcdf file
gdasapp::IodaVars iodaVars = providerToIodaVars(inputFilenames_[myrank]);
nobs = iodaVars.location;

// Get the total number of obs across pe's
comm.allReduce(nobs, nobs, eckit::mpi::sum());
gdasapp::IodaVars iodaVarsAll(nobs, iodaVars.floatMetadataName, iodaVars.intMetadataName);
int nobsAll(0);
comm_.allReduce(nobs, nobsAll, eckit::mpi::sum());
gdasapp::IodaVars iodaVarsAll(nobsAll, iodaVars.floatMetadataName, iodaVars.intMetadataName);

// Gather iodaVars arrays
gatherObs(comm, iodaVars.longitude, iodaVarsAll.longitude);
gatherObs(comm, iodaVars.latitude, iodaVarsAll.latitude);
gatherObs(comm, iodaVars.datetime, iodaVarsAll.datetime);
gatherObs(comm, iodaVars.obsVal, iodaVarsAll.obsVal);
gatherObs(comm, iodaVars.obsError, iodaVarsAll.obsError);
gatherObs(comm, iodaVars.preQc, iodaVarsAll.preQc);
gatherObs(iodaVars.longitude, iodaVarsAll.longitude);
gatherObs(iodaVars.latitude, iodaVarsAll.latitude);
gatherObs(iodaVars.datetime, iodaVarsAll.datetime);
gatherObs(iodaVars.obsVal, iodaVarsAll.obsVal);
gatherObs(iodaVars.obsError, iodaVarsAll.obsError);
gatherObs(iodaVars.preQc, iodaVarsAll.preQc);

// Create empty group backed by HDF file
if (oops::mpi::world().rank() == 0) {
ioda::Group group =
ioda::Engines::HH::createFile(
outputFilename_,
ioda::Engines::HH::createFile(outputFilename_,
ioda::Engines::BackendCreateModes::Truncate_If_Exists);

// Update the group with the location dimension
Expand Down Expand Up @@ -155,7 +153,7 @@ namespace gdasapp {
ioda::Variable tmpIntMeta;
int count = 0;
for (const std::string& strMeta : iodaVars.intMetadataName) {
std::cout << strMeta << std::endl;
oops::Log::info() << strMeta << std::endl;
tmpIntMeta = ogrp.vars.createWithScales<float>("MetaData/"+strMeta,
{ogrp.vars["Location"]}, int_params);
tmpIntMeta.writeWithEigenRegular(iodaVars.intMetadata.col(count));
Expand All @@ -166,7 +164,7 @@ namespace gdasapp {
ioda::Variable tmpFloatMeta;
count = 0;
for (const std::string& strMeta : iodaVars.floatMetadataName) {
std::cout << strMeta << std::endl;
oops::Log::info() << strMeta << std::endl;
tmpFloatMeta = ogrp.vars.createWithScales<float>("MetaData/"+strMeta,
{ogrp.vars["Location"]}, int_params);
tmpFloatMeta.writeWithEigenRegular(iodaVars.floatMetadata.col(count));
Expand All @@ -190,13 +188,40 @@ namespace gdasapp {

// Gather for eigen array
template <typename T>
void gatherObs(const eckit::mpi::Comm & comm,
const Eigen::Array<T, Eigen::Dynamic, 1> & obsPe,
void gatherObs(const Eigen::Array<T, Eigen::Dynamic, 1> & obsPe,
Eigen::Array<T, Eigen::Dynamic, 1> & obsAllPes) {
std::vector<T> tmpVec(obsPe.data(), obsPe.data() + obsPe.size());
oops::mpi::allGatherv(comm, tmpVec);
for (int i = 0; i < tmpVec.size(); ++i) {
obsAllPes(i) = tmpVec[i];
// define root pe
const size_t root = 0;

// send pointer to the PE's data
std::vector<T> send(obsPe.data(), obsPe.data() + obsPe.size());
size_t ntasks = comm_.size();

// gather the sizes of the send buffers
int localnobs = send.size();
std::vector<int> sizes(ntasks);
comm_.allGather(localnobs, sizes.begin(), sizes.end());

// displacement for the received data
std::vector<int> displs(ntasks);
size_t rcvsz = sizes[0];
displs[0] = 0;
for (size_t jj = 1; jj < ntasks; ++jj) {
displs[jj] = displs[jj - 1] + sizes[jj - 1];
rcvsz += sizes[jj];
}

// create receiving buffer
std::vector<T> recv(0);

// gather all send buffers
if (comm_.rank() == root) recv.resize(rcvsz);
comm_.barrier();
comm_.gatherv(send, recv, sizes, displs, root);

if (comm_.rank() == root) {
obsAllPes.segment(0, recv.size()) =
Eigen::Map<Eigen::Array<T, Eigen::Dynamic, 1>>(recv.data(), recv.size());
}
}

Expand All @@ -217,5 +242,6 @@ namespace gdasapp {
std::vector<std::string> inputFilenames_;
std::string outputFilename_;
std::string variable_;
const eckit::mpi::Comm & comm_;
};
} // namespace gdasapp
4 changes: 2 additions & 2 deletions utils/obsproc/Rads2Ioda.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ namespace gdasapp {

class Rads2Ioda : public NetCDFToIodaConverter {
public:
explicit Rads2Ioda(const eckit::Configuration & fullConfig)
: NetCDFToIodaConverter(fullConfig) {
explicit Rads2Ioda(const eckit::Configuration & fullConfig, const eckit::mpi::Comm & comm)
: NetCDFToIodaConverter(fullConfig, comm) {
variable_ = "absoluteDynamicTopography";
}

Expand Down
4 changes: 2 additions & 2 deletions utils/obsproc/Smos2Ioda.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ namespace gdasapp {

class Smos2Ioda : public NetCDFToIodaConverter {
public:
explicit Smos2Ioda(const eckit::Configuration & fullConfig)
: NetCDFToIodaConverter(fullConfig) {
explicit Smos2Ioda(const eckit::Configuration & fullConfig, const eckit::mpi::Comm & comm)
: NetCDFToIodaConverter(fullConfig, comm) {
variable_ = "Salinity";
}

Expand Down
4 changes: 2 additions & 2 deletions utils/obsproc/applications/gdas_obsprovider2ioda.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ namespace gdasapp {
fullConfig.get("provider", provider);

if (provider == "RADS") {
Rads2Ioda conv2ioda(fullConfig);
Rads2Ioda conv2ioda(fullConfig, this->getComm());
conv2ioda.writeToIoda();
} else if (provider == "GHRSST") {
oops::Log::info() << "Comming soon!" << std::endl;
} else if (provider == "SMOS") {
Smos2Ioda conv2ioda(fullConfig);
Smos2Ioda conv2ioda(fullConfig, this->getComm());
conv2ioda.writeToIoda();
} else {
oops::Log::info() << "Provider not implemented" << std::endl;
Expand Down
Loading