24 #ifdef _GLIBCXX_PARALLEL 25 #include <parallel/algorithm> 48 #define RECV MPI_Irecv(buf, buf_size, MPI_BYTE, MPI_ANY_SOURCE, MPI_ANY_TAG \ 52 #define WAIT MPI_Wait(req, stat); PE_NUM sender = stat->MPI_SOURCE 54 #define ISEND(buf, count, id, tag) \ 55 { MPI_Request dummy; \ 56 MPI_Isend(buf, count, MPI_BYTE, id, tag, env->m_comm, &dummy); \ 57 MPI_Request_free(&dummy); \ 60 #define SAY(id, tag) ISEND(NULL, 0, id, tag) 65 memcpy(&idx, buf, sizeof(index_type)); \ 66 outbuf[sender] = a_data[idx]; \ 68 ISEND(outbuf + sender, sizeof(value_type), sender, TAG_GIVE) 71 if (split_position[i] < 0) \ 73 split_value[i] = less.min_value(); pq.push(i); \ 75 else if (split_position[i] >= a_num_elements_on_pes[i]) \ 77 split_value[i] = less.max_value(); pq.push(i); \ 81 ISEND(split_position + i, sizeof(index_type), i, TAG_NEED); \ 88 #define FOR_ALL_PE(i) for (PE_NUM i = 0; i < env->m_numpes; ++i) 90 template<
class value_type,
class Comparator>
92 int* a_num_elements_on_pes,
99 VT_USER_START(
"MW-Sel-setup");
106 total_length += a_num_elements_on_pes[i];
107 CH_assert(a_num_elements_on_pes[i] >= 0);
113 PE_NUM parent = (env->m_mype - 1) / 2;
114 PE_NUM lChild = 2 * env->m_mype + 1;
115 PE_NUM rChild = 2 * env->m_mype + 2;
118 if (lChild < env->m_numpes)
122 if (rChild < env->m_numpes)
129 value_type* split_value =
new value_type[env->m_numpes];
130 value_type* outbuf =
new value_type[env->m_numpes];
132 std::fill(split_position, split_position + env->m_numpes, 0);
137 while (stepsize < a_my_cut)
144 if (a_my_cut >= total_length)
151 MPI_Datatype mpi_value_type;
152 MPI_Type_contiguous(
sizeof(value_type), MPI_BYTE, &mpi_value_type);
153 MPI_Type_commit(&mpi_value_type);
157 value_type v0 = (a_num_elements_on_pes[env->m_mype] > 0) ? a_data[0] : less.max_value();
159 MPI_Allgather(&v0, 1, mpi_value_type,
160 split_value, 1, mpi_value_type,
164 MPI_Type_free(const_cast<MPI_Datatype*>(&mpi_value_type));
169 typedef std::priority_queue<PE_NUM, std::vector<PE_NUM>, iiLess> pqueue;
171 pqueue pq(iiLess(split_value, less));
179 char* buf =
new char[buf_size];
180 MPI_Request* req =
new MPI_Request;
181 MPI_Status* stat =
new MPI_Status;
187 unsigned open_query = 0;
188 PE_NUM wChild = nChild;
190 VT_USER_END(
"MW-Sel-setup");
191 VT_USER_START(
"MW-Sel-loop-1");
195 while (have < a_my_cut)
197 while (open_query > 0)
201 switch (stat->MPI_TAG)
205 memcpy(split_value + sender, buf,
sizeof(value_type));
208 case TAG_NEED: ON_TAG_NEED;
210 case TAG_DONE: --wChild;
220 PE_NUM min_index = pq.top();
223 split_position[min_index] += stepsize;
242 if (split_position[i] > 0)
244 split_position[i] -= stepsize;
256 VT_USER_END(
"MW-Sel-loop-1");
260 if (env->m_mype > 0 && wChild == 0)
262 SAY(parent, TAG_DONE);
267 VT_USER_START(
"MW-Sel-loop-2");
273 switch (stat->MPI_TAG)
275 case TAG_NEED: ON_TAG_NEED;
282 SAY(parent, TAG_DONE);
286 if (lChild < env->m_numpes)
288 SAY(lChild, TAG_STOP);
290 if (rChild < env->m_numpes)
292 SAY(rChild, TAG_STOP);
299 if (lChild < env->m_numpes)
301 SAY(lChild, TAG_STOP);
303 if (rChild < env->m_numpes)
305 SAY(rChild, TAG_STOP);
319 VT_USER_END(
"MW-Sel-loop-2");
330 delete[] split_value;
335 return split_position;
341 template<
class value_type,
class Comparator>
343 int* a_num_elements_on_pes,
350 VT_USER_START(
"MW-Sel-setup");
357 total_length += a_num_elements_on_pes[i];
358 CH_assert(a_num_elements_on_pes[i] >= 0);
364 PE_NUM parent = (env->m_mype - 1) / 2;
365 PE_NUM lChild = 2 * env->m_mype + 1;
366 PE_NUM rChild = 2 * env->m_mype + 2;
369 if (lChild < env->m_numpes)
373 if (rChild < env->m_numpes)
380 value_type* outbuf =
new value_type[env->m_numpes];
382 std::fill(split_position, split_position + env->m_numpes, 0);
386 while (stepsize < a_my_cut)
393 if (a_my_cut >= total_length)
402 value_type* split_value =
new value_type[env->m_numpes];
403 value_type* elem_top =
new value_type[env->m_numpes];
405 CH_TIME(
"MW-Select:AllGather");
406 MPI_Datatype mpi_value_type;
407 MPI_Type_contiguous(
sizeof(value_type), MPI_BYTE, &mpi_value_type);
408 MPI_Type_commit(&mpi_value_type);
410 value_type* elem_tmp =
new value_type[2*env->m_numpes];
414 (a_num_elements_on_pes[env->m_mype] > 0) ? a_data[0] : less.max_value(),
415 a_data[a_num_elements_on_pes[env->m_mype]-1]
418 MPI_Allgather( &v0, 2, mpi_value_type,
419 elem_tmp, 2, mpi_value_type,
424 split_value[i] = elem_tmp[2*i];
425 elem_top[i] = elem_tmp[2*i + 1];
430 MPI_Type_free(const_cast<MPI_Datatype*>(&mpi_value_type));
436 typedef std::priority_queue<PE_NUM, std::vector<PE_NUM>, iiLess> pqueue;
438 pqueue pq(iiLess(split_value, less));
447 std::vector<bool> mask(env->m_numpes);
449 CH_TIME(
"MW-Select:make-mask");
454 PE_NUM root = pq.top();
461 if ( !mask[i] && split_value[i].get_key() <= elem_top[root].get_key() )
463 extra += a_num_elements_on_pes[i];
467 if ( have + extra <= a_my_cut )
469 have += a_num_elements_on_pes[root];
471 split_position[root] = a_num_elements_on_pes[root];
474 if ( have == a_my_cut )
break;
482 delete[] elem_top; elem_top = 0;
486 char* buf =
new char[buf_size];
487 MPI_Request* req =
new MPI_Request;
488 MPI_Status* stat =
new MPI_Status;
492 unsigned open_query = 0;
493 PE_NUM wChild = nChild;
495 VT_USER_END(
"MW-Sel-setup");
496 VT_USER_START(
"MW-Sel-loop-1");
500 while (have < a_my_cut)
502 while (open_query > 0)
506 switch (stat->MPI_TAG)
510 memcpy(split_value + sender, buf,
sizeof(value_type));
513 case TAG_NEED: ON_TAG_NEED;
515 case TAG_DONE: --wChild;
525 PE_NUM min_index = pq.top();
528 split_position[min_index] += stepsize;
550 if (split_position[i] > 0)
552 split_position[i] -= stepsize;
566 VT_USER_END(
"MW-Sel-loop-1");
570 if (env->m_mype > 0 && wChild == 0)
572 SAY(parent, TAG_DONE);
577 VT_USER_START(
"MW-Sel-loop-2");
583 switch (stat->MPI_TAG)
585 case TAG_NEED: ON_TAG_NEED;
592 SAY(parent, TAG_DONE);
596 if (lChild < env->m_numpes)
598 SAY(lChild, TAG_STOP);
600 if (rChild < env->m_numpes)
602 SAY(rChild, TAG_STOP);
609 if (lChild < env->m_numpes)
611 SAY(lChild, TAG_STOP);
613 if (rChild < env->m_numpes)
615 SAY(rChild, TAG_STOP);
628 if (have != a_my_cut)
630 std::cout <<
"\t\t\tERROR [" << env->m_mype <<
"] have=" << have <<
" a_my_cut=" << a_my_cut << std::endl;
635 VT_USER_END(
"MW-Sel-loop-2");
646 delete[] split_value;
651 return split_position;
660 template<
class value_type,
class Comparator>
662 int* a_num_elements_on_pes,
668 if ( env->m_numpes == 1)
671 split_position[0] = 0;
672 return split_position;
679 for (PE_NUM i = 0; i < P; ++i)
681 total_length += a_num_elements_on_pes[i];
682 CH_assert(a_num_elements_on_pes[i] >= 0);
689 PE_NUM parent = (env->m_mype - 1) / 2;
690 PE_NUM left_child = 2 * env->m_mype + 1;
691 PE_NUM right_child = 2 * env->m_mype + 2;
692 PE_NUM num_children = 0;
704 value_type* send_buffer =
new value_type[P];
706 std::fill(split_positions, split_positions + P, 0);
710 while (stepsize < a_my_cut)
717 if (a_my_cut == total_length)
719 for (PE_NUM p = 0; p < P; ++p)
720 split_positions[p] = a_num_elements_on_pes[p];
726 MPI_Datatype mpi_value_type;
727 MPI_Type_contiguous(
sizeof(value_type), MPI_BYTE, &mpi_value_type);
728 MPI_Type_commit(&mpi_value_type);
729 value_type* split_values =
new value_type[P];
730 value_type* elem_top =
new value_type[P];
732 CH_TIME(
"MW-Select:AllGather");
734 value_type* elem_tmp =
new value_type[2*P];
738 (a_num_elements_on_pes[env->m_mype] > 0) ? a_data[0] : less.max_value(),
739 a_data[a_num_elements_on_pes[env->m_mype]-1]
742 MPI_Allgather(&v0, 2, mpi_value_type,
743 elem_tmp, 2, mpi_value_type,
748 split_values[i] = elem_tmp[2*i];
749 elem_top[i] = elem_tmp[2*i + 1];
758 typedef std::priority_queue<PE_NUM, std::vector<PE_NUM>, iiless> pqueue;
761 pqueue pq(iiless(split_values, less));
762 for (PE_NUM p = 0; p < P; ++p)
768 std::vector<bool> mask(P);
770 CH_TIME(
"MW-Select:make-mask");
775 PE_NUM root = pq.top();
782 if ( !mask[i] && split_values[i].get_key() <= elem_top[root].get_key() )
784 extra += a_num_elements_on_pes[i];
788 if ( have + extra <= a_my_cut )
790 have += a_num_elements_on_pes[root];
792 split_positions[root] = a_num_elements_on_pes[root];
795 if ( have == a_my_cut )
break;
804 delete[] elem_top; elem_top = 0;
808 char* receive_buffer =
new char[receive_buffer_size];
815 #define RECV2 MPI_Irecv(receive_buffer, receive_buffer_size, MPI_BYTE, MPI_ANY_SOURCE, MPI_ANY_TAG, env->m_comm, &req) 817 #define TELL(id, tag) ISEND(NULL, 0, id, tag) 819 #define ANSWER_QUERY \ 822 memcpy(&idx, receive_buffer, sizeof(index_type)); \ 823 send_buffer[sender] = a_data[idx]; \ 825 ISEND(send_buffer + sender, sizeof(value_type), sender, TAG_ANSWER) 827 #define CHECK_TERMINATION(condition, done_tag, stop_tag, done, done_before) \ 831 if (env->m_mype > 0) \ 833 TELL(parent, done_tag); \ 837 if (left_child < P) \ 839 TELL(left_child, stop_tag); \ 841 if (right_child < P) \ 843 TELL(right_child, stop_tag); \ 848 done_before = condition; \ 851 #define CHECK_FINAL_TERMINATION \ 852 CHECK_TERMINATION(num_waiting_children == 0 && stepsize == 0 && !done_before, TAG_DONE, TAG_STOP, done, done_before) 854 #define CHECK_SHIFT_RIGHT_TERMINATION \ 855 bool dummy; CHECK_TERMINATION(num_children_done_shift_right == num_children, TAG_SHIFT_RIGHT_DONE, TAG_START_SHIFT_LEFT, shift_left, dummy); 868 PE_NUM num_waiting_children = num_children;
869 bool done =
false, done_before =
false;
871 CHECK_FINAL_TERMINATION;
877 PE_NUM num_children_done_shift_right = 0;
880 while (have < a_my_cut)
882 bool query_open =
false;
884 if (have < a_my_cut && stepsize > 0)
887 PE_NUM min_seq = pq.top();
891 split_positions[min_seq] += stepsize;
894 if (split_positions[min_seq] < 0)
896 split_values[min_seq] = less.min_value();
899 else if (split_positions[min_seq] >= a_num_elements_on_pes[min_seq])
901 split_values[min_seq] = less.max_value();
907 ISEND(split_positions + min_seq,
sizeof(
index_type),
917 MPI_Wait(&req, &status);
918 PE_NUM sender = status.MPI_SOURCE;
920 switch (status.MPI_TAG)
924 memcpy(split_values + sender, receive_buffer,
sizeof(value_type));
930 case TAG_SHIFT_RIGHT_DONE:
931 ++num_children_done_shift_right;
934 --num_waiting_children;
935 CHECK_FINAL_TERMINATION;
941 TELL(left_child, TAG_STOP);
945 TELL(right_child, TAG_STOP);
958 bool shift_left =
false;
960 CHECK_SHIFT_RIGHT_TERMINATION;
965 MPI_Wait(&req, &status);
966 PE_NUM sender = status.MPI_SOURCE;
968 switch (status.MPI_TAG)
973 case TAG_SHIFT_RIGHT_DONE:
974 ++num_children_done_shift_right;
975 CHECK_SHIFT_RIGHT_TERMINATION;
977 case TAG_START_SHIFT_LEFT:
980 TELL(left_child, TAG_START_SHIFT_LEFT);
984 TELL(right_child, TAG_START_SHIFT_LEFT);
989 --num_waiting_children;
990 CHECK_FINAL_TERMINATION;
995 TELL(left_child, TAG_STOP);
999 TELL(right_child, TAG_STOP);
1025 for (PE_NUM p = 0; p < P; ++p)
1026 if (split_positions[p] > 0 && !mask[p] )
1028 split_positions[p] -= stepsize;
1034 MPI_Alltoall(split_positions, 1, MPI_INDEX_TYPE,
1035 queries, 1, MPI_INDEX_TYPE,
1039 for (PE_NUM p = 0; p < P; ++p)
1042 send_buffer[p] = less.min_value();
1043 else if (queries[p] >= a_num_elements_on_pes[env->m_mype])
1044 send_buffer[p] = less.max_value();
1046 send_buffer[p] = a_data[queries[p]];
1050 MPI_Alltoall(send_buffer, 1, mpi_value_type,
1051 split_values, 1, mpi_value_type,
1057 for (PE_NUM p = 0; p < P; ++p)
1066 CHECK_FINAL_TERMINATION;
1068 if (have != a_my_cut)
1070 std::cout <<
"\t\t\tERROR [" << env->m_mype <<
"] have=" << have <<
" a_my_cut=" << a_my_cut << std::endl;
1075 MPI_Wait( &req, &status );
1079 MPI_Type_free(const_cast<MPI_Datatype*>(&mpi_value_type));
1081 delete[] receive_buffer;
1083 delete[] split_values;
1084 delete[] send_buffer;
1088 return split_positions;
1094 template<
class value_type,
class Comparator>
1096 int* a_num_elements_on_pes,
1104 VT_USER_START(
"MW-Sel-setup");
1106 const int P = env->m_numpes, mype = env->m_mype;
1112 total_length += a_num_elements_on_pes[i];
1113 CH_assert(a_num_elements_on_pes[i] >= 0);
1117 int goal = (int)(total_length/P);
CH_assert(goal>0);
1118 CH_assert(a_margin>0.0 && a_margin < 1.0);
1119 const int margin = (int)( (
float)goal*a_margin ) + 1;
1120 if (mype==P-1)std::cout <<
"\t[" << mype <<
"]mws_v4 margin = " << margin<<
", a_margin = " << a_margin <<
", num elems = " << total_length << std::endl;
1124 PE_NUM parent = (env->m_mype - 1) / 2;
1125 PE_NUM lChild = 2 * env->m_mype + 1;
1126 PE_NUM rChild = 2 * env->m_mype + 2;
1129 if (lChild < env->m_numpes)
1133 if (rChild < env->m_numpes)
1140 value_type* split_value =
new value_type[env->m_numpes];
1141 value_type* outbuf =
new value_type[env->m_numpes];
1143 std::fill(split_position, split_position + env->m_numpes, 0);
1148 while (stepsize < a_want)
1155 if (a_want >= total_length)
1162 MPI_Datatype mpi_value_type;
1163 MPI_Type_contiguous(
sizeof(value_type), MPI_BYTE, &mpi_value_type);
1164 MPI_Type_commit(&mpi_value_type);
1168 value_type v0 = (a_num_elements_on_pes[env->m_mype] > 0) ? a_data[0] : less.max_value();
1170 MPI_Allgather(&v0, 1, mpi_value_type,
1171 split_value, 1, mpi_value_type,
1175 MPI_Type_free(const_cast<MPI_Datatype*>(&mpi_value_type));
1180 typedef std::priority_queue<PE_NUM, std::vector<PE_NUM>, iiLess> pqueue;
1182 pqueue pq(iiLess(split_value, less));
1190 char* buf =
new char[buf_size];
1191 MPI_Request* req =
new MPI_Request;
1192 MPI_Status* stat =
new MPI_Status;
1198 unsigned open_query = 0;
1199 PE_NUM wChild = nChild;
1201 VT_USER_END(
"MW-Sel-setup");
1202 VT_USER_START(
"MW-Sel-loop-1");
1205 while (stepsize > 0)
1207 while ( have < a_want - margin )
1209 while (open_query > 0)
1214 switch (stat->MPI_TAG)
1218 memcpy(split_value + sender, buf,
sizeof(value_type));
1221 case TAG_NEED: ON_TAG_NEED;
1223 case TAG_DONE: --wChild;
1233 PE_NUM min_index = pq.top();
1236 split_position[min_index] += stepsize;
1239 if (have < a_want - margin)
1256 if (split_position[i] > 0)
1258 split_position[i] -= stepsize;
1271 VT_USER_END(
"MW-Sel-loop-1");
1275 if (env->m_mype > 0 && wChild == 0)
1277 SAY(parent, TAG_DONE);
1282 VT_USER_START(
"MW-Sel-loop-2");
1288 switch (stat->MPI_TAG)
1290 case TAG_NEED: ON_TAG_NEED;
1295 if (env->m_mype > 0)
1297 SAY(parent, TAG_DONE);
1301 if (lChild < env->m_numpes)
1303 SAY(lChild, TAG_STOP);
1305 if (rChild < env->m_numpes)
1307 SAY(rChild, TAG_STOP);
1314 if (lChild < env->m_numpes)
1316 SAY(lChild, TAG_STOP);
1318 if (rChild < env->m_numpes)
1320 SAY(rChild, TAG_STOP);
1331 VT_USER_END(
"MW-Sel-loop-2");
1342 delete[] split_value;
1347 return split_position;
1355 template <
class T,
class Comparator>
1367 if (m_histo!=0)
delete m_histo;
1370 void global_sort( MPI_Comm a_comm, Comparator a_less,
const int a_version = 2 );
1372 void global_sort( Comparator a_less,
const int a_version = 2 );
1377 const int a_capacity,
1382 const int a_version = 2);
1391 int* mws_sdispl( T *a_data,
1404 template<
class T,
class Comparator>
1410 const int a_version )
1412 int orig_size = this->
size(), new_size = this->
size();
1413 T *data = &(*this)[0];
1426 for (
int i=new_size ; i < orig_size ; i++) this->
pop_back();
1427 for (
int i=orig_size ; i < new_size ; i++ ) this->
push_back( data[i] );
1446 template<
class T,
class Comparator>
1457 int* num_el_pe =
new int[a_env->m_numpes];
1458 MPI_Allgather( &a_nloc, 1, MPI_INT, num_el_pe, 1, MPI_INT, a_env->m_comm);
1462 for (
int id = 0;
id < a_env->m_numpes; ++id)
1464 num_elems += num_el_pe[id];
1466 index_type start_rank = (a_env->m_mype*num_elems)/a_env->m_numpes;
1473 lborder = mws_v0<T, Comparator>
1474 (a_data, num_el_pe, start_rank, a_less, a_env );
1477 lborder = mws_v1<T, Comparator>
1478 (a_data, num_el_pe, start_rank, a_less, a_env );
1481 lborder = mws_v2<T, Comparator>
1482 (a_data, num_el_pe, start_rank, a_less, a_env );
1485 lborder = mws_v4<T, Comparator>
1486 (a_data, num_el_pe, start_rank, a_margin, a_less, a_env );
1498 template<
class T,
class Comparator>
1506 key_type max_v = a_less.max_value().get_key();
1507 key_type mink = a_less.max_value().get_key(), maxk = a_less.min_value().get_key();
1509 for (
int i=0;i<a_nloc;i++)
1512 if (k == max_v) k = max_v - 1;
1514 if ( k<mink ) mink = k;
1515 if ( k>maxk ) maxk = k;
1518 MPI_Allreduce( &mink, &mink2, 1, MPI_INDEX_TYPE, MPI_MIN, a_env->m_comm );
1519 MPI_Allreduce( &maxk, &maxk2, 1, MPI_INDEX_TYPE, MPI_MAX, a_env->m_comm );
1522 m_histo =
new Histogram( a_env, mink2, maxk2 );
1526 m_histo->set_limits(mink2, maxk2);
1529 int* sdispl = m_histo->mws_sdispl( keys, a_nloc, a_env->m_comm, a_margin );
1542 index_type quot = count / max_elems_per_message;
1543 index_type rem = count % max_elems_per_message;
1545 return (quot + ((rem > 0) ? 1 : 0));
1554 template<
class T,
class Comparator>
1557 const int a_capacity,
1562 const int a_version)
1564 const int loc_s_len = *a_local_len;
1566 VT_USER_START(
"Sort-local-sort");
1572 std::sort(a_data, a_data + loc_s_len, a_less);
1575 VT_USER_END(
"Sort-local-sort");
1580 SortComm tcomm(a_comm), *env = &tcomm;
1582 if (env->m_numpes == 1)
return a_data;
1584 MultiwaySelectResult *msr;
1585 float b_marg = 0.05;
1586 if ( a_version != 3 )
1588 CH_TIMER(
"Sort:merge-multiway-select", t9);
1590 index_type* lborder = mws_lborder(a_data, loc_s_len, b_marg, a_version,
1592 MPI_Barrier( a_comm );
1595 CH_TIMER(
"Sort:Select-result", t2);
1598 VT_USER_START(
"Sort-WM-result");
1600 msr =
new MultiwaySelectResult( lborder, loc_s_len, env );
1603 VT_USER_END(
"Sort-WM-result");
1604 VT_USER_START(
"Sort-Send-Recv");
1610 CH_TIMER(
"Sort:bin-multiway-select", t9);
1612 int* sdispl = mws_sdispl(a_data, loc_s_len, b_marg, a_less, env );
1613 MPI_Barrier( a_comm );
1616 CH_TIMER(
"Sort:Select-result", t2);
1619 VT_USER_START(
"Sort-WM-result");
1621 msr =
new MultiwaySelectResult( sdispl, env );
1624 VT_USER_END(
"Sort-WM-result");
1625 VT_USER_START(
"Sort-Send-Recv");
1631 for (
int ii=0;ii<env->m_numpes;ii++)
1633 if (msr->m_rcount[ii]<0)
1635 std::cout <<
"\t\t\tERROR [" << env->m_mype <<
"] rcount=" << msr->m_rcount[0] <<
" " << msr->m_rcount[1] <<
" " << msr->m_rcount[2] <<
" " << msr->m_rcount[3] << std::endl;
1636 std::cout <<
"\t\t\tERROR [" << env->m_mype <<
"] sdispl=" << msr->m_sdispl[0] <<
" " << msr->m_sdispl[1] <<
" " << msr->m_sdispl[2] <<
" " << msr->m_sdispl[3] <<
" " << msr->m_sdispl[4] << std::endl;
1637 std::cout <<
"\t\t\tERROR [" << env->m_mype <<
"] scount=" << msr->m_scount[0] <<
" " << msr->m_scount[1] <<
" " << msr->m_scount[2] <<
" " << msr->m_scount[3] << std::endl;
1638 std::cout <<
"\t\t\tERROR [" << env->m_mype <<
"] rdispl=" << msr->m_rdispl[0] <<
" " << msr->m_rdispl[1] <<
" " << msr->m_rdispl[2] <<
" " << msr->m_rdispl[3] << std::endl;
1644 int local_recv_len = 0;
1645 for (
int ii=0;ii<env->m_numpes;ii++) local_recv_len += msr->m_rcount[ii];
1646 *a_local_len = local_recv_len;
1647 if ( local_recv_len > a_capacity || local_recv_len < 0 )
1649 std::cout <<
"\t\t[" << env->m_mype <<
"]memsort ERROR: new local size = " << local_recv_len <<
", capacity = " << a_capacity << std::endl;
1653 MPI_Barrier(a_comm);
1655 T* merge_data =
new T[local_recv_len];
1657 std::vector<T*> slab( env->m_numpes + 1 );
1658 slab[0] = merge_data;
1659 for (PE_NUM
id = 1;
id <= env->m_numpes; ++id)
1661 slab[id] = slab[
id - 1] + msr->m_rcount[
id - 1];
1667 MPI_Datatype mpi_value_type;
1668 MPI_Type_contiguous(
sizeof(T), MPI_BYTE, &mpi_value_type);
1669 MPI_Type_commit( &mpi_value_type );
1673 MPI_Alltoallv( a_data, msr->m_scount, msr->m_sdispl, mpi_value_type,
1674 merge_data, msr->m_rcount, msr->m_rdispl, mpi_value_type,
1679 int max_elems_per_message = (1 << 20);
1682 for (PE_NUM
id = 0;
id < env->m_numpes; ++id)
1684 send +=
sort_messages(msr->m_scount[
id], max_elems_per_message);
1685 recv +=
sort_messages(msr->m_rcount[
id], max_elems_per_message);
1689 unsigned bsend_buf_size = loc_s_len*
sizeof(T) + send*MPI_BSEND_OVERHEAD;
1690 char* bsend_buf =
new char[bsend_buf_size];
1691 MPI_Buffer_attach( bsend_buf, bsend_buf_size );
1693 MPI_Request* req =
new MPI_Request[send + recv];
1696 id_recv = pe_right(env->m_mype, env),
1697 id_send = pe_left(env->m_mype, env)
1702 id_recv = pe_right(id_recv, env),
1703 id_send = pe_left(id_send, env)
1706 T* buf_send = a_data + msr->m_sdispl[id_send];
1707 T* buf_recv = slab[id_recv];
1709 while (msr->m_scount[id_send] > 0 && msr->m_rcount[id_recv] > 0)
1712 max_elems_per_message
1717 MPI_Ibsend(buf_send,
1721 TAG_ALLTOALLV_MANUAL,
1727 buf_send += elems_send;
1728 msr->m_scount[id_send] -= elems_send;
1731 max_elems_per_message
1737 TAG_ALLTOALLV_MANUAL,
1743 buf_recv += elems_recv;
1744 msr->m_rcount[id_recv] -= elems_recv;
1747 while (msr->m_scount[id_send] > 0)
1750 max_elems_per_message
1753 MPI_Ibsend(buf_send,
1757 TAG_ALLTOALLV_MANUAL,
1763 buf_send += elems_send;
1764 msr->m_scount[id_send] -= elems_send;
1767 while (msr->m_rcount[id_recv] > 0)
1771 max_elems_per_message
1778 TAG_ALLTOALLV_MANUAL,
1784 buf_recv += elems_recv;
1785 msr->m_rcount[id_recv] -= elems_recv;
1789 MPI_Waitall(req_pos, req, MPI_STATUSES_IGNORE);
1792 MPI_Buffer_detach( &bsend_buf, &tt );
1798 MPI_Type_free(const_cast<MPI_Datatype*>(&mpi_value_type));
1805 VT_USER_END(
"Sort-Send-Recv");
1806 VT_USER_START(
"Sort-final-merge");
1814 std::vector<std::pair<T*, T*> > sub_run_iterators;
1816 for (
unsigned s = 0; s + 1 < slab.size(); ++s)
1818 sub_run_iterators.push_back(std::make_pair(slab[s],
1822 length += slab[s + 1] - slab[s];
1838 for (
int ii = 0 ; ii < local_recv_len ; ii++ )
1840 a_data[ii] = merge_data[ii];
1843 delete [] merge_data;
1850 VT_USER_END(
"Sort-final-merge");
1856 #endif //*SORT_H_*// #define CH_TIMERS(name)
Definition: CH_Timer.H:70
#define CH_assert(cond)
Definition: CHArray.H:37
one dimensional dynamic array
Definition: Vector.H:52
size_t capacity() const
Definition: Vector.H:345
IndexTM< T, N > min(const IndexTM< T, N > &a_p1, const IndexTM< T, N > &a_p2)
Definition: IndexTMI.H:396
#define CH_START(tpointer)
Definition: CH_Timer.H:78
index_type sort_messages(index_type count, int max_elems_per_message)
Definition: sort.H:1540
#define CH_TIMER(name, tpointer)
Definition: CH_Timer.H:55
void pop_back()
Definition: Vector.H:263
unsigned long long key_type
Definition: sort_hist.H:15
void push_back(const T &in)
Definition: Vector.H:283
#define CH_TIME(name)
Definition: CH_Timer.H:59
Definition: sort_utils.H:47
IndexTM< T, N > max(const IndexTM< T, N > &a_p1, const IndexTM< T, N > &a_p2)
Definition: IndexTMI.H:403
ParticleVector(unsigned int size)
Definition: sort.H:1359
size_t size() const
Definition: Vector.H:177
void global_sort(Comparator a_less, const int a_version=2)
Definition: sort.H:1405
Definition: sort_hist.H:47
#define CH_STOP(tpointer)
Definition: CH_Timer.H:80
long long index_type
Definition: sort_utils.H:22
T * memsort(T *a_data, int *a_local_len, const int a_capacity, Comparator a_less, const int a_version=2)
Definition: sort.H:1555