TrioCFD 1.9.8
TrioCFD documentation
Loading...
Searching...
No Matches
Comm_Group_MPI.cpp
1/****************************************************************************
2* Copyright (c) 2026, CEA
3* All rights reserved.
4*
5* Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
6* 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
7* 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
8* 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission.
9*
10* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
11* IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
12* OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
13*
14*****************************************************************************/
15#include <Comm_Group_MPI.h>
16#include <petsc_for_kernel.h>
17#include <communications.h>
18#include <PE_Groups.h>
19#include <vector>
20#include <Perf_counters.h>
21#ifdef INT_is_64_
22#include <algorithm>
23#endif
24
25Implemente_instanciable_sans_constructeur_ni_destructeur(Comm_Group_MPI,"Comm_Group_MPI",Comm_Group);
26
27
28#ifdef MPI_
29
30MPI_Status * Comm_Group_MPI::mpi_status_ = 0;
31MPI_Request * Comm_Group_MPI::mpi_requests_ = 0;
32int Comm_Group_MPI::mpi_nrequests_ = -1;
33int Comm_Group_MPI::mpi_maxrequests_ = -1;
34int Comm_Group_MPI::current_msg_size_;
35MPI_Comm Comm_Group_MPI::trio_u_world_ = MPI_COMM_WORLD;
36// By default, we initialize mpi at statup (see set_must_mpi_initialize())
37bool Comm_Group_MPI::must_mpi_initialize_ = true;
38
39namespace
40{
41/*! @brief Partie non inline du traitement d'erreur mpi.
42 *
43 * Affichage d'un code d'erreur mpi avec MPI_Error_string.
44 *
45 */
46void mpi_print_error(int error_code)
47{
48 Cerr << "mpi_error in Comm_Group_MPI : error_code = " << error_code << finl;
49 Process::Journal() << "mpi_error in Comm_Group_MPI : error_code = " << error_code << finl;
50 int length = 0;
51 char message[MPI_MAX_ERROR_STRING];
52 MPI_Error_string(error_code, message, & length);
53 if (length > 0)
54 {
55 Cerr << message << finl;
56 Process::Journal() << message << finl;
57 }
58 // Normalement on aurait pris trio_u_world_, mais on n'y a pas acces ici.
59 // Pour faire abort, en l'occurence, c'est pas grave mais merci de ne pas
60 // prendre comme exemple...
61 assert(0);
62 MPI_Abort(MPI_COMM_WORLD,-1);
64}
65
66/*! @brief Partie inline du traitement d'erreur mpi (on inline le test: sauf exception, il n'y a pas d'appel de fonction
67 *
68 * supplementaire.
69 *
70 */
71inline void mpi_error(int error_code)
72{
73 if (error_code != MPI_SUCCESS)
74 mpi_print_error(error_code);
75}
76
77} // end anonymous NS
78#endif
79
80
82{
83 exit();
84 return os;
85}
86
88{
89 exit();
90 return is;
91}
92
93/*! @brief Constructeur par defaut.
94 *
95 * Il faut ensuite appeler init_group() ou init_group_trio() pour finir la construction du groupe.
96 *
97 */
99#ifdef MPI_
100 : mpi_group_(MPI_GROUP_NULL),
101 mpi_comm_(MPI_COMM_NULL),
102 must_finalize_(-1) // -1 indique que le groupe n'a pas ete initialise
103#endif
104{
105}
106
108{
109#ifdef MPI_
110 // Si groupe non initialise, ne rien faire:
111 // Modif par BM (20/08/2012): ne detruire ces membres statiques que si c'est le groupe principal (fin de l'execution)
112 if ((mpi_comm_!=MPI_COMM_NULL) && (mpi_comm_ == trio_u_world_))
113 {
114 delete [] mpi_status_;
115 mpi_status_=0;
116
117 for (int r=0; r<mpi_maxrequests_; r++)
118 {
119 if(mpi_requests_[r]!=MPI_REQUEST_NULL)
120 {
121 MPI_Request_free(&(mpi_requests_[r]));
122 }
123 }
124
125 delete [] mpi_requests_;
126 mpi_requests_=0;
127 }
128
129 else // on detruit les groupes non prinicpaux
130 {
131 if (mpi_comm_!=MPI_COMM_NULL)
132 {
133 // on detruit le group puis le mpi_comm
134 mpi_error(MPI_Comm_free(&mpi_comm_));
135 assert(mpi_comm_==MPI_COMM_NULL);
136 }
137 if (mpi_group_!=MPI_GROUP_NULL)
138 {
139 mpi_error(MPI_Group_free( &mpi_group_));
140 assert(mpi_group_==MPI_GROUP_NULL);
141 }
142 }
143#endif
144}
145
146/*! @brief appel a MPI_Abort et rend la main
147 *
148 */
150{
151#ifdef MPI_
152 MPI_Abort(trio_u_world_,-1);
153#endif
154}
155
156#ifdef MPI_
157template <typename _TYPE_, int TYP_IDX>
158void Comm_Group_MPI::mp_collective_op_template(const _TYPE_ *x, _TYPE_ *resu, int n, Comm_Group::Collective_Op op) const
159{
160 static_assert(TYP_IDX >= 1 && TYP_IDX <= 4, "Invalid type index!");
161 MPI_Datatype mpi_typ = TYP_IDX==1 ? MPI_INT : (TYP_IDX==2 ? MPI_LONG : (TYP_IDX==3 ? MPI_DOUBLE : MPI_FLOAT));
162 if (n <= 0) return;
163 double s = -1;
164 bool clock_on = statistics().is_gpu_verbose_on() && Process::je_suis_maitre();
165 switch(op)
166 {
168 statistics().begin_count(STD_COUNTERS::mpi_sumdouble);
169 mpi_error(MPI_Allreduce(x, resu, n, mpi_typ, MPI_SUM, mpi_comm_));
170 if (clock_on && statistics().is_running(STD_COUNTERS::mpi_sumdouble))
171 s = statistics().get_time_since_last_open(STD_COUNTERS::mpi_sumdouble);
172 statistics().end_count(STD_COUNTERS::mpi_sumdouble);
173 break;
175 statistics().begin_count(STD_COUNTERS::mpi_mindouble);
176 mpi_error(MPI_Allreduce(x, resu, n, mpi_typ, MPI_MIN, mpi_comm_));
177 if (clock_on && statistics().is_running(STD_COUNTERS::mpi_mindouble))
178 s = statistics().get_time_since_last_open(STD_COUNTERS::mpi_mindouble);
179 statistics().end_count(STD_COUNTERS::mpi_mindouble);
180 break;
182 statistics().begin_count(STD_COUNTERS::mpi_maxdouble);
183 mpi_error(MPI_Allreduce(x, resu, n, mpi_typ, MPI_MAX, mpi_comm_));
184 if (clock_on && statistics().is_running(STD_COUNTERS::mpi_maxdouble))
185 s = statistics().get_time_since_last_open(STD_COUNTERS::mpi_maxdouble);
186 statistics().end_count(STD_COUNTERS::mpi_maxdouble);
187 break;
189 internal_collective(x, resu, n, &op, -1 /* only one operation */, 0 /* recursion level */);
190 break;
191 }
192 if (s>0) // Affichage
193 {
194 std::string clock(Process::is_parallel() ? "[clock]#" + std::to_string(Process::me()) : "[clock] ");
195 std::string mpi_reduce = "mp_sum";
196 if (op==Comm_Group::COLL_MIN) mpi_reduce = "mp_min";
197 else if (op==Comm_Group::COLL_MAX) mpi_reduce = "mp_max";
198 printf("%s %7.3f ms [MPI] %s\n", clock.c_str(), 0.001 * s, mpi_reduce.c_str());
199 fflush(stdout);
200 }
201}
202#endif
203
204void Comm_Group_MPI::mp_collective_op(const double *x, double *resu, int n, Collective_Op op) const
205{
206#ifdef MPI_
207 mp_collective_op_template<double, 3 /*double*/>(x, resu, n, op);
208#endif
209}
210
211void Comm_Group_MPI::mp_collective_op(const float *x, float *resu, int n, Collective_Op op) const
212{
213#ifdef MPI_
214 mp_collective_op_template<float, 4 /*float*/>(x, resu, n, op);
215#endif
216}
217
218void Comm_Group_MPI::mp_collective_op(const int *x, int *resu, int n, Collective_Op op) const
219{
220#ifdef MPI_
221 mp_collective_op_template<int, 1 /*int*/>(x, resu, n, op);
222#endif
223}
224
225#if INT_is_64_ == 2
226void Comm_Group_MPI::mp_collective_op(const trustIdType *x, trustIdType *resu, int n, Collective_Op op) const
227{
228#ifdef MPI_
229 mp_collective_op_template<trustIdType, 2 /*long*/>(x, resu, n, op);
230#endif
231}
232#endif
233
234void Comm_Group_MPI::mp_collective_op(const double *x, double *resu, const Collective_Op *op, int n) const
235{
236#ifdef MPI_
237 if (n <= 0)
238 return;
239 internal_collective(x, resu, n, op, n /* n different operations */, 0 /* recursion level */);
240#endif
241}
242
243void Comm_Group_MPI::mp_collective_op(const float *x, float *resu, const Collective_Op *op, int n) const
244{
245#ifdef MPI_
246 if (n <= 0)
247 return;
248 internal_collective(x, resu, n, op, n /* n different operations */, 0 /* recursion level */);
249#endif
250}
251
252void Comm_Group_MPI::mp_collective_op(const int *x, int *resu, const Collective_Op *op, int n) const
253{
254#ifdef MPI_
255 if (n <= 0)
256 return;
257 internal_collective(x, resu, n, op, n /* n different operations */, 0 /* recursion level */);
258#endif
259}
260
261#if INT_is_64_ == 2
262void Comm_Group_MPI::mp_collective_op(const trustIdType *x, trustIdType *resu, const Collective_Op *op, int n) const
263{
264#ifdef MPI_
265 if (n <= 0)
266 return;
267 internal_collective(x, resu, n, op, n /* n different operations */, 0 /* recursion level */);
268#endif
269}
270#endif
271
272
273/*! @brief Point de synchronisation de tous les processeurs du groupe (permet de verifier que tout le monde est la.
274 *
275 * ..). Si check_enabled() est
276 * non nul, on utilise le tag pour verifier que tous les processeurs
277 * sont bien en train d'attendre le meme tag, sinon c'est une barriere
278 * simple. Le tag doit verifier 0 <= tag < max_tag (soit 32).
279 *
280 */
281void Comm_Group_MPI::barrier(int tag) const
282{
283#ifdef MPI_
284 static const int max_tag = 32;
285 statistics().begin_count(STD_COUNTERS::mpi_barrier);
286 assert(tag >= 0 && tag < max_tag);
287 if (check_enabled())
288 {
289 // On fait la barriere avec des mpmin et mpmax pour verifier
290 // que le tag est le meme sur tous les processeurs :
291 // ATTENTION : il faut "int" et pas "entier" !!!
292 int tag_complet = get_new_tag() * max_tag + tag;
293 int min_tag, amax_tag;
294 mpi_error(MPI_Allreduce(& tag_complet, & min_tag, 1, MPI_ENTIER, MPI_MIN, mpi_comm_));
295 mpi_error(MPI_Allreduce(& tag_complet, & amax_tag, 1, MPI_ENTIER, MPI_MAX, mpi_comm_));
296 if (min_tag != tag_complet || amax_tag != tag_complet)
297 {
298 Cerr << "Error in Comm_Group_MPI::barrier(int tag)\n";
299 Cerr << " the tag is not identical on all the processes.\n";
300 Cerr << " (Loss of communications synchronisation)." << finl;
301 Process::Journal() << "Comm_Group_MPI::barrier\n Error : tag = " << tag << finl;
302 assert(0);
303 exit();
304 }
305 }
306 else
307 {
308 // Barriere simple sans le tag :
309 mpi_error(MPI_Barrier(mpi_comm_));
310 }
311 statistics().end_count(STD_COUNTERS::mpi_barrier);
312#endif
313}
314
315/*! @brief Demarre l'envoi et la reception des buffers.
316 *
317 * Les buffers doivent rester valide jusqu'au retour de send_recv_finish().
318 * Le graphe de communication et la taille des buffers doivent etre corrects !
319 *
320 * send_list : liste des processeurs (numerotation sur groupe courant) a qui envoyer
321 * send_size : taille en octets de chaque message
322 * send_buffers : adresse des donnees a envoyer.
323 * recv_... : idem pour les donnees en reception.
324 *
325 *
326 */
327void Comm_Group_MPI::send_recv_start(const ArrOfInt& send_list,
328 const ArrOfInt& send_size,
329 const char * const * const send_buffers,
330 const ArrOfInt& recv_list,
331 const ArrOfInt& recv_size,
332 char * const * const recv_buffers,
333 TypeHint typehint) const
334{
335#ifdef MPI_
336 statistics().begin_count(STD_COUNTERS::mpi_sendrecv);
337 assert(mpi_nrequests_ < 0);
338
339 const int tag = get_new_tag();
340 int i, n;
341 mpi_nrequests_ = 0;
342 int msg_size = 0;
343
344 int divisor = 0;
345 MPI_Datatype datatype = MPI_CHAR;
346 assert(sizeof(int) == sizeof(int)); // Sinon il faut changer MPI_ENTIER !!!
347 switch(typehint)
348 {
349 case CHAR:
350 divisor = 1;
351 break;
352 case INT:
353 divisor = sizeof(int);
354 datatype = MPI_ENTIER;
355 break;
356 case DOUBLE:
357 divisor = sizeof(double);
358 datatype = MPI_DOUBLE;
359 break;
360 case FLOAT:
361 divisor = sizeof(float);
362 datatype = MPI_FLOAT;
363 break;
364 default:
366 }
367
368 // Astuce pour maximiser les chances que ca marche : on declare
369 // la reception d'abord et l'envoi ensuite.
370 n = recv_list.size_array();
371 for (i = 0; i < n; i++)
372 {
373 int source = recv_list[i];
374 int sz = recv_size[i];
375 msg_size += sz;
376 assert(source >= 0 && source < nproc());
377 assert(mpi_nrequests_ < mpi_maxrequests_);
378 assert(sz % divisor == 0);
379 assert(mpi_requests_[mpi_nrequests_]==MPI_REQUEST_NULL);
380 mpi_error(MPI_Irecv(recv_buffers[i], sz / divisor,
381 datatype,
382 source, tag, mpi_comm_,
383 & mpi_requests_[mpi_nrequests_]));
384 mpi_nrequests_++;
385 }
386
387 n = send_list.size_array();
388 for (i = 0; i < n; i++)
389 {
390 int dest = send_list[i];
391 int sz = send_size[i];
392 msg_size += sz;
393 assert(dest >= 0 && dest < nproc());
394 assert(mpi_nrequests_ < mpi_maxrequests_);
395 assert(sz % divisor == 0);
396 mpi_error(MPI_Isend((char*) send_buffers[i], sz / divisor,
397 datatype,
398 dest, tag, mpi_comm_,
399 & mpi_requests_[mpi_nrequests_]));
400 mpi_nrequests_++;
401 }
402 current_msg_size_ = msg_size;
403#endif
404}
405
406/*! @brief Attend que l'ensemble des communications lancees par send_recv_start soient finie.
407 *
408 */
410{
411#ifdef MPI_
412 assert(mpi_nrequests_ >= 0);
413 mpi_error(MPI_Waitall(mpi_nrequests_, mpi_requests_, mpi_status_));
414 if (statistics().is_gpu_verbose_on() && Process::je_suis_maitre()) // Affichage
415 {
416 std::string clock(Process::is_parallel() ? "[clock]#" + std::to_string(Process::me()) : "[clock] ");
417 double ms = 0.001 * statistics().get_time_since_last_open(STD_COUNTERS::mpi_sendrecv) ;
418 printf("%s %7.3f ms [MPI] Comm_Group_MPI::exchange\n", clock.c_str(), ms);
419 fflush(stdout);
420 }
421 statistics().end_count(STD_COUNTERS::mpi_sendrecv,mpi_nrequests_,current_msg_size_);
422 /*
423 for (int r=0;r<mpi_nrequests_;r++)
424 {
425 if ( mpi_requests_[r]!=MPI_REQUEST_NULL)
426 {
427 MPI_Request_free(&(mpi_requests_[r]));
428 }
429 mpi_requests_[r]=MPI_REQUEST_NULL;
430 }
431 */
432 mpi_nrequests_ = -1;
433#endif
434}
435
436
437/*! @brief Envoi blocant.
438 *
439 * Pour etre bien certain que le code est safe, on force
440 * une communication synchrone pour forcer le blocage en mode check (voir check_enabled()).
441 * Sinon, on utilise MPI_Send qui est en general non blocant pour les petits messages
442 * (donc de meilleures performances).
443 *
444 */
445void Comm_Group_MPI::send(int pe, const void *buffer, int size, int tag) const
446{
447#ifdef MPI_
448 statistics().begin_count(STD_COUNTERS::mpi_send);
449 assert(mpi_nrequests_ < 0);
450 int dest = pe;
451 assert(dest >= 0 && dest < nproc());
452 // Probleme: oblige de faire un cast de (const void*) en (void*) a cause
453 // du prototype de MPI_Send
454 if (check_enabled())
455 mpi_error(MPI_Ssend ((void*)buffer, size, MPI_CHAR, dest, tag, mpi_comm_));
456 else
457 mpi_error(MPI_Send ((void*)buffer, size, MPI_CHAR, dest, tag, mpi_comm_));
458 statistics().end_count(STD_COUNTERS::mpi_send,1,size);
459#endif
460}
461
462/*! @brief Reception blocante d'un message.
463 *
464 */
465void Comm_Group_MPI::recv(int pe, void *buffer, int size, int tag) const
466{
467#ifdef MPI_
468 statistics().begin_count(STD_COUNTERS::mpi_recv);
469 assert(mpi_nrequests_ < 0);
470 MPI_Status status;
471 int source = pe;
472 assert(source >= 0 && source < nproc());
473 mpi_error(MPI_Recv (buffer, size, MPI_CHAR, source, tag, mpi_comm_, & status));
474 statistics().end_count(STD_COUNTERS::mpi_recv,1,size);
475#endif
476}
477
478void Comm_Group_MPI::broadcast(void *buffer, int size, int pe_source) const
479{
480#ifdef MPI_
481 statistics().begin_count(STD_COUNTERS::mpi_bcast);
482 assert(mpi_nrequests_ < 0);
483 mpi_error(MPI_Bcast (buffer, size, MPI_CHAR, pe_source, mpi_comm_));
484 statistics().end_count(STD_COUNTERS::mpi_bcast,1,size);
485#endif
486}
487
488void Comm_Group_MPI::all_to_all(const void *src_buffer, void *dest_buffer, int data_size) const
489{
490#ifdef MPI_
491 statistics().begin_count(STD_COUNTERS::mpi_alltoall);
492 assert(src_buffer != dest_buffer);
493 void * ptr = (void *) src_buffer; // Cast a cause de l'interface de MPI_Alltoall
494 mpi_error(MPI_Alltoall(ptr, data_size, MPI_CHAR, dest_buffer, data_size, MPI_CHAR, mpi_comm_));
495 statistics().end_count(STD_COUNTERS::mpi_alltoall,1,data_size);
496#endif
497}
498
499void Comm_Group_MPI::gather(const void *src_buffer, void *dest_buffer, int data_size, int root) const
500{
501#ifdef MPI_
502 statistics().begin_count(STD_COUNTERS::mpi_gather);
503 void * ptr = (void *) src_buffer; // Cast a cause de l'interface de MPI_Alltoall
504 mpi_error(MPI_Gather(ptr, data_size, MPI_CHAR, dest_buffer, data_size, MPI_CHAR, root, mpi_comm_));
505 statistics().end_count(STD_COUNTERS::mpi_gather,1,data_size);
506#endif
507}
508
509void Comm_Group_MPI::all_gather(const void *src_buffer, void *dest_buffer, int data_size) const
510{
511#ifdef MPI_
512 statistics().begin_count(STD_COUNTERS::mpi_allgather);
513 void * ptr = (void *) src_buffer; // Cast a cause de l'interface de MPI_Alltoall
514 mpi_error(MPI_Allgather(ptr, data_size, MPI_CHAR, dest_buffer, data_size, MPI_CHAR, mpi_comm_));
515 statistics().end_count(STD_COUNTERS::mpi_allgather,1,data_size);
516#endif
517}
518
519void Comm_Group_MPI::all_gatherv(const void *src_buffer, void *dest_buffer, int send_size, const int* recv_size, const int* displs) const
520{
521#ifdef MPI_
522 statistics().begin_count(STD_COUNTERS::mpi_allgather);
523 void * ptr = (void *) src_buffer; // Cast a cause de l'interface de MPI_Alltoall
524 mpi_error(MPI_Allgatherv(ptr, send_size, MPI_CHAR, dest_buffer, recv_size, displs, MPI_CHAR, mpi_comm_));
525 statistics().end_count(STD_COUNTERS::mpi_allgather,1,send_size);
526#endif
527}
528
529
530#ifdef MPI_
531
532/*! @brief constructeur du groupe "tous" Attention, ce constructeur ne doit etre appele qu'une seule fois.
533 *
534 * Le groupe est associe a trio_u_world_
535 * Si must_mpi_initialize_==false, on suppose que MPI_Init a deja ete appele.
536 * Apres l'appel a init_group_trio, il faut enregistrer le groupe dans PE_Groups
537 * Voir PE_Groups::initialize()
538 *
539 */
541{
542 must_finalize_ = 0;
543
544 if (mpi_status_ != 0)
545 {
546 Cerr << "Error : the construction of the global Comm_Group_MPI has already been done." << finl;
547 exit();
548 }
549
550 if (must_mpi_initialize_)
551 {
552 if (trio_u_world_ != MPI_COMM_WORLD)
553 {
554 Cerr << "Error in Comm_Group_MPI::init_group_trio(...) : you cannot ask to initialize MPI\n"
555 << " with something else than MPI_COMM_WORLD !" << finl;
556 exit();
557 }
558 must_finalize_ = 1;
559 int argc=0;
560 char** argv=nullptr;
561 int errcode = MPI_Init(&argc, &argv);
562 //int errcode = MPI_Init(0,0); Message d'erreur sur MPI Voltaire
563 if (errcode != MPI_SUCCESS)
564 {
565 Cerr << "Error in Comm_Group_MPI::init_group_trio()\n"
566 << " MPI_Init() failed (forget to run with mpirun ?)" << finl;
567 mpi_error(errcode);
568 }
569 }
570
571 int arank;
572 int nbproc;
573
574 mpi_error(MPI_Comm_size (trio_u_world_, & nbproc));
575 mpi_error(MPI_Comm_rank (trio_u_world_, & arank));
576
577 Comm_Group::init_group_trio(nbproc, arank);
578
579 mpi_comm_ = trio_u_world_;
580 MPI_Comm_group(mpi_comm_, &mpi_group_);
581
582 // Initialisation des variables statiques de la classe
583 // Un buffer a envoyer et un a recevoir par processeur
584 // d'ou le maximum...
585 mpi_maxrequests_ = nbproc * 2;
586 mpi_status_ = new MPI_Status[mpi_maxrequests_];
587 mpi_requests_ = new MPI_Request[mpi_maxrequests_];
588
589 for (int r=0; r<mpi_maxrequests_; r++)
590 {
591 mpi_requests_[r]=MPI_REQUEST_NULL;
592 }
593 if (arank == 0)
594 {
595 if (trio_u_world_ == MPI_COMM_WORLD)
596 {
597 Cerr << "Initialized MPI with MPI_COMM_WORLD (using all processors)" << finl;
598 }
599 else
600 {
601 Cerr << "Initialized MPI with communicator!=MPI_COMM_WORLD: using " << (int)nbproc << " processors" << finl;
602 }
603 }
604}
605
606
607// MPI_Group_free should be done before MPI_Finalize so not included into Comm_Group_MPI destructor
608void Comm_Group_MPI::free()
609{
610 if (mpi_maxrequests_!=-1) // Group is created when mpi_maxrequests_>0 (avoid a crash with verifie_pere script)
611 mpi_error(MPI_Group_free(& mpi_group_));
612}
613
614
615/*! @brief Free group and MPI communicator (to use for MPI subgroups only, MPI_COMM_WORLD can no be freed)
616 *
617 */
618void Comm_Group_MPI::free_all()
619{
620 if (mpi_maxrequests_!=-1)
621 {
622 if (mpi_group_!=MPI_GROUP_NULL)
623 mpi_error(MPI_Group_free(& mpi_group_));
624 if (mpi_comm_!=MPI_COMM_NULL)
625 mpi_error(MPI_Comm_free(&mpi_comm_));
626 }
627}
628
629
630// Wrapper to MPI_Alltoallv. data type is MPI_CHAR
631void Comm_Group_MPI::all_to_allv(const void *src_buffer, int *send_data_size, int *send_data_offset,
632 void *dest_buffer, int *recv_data_size, int *recv_data_offset) const
633{
634 statistics().begin_count(STD_COUNTERS::mpi_alltoall);
635 assert(src_buffer != dest_buffer);
636 void * ptr = (void *) src_buffer; // Cast a cause de l'interface de MPI_Alltoall
637
638 const int n = nproc();
639 int size;
640
641#ifdef INT_is_64_
642 std::vector<int> send_data_size_int(n);
643 std::vector<int> send_data_offset_int(n);
644 std::vector<int> recv_data_size_int(n);
645 std::vector<int> recv_data_offset_int(n);
646
647 auto cast_func = [](int i) -> int { return static_cast<int>(i); };
648 std::transform(send_data_size, send_data_size + n, send_data_size_int.begin(), cast_func);
649 std::transform(send_data_offset, send_data_offset + n, send_data_offset_int.begin(), cast_func);
650 std::transform(recv_data_size, recv_data_size + n, recv_data_size_int.begin(), cast_func);
651 std::transform(recv_data_offset, recv_data_offset + n, recv_data_offset_int.begin(), cast_func);
652
653 mpi_error(MPI_Alltoallv(ptr, send_data_size_int.data(), send_data_offset_int.data(), MPI_CHAR,
654 dest_buffer, recv_data_size_int.data(), recv_data_offset_int.data(), MPI_CHAR, mpi_comm_));
655 size = send_data_offset_int[n-1] + send_data_size_int[n-1] + recv_data_size_int[n-1] + recv_data_offset_int[n-1];
656#else
657 mpi_error(MPI_Alltoallv(ptr, send_data_size, send_data_offset, MPI_CHAR,
658 dest_buffer, recv_data_size, recv_data_offset, MPI_CHAR, mpi_comm_));
659 size = send_data_offset[n-1] + send_data_size[n-1] + recv_data_size[n-1] + recv_data_offset[n-1];
660
661#endif
662 statistics().end_count(STD_COUNTERS::mpi_alltoall,1,size);
663}
664
665/*! @brief pour que trio_u n'utilise qu'une partie des processeurs de MPI_COMM_WORLD, il faut donner un communicateur a utiliser avant
666 *
667 * d'appeler init_group_trio.
668 *
669 */
670void Comm_Group_MPI::set_trio_u_world(MPI_Comm world)
671{
672 if (mpi_status_ != 0)
673 {
674 Cerr << "Error : the construction of the global Comm_Group_MPI has already been done\n"
675 << " set_trio_u_world call is forbidden" << finl;
676 exit();
677 }
678#ifdef PETSCKSP_H
679 PETSC_COMM_WORLD= world;
680#endif
681 trio_u_world_ = world;
682}
683
684MPI_Comm Comm_Group_MPI::get_trio_u_world()
685{
686 return trio_u_world_;
687}
688
689
690
691void Comm_Group_MPI::set_must_mpi_initialize(bool flag)
692{
693 if (mpi_status_ != 0)
694 {
695 Cerr << "Error : the construction of the global Comm_Group_MPI has already been done\n"
696 << " set_must_mpi_initialize() call is forbidden." << finl;
697 exit();
698 }
699 must_mpi_initialize_ = flag;
700}
701
702void Comm_Group_MPI::ptop_send_recv(const void * send_buf, int send_buf_size, int send_proc,
703 void * recv_buf, int recv_buf_size, int recv_proc) const
704{
705 statistics().begin_count(STD_COUNTERS::mpi_sendrecv);
706 assert(mpi_nrequests_ < 0);
707 int dest = send_proc;
708 int src = recv_proc;
709 int tag = 1;
710 MPI_Status status;
711 if (send_proc < 0 && recv_proc < 0)
712 {
713 // do nothing
714 }
715 else if (send_proc < 0 && recv_proc >= 0)
716 {
717 mpi_error(MPI_Recv (recv_buf, recv_buf_size, MPI_CHAR, src, tag, mpi_comm_, &status));
718 }
719 else if (recv_proc < 0 && send_proc >= 0)
720 {
721 mpi_error(MPI_Send ((void*)send_buf, send_buf_size, MPI_CHAR, dest, tag, mpi_comm_));
722 }
723 else
724 {
725 assert(dest >= 0 && dest < nproc());
726 assert(src >= 0 && src < nproc());
727 // Probleme: oblige de faire un cast de (const void*) en (void*) a cause
728 // du prototype de MPI_Send
729 mpi_error(MPI_Sendrecv((void*)send_buf, send_buf_size, MPI_CHAR, dest, tag,
730 recv_buf, recv_buf_size, MPI_CHAR, src, tag, mpi_comm_,
731 &status));
732 }
733 statistics().end_count(STD_COUNTERS::mpi_sendrecv, 1, send_buf_size + recv_buf_size);
734}
735
736/*! @brief Construction du groupe de processeurs a partir de la liste.
737 *
738 * Voir Comm_Group::init_group(const ArrOfInt &)
739 * Methode appelee par PE_Groups::create_group()
740 *
741 */
742void Comm_Group_MPI::init_group(const ArrOfInt& pe_list)
743{
744 must_finalize_ = 0;
745 // Le groupe "tous" doit exister
746 assert(mpi_status_);
747
748 Comm_Group::init_group(pe_list);
749
751 // On stocke une reference au groupe pere : c'est le groupe courant au moment
752 // de l'appel a init_group. Le destructeur devra etre appele simultanement
753 // sur tous les processeurs du meme groupe.
754 groupe_pere_ = PE_Groups::current_group();
755 // Construction du groupe MPI
756 const MPI_Group& current_mpi_group = cg.mpi_group_;
757 const MPI_Comm& current_mpi_comm = cg.mpi_comm_;
758 // Copie de pe_list au cas ou int != int...
759 const int nbproc = this->nproc();
760 int *ranks = new int[nbproc];
761 for (int i = 0; i < nbproc; i++)
762 ranks[i] = pe_list[i];
763 assert(mpi_group_==MPI_GROUP_NULL);
764 mpi_error(MPI_Group_incl(current_mpi_group, nbproc, ranks, & mpi_group_));
765 delete[] ranks;
766 // Construction du communicator
767 // MPI_Comm_create renvoie MPI_COMM_NULL si le processeur courant
768 // n'est pas dans le groupe.
769 mpi_error(MPI_Comm_create(current_mpi_comm, mpi_group_, & mpi_comm_));
770}
771
772
773
774/*! @brief Building MPI communicator based on numa node (ie one communicator for each node)
775 *
776 */
777void Comm_Group_MPI::init_comm_on_numa_node()
778{
779 must_finalize_ = 0;
780 // Le groupe "tous" doit exister
781 assert(mpi_status_);
782 assert(mpi_group_==MPI_GROUP_NULL);
783
784 groupe_pere_ = PE_Groups::current_group();
785
786 // Construction du communicator
788 const MPI_Comm& current_mpi_comm = cg.mpi_comm_;
789 int current_rank = cg.rank();
790 mpi_error(MPI_Comm_split_type(current_mpi_comm, MPI_COMM_TYPE_SHARED, current_rank, MPI_INFO_NULL, &mpi_comm_));
791 mpi_error(MPI_Comm_group(mpi_comm_, &mpi_group_));
792
793 int loc_rank;
794 int nbproc;
795 mpi_error(MPI_Comm_size(mpi_comm_, &nbproc));
796 mpi_error(MPI_Comm_rank(mpi_comm_, &loc_rank));
797
798 Comm_Group::init_group_node(nbproc, loc_rank, current_rank);
799
800 // Getting rank of my node among all the other nodes:
801 // we create a temporary communicator which gathers all masters of each node group
802 // so that the rank of my node is the rank of my master inside this temporary communicator
803 int master = loc_rank==0 ? 0 : MPI_UNDEFINED;
804 MPI_Comm tmp;
805 mpi_error(MPI_Comm_split(current_mpi_comm, master, current_rank, &tmp));
806 if(tmp != MPI_COMM_NULL)
807 {
808 mpi_error(MPI_Comm_rank(tmp, &node_id_));
809 mpi_error(MPI_Comm_size(tmp, &nb_nodes_));
810 }
811 // each master broadcasts id and size to their group
812 mpi_error(MPI_Bcast(&node_id_, 1, MPI_INT, 0, mpi_comm_));
813 mpi_error(MPI_Bcast(&nb_nodes_, 1, MPI_INT, 0, mpi_comm_));
814
815 if (tmp!= MPI_COMM_NULL)
816 mpi_error(MPI_Comm_free(&tmp));
817
818}
819
820/*! @brief Building MPI communicator containing only the master of my numa node (ie one different communicator for each node)
821 *
822 */
823void Comm_Group_MPI::init_comm_on_node_master()
824{
825 must_finalize_ = 0;
826 // Le groupe "tous" doit exister
827 assert(mpi_status_);
828 assert(mpi_group_==MPI_GROUP_NULL);
829
830 groupe_pere_ = PE_Groups::get_node_group();
831
832 // Construction du communicateur + groupe MPI
834 const MPI_Comm& current_mpi_comm = cg.mpi_comm_;
835 const MPI_Group& current_mpi_group = cg.mpi_group_;
836 int master = 0;
837 mpi_error(MPI_Group_incl(current_mpi_group, 1, &master, & mpi_group_));
838 mpi_error(MPI_Comm_create(current_mpi_comm, mpi_group_, & mpi_comm_));
839
840 int world_rank = ref_cast(Comm_Group_MPI, PE_Groups::current_group()).rank();
841 int loc_rank = cg.rank() == 0 ? 0 : -1;
842 Comm_Group::init_group_node(1, loc_rank, world_rank);
843}
844
845void Comm_Group_MPI::internal_collective(const int *x, int *resu, int nx, const Collective_Op *op, int nop, int level) const
846{
847 // Pour l'instant algo bourrin, a optimiser...
848 for (int i = 0; i < nx; i++)
849 {
850 int j = (nop < 0) ? 0 : i;
851 trustIdType xx = x[i], resu2 = -1;
852 if (op[j] != COLL_PARTIAL_SUM)
853 mp_collective_op(&xx, &resu2, 1, op[j]);
854 else
855 resu2 = mppartial_sum_impl(x[i]);
856 assert(resu2 < std::numeric_limits<int>::max());
857 resu[i] = static_cast<int>(resu2);
858 }
859}
860
861#if INT_is_64_ == 2
862void Comm_Group_MPI::internal_collective(const trustIdType *x, trustIdType *resu, int nx, const Collective_Op *op, int nop, int level) const
863{
864 // Pour l'instant algo bourrin, a optimiser...
865 for (int i = 0; i < nx; i++)
866 {
867 int j = (nop < 0) ? 0 : i;
868 if (op[j] != COLL_PARTIAL_SUM)
869 mp_collective_op(x+i, resu+i, 1, op[j]);
870 else
871 resu[i] = mppartial_sum_impl(x[i]);
872 }
873}
874#endif
875
876
877void Comm_Group_MPI::internal_collective(const double *x, double *resu, int nx, const Collective_Op *op, int nop, int level) const
878{
879 // Pour l'instant algo bourrin, a optimiser...
880 for (int i = 0; i < nx; i++)
881 {
882 int j = (nop < 0) ? 0 : i;
883 if (op[j] != COLL_PARTIAL_SUM)
884 mp_collective_op(x+i, resu+i, 1, op[j]);
885 else
886 {
887 Cerr << "Error in Comm_Group_MPI: COLL_PARTIAL_SUM not coded for double" << finl;
888 exit();
889 }
890 }
891}
892
893void Comm_Group_MPI::internal_collective(const float *x, float *resu, int nx, const Collective_Op *op, int nop, int level) const
894{
895 // Pour l'instant algo bourrin, a optimiser...
896 for (int i = 0; i < nx; i++)
897 {
898 int j = (nop < 0) ? 0 : i;
899 if (op[j] != COLL_PARTIAL_SUM)
900 mp_collective_op(x+i, resu+i, 1, op[j]);
901 else
902 {
903 Cerr << "Error in Comm_Group_MPI: COLL_PARTIAL_SUM not coded for float" << finl;
904 exit();
905 }
906 }
907}
908
909/*! @brief Renvoie la somme des x sur les processeurs precedents du groupe (moi non compris).
910 *
911 * Le resultat sur le premier processeur du groupe est donc toujours 0.
912 * Le resultat depend de l'ordre dans lequel les processeurs ont ete
913 * fournis dans le constructeur.
914 *
915 */
916trustIdType Comm_Group_MPI::mppartial_sum_impl(trustIdType x) const
917{
918 statistics().begin_count(STD_COUNTERS::mpi_partialsum);
919 trustIdType somme = 0;
920 MPI_Status status;
921 int tag = get_new_tag();
922 int rang = rank();
923 int np = nproc();
924
925 if (rang > 0)
926 {
927 // Recoit la somme partielle du precedent
928#ifndef INT_is_64_
929 mpi_error(MPI_Recv(& somme, 1, MPI_INT, rang-1, tag, mpi_comm_, &status));
930#else
931 mpi_error(MPI_Recv(& somme, 1, MPI_LONG, rang-1, tag, mpi_comm_, &status));
932#endif
933 }
934 if (rang+1 < np)
935 {
936 // Envoie la somme partielle au suivant
937 trustIdType s = somme + x;
938#ifndef INT_is_64_
939 mpi_error(MPI_Send(& s, 1, MPI_INT, rang+1, tag, mpi_comm_));
940#else
941 mpi_error(MPI_Send(& s, 1, MPI_LONG, rang+1, tag, mpi_comm_));
942#endif
943 }
944 statistics().end_count(STD_COUNTERS::mpi_partialsum);
945 return somme;
946}
947
948#endif
: Classe Comm_Group_MPI, derivee de la classe abstraite Comm_Group.
void recv(int pe, void *buffer, int size, int tag) const override
Reception blocante d'un message.
void all_to_all(const void *src_buffer, void *dest_buffer, int data_size) const override
void all_gather(const void *src_buffer, void *dest_buffer, int data_size) const override
void abort() const override
appel a MPI_Abort et rend la main
void send(int pe, const void *buffer, int size, int tag) const override
Envoi blocant.
void broadcast(void *buffer, int size, int pe_source) const override
void all_gatherv(const void *src_buffer, void *dest_buffer, int send_size, const int *recv_size, const int *displs) const override
~Comm_Group_MPI() override
void send_recv_finish() const override
Attend que l'ensemble des communications lancees par send_recv_start soient finie.
void send_recv_start(const ArrOfInt &send_list, const ArrOfInt &send_size, const char *const *const send_buffers, const ArrOfInt &recv_list, const ArrOfInt &recv_size, char *const *const recv_buffers, TypeHint typehint=CHAR) const override
Demarre l'envoi et la reception des buffers.
Comm_Group_MPI()
Constructeur par defaut.
void gather(const void *src_buffer, void *dest_buffer, int data_size, int root) const override
void mp_collective_op(const double *x, double *resu, int n, Collective_Op op) const override
: Cette classe decrit un groupe de processeurs sur lesquels
Definition Comm_Group.h:40
static int check_enabled()
Definition Comm_Group.h:159
@ COLL_PARTIAL_SUM
Definition Comm_Group.h:52
int nproc() const
Renvoie le nombre de processeurs dans le groupe *this.
Definition Comm_Group.h:190
int rank() const
Renvoie le rang du processeur local dans le groupe *this.
Definition Comm_Group.h:182
void init_group_node(int nproc, int loc_rank, int glob_rank)
Initialize all the information relative to world sizes and ranks for node communicator.
virtual void init_group(const ArrOfInt &pe_list)
Cette fonction doit etre appelee simultanement par tous les PEs du groupe current_group avec les meme...
void init_group_trio(int nproc, int rank)
Initialise le groupe_TRUST().
int get_new_tag() const
Cette fonction renvoie un nouveau tag de communication pour le groupe.
Definition Comm_Group.h:169
Class defining operators and methods for all reading operation in an input flow (file,...
Definition Entree.h:42
virtual Entree & readOn(Entree &)
Lecture d'un Objet_U sur un flot d'entree Methode a surcharger.
Definition Objet_U.cpp:293
virtual Sortie & printOn(Sortie &) const
Ecriture de l'objet sur un flot de sortie Methode a surcharger.
Definition Objet_U.cpp:282
static const Comm_Group & get_node_group()
Renvoie une reference au groupe sur les noeuds.
static const Comm_Group & current_group()
renvoie une reference au groupe de processeurs actif courant
Definition PE_Groups.h:65
void begin_count(const STD_COUNTERS &std_cnt, int counter_lvl=-100000)
void end_count(const std::string &custom_count_name, int count_increment=1, long int quantity_increment=0)
End the count of a counter and update the counter values.
static bool is_parallel()
Definition Process.cpp:110
static Sortie & Journal(int message_level=0)
Renvoie un objet statique de type Sortie qui sert de journal d'evenements.
Definition Process.cpp:588
static void barrier()
Synchronise tous les processeurs du groupe courant (attend que tous les processeurs soient arrives a ...
Definition Process.cpp:136
static int me()
renvoie mon rang dans le groupe de communication courant.
Definition Process.cpp:125
static void exit(int exit_code=-1)
Routine de sortie de TRUST dans une region Kokkos.
Definition Process.cpp:455
static int je_suis_maitre()
renvoie 1 si on est sur le processeur maitre du groupe courant (c'est a dire me() == 0),...
Definition Process.cpp:86
Classe de base des flux de sortie.
Definition Sortie.h:52
_SIZE_ size_array() const