Chombo + EB  3.0
sort.H
Go to the documentation of this file.
1 /***********************************************************
2  * This file is part of DEMSort, a high-performance *
3  * distributed-memory external-memory sorting program *
4  * *
5  * Copyright (C) 2010 Mirko Rahn and Johannes Singler *
6  * Karlsruhe Institute of Technology (KIT), Germany *
7  * *
8  * All rights reserved. Free for academic use. *
9  ***********************************************************/
10 
11 #ifndef _SORT_H_
12 #define _SORT_H_
13 
14 #ifdef CH_MPI
15 #include <mpi.h>
16 #endif
17 #include <cstring>
18 #include <iomanip>
19 #include <iostream>
20 #include <fstream>
21 #include <sstream>
22 
23 #include <tr1/memory>
24 #ifdef _GLIBCXX_PARALLEL
25 #include <parallel/algorithm>
26 #else
27 #include <algorithm>
28 #endif
29 
30 #include <Vector.H>
31 #include "CH_Timer.H"
32 #include "CH_assert.H"
33 
34 #include "sort_utils.H"
35 #include "parstream.H"
36 #include <queue>
37 
38 #include "UsingNamespace.H"
39 
40 #ifdef CH_MPI
41 
42 #ifdef VTRACE
43 #include "vt_user.h"
44 #endif
45 
46 /*********************************************************************** */
47 
48 #define RECV MPI_Irecv(buf, buf_size, MPI_BYTE, MPI_ANY_SOURCE, MPI_ANY_TAG \
49  , env->m_comm, req \
50  )
51 
52 #define WAIT MPI_Wait(req, stat); PE_NUM sender = stat->MPI_SOURCE
53 
54 #define ISEND(buf, count, id, tag) \
55  { MPI_Request dummy; \
56  MPI_Isend(buf, count, MPI_BYTE, id, tag, env->m_comm, &dummy); \
57  MPI_Request_free(&dummy); \
58  }
59 
60 #define SAY(id, tag) ISEND(NULL, 0, id, tag)
61 
62 #define ON_TAG_NEED \
63  { \
64  index_type idx; \
65  memcpy(&idx, buf, sizeof(index_type)); \
66  outbuf[sender] = a_data[idx]; \
67  } \
68  ISEND(outbuf + sender, sizeof(value_type), sender, TAG_GIVE)
69 
70 #define UPDATE(i) \
71  if (split_position[i] < 0) \
72  { \
73  split_value[i] = less.min_value(); pq.push(i); \
74  } \
75  else if (split_position[i] >= a_num_elements_on_pes[i]) \
76  { \
77  split_value[i] = less.max_value(); pq.push(i); \
78  } \
79  else \
80  { \
81  ISEND(split_position + i, sizeof(index_type), i, TAG_NEED); \
82  ++open_query; \
83  }
84 
85 
86 /* ************************************************************************* */
87 
88 #define FOR_ALL_PE(i) for (PE_NUM i = 0; i < env->m_numpes; ++i)
89 
90 template<class value_type, class Comparator>
91 index_type* mws_v0( value_type *a_data,
92  int* a_num_elements_on_pes,
93  const index_type a_my_cut,
94  Comparator less,
95  SortComm* env
96  )
97 {
98 #ifdef VTRACE
99  VT_USER_START("MW-Sel-setup");
100 #endif
101  CH_assert(env->m_numpes > 1); CH_assert(a_my_cut >= 0);
102 
103  index_type total_length = 0;
104  FOR_ALL_PE(i)
105  {
106  total_length += a_num_elements_on_pes[i];
107  CH_assert(a_num_elements_on_pes[i] >= 0);
108  }
109  CH_assert(a_my_cut <= total_length);
110 
111  /* *********************************************************************** */
112 
113  PE_NUM parent = (env->m_mype - 1) / 2;
114  PE_NUM lChild = 2 * env->m_mype + 1;
115  PE_NUM rChild = 2 * env->m_mype + 2;
116  PE_NUM nChild = 0;
117 
118  if (lChild < env->m_numpes)
119  {
120  ++nChild;
121  }
122  if (rChild < env->m_numpes)
123  {
124  ++nChild;
125  }
126 
127  index_type* split_position = new index_type[env->m_numpes]; // the splitter
128  index_type* need = new index_type[env->m_numpes];
129  value_type* split_value = new value_type[env->m_numpes]; // split_value[i]==e@i[split_position[i]]
130  value_type* outbuf = new value_type[env->m_numpes];
131 
132  std::fill(split_position, split_position + env->m_numpes, 0);
133 
134  index_type have = 0;
135  index_type stepsize = 2;
136 
137  while (stepsize < a_my_cut)
138  {
139  stepsize <<= 1;
140  }
141 
142  stepsize >>= 1;
143 
144  if (a_my_cut >= total_length)
145  {
146  stepsize = 0;
147  }
148 
149  /* ********************************************************************** */
150 
151  MPI_Datatype mpi_value_type; //value_type specification for MPI
152  MPI_Type_contiguous(sizeof(value_type), MPI_BYTE, &mpi_value_type);
153  MPI_Type_commit(&mpi_value_type);
154 
155  // initial distribution of a_data[0]
156 
157  value_type v0 = (a_num_elements_on_pes[env->m_mype] > 0) ? a_data[0] : less.max_value();
158 
159  MPI_Allgather(&v0, 1, mpi_value_type,
160  split_value, 1, mpi_value_type,
161  env->m_comm
162  );
163 
164  MPI_Type_free(const_cast<MPI_Datatype*>(&mpi_value_type));
165 
166  /* ********************************************************************** */
167 
169  typedef std::priority_queue<PE_NUM, std::vector<PE_NUM>, iiLess> pqueue;
170 
171  pqueue pq(iiLess(split_value, less));
172 
173  FOR_ALL_PE(i)
174  {
175  pq.push(i);
176  }
177 
178  unsigned buf_size = std::max(sizeof(value_type), sizeof(index_type));
179  char* buf = new char[buf_size];
180  MPI_Request* req = new MPI_Request;
181  MPI_Status* stat = new MPI_Status;
182 
183  /* ********************************************************************** */
184 
185  RECV;
186 
187  unsigned open_query = 0;
188  PE_NUM wChild = nChild;
189 #ifdef VTRACE
190  VT_USER_END("MW-Sel-setup");
191  VT_USER_START("MW-Sel-loop-1");
192 #endif
193  while (stepsize > 0)
194  {
195  while (have < a_my_cut)
196  {
197  while (open_query > 0)
198  {
199  WAIT;
200 
201  switch (stat->MPI_TAG)
202  {
203  case TAG_GIVE:
204  --open_query;
205  memcpy(split_value + sender, buf, sizeof(value_type));
206  pq.push(sender);
207  break;
208  case TAG_NEED: ON_TAG_NEED;
209  break;
210  case TAG_DONE: --wChild;
211  break;
212  default: CH_assert(false);
213  break;
214  }
215 
216  RECV;
217  } // open_query > 0
218 
219  // go to right in one sequence
220  PE_NUM min_index = pq.top();
221  pq.pop();
222 
223  split_position[min_index] += stepsize;
224  have += stepsize;
225  if (have < a_my_cut)
226  {
227  UPDATE(min_index);
228  }
229  } // have < want
230 
231  stepsize /= 2;
232 
233  if (stepsize)
234  {
235  while (!pq.empty()) // clear the queue
236  {
237  pq.pop();
238  }
239 
240  FOR_ALL_PE(i)
241  {
242  if (split_position[i] > 0)
243  {
244  split_position[i] -= stepsize;
245  have -= stepsize;
246  UPDATE(i);
247  }
248  else
249  {
250  pq.push(i);
251  }
252  }
253  }
254  }
255 #ifdef VTRACE
256  VT_USER_END("MW-Sel-loop-1");
257 #endif
258  /* *********************************************************************** */
259 
260  if (env->m_mype > 0 && wChild == 0)
261  {
262  SAY(parent, TAG_DONE);
263  }
264 
265  bool stop = false;
266 #ifdef VTRACE
267  VT_USER_START("MW-Sel-loop-2");
268 #endif
269  while (!stop)
270  {
271  WAIT;
272 
273  switch (stat->MPI_TAG)
274  {
275  case TAG_NEED: ON_TAG_NEED;
276  break;
277  case TAG_DONE:
278  if (--wChild == 0)
279  {
280  if (env->m_mype > 0)
281  {
282  SAY(parent, TAG_DONE);
283  }
284  else
285  {
286  if (lChild < env->m_numpes)
287  {
288  SAY(lChild, TAG_STOP);
289  }
290  if (rChild < env->m_numpes)
291  {
292  SAY(rChild, TAG_STOP);
293  }
294  stop = true;
295  }
296  }
297  break;
298  case TAG_STOP:
299  if (lChild < env->m_numpes)
300  {
301  SAY(lChild, TAG_STOP);
302  }
303  if (rChild < env->m_numpes)
304  {
305  SAY(rChild, TAG_STOP);
306  }
307  stop = true;
308  break;
309  case TAG_GIVE:
310  --open_query; // ????
311  break;
312  default: CH_assert(false);
313  break;
314  }
315 
316  RECV;
317  }
318 #ifdef VTRACE
319  VT_USER_END("MW-Sel-loop-2");
320 #endif
321  MPI_Cancel(req);
322  WAIT;
323 
324  /* *********************************************************************** */
325 
326  delete[] buf;
327  delete req;
328  delete stat;
329  delete[] need;
330  delete[] split_value;
331  delete[] outbuf;
332 
333  /* *********************************************************************** */
334 
335  return split_position;
336 }
337 
338 ////////////////////////////////////////////////////////////////////////
339 // mws_v1 -- v0 _ opt. for exisiting order
340 ////////////////////////////////////////////////////////////////////////
341 template<class value_type, class Comparator>
342 index_type* mws_v1( value_type *a_data,
343  int* a_num_elements_on_pes,
344  const index_type a_my_cut,
345  Comparator less,
346  SortComm* env
347  )
348 {
349 #ifdef VTRACE
350  VT_USER_START("MW-Sel-setup");
351 #endif
352  CH_assert(env->m_numpes > 1); CH_assert(a_my_cut >= 0);
353 
354  index_type total_length = 0;
355  FOR_ALL_PE(i)
356  {
357  total_length += a_num_elements_on_pes[i];
358  CH_assert(a_num_elements_on_pes[i] >= 0);
359  }
360  CH_assert(a_my_cut <= total_length);
361 
362  /* *********************************************************************** */
363 
364  PE_NUM parent = (env->m_mype - 1) / 2;
365  PE_NUM lChild = 2 * env->m_mype + 1;
366  PE_NUM rChild = 2 * env->m_mype + 2;
367  PE_NUM nChild = 0;
368 
369  if (lChild < env->m_numpes)
370  {
371  ++nChild;
372  }
373  if (rChild < env->m_numpes)
374  {
375  ++nChild;
376  }
377 
378  index_type* split_position = new index_type[env->m_numpes]; // the splitter
379  index_type* need = new index_type[env->m_numpes];
380  value_type* outbuf = new value_type[env->m_numpes];
381 
382  std::fill(split_position, split_position + env->m_numpes, 0);
383 
384  index_type stepsize = 2;
385 
386  while (stepsize < a_my_cut)
387  {
388  stepsize <<= 1;
389  }
390 
391  stepsize >>= 1;
392 
393  if (a_my_cut >= total_length)
394  {
395  stepsize = 0;
396  }
397 
398  /* ********************************************************************** */
399 
400 
401  // initial distribution of a_data[0] and get a_data[n(p)]
402  value_type* split_value = new value_type[env->m_numpes]; // split_value[i]==e@i[split_position[i]]
403  value_type* elem_top = new value_type[env->m_numpes];
404  {
405  CH_TIME("MW-Select:AllGather");
406  MPI_Datatype mpi_value_type; //value_type specification for MPI
407  MPI_Type_contiguous(sizeof(value_type), MPI_BYTE, &mpi_value_type);
408  MPI_Type_commit(&mpi_value_type);
409 
410  value_type* elem_tmp = new value_type[2*env->m_numpes];
411 
412  value_type v0[2] =
413  {
414  (a_num_elements_on_pes[env->m_mype] > 0) ? a_data[0] : less.max_value(),
415  a_data[a_num_elements_on_pes[env->m_mype]-1]
416  };
417 
418  MPI_Allgather( &v0, 2, mpi_value_type,
419  elem_tmp, 2, mpi_value_type,
420  env->m_comm
421  );
422  FOR_ALL_PE(i)
423  {
424  split_value[i] = elem_tmp[2*i];
425  elem_top[i] = elem_tmp[2*i + 1];
426  }
427 
428  delete elem_tmp;
429 
430  MPI_Type_free(const_cast<MPI_Datatype*>(&mpi_value_type));
431  }
432 
433  /* ********************************************************************** */
434 
436  typedef std::priority_queue<PE_NUM, std::vector<PE_NUM>, iiLess> pqueue;
437 
438  pqueue pq(iiLess(split_value, less));
439 
440  FOR_ALL_PE(i)
441  {
442  pq.push(i);
443  }
444 
445  // find low procs that can be deleted from process
446  index_type have = 0;
447  std::vector<bool> mask(env->m_numpes);
448  {
449  CH_TIME("MW-Select:make-mask");
450  FOR_ALL_PE(i)
451  {
452  mask[i] = false;
453  }
454  PE_NUM root = pq.top();
455  while (true)
456  {
457  index_type extra = 0; CH_assert(!mask[root]);
458  FOR_ALL_PE(i)
459  {
460  // !mask & bot[i] <= top[root] then have a interferer
461  if ( !mask[i] && split_value[i].get_key() <= elem_top[root].get_key() )
462  {
463  extra += a_num_elements_on_pes[i];
464  }
465  }
466  // see if we can add this
467  if ( have + extra <= a_my_cut )
468  {
469  have += a_num_elements_on_pes[root];
470  mask[root] = true;
471  split_position[root] = a_num_elements_on_pes[root];
472  pq.pop();
473  root = pq.top();
474  if ( have == a_my_cut ) break;
475  }
476  else
477  {
478  break;
479  }
480  }
481  }
482  delete[] elem_top; elem_top = 0;
483  CH_assert(have <= a_my_cut);
484 
485  unsigned buf_size = std::max(sizeof(value_type), sizeof(index_type));
486  char* buf = new char[buf_size];
487  MPI_Request* req = new MPI_Request;
488  MPI_Status* stat = new MPI_Status;
489 
490  RECV;
491 
492  unsigned open_query = 0;
493  PE_NUM wChild = nChild;
494 #ifdef VTRACE
495  VT_USER_END("MW-Sel-setup");
496  VT_USER_START("MW-Sel-loop-1");
497 #endif
498  while (stepsize > 0)
499  {
500  while (have < a_my_cut)
501  {
502  while (open_query > 0)
503  {
504  WAIT;
505 
506  switch (stat->MPI_TAG)
507  {
508  case TAG_GIVE:
509  --open_query;
510  memcpy(split_value + sender, buf, sizeof(value_type));
511  pq.push(sender);
512  break;
513  case TAG_NEED: ON_TAG_NEED;
514  break;
515  case TAG_DONE: --wChild;
516  break;
517  default: CH_assert(false);
518  break;
519  }
520 
521  RECV;
522  } // open_query > 0
523 
524  // go to right in one sequence
525  PE_NUM min_index = pq.top();
526  pq.pop();
527 
528  split_position[min_index] += stepsize;
529  have += stepsize;
530 
531  if (have < a_my_cut)
532  {
533  UPDATE(min_index);
534  }
535  } // have < want
536 
537  stepsize /= 2;
538 
539  if (stepsize)
540  {
541  while (!pq.empty()) // clear the queue
542  {
543  pq.pop();
544  }
545 
546  FOR_ALL_PE(i)
547  {
548  if ( !mask[i] )
549  {
550  if (split_position[i] > 0)
551  {
552  split_position[i] -= stepsize;
553  have -= stepsize;
554 
555  UPDATE(i);
556  }
557  else
558  {
559  pq.push(i);
560  }
561  }
562  }
563  }
564  }
565 #ifdef VTRACE
566  VT_USER_END("MW-Sel-loop-1");
567 #endif
568  /* *********************************************************************** */
569 
570  if (env->m_mype > 0 && wChild == 0)
571  {
572  SAY(parent, TAG_DONE);
573  }
574 
575  bool stop = false;
576 #ifdef VTRACE
577  VT_USER_START("MW-Sel-loop-2");
578 #endif
579  while (!stop)
580  {
581  WAIT;
582 
583  switch (stat->MPI_TAG)
584  {
585  case TAG_NEED: ON_TAG_NEED;
586  break;
587  case TAG_DONE:
588  if (--wChild == 0)
589  {
590  if (env->m_mype > 0)
591  {
592  SAY(parent, TAG_DONE);
593  }
594  else
595  {
596  if (lChild < env->m_numpes)
597  {
598  SAY(lChild, TAG_STOP);
599  }
600  if (rChild < env->m_numpes)
601  {
602  SAY(rChild, TAG_STOP);
603  }
604  stop = true;
605  }
606  }
607  break;
608  case TAG_STOP:
609  if (lChild < env->m_numpes)
610  {
611  SAY(lChild, TAG_STOP);
612  }
613  if (rChild < env->m_numpes)
614  {
615  SAY(rChild, TAG_STOP);
616  }
617  stop = true;
618  break;
619  case TAG_GIVE:
620  --open_query; // ????
621  break;
622  default: CH_assert(false);
623  break;
624  }
625 
626  RECV;
627  }
628  if (have != a_my_cut)
629  {
630  std::cout << "\t\t\tERROR [" << env->m_mype << "] have=" << have << " a_my_cut=" << a_my_cut << std::endl;
631  }
632  CH_assert(have == a_my_cut);
633 
634 #ifdef VTRACE
635  VT_USER_END("MW-Sel-loop-2");
636 #endif
637  MPI_Cancel(req);
638  WAIT;
639 
640  /* *********************************************************************** */
641 
642  delete[] buf;
643  delete req;
644  delete stat;
645  delete[] need;
646  delete[] split_value;
647  delete[] outbuf;
648 
649  /* *********************************************************************** */
650 
651  return split_position;
652 }
653 
654 /* ************************************************************************* */
655 ////////////////////////////////////////////////////////////////////////
656 // mws_v2
657 ////////////////////////////////////////////////////////////////////////
658 /** Alternative implementation for the interface, using collective operations only.
659  With existing order optimization */
660 template<class value_type, class Comparator>
661 index_type* mws_v2(value_type *a_data,
662  int* a_num_elements_on_pes,
663  const index_type a_my_cut,
664  Comparator less,
665  SortComm* env
666  )
667 {
668  if ( env->m_numpes == 1)
669  {
670  index_type* split_position = new index_type[env->m_numpes]; // the splitter
671  split_position[0] = 0;
672  return split_position;
673  }
674  CH_assert(a_my_cut >= 0);
675 
676  const index_type P = env->m_numpes;
677 
678  index_type total_length = 0;
679  for (PE_NUM i = 0; i < P; ++i)
680  {
681  total_length += a_num_elements_on_pes[i];
682  CH_assert(a_num_elements_on_pes[i] >= 0);
683  }
684  CH_assert(a_my_cut <= total_length);
685 
686  /* *********************************************************************** */
687 
688  //for termination protocol
689  PE_NUM parent = (env->m_mype - 1) / 2;
690  PE_NUM left_child = 2 * env->m_mype + 1;
691  PE_NUM right_child = 2 * env->m_mype + 2;
692  PE_NUM num_children = 0;
693  if (left_child < P)
694  {
695  ++num_children;
696  }
697  if (right_child < P)
698  {
699  ++num_children;
700  }
701 
702  index_type* split_positions = new index_type[P];
703  index_type* queries = new index_type[P];
704  value_type* send_buffer = new value_type[P];
705 
706  std::fill(split_positions, split_positions + P, 0); //start at the very left
707 
708  index_type stepsize = 2;
709 
710  while (stepsize < a_my_cut)
711  {
712  stepsize <<= 1;
713  }
714  stepsize >>= 1;
715  //stepsize can differ across the PEs, because they select a different rank
716 
717  if (a_my_cut == total_length)
718  {
719  for (PE_NUM p = 0; p < P; ++p)
720  split_positions[p] = a_num_elements_on_pes[p];
721  stepsize = 0; //do not select myself, but participate in collective data exchange
722  }
723 
724  /* ********************************************************************** */
725  // initial distribution of first elements
726  MPI_Datatype mpi_value_type; //value_type specification for MPI
727  MPI_Type_contiguous(sizeof(value_type), MPI_BYTE, &mpi_value_type);
728  MPI_Type_commit(&mpi_value_type);
729  value_type* split_values = new value_type[P]; // split_values[i]==e@i[split[i]]
730  value_type* elem_top = new value_type[P];
731  {
732  CH_TIME("MW-Select:AllGather");
733 
734  value_type* elem_tmp = new value_type[2*P]; // split_value[i]==e@i[split[i]]
735 
736  value_type v0[2] =
737  {
738  (a_num_elements_on_pes[env->m_mype] > 0) ? a_data[0] : less.max_value(),
739  a_data[a_num_elements_on_pes[env->m_mype]-1]
740  };
741 
742  MPI_Allgather(&v0, 2, mpi_value_type,
743  elem_tmp, 2, mpi_value_type,
744  env->m_comm
745  );
746  FOR_ALL_PE(i)
747  {
748  split_values[i] = elem_tmp[2*i];
749  elem_top[i] = elem_tmp[2*i + 1];
750  }
751 
752  delete elem_tmp;
753  }
754 
755  /* ********************************************************************** */
756  //initial state
758  typedef std::priority_queue<PE_NUM, std::vector<PE_NUM>, iiless> pqueue;
759 
760  //contains numbers p of the PEs, key is corresponding split_values[p]
761  pqueue pq(iiless(split_values, less));
762  for (PE_NUM p = 0; p < P; ++p)
763  {
764  pq.push(p);
765  }
766  // find low procs that can be deleted from process
767  index_type have = 0;
768  std::vector<bool> mask(P);
769  {
770  CH_TIME("MW-Select:make-mask");
771  FOR_ALL_PE(i)
772  {
773  mask[i] = false;
774  }
775  PE_NUM root = pq.top();
776  while (true)
777  {
778  index_type extra = 0; CH_assert(!mask[root]);
779  FOR_ALL_PE(i)
780  {
781  // !mask & bot[i] <= top[root] then have a interferer
782  if ( !mask[i] && split_values[i].get_key() <= elem_top[root].get_key() )
783  {
784  extra += a_num_elements_on_pes[i];
785  }
786  }
787  // see if we can add this
788  if ( have + extra <= a_my_cut )
789  {
790  have += a_num_elements_on_pes[root];
791  mask[root] = true;
792  split_positions[root] = a_num_elements_on_pes[root];
793  pq.pop();
794  root = pq.top();
795  if ( have == a_my_cut ) break;
796  }
797  else
798  {
799  break;
800  }
801  }
802  }
803  CH_assert(have <= a_my_cut);
804  delete[] elem_top; elem_top = 0;
805 
806  //receive either a query or an answer
807  unsigned receive_buffer_size = std::max(sizeof(value_type), sizeof(index_type));
808  char* receive_buffer = new char[receive_buffer_size];
809 
810  MPI_Request req;
811  MPI_Status status;
812 
813  /* ********************************************************************** */
814 
815 #define RECV2 MPI_Irecv(receive_buffer, receive_buffer_size, MPI_BYTE, MPI_ANY_SOURCE, MPI_ANY_TAG, env->m_comm, &req)
816 
817 #define TELL(id, tag) ISEND(NULL, 0, id, tag)
818 
819 #define ANSWER_QUERY \
820  { \
821  index_type idx; \
822  memcpy(&idx, receive_buffer, sizeof(index_type)); \
823  send_buffer[sender] = a_data[idx]; \
824  } \
825  ISEND(send_buffer + sender, sizeof(value_type), sender, TAG_ANSWER)
826 
827 #define CHECK_TERMINATION(condition, done_tag, stop_tag, done, done_before) \
828  { \
829  if (condition) \
830  { \
831  if (env->m_mype > 0) \
832  { \
833  TELL(parent, done_tag); \
834  } \
835  else \
836  { \
837  if (left_child < P) \
838  { \
839  TELL(left_child, stop_tag); \
840  } \
841  if (right_child < P) \
842  { \
843  TELL(right_child, stop_tag); \
844  } \
845  done = true; \
846  } \
847  } \
848  done_before = condition; \
849 }
850 
851 #define CHECK_FINAL_TERMINATION \
852 CHECK_TERMINATION(num_waiting_children == 0 && stepsize == 0 && !done_before, TAG_DONE, TAG_STOP, done, done_before)
853 
854 #define CHECK_SHIFT_RIGHT_TERMINATION \
855 bool dummy; CHECK_TERMINATION(num_children_done_shift_right == num_children, TAG_SHIFT_RIGHT_DONE, TAG_START_SHIFT_LEFT, shift_left, dummy);
856 
857 
858  /* ********************************************************************** */
859  // find own split positions and serve queries from others
860  //in each round
861  // -all PEs shift right until they have excess, exchanging queries and answers*
862  // -barrier (using tree-shaped termination protocol)
863  // -all PEs shift left once, exchanging updated elements in a collective communication*
864  // *PEs only participate as long as they have not found the final split position, however,
865  // they still serve answers to queries and participate in the collective communications
866  //barrier (using tree-shaped termination protocol)
867 
868  PE_NUM num_waiting_children = num_children;
869  bool done = false, done_before = false;
870 
871  CHECK_FINAL_TERMINATION;
872 
873  RECV2; //post receive
874 
875  while (!done)
876  {
877  PE_NUM num_children_done_shift_right = 0;
878 
879  //do query-answer game until the split position is shifted enough to the right
880  while (have < a_my_cut)
881  {
882  bool query_open = false;
883 
884  if (have < a_my_cut && stepsize > 0)
885  {
886  //find sequence with least split value
887  PE_NUM min_seq = pq.top();
888  pq.pop();
889 
890  //move to the right in this sequence
891  split_positions[min_seq] += stepsize;
892  have += stepsize;
893 
894  if (split_positions[min_seq] < 0)
895  {
896  split_values[min_seq] = less.min_value();
897  pq.push(min_seq);
898  }
899  else if (split_positions[min_seq] >= a_num_elements_on_pes[min_seq])
900  {
901  split_values[min_seq] = less.max_value();
902  pq.push(min_seq);
903  }
904  else
905  {
906  //send query for element
907  ISEND(split_positions + min_seq, sizeof(index_type),
908  min_seq, TAG_QUERY
909  );
910  query_open = true;
911  }
912  }
913 
914  //wait for answer, answer other queries in the meantime
915  while (query_open)
916  {
917  MPI_Wait(&req, &status);
918  PE_NUM sender = status.MPI_SOURCE;
919 
920  switch (status.MPI_TAG)
921  {
922  case TAG_ANSWER:
923  query_open = false;
924  memcpy(split_values + sender, receive_buffer, sizeof(value_type));
925  pq.push(sender);
926  break;
927  case TAG_QUERY:
928  ANSWER_QUERY;
929  break;
930  case TAG_SHIFT_RIGHT_DONE:
931  ++num_children_done_shift_right;
932  break;
933  case TAG_DONE: //child is done
934  --num_waiting_children;
935  CHECK_FINAL_TERMINATION;
936  break;
937  case TAG_STOP:
938  CH_assert(false);
939  if (left_child < P)
940  {
941  TELL(left_child, TAG_STOP);
942  }
943  if (right_child < P)
944  {
945  TELL(right_child, TAG_STOP);
946  }
947  done = true;
948  break;
949  default:
950  CH_assert(false);
951  break;
952  }
953 
954  RECV2;
955  } //num_open_queries > 0
956  } //have < rank
957 
958  bool shift_left = false;
959 
960  CHECK_SHIFT_RIGHT_TERMINATION;
961 
962  //wait for the command to shift left (kind of a barrier), answer queries meanwhile
963  while (!shift_left)
964  {
965  MPI_Wait(&req, &status);
966  PE_NUM sender = status.MPI_SOURCE;
967 
968  switch (status.MPI_TAG)
969  {
970  case TAG_QUERY:
971  ANSWER_QUERY;
972  break;
973  case TAG_SHIFT_RIGHT_DONE:
974  ++num_children_done_shift_right;
975  CHECK_SHIFT_RIGHT_TERMINATION;
976  break;
977  case TAG_START_SHIFT_LEFT:
978  if (left_child < P)
979  {
980  TELL(left_child, TAG_START_SHIFT_LEFT);
981  }
982  if (right_child < P)
983  {
984  TELL(right_child, TAG_START_SHIFT_LEFT);
985  }
986  shift_left = true;
987  break;
988  case TAG_DONE:
989  --num_waiting_children;
990  CHECK_FINAL_TERMINATION;
991  break;
992  case TAG_STOP:
993  if (left_child < P)
994  {
995  TELL(left_child, TAG_STOP);
996  }
997  if (right_child < P)
998  {
999  TELL(right_child, TAG_STOP);
1000  }
1001  done = true;
1002  break;
1003  default:
1004  CH_assert(false);
1005  break;
1006  }
1007 
1008  RECV2;
1009  }
1010 
1011  if (done)
1012  {
1013  break;
1014  }
1015  //now everybody is ready to shift left
1016 
1017  stepsize /= 2;
1018 
1019  if (stepsize > 0)
1020  {
1021  while (!pq.empty()) // clear the queue
1022  pq.pop();
1023 
1024  //shift left in each sequence
1025  for (PE_NUM p = 0; p < P; ++p)
1026  if (split_positions[p] > 0 && !mask[p] )
1027  {
1028  split_positions[p] -= stepsize;
1029  have -= stepsize;
1030  }
1031  }
1032 
1033  //query the elements on all split positions in a collective communication operation
1034  MPI_Alltoall(split_positions, 1, MPI_INDEX_TYPE,
1035  queries, 1, MPI_INDEX_TYPE,
1036  env->m_comm);
1037 
1038  //prepare answers
1039  for (PE_NUM p = 0; p < P; ++p)
1040  {
1041  if (queries[p] < 0)
1042  send_buffer[p] = less.min_value();
1043  else if (queries[p] >= a_num_elements_on_pes[env->m_mype])
1044  send_buffer[p] = less.max_value();
1045  else
1046  send_buffer[p] = a_data[queries[p]];
1047  }
1048 
1049  //exchange answers
1050  MPI_Alltoall(send_buffer, 1, mpi_value_type,
1051  split_values, 1, mpi_value_type,
1052  env->m_comm);
1053 
1054  //refill priority queue
1055  if (stepsize > 0)
1056  {
1057  for (PE_NUM p = 0; p < P; ++p)
1058  {
1059  if ( !mask[p] )
1060  {
1061  pq.push(p);
1062  }
1063  }
1064  }
1065 
1066  CHECK_FINAL_TERMINATION;
1067  } // !done
1068  if (have != a_my_cut)
1069  {
1070  std::cout << "\t\t\tERROR [" << env->m_mype << "] have=" << have << " a_my_cut=" << a_my_cut << std::endl;
1071  }
1072  CH_assert(have == a_my_cut);
1073 
1074  MPI_Cancel( &req );
1075  MPI_Wait( &req, &status );
1076 
1077  /* *********************************************************************** */
1078 
1079  MPI_Type_free(const_cast<MPI_Datatype*>(&mpi_value_type));
1080 
1081  delete[] receive_buffer;
1082  delete[] queries;
1083  delete[] split_values;
1084  delete[] send_buffer;
1085 
1086  /* *********************************************************************** */
1087 
1088  return split_positions;
1089 }
1090 
1091 ////////////////////////////////////////////////////////////////////////
1092 // mws_v4 -- v0 with optimization for inexact load balance and initial guess
1093 ////////////////////////////////////////////////////////////////////////
1094 template<class value_type, class Comparator>
1095 index_type* mws_v4(value_type *a_data,
1096  int* a_num_elements_on_pes,
1097  const index_type a_want,
1098  float a_margin,
1099  Comparator less,
1100  SortComm* env
1101  )
1102 {
1103 #ifdef VTRACE
1104  VT_USER_START("MW-Sel-setup");
1105 #endif
1106  const int P = env->m_numpes, mype = env->m_mype;
1107  CH_assert(env->m_numpes > 1); CH_assert(a_want >= 0);
1108 
1109  index_type total_length = 0;
1110  FOR_ALL_PE(i)
1111  {
1112  total_length += a_num_elements_on_pes[i];
1113  CH_assert(a_num_elements_on_pes[i] >= 0);
1114  }
1115  CH_assert(a_want <= total_length);
1116 
1117  int goal = (int)(total_length/P); CH_assert(goal>0);
1118  CH_assert(a_margin>0.0 && a_margin < 1.0);
1119  const int margin = (int)( (float)goal*a_margin ) + 1;
1120 if (mype==P-1)std::cout << "\t[" << mype << "]mws_v4 margin = " << margin<< ", a_margin = " << a_margin << ", num elems = " << total_length << std::endl;
1121 
1122  /* *********************************************************************** */
1123 
1124  PE_NUM parent = (env->m_mype - 1) / 2;
1125  PE_NUM lChild = 2 * env->m_mype + 1;
1126  PE_NUM rChild = 2 * env->m_mype + 2;
1127  PE_NUM nChild = 0;
1128 
1129  if (lChild < env->m_numpes)
1130  {
1131  ++nChild;
1132  }
1133  if (rChild < env->m_numpes)
1134  {
1135  ++nChild;
1136  }
1137 
1138  index_type* split_position = new index_type[env->m_numpes]; // the splitter
1139  index_type* need = new index_type[env->m_numpes];
1140  value_type* split_value = new value_type[env->m_numpes]; // split_value[i]==e@i[split_position[i]]
1141  value_type* outbuf = new value_type[env->m_numpes];
1142 
1143  std::fill(split_position, split_position + env->m_numpes, 0);
1144 
1145  index_type have = 0;
1146  index_type stepsize = 2;
1147 
1148  while (stepsize < a_want)
1149  {
1150  stepsize <<= 1;
1151  }
1152 
1153  stepsize >>= 1;
1154 
1155  if (a_want >= total_length)
1156  {
1157  stepsize = 0;
1158  }
1159 
1160  /* ********************************************************************** */
1161 
1162  MPI_Datatype mpi_value_type; //value_type specification for MPI
1163  MPI_Type_contiguous(sizeof(value_type), MPI_BYTE, &mpi_value_type);
1164  MPI_Type_commit(&mpi_value_type);
1165 
1166  // initial distribution of a_data[0]
1167 
1168  value_type v0 = (a_num_elements_on_pes[env->m_mype] > 0) ? a_data[0] : less.max_value();
1169 
1170  MPI_Allgather(&v0, 1, mpi_value_type,
1171  split_value, 1, mpi_value_type,
1172  env->m_comm
1173  );
1174 
1175  MPI_Type_free(const_cast<MPI_Datatype*>(&mpi_value_type));
1176 
1177  /* ********************************************************************** */
1178 
1180  typedef std::priority_queue<PE_NUM, std::vector<PE_NUM>, iiLess> pqueue;
1181 
1182  pqueue pq(iiLess(split_value, less));
1183 
1184  FOR_ALL_PE(i)
1185  {
1186  pq.push(i);
1187  }
1188 
1189  unsigned buf_size = std::max(sizeof(value_type), sizeof(index_type));
1190  char* buf = new char[buf_size];
1191  MPI_Request* req = new MPI_Request;
1192  MPI_Status* stat = new MPI_Status;
1193 
1194  /* ********************************************************************** */
1195 
1196  RECV;
1197 
1198  unsigned open_query = 0;
1199  PE_NUM wChild = nChild;
1200 #ifdef VTRACE
1201  VT_USER_END("MW-Sel-setup");
1202  VT_USER_START("MW-Sel-loop-1");
1203 #endif
1204  int ii = 0;
1205  while (stepsize > 0)
1206  {
1207  while ( have < a_want - margin )
1208  {
1209  while (open_query > 0)
1210  {
1211 
1212  WAIT;
1213 
1214  switch (stat->MPI_TAG)
1215  {
1216  case TAG_GIVE:
1217  --open_query;
1218  memcpy(split_value + sender, buf, sizeof(value_type));
1219  pq.push(sender);
1220  break;
1221  case TAG_NEED: ON_TAG_NEED;
1222  break;
1223  case TAG_DONE: --wChild;
1224  break;
1225  default: CH_assert(false);
1226  break;
1227  }
1228 
1229  RECV;
1230  } // open_query > 0
1231 
1232  // go to right in one sequence
1233  PE_NUM min_index = pq.top();
1234  pq.pop();
1235 
1236  split_position[min_index] += stepsize;
1237  have += stepsize;
1238 
1239  if (have < a_want - margin)
1240  {
1241  UPDATE(min_index);
1242  }
1243  } // have < want
1244 
1245  stepsize /= 2;
1246 
1247  if (stepsize)
1248  {
1249  while (!pq.empty()) // clear the queue
1250  {
1251  pq.pop();
1252  }
1253 
1254  FOR_ALL_PE(i)
1255  {
1256  if (split_position[i] > 0)
1257  {
1258  split_position[i] -= stepsize;
1259  have -= stepsize;
1260 
1261  UPDATE(i);
1262  }
1263  else
1264  {
1265  pq.push(i);
1266  }
1267  }
1268  }
1269  }
1270 #ifdef VTRACE
1271  VT_USER_END("MW-Sel-loop-1");
1272 #endif
1273  /* *********************************************************************** */
1274 
1275  if (env->m_mype > 0 && wChild == 0)
1276  {
1277  SAY(parent, TAG_DONE);
1278  }
1279 
1280  bool stop = false;
1281 #ifdef VTRACE
1282  VT_USER_START("MW-Sel-loop-2");
1283 #endif
1284  while (!stop)
1285  {
1286  WAIT;
1287 
1288  switch (stat->MPI_TAG)
1289  {
1290  case TAG_NEED: ON_TAG_NEED;
1291  break;
1292  case TAG_DONE:
1293  if (--wChild == 0)
1294  {
1295  if (env->m_mype > 0)
1296  {
1297  SAY(parent, TAG_DONE);
1298  }
1299  else
1300  {
1301  if (lChild < env->m_numpes)
1302  {
1303  SAY(lChild, TAG_STOP);
1304  }
1305  if (rChild < env->m_numpes)
1306  {
1307  SAY(rChild, TAG_STOP);
1308  }
1309  stop = true;
1310  }
1311  }
1312  break;
1313  case TAG_STOP:
1314  if (lChild < env->m_numpes)
1315  {
1316  SAY(lChild, TAG_STOP);
1317  }
1318  if (rChild < env->m_numpes)
1319  {
1320  SAY(rChild, TAG_STOP);
1321  }
1322  stop = true;
1323  break;
1324  default: CH_assert(false);
1325  break;
1326  }
1327 
1328  RECV;
1329  }
1330 #ifdef VTRACE
1331  VT_USER_END("MW-Sel-loop-2");
1332 #endif
1333  MPI_Cancel(req);
1334  WAIT;
1335 
1336  /* *********************************************************************** */
1337 
1338  delete[] buf;
1339  delete req;
1340  delete stat;
1341  delete[] need;
1342  delete[] split_value;
1343  delete[] outbuf;
1344 
1345  /* *********************************************************************** */
1346 
1347  return split_position;
1348 }
1349 
1350 #endif // CH_MPI
1351 
1352 ////////////////////////////////////////////////////////////////////////
1353 // class ParticleVector - public Vector<T>
1354 ////////////////////////////////////////////////////////////////////////
1355 template <class T, class Comparator>
1356 class ParticleVector : public Vector<T>
1357 {
1358 public:
1359  ParticleVector(unsigned int size) : Vector<T>(size)
1360 #ifdef CH_MPI
1361 , m_histo(0)
1362 #endif
1363  {}
1364 #ifdef CH_MPI
1365  ~ParticleVector()
1366  {
1367  if (m_histo!=0) delete m_histo;
1368  }
1369  //
1370  void global_sort( MPI_Comm a_comm, Comparator a_less, const int a_version = 2 );
1371 #else
1372  void global_sort( Comparator a_less, const int a_version = 2 );
1373 #endif
1374 private:
1375  T* memsort( T* a_data,
1376  int *a_local_len,
1377  const int a_capacity,
1378  Comparator a_less,
1379 #ifdef CH_MPI
1380  MPI_Comm a_comm,
1381 #endif
1382  const int a_version = 2);
1383 #ifdef CH_MPI
1384  index_type* mws_lborder( T *a_data,
1385  int a_nloc,
1386  float a_margin,
1387  int a_version,
1388  Comparator a_less,
1389  SortComm* a_env);
1390 
1391  int* mws_sdispl( T *a_data,
1392  int a_nloc,
1393  float a_margin,
1394  Comparator a_less,
1395  SortComm* a_env);
1396  // data
1397  Histogram *m_histo;
1398 #endif
1399 };
1400 
1401 ////////////////////////////////////////////////////////////////////////
1402 // ParticleVector<T,Comparator>::global_sort
1403 ////////////////////////////////////////////////////////////////////////
1404 template<class T, class Comparator>
1406 #ifdef CH_MPI
1407  MPI_Comm a_comm,
1408 #endif
1409  Comparator a_less,
1410  const int a_version )
1411 {
1412  int orig_size = this->size(), new_size = this->size();
1413  T *data = &(*this)[0];
1414 
1415  // sort
1416  T* ret = memsort( data, &new_size, this->capacity(), a_less,
1417 #ifdef CH_MPI
1418  a_comm,
1419 #endif
1420  a_version );
1421 
1422  if (ret != 0 )
1423  {
1424  CH_assert(new_size <= this->capacity());
1425  // hack to deal with array that changed size -- can we manually move the stack ptr?
1426  for (int i=new_size ; i < orig_size ; i++) this->pop_back(); // this call the destructor!
1427  for (int i=orig_size ; i < new_size ; i++ ) this->push_back( data[i] ); // adding itself (direct access to stack pointer?)
1428  CH_assert(new_size == this->size());
1429  }
1430 }
1431 
1432 #ifdef CH_MPI
1433 
1434 ////////////////////////////////////////////////////////////////////////
1435 // ParticleVector<T,Comparator>::mws_lborder
1436 ////////////////////////////////////////////////////////////////////////
1437 /** This function searches for the smallest elements in sorted container
1438  * \param data is a object with the operator[]
1439  * \param a_nloc is the size of the container on this PE
1440  * \param less is the compare function
1441  * \param a_env
1442  * \return array of P indices. Each entry states the number of elements on the PEs belonging to the sum(i=0..rank-1)num_elements_on_pe@i smallest elements
1443  * This function is a collective function (all PEs in comm have to call this function)
1444  * The PE do not have to use the same container_type, but the same value_type and Comparator
1445  */
1446 template<class T, class Comparator>
1448  int a_nloc, // sort can change this
1449  float a_margin,
1450  int a_version,
1451  Comparator a_less,
1452  SortComm* a_env
1453  )
1454 {
1455  // Johannes Singler's methods
1456  //the lengths of all containers
1457  int* num_el_pe = new int[a_env->m_numpes];
1458  MPI_Allgather( &a_nloc, 1, MPI_INT, num_el_pe, 1, MPI_INT, a_env->m_comm);
1459 
1460  //total number of Elements, which should be selected
1461  index_type num_elems = 0;
1462  for (int id = 0; id < a_env->m_numpes; ++id)
1463  {
1464  num_elems += num_el_pe[id];
1465  }
1466  index_type start_rank = (a_env->m_mype*num_elems)/a_env->m_numpes;
1467 
1468  // MW-Sel
1469  index_type* lborder;
1470  switch(a_version)
1471  {
1472  case 0:
1473  lborder = mws_v0<T, Comparator>
1474  (a_data, num_el_pe, start_rank, a_less, a_env );
1475  break;
1476  case 1:
1477  lborder = mws_v1<T, Comparator>
1478  (a_data, num_el_pe, start_rank, a_less, a_env );
1479  break;
1480  case 2:
1481  lborder = mws_v2<T, Comparator>
1482  (a_data, num_el_pe, start_rank, a_less, a_env );
1483  break;
1484  case 4:
1485  lborder = mws_v4<T, Comparator>
1486  (a_data, num_el_pe, start_rank, a_margin, a_less, a_env );
1487  break;
1488  default: CH_assert(0);
1489  }
1490 
1491  delete[] num_el_pe;
1492 
1493  return lborder;
1494 }
1495 //
1496 // ParticleVector<T,Comparator>::mws_sdispl
1497 //
1498 template<class T, class Comparator>
1500  int a_nloc,
1501  float a_margin,
1502  Comparator a_less,
1503  SortComm* a_env
1504  )
1505 {
1506  key_type max_v = a_less.max_value().get_key();
1507  key_type mink = a_less.max_value().get_key(), maxk = a_less.min_value().get_key();
1508  key_type *keys = new key_type[a_nloc];
1509  for (int i=0;i<a_nloc;i++)
1510  {
1511  key_type k = a_data[i].get_key();
1512  if (k == max_v) k = max_v - 1; // need have a sentinal value ???
1513  keys[i] = k;
1514  if ( k<mink ) mink = k;
1515  if ( k>maxk ) maxk = k;
1516  }
1517  key_type mink2, maxk2;
1518  MPI_Allreduce( &mink, &mink2, 1, MPI_INDEX_TYPE, MPI_MIN, a_env->m_comm );
1519  MPI_Allreduce( &maxk, &maxk2, 1, MPI_INDEX_TYPE, MPI_MAX, a_env->m_comm );
1520  if (m_histo==0)
1521  {
1522  m_histo = new Histogram( a_env, mink2, maxk2 );
1523  }
1524  else
1525  {
1526  m_histo->set_limits(mink2, maxk2);
1527  }
1528 
1529  int* sdispl = m_histo->mws_sdispl( keys, a_nloc, a_env->m_comm, a_margin );
1530 
1531  delete [] keys;
1532 
1533  return sdispl;
1534 }
1535 
1536 #endif // CH_MPI
1537 
1538 // ************************************************************************* //
1539 
1540 inline index_type sort_messages(index_type count, int max_elems_per_message)
1541 {
1542  index_type quot = count / max_elems_per_message;
1543  index_type rem = count % max_elems_per_message;
1544 
1545  return (quot + ((rem > 0) ? 1 : 0));
1546 }
1547 
1548 // ************************************************************************* //
1549 
1550 //This is the Algorithm for sorting in Memory.
1551 ////////////////////////////////////////////////////////////////////////
1552 // ParticleVector<T,Comparator>::memsort
1553 ////////////////////////////////////////////////////////////////////////
1554 template<class T, class Comparator>
1556  int *a_local_len,
1557  const int a_capacity,
1558  Comparator a_less,
1559 #ifdef CH_MPI
1560  MPI_Comm a_comm,
1561 #endif
1562  const int a_version)
1563 {
1564  const int loc_s_len = *a_local_len;
1565 #ifdef VTRACE
1566  VT_USER_START("Sort-local-sort");
1567 #endif
1568  CH_TIMERS("memsort");
1569 
1570  CH_TIMER("LocalSort", t1);
1571  CH_START(t1);
1572  std::sort(a_data, a_data + loc_s_len, a_less);
1573  CH_STOP(t1);
1574 #ifdef VTRACE
1575  VT_USER_END("Sort-local-sort");
1576 #endif
1577 
1578 #ifdef CH_MPI
1579 
1580  SortComm tcomm(a_comm), *env = &tcomm; // work with old code
1581 
1582  if (env->m_numpes == 1) return a_data; // all done
1583 
1584  MultiwaySelectResult *msr;
1585  float b_marg = 0.05; // load balance criteria
1586  if ( a_version != 3 )
1587  {
1588  CH_TIMER("Sort:merge-multiway-select", t9);
1589  CH_START(t9);
1590  index_type* lborder = mws_lborder(a_data, loc_s_len, b_marg, a_version,
1591  a_less, env );
1592  MPI_Barrier( a_comm ); //for correct sort wall time
1593  CH_STOP(t9);
1594  //std::cout << "\t[" << env->m_mype << "] lborder=" << lborder[0] << " " << lborder[1] << " " << lborder[2] << " " << lborder[3] << std::endl;
1595  CH_TIMER("Sort:Select-result", t2);
1596  CH_START(t2);
1597 #ifdef VTRACE
1598  VT_USER_START("Sort-WM-result");
1599 #endif
1600  msr = new MultiwaySelectResult( lborder, loc_s_len, env );
1601  delete [] lborder;
1602 #ifdef VTRACE
1603  VT_USER_END("Sort-WM-result");
1604  VT_USER_START("Sort-Send-Recv");
1605 #endif
1606  CH_STOP(t2);
1607  }
1608  else
1609  {
1610  CH_TIMER("Sort:bin-multiway-select", t9);
1611  CH_START(t9);
1612  int* sdispl = mws_sdispl(a_data, loc_s_len, b_marg, a_less, env );
1613  MPI_Barrier( a_comm ); //for correct sort wall time
1614  CH_STOP(t9);
1615 
1616  CH_TIMER("Sort:Select-result", t2);
1617  CH_START(t2);
1618 #ifdef VTRACE
1619  VT_USER_START("Sort-WM-result");
1620 #endif
1621  msr = new MultiwaySelectResult( sdispl, env );
1622 
1623 #ifdef VTRACE
1624  VT_USER_END("Sort-WM-result");
1625  VT_USER_START("Sort-Send-Recv");
1626 #endif
1627  CH_STOP(t2);
1628  }
1629 
1630  // debug
1631  for (int ii=0;ii<env->m_numpes;ii++)
1632  {
1633  if (msr->m_rcount[ii]<0)
1634  {
1635  std::cout << "\t\t\tERROR [" << env->m_mype << "] rcount=" << msr->m_rcount[0] << " " << msr->m_rcount[1] << " " << msr->m_rcount[2] << " " << msr->m_rcount[3] << std::endl;
1636  std::cout << "\t\t\tERROR [" << env->m_mype << "] sdispl=" << msr->m_sdispl[0] << " " << msr->m_sdispl[1] << " " << msr->m_sdispl[2] << " " << msr->m_sdispl[3] << " " << msr->m_sdispl[4] << std::endl;
1637  std::cout << "\t\t\tERROR [" << env->m_mype << "] scount=" << msr->m_scount[0] << " " << msr->m_scount[1] << " " << msr->m_scount[2] << " " << msr->m_scount[3] << std::endl;
1638  std::cout << "\t\t\tERROR [" << env->m_mype << "] rdispl=" << msr->m_rdispl[0] << " " << msr->m_rdispl[1] << " " << msr->m_rdispl[2] << " " << msr->m_rdispl[3] << std::endl;
1639  exit(122);
1640  }
1641  }
1642 
1643  // compute new local length
1644  int local_recv_len = 0;
1645  for (int ii=0;ii<env->m_numpes;ii++) local_recv_len += msr->m_rcount[ii];
1646  *a_local_len = local_recv_len; // output
1647  if ( local_recv_len > a_capacity || local_recv_len < 0 )
1648  {
1649  std::cout << "\t\t[" << env->m_mype << "]memsort ERROR: new local size = " << local_recv_len << ", capacity = " << a_capacity << std::endl;
1650  exit(12);
1651  }
1652 
1653  MPI_Barrier(a_comm); //for correct sort wall time
1654 
1655  T* merge_data = new T[local_recv_len];
1656 
1657  std::vector<T*> slab( env->m_numpes + 1 );
1658  slab[0] = merge_data;
1659  for (PE_NUM id = 1; id <= env->m_numpes; ++id)
1660  {
1661  slab[id] = slab[id - 1] + msr->m_rcount[id - 1];
1662  }
1663 
1664  CH_TIMER("Sort-send-recv.", t3);
1665  CH_START(t3);
1666 
1667  MPI_Datatype mpi_value_type; //value_type specification for MPI
1668  MPI_Type_contiguous(sizeof(T), MPI_BYTE, &mpi_value_type);
1669  MPI_Type_commit( &mpi_value_type );
1670 
1671  if (true) // collective send/recv good for unsorted data that requires all-to-all
1672  {
1673  MPI_Alltoallv( a_data, msr->m_scount, msr->m_sdispl, mpi_value_type,
1674  merge_data, msr->m_rcount, msr->m_rdispl, mpi_value_type,
1675  env->m_comm);
1676  }
1677  else
1678  {
1679  int max_elems_per_message = (1 << 20);
1680  index_type send = 0;
1681  index_type recv = 0;
1682  for (PE_NUM id = 0; id < env->m_numpes; ++id)
1683  {
1684  send += sort_messages(msr->m_scount[id], max_elems_per_message);
1685  recv += sort_messages(msr->m_rcount[id], max_elems_per_message);
1686  }
1687 
1688  // BSEND stuff
1689  unsigned bsend_buf_size = loc_s_len*sizeof(T) + send*MPI_BSEND_OVERHEAD;
1690  char* bsend_buf = new char[bsend_buf_size];
1691  MPI_Buffer_attach( bsend_buf, bsend_buf_size );
1692 
1693  MPI_Request* req = new MPI_Request[send + recv];
1694  index_type req_pos = 0;
1695  for (PE_NUM k = 0,
1696  id_recv = pe_right(env->m_mype, env),
1697  id_send = pe_left(env->m_mype, env)
1698 
1699  ; k < env->m_numpes
1700 
1701  ; ++k,
1702  id_recv = pe_right(id_recv, env),
1703  id_send = pe_left(id_send, env)
1704  )
1705  {
1706  T* buf_send = a_data + msr->m_sdispl[id_send];
1707  T* buf_recv = slab[id_recv];
1708 
1709  while (msr->m_scount[id_send] > 0 && msr->m_rcount[id_recv] > 0)
1710  {
1711  index_type elems_send = std::min(msr->m_scount[id_send],
1712  max_elems_per_message
1713  );
1714 
1715  //sum += elems_send;
1716 
1717  MPI_Ibsend(buf_send, // BSEND stuff
1718  elems_send,
1719  mpi_value_type,
1720  id_send,
1721  TAG_ALLTOALLV_MANUAL,
1722  env->m_comm,
1723  req + req_pos
1724  );
1725 
1726  ++req_pos;
1727  buf_send += elems_send;
1728  msr->m_scount[id_send] -= elems_send;
1729 
1730  index_type elems_recv = std::min(msr->m_rcount[id_recv],
1731  max_elems_per_message
1732  );
1733  MPI_Irecv(buf_recv,
1734  elems_recv,
1735  mpi_value_type,
1736  id_recv,
1737  TAG_ALLTOALLV_MANUAL,
1738  env->m_comm,
1739  req + req_pos
1740  );
1741 
1742  ++req_pos;
1743  buf_recv += elems_recv;
1744  msr->m_rcount[id_recv] -= elems_recv;
1745  }
1746 
1747  while (msr->m_scount[id_send] > 0)
1748  {
1749  index_type elems_send = std::min(msr->m_scount[id_send],
1750  max_elems_per_message
1751  );
1752 
1753  MPI_Ibsend(buf_send, // BSEND stuff
1754  elems_send,
1755  mpi_value_type,
1756  id_send,
1757  TAG_ALLTOALLV_MANUAL,
1758  env->m_comm,
1759  req + req_pos
1760  );
1761 
1762  ++req_pos;
1763  buf_send += elems_send;
1764  msr->m_scount[id_send] -= elems_send;
1765  }
1766 
1767  while (msr->m_rcount[id_recv] > 0)
1768  {
1769 
1770  index_type elems_recv = std::min(msr->m_rcount[id_recv],
1771  max_elems_per_message
1772  );
1773 
1774  MPI_Irecv(buf_recv,
1775  elems_recv,
1776  mpi_value_type,
1777  id_recv,
1778  TAG_ALLTOALLV_MANUAL,
1779  env->m_comm,
1780  req + req_pos
1781  );
1782 
1783  ++req_pos;
1784  buf_recv += elems_recv;
1785  msr->m_rcount[id_recv] -= elems_recv;
1786  }
1787  }
1788 
1789  MPI_Waitall(req_pos, req, MPI_STATUSES_IGNORE);
1790 
1791  int tt = loc_s_len;
1792  MPI_Buffer_detach( &bsend_buf, &tt ); // BSEND stuff
1793  delete bsend_buf;
1794 
1795  delete [] req;
1796  }
1797 
1798  MPI_Type_free(const_cast<MPI_Datatype*>(&mpi_value_type));
1799 
1800  delete msr;
1801 
1802  CH_STOP(t3);
1803 
1804 #ifdef VTRACE
1805  VT_USER_END("Sort-Send-Recv");
1806  VT_USER_START("Sort-final-merge");
1807 #endif
1808 
1809  //merging
1810  CH_TIMER("Final-merge", t5);
1811  CH_START(t5);
1812  if (true)
1813  {
1814  std::vector<std::pair<T*, T*> > sub_run_iterators;
1815  index_type length = 0;
1816  for (unsigned s = 0; s + 1 < slab.size(); ++s)
1817  {
1818  sub_run_iterators.push_back(std::make_pair(slab[s],
1819  slab[s + 1]
1820  )
1821  );
1822  length += slab[s + 1] - slab[s];
1823  }
1824  CH_assert(local_recv_len==length);
1825 
1826  CH_assert(false); // special gnu4.4 code commented out for now
1827  /*
1828  __gnu_parallel::multiway_merge(sub_run_iterators.begin(),
1829  sub_run_iterators.end(),
1830  a_data,
1831  length,
1832  a_less
1833  );
1834  */
1835  }
1836  else // no merge
1837  {
1838  for ( int ii = 0 ; ii < local_recv_len ; ii++ )
1839  {
1840  a_data[ii] = merge_data[ii];
1841  }
1842  }
1843  delete [] merge_data;
1844 
1845  CH_STOP(t5);
1846 
1847 #endif // CH_MPI
1848 
1849 #ifdef VTRACE
1850  VT_USER_END("Sort-final-merge");
1851 #endif
1852 
1853  return a_data;
1854 }
1855 
1856 #endif //*SORT_H_*//
#define CH_TIMERS(name)
Definition: CH_Timer.H:70
#define CH_assert(cond)
Definition: CHArray.H:37
one dimensional dynamic array
Definition: Vector.H:52
size_t capacity() const
Definition: Vector.H:345
IndexTM< T, N > min(const IndexTM< T, N > &a_p1, const IndexTM< T, N > &a_p2)
Definition: IndexTMI.H:396
#define CH_START(tpointer)
Definition: CH_Timer.H:78
index_type sort_messages(index_type count, int max_elems_per_message)
Definition: sort.H:1540
#define CH_TIMER(name, tpointer)
Definition: CH_Timer.H:55
void pop_back()
Definition: Vector.H:263
unsigned long long key_type
Definition: sort_hist.H:15
void push_back(const T &in)
Definition: Vector.H:283
Definition: sort.H:1356
#define CH_TIME(name)
Definition: CH_Timer.H:59
Definition: sort_utils.H:47
IndexTM< T, N > max(const IndexTM< T, N > &a_p1, const IndexTM< T, N > &a_p2)
Definition: IndexTMI.H:403
ParticleVector(unsigned int size)
Definition: sort.H:1359
size_t size() const
Definition: Vector.H:177
void global_sort(Comparator a_less, const int a_version=2)
Definition: sort.H:1405
Definition: sort_hist.H:47
#define CH_STOP(tpointer)
Definition: CH_Timer.H:80
long long index_type
Definition: sort_utils.H:22
T * memsort(T *a_data, int *a_local_len, const int a_capacity, Comparator a_less, const int a_version=2)
Definition: sort.H:1555