TrioCFD 1.9.8
TrioCFD documentation
Loading...
Searching...
No Matches
Schema_Comm_Vecteurs.cpp
1/****************************************************************************
2* Copyright (c) 2026, CEA
3* All rights reserved.
4*
5* Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
6* 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
7* 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
8* 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission.
9*
10* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
11* IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
12* OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
13*
14*****************************************************************************/
15#include <Schema_Comm_Vecteurs.h>
16#include <Comm_Group.h>
17#include <communications.h>
18#include <PE_Groups.h>
19#include <sstream>
20#include <comm_incl.h>
21
26#if INT_is_64_ == 2
27ArrOfTID Schema_Comm_Vecteurs::tmp_area_tid_;
28#endif
30bool check_comm_vector = false;
31
40
41void Schema_Comm_Vecteurs_Static_Data::init(int min_buf_size, bool bufferOnDevice)
42{
43 if (buf_pointers_size_ == 0)
44 {
46 buf_pointers_size_ = n * 2; // Maximum requis si on echange avec tous les procs dans exchange()
47 buf_pointers_ = new char*[n*2];
48 for (int i = 0; i < n*2; i++)
49 buf_pointers_[i] = 0;
50 }
51 // Le buffer global a-t-il une taille suffisante ?
52 if (buffer_base_size_ < min_buf_size)
53 {
55 {
58 }
59 delete [] buffer_base_;
60 buffer_base_ = new char[min_buf_size];
61 // GF ajout de la mise a zero pour mpiwrapper valgrind mais est ce util ?
62 for (int i = 0; i < min_buf_size; i++)
63 buffer_base_[i] = 0;
64 buffer_base_size_ = min_buf_size;
65 }
66 if (bufferOnDevice && buffer_base_device_size_ < min_buf_size)
67 {
68 // Allocate buffer_base_ on device:
71 allocateOnDevice(buffer_base_, min_buf_size);
72 buffer_base_device_size_ = min_buf_size;
73 }
74}
75
77{
78 /* ToDo OpenMP Fix crash when using AmgX: Failing in Thread:0
79 call to cuInit returned error 4: Deinitialized
80 deleteOnDevice(buffer_base_, buffer_base_size_);
81 */
82 delete[] buffer_base_;
83 delete[] buf_pointers_;
84}
85
87{
88 status_ = RESET;
89 const char* env_var = getenv("TRUST_USE_MPI_GPU_AWARE");
90 use_gpu_aware_mpi_ = env_var != nullptr && std::stoi(env_var) == 1;
92 {
93#ifdef CRAY_MPICH_VER
94 if (getenv("MPICH_GPU_SUPPORT_ENABLED") == nullptr)
95 Process::exit("You try to enable GPU communications on Cray MPICH with TRUST_USE_MPI_GPU_AWARE=1 but forgot to set also MPICH_GPU_SUPPORT_ENABLED=1 !");
96#endif
97 std::cerr << "[MPI] Enabling GPU capability to communicate between devices." << std::endl;
98 //Cerr << "[MPI] Warning! Only MPI calls with device pointers will benefit. Classic MPI calls with host pointers will be slower..." << finl;
99 }
100}
101
106
107/*! @brief Reinitialise les tailles de buffers.
108 *
109 * Il faut ensuite definir les tailles de buffers avec add_send/recv_area_...()
110 * Cette methode doit etre appelee simultanement sur tous les processeurs du groupe.
111 *
112 */
114{
115 assert(status_ == END_INIT || status_ == RESET);
116 // Reset des tableaux sizes_
117 const int np = Process::nproc();
118 send_buf_sizes_.resize_array(np, RESIZE_OPTIONS::NOCOPY_NOINIT);
119 send_buf_sizes_ = 0;
120 recv_buf_sizes_.resize_array(np, RESIZE_OPTIONS::NOCOPY_NOINIT);
121 recv_buf_sizes_ = 0;
122 send_procs_.resize_array(0);
123 recv_procs_.resize_array(0);
124 sorted_ = 1;
127 {
128#if defined(TRUST_USE_CUDA) && !defined(MPIX_CUDA_AWARE_SUPPORT)
129 Process::exit("MPI version is detected as not CUDA-Aware. You can't use TRUST_USE_MPI_GPU_AWARE=1");
130#endif
131 }
132}
133
134/*! @brief Une fois les donnees a echanger declarees avec add_send/recv_area_.
135 *
136 * ..(), initialise les offset de buffers et alloue un buffer global de taille
137 * suffisante.
138 * Methode a appeler par tous les processeurs du groupe.
139 *
140 */
142{
143 assert(status_ == BEGIN_INIT);
144 assert(Process::nproc() == send_buf_sizes_.size_array());
145 // Verification des tailles en emission et en reception:
147 {
148 const int n = send_buf_sizes_.size_array();
149 ArrOfInt tmp(n);
150 envoyer_all_to_all(send_buf_sizes_, tmp);
151 int err = 0;
152 for (int i = 0; i < n; i++)
153 if (tmp[i] != recv_buf_sizes_[i])
154 err++;
155 if (Process::mp_sum(err))
156 {
157 Cerr << "Error in Schema_Comm_Vecteurs::end_init(): send_size_ and recv_size_ don't match, see log files" << finl;
158 Process::Journal() << "Error in Schema_Comm_Vecteurs::end_init():\n"
159 << "send_sizes_ = " << send_buf_sizes_
160 << "\n recv_sizes_ = " << recv_buf_sizes_ << finl;
163 }
164 }
165 // Tri des numeros de processeurs dans l'ordre croissant
166 if (!sorted_)
167 {
168 send_procs_.ordonne_array();
169 recv_procs_.ordonne_array();
170 }
171 const int nsend = send_procs_.size_array();
172 const int nrecv = recv_procs_.size_array();
173 int offset = 0;
174 int i;
175 for (i = 0; i < nsend; i++)
176 {
177 int pe = send_procs_[i];
178 const int size = (send_buf_sizes_[pe]+7)&(~7); // align size on 8 bytes
179 offset += size;
180 assert(pe >= i); // send_procs_ trie dans l'ordre croissant
181 send_buf_sizes_[i] = size;
182 }
183 for (i = 0; i < nrecv; i++)
184 {
185 int pe = recv_procs_[i];
186 const int size = (recv_buf_sizes_[pe]+7)&(~7); // align size on 8 bytes
187 offset += size;
188 assert(pe >= i); // recv_procs_ trie dans l'ordre croissant
189 recv_buf_sizes_[i] = size;
190 }
191 min_buf_size_ = offset; // taille du buffer a alouer recv_buf_offset_[i] = offset;
192
193 send_buf_sizes_.resize_array(nsend);
194 recv_buf_sizes_.resize_array(nrecv);
196}
197
198/*! @brief Commence un nouvel echange de donnees (les tailles de buffers doivent avoir ete initialisees avec begin_init() .
199 *
200 * .. end_init())
201 * On place les sdata_.buf_pointers_ au debut des buffers pour chaque
202 * processeur pour lequel un buffer a ete declare en "send"
203 * Apres begin_comm(), il faut remplir les buffers en utilisant
204 * get_next_area_int() ou get_next_area_double() dans le meme
205 * ordre que celui declare dans la phase d'initialisation,
206 * puis appeler exchange()
207 *
208 */
209void Schema_Comm_Vecteurs::begin_comm(bool bufferOnDevice)
210{
211 assert(status_ == END_INIT);
212 // Pas un assert car erreur grave et sans doute rare...
213 if (buffer_locked_)
214 {
215 Cerr << "Internal error in Schema_Comm_Vecteurs::begin_comm(): buffers already locked by another communication" << finl;
217 }
218 buffer_locked_ = true;
219 sdata_.init(min_buf_size_, bufferOnDevice);
220
221 // Fait pointer les buffers sur le debut des send_buffers
222 char *ptr = sdata_.buffer_base_;
223
224 const int nsend = send_procs_.size_array();
225 for (int i = 0; i < nsend; i++)
226 {
227 const int pe = send_procs_[i];
228 sdata_.buf_pointers_[pe] = ptr;
229 ptr += send_buf_sizes_[i];
230 }
231 buffer_locked_ = true;
233 bufferOnDevice_ = bufferOnDevice;
234}
235
236void Schema_Comm_Vecteurs::exchange(IsExchangeBlocking exchange_type, const std::string kernel_name)
237{
238
239 char * ptr = sdata_.buffer_base_;
240 const Comm_Group& group = PE_Groups::current_group();
241 const int nsend = send_procs_.size_array();
242 const int nrecv = recv_procs_.size_array();
243
244 if ((exchange_type == IsExchangeBlocking::DefaultBlocking)||(exchange_type == IsExchangeBlocking::NonBlockingStart))
245 {
246
247 // Copy buffer before MPI send
248 if (bufferOnDevice_)
249 {
251 copyFromDevice(sdata_.buffer_base_, min_buf_size_); // Copy buffer to host for MPI communication
252 else
253 {
254 // Communication between devices. Use device buffer:
255 ptr = addrOnDevice(sdata_.buffer_base_);
256 }
257 }
258
259 assert(status_ == BEGIN_COMM);
260 // Verifie que tous les buffers sont pleins
261 assert(check_buffers_full());
262 // Echange les donnees
263
264 // On utilise le tableau sdata_.buf_pointers_ pour stocker les adresses
265 // des buffers a donner a Comm_Group::send_recv_start()
266 // (dimensionne a 2*nproc() donc suffisant)
267 assert(nsend + nrecv <= sdata_.buf_pointers_size_);
268 char ** send_bufs = sdata_.buf_pointers_;
269 char ** recv_bufs = sdata_.buf_pointers_ + nsend;
270 for (int i = 0; i < nsend; i++)
271 {
272 send_bufs[i] = ptr;
273 ptr += send_buf_sizes_[i];
274 }
275 for (int i = 0; i < nrecv; i++)
276 {
277 recv_bufs[i] = ptr;
278 ptr += recv_buf_sizes_[i];
279 }
280
281 // On devrait pouvoir mettre un int64 comme type ici car
282 // les buffers sont de alignes sur 8 octets.
283
284 if (exchange_type == IsExchangeBlocking::NonBlockingStart) start_gpu_timer(kernel_name);
286 recv_procs_, recv_buf_sizes_, recv_bufs,
288 }
289
290 if ((exchange_type == IsExchangeBlocking::DefaultBlocking)||(exchange_type == IsExchangeBlocking::NonBlockingFinish))
291 {
292 group.send_recv_finish();
293 if (exchange_type == IsExchangeBlocking::NonBlockingFinish) end_gpu_timer(kernel_name);
294 // Fait pointer les buffers sur les donnees recues
295 char * recv_ptr = sdata_.buffer_base_;
296 for (int i = 0; i < nsend; i++)
297 recv_ptr += send_buf_sizes_[i];
298 for (int i = 0; i < nrecv; i++)
299 {
300 const int pe = recv_procs_[i];
301 sdata_.buf_pointers_[pe] = recv_ptr;
302 recv_ptr += recv_buf_sizes_[i];
303 }
305
306 // Copy buffer to device after MPI recv if GPU-Aware MPI is not enabled:
307 if (bufferOnDevice_ && !use_gpu_aware_mpi_) copyToDevice(sdata_.buffer_base_, min_buf_size_);
308 }
309}
310
312{
313 assert(status_ == EXCHANGED);
314 // Verifie qu'on a bien lu toutes les donnees
315 assert(check_buffers_full());
316 status_ = END_INIT; // pret pour un nouveau begin_comm()
317 buffer_locked_ = false;
318 bufferOnDevice_ = false;
319}
320
321/*! @brief Selon status_, verifie que tous les pointeurs de buffers pointent a la fin du buffer aloue pour chaque processeur en emission
322 *
323 * ou reception. Renvoie 0 en cas d'erreur (si un buffer n'a pas ete
324 * entierement rempli ou vide)
325 *
326 */
328{
329 char *ptr = sdata_.buffer_base_;
330 const int nsend = send_procs_.size_array();
331 int i;
332 int ok = 1;
333 if (status_ == BEGIN_COMM)
334 {
335 for (i = 0; i < nsend; i++)
336 {
337 ptr += send_buf_sizes_[i];
338 const int pe = send_procs_[i];
339 char *ptr2 = sdata_.buf_pointers_[pe];
340 ALIGN_SIZE(ptr2, sizeof(double));
341 if (ptr != ptr2)
342 {
343 Cerr << "Internal error in Schema_Comm_Vecteurs::check_buffers_full(): send buffer for processor "
344 << pe << " is not full" << finl;
345 ok = 0;
346 }
347 }
348 }
349 else if (status_ == EXCHANGED)
350 {
351 for (i = 0; i < nsend; i++)
352 ptr += send_buf_sizes_[i];
353 const int nrecv = recv_procs_.size_array();
354 for (i = 0; i < nrecv; i++)
355 {
356 ptr += recv_buf_sizes_[i];
357 const int pe = recv_procs_[i];
358 char *ptr2 = sdata_.buf_pointers_[pe];
359 ALIGN_SIZE(ptr2, sizeof(double));
360 if (ptr != ptr2)
361 {
362 Cerr << "Internal error in Schema_Comm_Vecteurs::check_buffers_full(): recv buffer for processor "
363 << pe << " has not been read entirely" << finl;
364 ok = 0;
365 }
366 }
367 }
368 else
369 {
370 Cerr << "check_buffers_full: What ?" << finl;
372 }
373 return ok;
374}
375
376/*! @brief verifie qu'il reste au moins byte_size octets dans le buffer du processeur pe
377 *
378 */
379int Schema_Comm_Vecteurs::check_next_area(int pe, int byte_size) const
380{
381 assert(byte_size >= 0);
382 if (byte_size == 0)
383 {
384 return 1;
385 }
386 const ArrOfInt& procs = (status_ == BEGIN_COMM) ? send_procs_ : recv_procs_;
387 const ArrOfInt& sizes = (status_ == BEGIN_COMM) ? send_buf_sizes_ : recv_buf_sizes_;
388 const int n = procs.size_array();
389 int i;
390 char * ptr = sdata_.buffer_base_;
391 // Si on est en phase de reception, le debut des buffers de reception se trouve a la
392 // fin des buffers en emission:
393 if (status_ != BEGIN_COMM)
394 {
395 const int nsend = send_procs_.size_array();
396 for (i = 0; i < nsend; i++)
397 ptr += send_buf_sizes_[i];
398 }
399 for (i = 0; i < n; i++)
400 {
401 ptr += sizes[i]; // pointeur sur la fin du buffer de ce processeur
402 if (procs[i] == pe)
403 return (sdata_.buf_pointers_[pe] + byte_size) <= ptr;
404 }
405 // aucun buffer declare pour ce processeur
406 return 0;
407}
: Cette classe decrit un groupe de processeurs sur lesquels
Definition Comm_Group.h:40
static int check_enabled()
Definition Comm_Group.h:159
virtual void send_recv_finish() const =0
int nproc() const
Renvoie le nombre de processeurs dans le groupe *this.
Definition Comm_Group.h:190
virtual void send_recv_start(const ArrOfInt &send_list, const ArrOfInt &send_size, const char *const *const send_buffers, const ArrOfInt &recv_list, const ArrOfInt &recv_size, char *const *const recv_buffers, TypeHint typehint=CHAR) const =0
static const Comm_Group & current_group()
renvoie une reference au groupe de processeurs actif courant
Definition PE_Groups.h:65
static const Comm_Group & groupe_TRUST()
Renvoie une reference au groupe de tous les processeurs TRUST.
static Sortie & Journal(int message_level=0)
Renvoie un objet statique de type Sortie qui sert de journal d'evenements.
Definition Process.cpp:588
static int nproc()
renvoie le nombre de processeurs dans le groupe courant Voir Comm_Group::nproc() et PE_Groups::curren...
Definition Process.cpp:104
static double mp_sum(double)
Calcule la somme de x sur tous les processeurs du groupe courant.
Definition Process.cpp:146
static void barrier()
Synchronise tous les processeurs du groupe courant (attend que tous les processeurs soient arrives a ...
Definition Process.cpp:136
static void exit(int exit_code=-1)
Routine de sortie de TRUST dans une region Kokkos.
Definition Process.cpp:455
Donnees statiques communes a toutes les classes Schema_Comm_Vecteur, avec destructeur pour liberer la...
void init(int size, bool bufferOnDevice)
static Schema_Comm_Vecteurs_Static_Data sdata_
static ArrOfDouble tmp_area_double_
int check_next_area(int pe, int byte_size) const
verifie qu'il reste au moins byte_size octets dans le buffer du processeur pe
void end_init()
Une fois les donnees a echanger declarees avec add_send/recv_area_.
void begin_init()
Reinitialise les tailles de buffers.
int check_buffers_full() const
Selon status_, verifie que tous les pointeurs de buffers pointent a la fin du buffer aloue pour chaqu...
static ArrOfFloat tmp_area_float_
void exchange(IsExchangeBlocking exchange_type=IsExchangeBlocking::DefaultBlocking, const std::string kernel_name="noname")
void begin_comm(bool bufferOnDevice=false)
Commence un nouvel echange de donnees (les tailles de buffers doivent avoir ete initialisees avec beg...
_SIZE_ size_array() const