Chombo + EB + MF  3.2
MPI_util.H
Go to the documentation of this file.
1 #ifdef CH_LANG_CC
2 /*
3  * _______ __
4  * / ___/ / ___ __ _ / / ___
5  * / /__/ _ \/ _ \/ V \/ _ \/ _ \
6  * \___/_//_/\___/_/_/_/_.__/\___/
7  * Please refer to Copyright.txt, in Chombo's root directory.
8  */
9 #endif
10 
11 // This defines a function used to scatter particles to the proper process
12 
13 #ifndef _MPI_UTIL_H_
14 #define _MPI_UTIL_H_
15 
16 #ifdef CH_MPI
17 #include "mpi.h"
18 #include <vector>
19 #include <typeinfo>
20 #include <iomanip>
21 #include <map>
22 using std::map;
23 
24 #include "SPMD.H"
25 #include "REAL.H"
26 #include "List.H"
27 #include "RealVect.H"
28 #include "parstream.H"
29 #include "MayDay.H"
30 
31 #include "NamespaceHeader.H"
32 
33 // a_p is a local container in which to collect the P-objects passed by other
34 // processes. a_pp is a vector numProc long, whose elements are the containers
35 // of P-objects to be sent to each other process.
36 template <class P> void mpi_scatter_part(map<unsigned,List<P> >& a_p,
37  vector<map<unsigned,List<P> > >& a_pp)
38 {
39  CH_assert(a_pp.size()==numProc());
40 
41  const size_t psize = P().size();
42  vector<unsigned int> snd_sizes(numProc(),0), rcv_sizes(numProc(),0);
43 
44  typename map<unsigned int,List<P> >::iterator vi;
45  for (int p=0; p<numProc(); p++)
46  {
47  // total number of ps
48  for (vi=a_pp[p].begin(); vi!=a_pp[p].end(); ++vi)
49  snd_sizes[p] += vi->second.length();
50  // times p-size
51  snd_sizes[p] *= psize;
52  // +map keys + list lengths
53  snd_sizes[p] += a_pp[p].size() * (sizeof(unsigned int)+sizeof(size_t));
54  }
55 
56  int err = MPI_Alltoall(&snd_sizes[0],1,MPI_UNSIGNED,
57  &rcv_sizes[0],1,MPI_UNSIGNED,
58  Chombo_MPI::comm);
59  if (err != MPI_SUCCESS)
60  {
61  MayDay::Error("MPI communcation error in mpi_assign_part");
62  }
63  // sanity check
64  CH_assert(rcv_sizes[procID()]==0);
65 
66  vector<MPI_Request> rcv_req(numProc()-1);
67  vector<char*> rcv_buf(numProc(),NULL);
68  int rcv_req_cnt=0;
69 
70  for (int p=0; p<numProc(); p++)
71  {
72  if (rcv_sizes[p] > 0)
73  {
74  rcv_buf[p] = new char[rcv_sizes[p]];
75  MPI_Irecv(rcv_buf[p], rcv_sizes[p], MPI_CHAR, p, 1,
76  MPI_COMM_WORLD, &rcv_req[rcv_req_cnt] );
77  rcv_req_cnt++;
78  }
79  }
80 
81  vector<MPI_Request> snd_req(numProc()-1);
82  vector<char*> snd_buf(numProc(),NULL);
83  int snd_req_cnt=0;
84 
85  // pack up data
86  for (int p=0; p<numProc(); p++)
87  {
88  if (snd_sizes[p] > 0)
89  {
90  snd_buf[p] = new char[snd_sizes[p]];
91 
92  char* data = snd_buf[p];
93  for (vi=a_pp[p].begin(); vi!=a_pp[p].end(); ++vi)
94  {
95  *((unsigned int*)data)=vi->first;
96  data += sizeof(unsigned int);
97  *((size_t*)data)=vi->second.length();
98  data += sizeof(size_t);
99  for (ListIterator<P> li(vi->second); li.ok(); ++li)
100  {
101  li().linearOut((void*)data);
102  data += psize;
103  }
104  }
105 
106  MPI_Isend(snd_buf[p], snd_sizes[p], MPI_CHAR, p, 1,
107  MPI_COMM_WORLD, &snd_req[snd_req_cnt] );
108  snd_req_cnt++;
109  }
110  }
111 
112  vector<MPI_Status> rcv_stat(numProc()-1),snd_stat(numProc()-1);
113  if (snd_req_cnt>0)
114  {
115  int err = MPI_Waitall(snd_req_cnt, &snd_req[0], &snd_stat[0]);
116  if (err != MPI_SUCCESS)
117  {
118  MayDay::Error("mpi_scatter_part: send communication failed");
119  }
120  }
121 
122  if (rcv_req_cnt>0)
123  {
124  int err = MPI_Waitall(rcv_req_cnt, &rcv_req[0], &rcv_stat[0]);
125  if (err != MPI_SUCCESS)
126  {
127  MayDay::Error("mpi_scatter_part: receive communication failed");
128  }
129  }
130 
131  // unpack buffers
132  for (int p=0; p<numProc(); ++p)
133  {
134  P q;
135  if (rcv_sizes[p] > 0)
136  {
137  char* data = rcv_buf[p];
138  unsigned int in=0;
139  while (in < rcv_sizes[p])
140  {
141  unsigned int idx=*((unsigned int*)data);
142  data += sizeof(unsigned int);
143  size_t nps = *((size_t*)data);
144  data += sizeof(size_t);
145  unsigned int np=0;
146  while (np++<nps)
147  {
148  q.linearIn((void*)data);
149  a_p[idx].add(q);
150  data += psize;
151  }
152  in += sizeof(unsigned int)+sizeof(size_t)+psize*nps;
153  }
154  }
155  }
156 
157  // delete buffers
158  for (int p=0; p<numProc(); ++p)
159  {
160  a_pp[p].clear();
161  if (snd_sizes[p] > 0) delete snd_buf[p];
162  if (rcv_sizes[p] > 0) delete rcv_buf[p];
163  }
164 }
165 
166 #include "NamespaceFooter.H"
167 
168 #endif // if CH_MPI
169 
170 #endif
#define CH_assert(cond)
Definition: CHArray.H:37
unsigned int numProc()
number of parallel processes
bool ok() const
Return true if the iterator is not past the end of the list.
Definition: List.H:465
Iterator over a List.
Definition: List.H:20
static void Error(const char *const a_msg=m_nullString, int m_exitCode=CH_DEFAULT_ERROR_CODE)
Print out message to cerr and exit with the specified exit code.
int procID()
local process ID