PFASST++
mpi_communicator_impl.hpp
Go to the documentation of this file.
2 
3 #include "pfasst/logging.hpp"
4 
5 
6 namespace pfasst
7 {
8  namespace mpi
9  {
10  MPIError::MPIError(const string& msg)
11  : runtime_error(msg)
12  {}
13 
14  const char* MPIError::what() const throw()
15  {
16  return (string("mpi error: ") + string(runtime_error::what())).c_str();
17  }
18 
19  MPIError MPIError::from_code(const int err_code)
20  {
21  char err_str[MPI_MAX_ERROR_STRING];
22  int err_len = 0;
23  int err = MPI_Error_string(err_code, err_str, &err_len);
24  check_mpi_error(err);
25  return MPIError("MPI Error: " + string(err_str, err_len) + " (code=" + to_string(err_code) + ")");
26  }
27 
28 
30  {}
31 
33  {
34  set_comm(comm);
35  }
36 
37  void MPICommunicator::set_comm(MPI_Comm comm)
38  {
39  this->comm = comm;
40  MPI_Comm_size(this->comm, &(this->_size));
41  MPI_Comm_rank(this->comm, &(this->_rank));
42  int len = 0;
43  char buff[MPI_MAX_OBJECT_NAME];
44  int err = MPI_Comm_get_name(this->comm, buff, &len);
45  check_mpi_error(err);
46  if (len == 0) {
47  this->_name = string("world");
48  } else {
49  this->_name = string(buff, len);
50  }
51 
52  shared_ptr<MPIStatus> status = make_shared<MPIStatus>();
53  this->status = status;
54  this->status->set_comm(this);
55  }
56 
58  {
59  return this->_size;
60  }
61 
63  {
64  return this->_rank;
65  }
66 
68  {
69  return this->_name;
70  }
71 
72 
74  {
75  this->comm = comm;
76  this->converged.resize(comm->size());
77 
78  this->mpi = dynamic_cast<MPICommunicator*>(comm); assert(this->mpi);
79  }
80 
82  {
83  std::fill(converged.begin(), converged.end(), false);
84  }
85 
86  void MPIStatus::set_converged(bool converged)
87  {
88  ML_CLOG(DEBUG, "Controller", "set converged for rank " << this->comm->rank() << " to "
89  << "'" << boolalpha << converged << "'");
90  this->converged.at(this->comm->rank()) = converged;
91  assert(this->converged.at(this->comm->rank()) == converged);
92  }
93 
94  bool MPIStatus::get_converged(int rank)
95  {
96  return this->converged.at(rank);
97  }
98 
99  void MPIStatus::post(int tag)
100  {
101  // noop: send/recv for status info is blocking
102  UNUSED(tag);
103  }
104 
105  void MPIStatus::send(int tag)
106  {
107  // don't send forward if: single processor run, or we're the last processor
108  if (mpi->size() == 1) { return; }
109  if (mpi->rank() == mpi->size() - 1) { return; }
110 
111  int iconverged = converged.at(mpi->rank()) ? IStatus::CONVERGED : IStatus::NOT_CONVERGED;
112  int dest_rank = (mpi->rank() + 1) % mpi->size();
113 
114  ML_CLOG(DEBUG, "Controller", "sending converged status to rank " << dest_rank
115  << " with tag " << tag << ": "
116  << boolalpha << ((bool)iconverged == IStatus::CONVERGED));
117  int err = MPI_Send(&iconverged, 1, MPI_INT, dest_rank, tag, mpi->comm);
118  check_mpi_error(err);
119  ML_CLOG(DEBUG, "Controller", "sent converged status");
120  }
121 
122  void MPIStatus::recv(int tag)
123  {
124  // don't recv if: single processor run, or we're the first processor
125  if (mpi->size() == 1) { return; }
126  if (mpi->rank() == 0) { return; }
127 
128  if (get_converged(mpi->rank() - 1)) {
129  ML_CLOG(DEBUG, "Controller", "skipping status recieve as previous is stored as converged");
130  return;
131  }
132 
133  MPI_Status stat = MPI_Status_factory();
134  int iconverged = IStatus::NOT_CONVERGED;
135  int src_rank = (mpi->rank() - 1) % mpi->size();
136  ML_CLOG(DEBUG, "Controller", "receiving converged status from rank " << src_rank
137  << " with tag '1'");
138  int err = MPI_Recv(&iconverged, 1, MPI_INT, src_rank, tag, mpi->comm, &stat);
139  check_mpi_error(err);
140  ML_CLOG(DEBUG, "Controller", "received converged status from rank " << src_rank
141  << " with tag "<< tag << ": "
142  << boolalpha << ((bool)iconverged == IStatus::CONVERGED));
143 
144  converged.at(mpi->rank() - 1) = (iconverged == IStatus::CONVERGED) ? true : false;
145  }
146  } // ::pfasst::mpi
147 } // ::pfasst
148 
149 
150 MAKE_LOGGABLE(MPI_Status, mpi_status, os)
151 {
152  if ( mpi_status.MPI_TAG == MPI_ANY_TAG
153  && mpi_status.MPI_SOURCE == MPI_ANY_SOURCE
154  && mpi_status.MPI_ERROR == MPI_SUCCESS) {
155  os << "MPI_Status(empty)";
156  } else {
157  char err_str[MPI_MAX_ERROR_STRING];
158  int err_len = 0;
159  int err = MPI_Error_string(mpi_status.MPI_ERROR, err_str, &err_len);
161  os << "MPI_Status(source=" << to_string(mpi_status.MPI_SOURCE) << ", "
162  << "tag=" << to_string(mpi_status.MPI_TAG) << ", "
163  << "error=" << string(err_str, err_len) << ")";
164  }
165  return os;
166 }
virtual void clear() override
Resetting status.
ICommunicator * comm
Definition: interfaces.hpp:94
virtual void set_comm(ICommunicator *comm)
Set new communicator to use.
#define ML_CLOG(level, logger_id, x)
same as CLOG(level, logger, x) from easylogging++
Definition: logging.hpp:117
virtual void set_comm(MPI_Comm comm)
static const int CONVERGED
Definition: interfaces.hpp:91
virtual void recv(int tag) override
virtual int size()=0
virtual int rank()=0
MPIError(const string &msg="")
#define UNUSED(expr)
Denoting unused function parameters for omitting compiler warnings.
Definition: globals.hpp:32
virtual void post(int tag) override
Abstract interface for communicators.
Definition: interfaces.hpp:70
virtual void set_converged(bool converged) override
sets a new converged state.
MAKE_LOGGABLE(MPI_Status, mpi_status, os)
static MPIError from_code(const int err_code)
virtual bool get_converged(int rank) override
Retreive converged state for specific processor.
static void check_mpi_error(const int err_code)
checks MPI error code
virtual void send(int tag) override
shared_ptr< IStatus > status
Definition: interfaces.hpp:77
static const int NOT_CONVERGED
Definition: interfaces.hpp:90
static MPI_Status MPI_Status_factory()
creates and initializes a new empty MPI_Status object
virtual const char * what() const