TrioCFD 1.9.8
TrioCFD documentation
Loading...
Searching...
No Matches
IJK_Striped_Writer.tpp
1/****************************************************************************
2 * Copyright (c) 2022, CEA
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
6 * 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
7 * 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
8 * 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission.
9 *
10 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
11 * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
12 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
13 *
14 *****************************************************************************/
15
16#ifndef IJK_Striped_Writer_TPP_H
17#define IJK_Striped_Writer_TPP_H
18
19#include <Parallel_io_parameters.h>
20#include <Domaine_IJK.h>
21#include <SFichier.h>
22#include <Schema_Comm.h>
23
24// Returns the number of written values
25template<typename _OUT_TYPE_, typename _TYPE_, typename _TYPE_ARRAY_>
27{
30
32 {
33 Cerr << "Error in IJK_Striped_Writer::write_data_template(scalar field): provided field has unsupported localisation" << finl;
35 }
36 // Collate data on processor 0
37 const Domaine_IJK& splitting = f.get_domaine();
39 const int nitot = splitting.get_nb_items_global(loc, DIRECTION_I);
40 const int njtot = splitting.get_nb_items_global(loc, DIRECTION_J);
41 const int nktot = splitting.get_nb_items_global(loc, DIRECTION_K);
42 const int ncompo = 1;
43
44 BigTRUSTArray<_OUT_TYPE_> tmp;
46 tmp.resize_array(nitot*njtot*nktot);
47 redistribute(f, tmp, nitot, njtot, nktot, ncompo, 0);
49 {
50 SFichier binary_file;
51 binary_file.set_bin(true);
52 binary_file.ouvrir(filename);
53 binary_file.put(tmp.addr(), tmp.size_array(), 1);
54 binary_file.close();
55 }
56 return tmp.size_array();
57}
58
59// Write 3 velocity components at faces in "lata" format:
60// write ni*nj*nk lines of data with
61// ni, nj, nk = number of nodes in the domain
62// line of data = 3 values. 1st value = component i of velocity on left face of element (i,j,k), 2nd value = component j, etc...
63// Returns the number of written values (1 value = 3 scalars)
64template<typename _OUT_TYPE_, typename _TYPE_, typename _TYPE_ARRAY_>
66{
69
73 {
74 Cerr << "Error in IJK_Striped_Writer::write_data_template(vx, vy, vz): provided fields have incorrect localisation" << finl;
76 }
77 const Domaine_IJK& splitting = vx.get_domaine();
78 // Collate data on processor 0
79 // In lata format, the velocity is written as an array of (vx, vy, vz) vectors.
80 // Size of the array is the total number of nodes in the mesh.
81 // The velocity associated with a node is the combination of velocities at the faces
82 // at the right of the node (in each direction).
83 const int nitot = splitting.get_nb_items_global(Domaine_IJK::ELEM, 0) + 1;
84 const int njtot = splitting.get_nb_items_global(Domaine_IJK::ELEM, 1) + 1;
85 const int nktot = splitting.get_nb_items_global(Domaine_IJK::ELEM, 2) + 1;
86 const int nbcompo = 3;
87
88 BigTRUSTArray<_OUT_TYPE_> tmp;
90 tmp.resize_array(nitot*njtot*nktot*nbcompo);
91 tmp=0;
92 redistribute(vx, tmp, nitot, njtot, nktot, nbcompo, 0);
93 redistribute(vy, tmp, nitot, njtot, nktot, nbcompo, 1);
94 redistribute(vz, tmp, nitot, njtot, nktot, nbcompo, 2);
95
97 {
98 SFichier binary_file;
99 binary_file.set_bin(true);
100 binary_file.ouvrir(filename);
101 binary_file.put(tmp.addr(), tmp.size_array(), 1);
102 binary_file.close();
103 }
104 return tmp.size_array() / 3;
105
106}
107
108// Density fields are written by more than one processor. Each processor write a file.
109template<typename _OUT_TYPE_, typename _TYPE_, typename _TYPE_ARRAY_>
111{
113 {
114 Cerr << "Error in IJK_Striped_Writer::write_data_parallele_plan_tempalte(scalar field): provided field has unsupported localisation" << finl;
116 }
117
118 const Domaine_IJK& splitting = f.get_domaine();
119 const int nitot = splitting.get_nb_items_global(Domaine_IJK::ELEM, 0);
120 const int njtot = splitting.get_nb_items_global(Domaine_IJK::ELEM, 1);
121 const int nktot = splitting.get_nb_items_global(Domaine_IJK::ELEM, 2);
122 const int ni = splitting.get_nb_items_local(Domaine_IJK::ELEM, 0);
123 const int nj = splitting.get_nb_items_local(Domaine_IJK::ELEM, 1);
124 const int nk = splitting.get_nb_items_local(Domaine_IJK::ELEM, 2);
125
127 tmp.resize_array(ni*nj*nk);
128 for (int k = 0; k < nk; k++)
129 for (int j = 0; j < nj; j++)
130 for (int i = 0; i < ni; i++)
131 tmp[(k*nj+j)*ni+i] = (_OUT_TYPE_)f(i,j,k);
132
133 SFichier binary_file;
134 binary_file.set_bin(true);
135 binary_file.ouvrir(filename);
136 binary_file.put(tmp.addr(), tmp.size_array(), 1);
137 binary_file.close();
138 return nitot*njtot*nktot;
139}
140
141// Velocity fields are written by more than one processor. Each processor write a file.
142template<typename _OUT_TYPE_, typename _TYPE_, typename _TYPE_ARRAY_>
144{
148 {
149 Cerr << "Error in IJK_Striped_Writer::write_data_parallele_plan_template(vx, vy, vz): provided fields have incorrect localisation" << finl;
151 }
152 const Domaine_IJK& splitting = vx.get_domaine();
153 const int nitot = splitting.get_nb_items_global(Domaine_IJK::ELEM, 0) + 1;
154 const int njtot = splitting.get_nb_items_global(Domaine_IJK::ELEM, 1) + 1;
155 const int nktot = splitting.get_nb_items_global(Domaine_IJK::ELEM, 2) + 1;
156
157 // In lata format, the velocity is written as an array of (vx, vy, vz) vectors.
158 // Size of the array is the local number of nodes in the mesh.
159 // The velocity associated with a node is the combination of velocities at the faces
160 // at the right of the node (in each direction).
161
162 int ni = splitting.get_nb_items_local(Domaine_IJK::ELEM, 0);
163 if ( (splitting.get_local_slice_index(0) == splitting.get_nprocessor_per_direction(0) - 1) )
164 {
165 ni++;
166 }
167
168 int nj = splitting.get_nb_items_local(Domaine_IJK::ELEM, 1);
169 if ( (splitting.get_local_slice_index(1) == splitting.get_nprocessor_per_direction(1) - 1) )
170 {
171 nj++;
172 }
173
174 int nk = splitting.get_nb_items_local(Domaine_IJK::ELEM, 2);
175 if ( splitting.get_local_slice_index(2) == splitting.get_nprocessor_per_direction(2) - 1)
176 {
177 nk++;
178 }
179 const int nbcompo = 3;
180
182 tmp.resize_array(ni*nj*nk*nbcompo);
183 for (int k = 0; k < nk; k++)
184 {
185 for (int j = 0; j < nj; j++)
186 {
187 for (int i = 0; i < ni; i++)
188 {
189 tmp[((k*nj+j)*ni+i)*nbcompo+0] = (_OUT_TYPE_)vx(i,j,k);
190 tmp[((k*nj+j)*ni+i)*nbcompo+1] = (_OUT_TYPE_)vy(i,j,k);
191 tmp[((k*nj+j)*ni+i)*nbcompo+2] = (_OUT_TYPE_)vz(i,j,k);
192 }
193 }
194 }
195
196 SFichier binary_file;
197 binary_file.set_bin(true);
198 binary_file.ouvrir(filename);
199 binary_file.put(tmp.addr(), tmp.size_array(), 1);
200 binary_file.close();
201 return nitot*njtot*nktot;
202}
203
204
205/** Redistribute data from input (distributed ijk scalar field) to ouput
206 * (striped linear storage)
207 * output is a 'big' array (might contain more than 32b)
208 * les n_compo_tot sont inutiles ici !!!
209 */
210template<typename _OUT_TYPE_, typename _TYPE_, typename _TYPE_ARRAY_>
211void IJK_Striped_Writer::redistribute(const IJK_Field_template<_TYPE_,_TYPE_ARRAY_>& input, BigTRUSTArray<_OUT_TYPE_>& output,
212 const int nitot, const int njtot, const int nktot, const int nbcompo, const int component)
213{
214 const Domaine_IJK& geom = input.get_domaine();
215 // For the moment, assume data is collected on mpi process 0
216 // mpi processes send data to master
218 int ni = input.ni();
219 int nj = input.nj();
220 int nk = input.nk();
221 // For data localized at faces and periodic box, write value of the rightmost face (which is not stored internally)
222 // (that supposes that virtual cell available and uptodate, if not, written data at the right end will not reflect the periodicity)
223 if (input.ghost() > 0 && input.get_localisation() != Domaine_IJK::ELEM)
224 {
225 // if periodic and we are at the right end the domain:
226 // MODIFS GABRIEL : il faut boucler sur les 3 directions !!
227 for (int dir = 0; dir < 3 /* pas 2 */; dir++)
228 {
229 if (geom.get_periodic_flag(dir) && geom.get_local_slice_index(dir) == geom.get_nprocessor_per_direction(dir) - 1)
230 {
231 int& n = (dir==0)?ni:((dir==1)?nj:nk);
232 n++;
233 }
234 }
235 }
236 sendtmp.resize_array(ni * nj * nk); // allocate and store zero in array
237 for (int k = 0; k < nk; k++)
238 for (int j = 0; j < nj; j++)
239 for (int i = 0; i < ni; i++)
240 sendtmp[(k*nj+j)*ni+i] = (_OUT_TYPE_)input(i,j,k);
241
242 // Some processors might have empty domain, will not receive any message so do not send !
243 if (!Process::je_suis_maitre() && geom.get_nb_elem_local(0) > 0)
244 {
245 envoyer(geom.get_offset_local(0), 0, 0);
246 envoyer(ni, 0, 0);
247 envoyer(geom.get_offset_local(1), 0, 0);
248 envoyer(nj, 0, 0);
249 envoyer(geom.get_offset_local(2), 0, 0);
250 envoyer(nk, 0, 0);
251 envoyer(sendtmp, 0, 0);
252 }
253
255 {
256 // Master mpi process collects the data
257 IntTab mapping;
258 geom.get_processor_mapping(mapping);
259 TRUSTArray<_OUT_TYPE_> recv_tmp;
260 for (int kproc = 0; kproc < mapping.dimension(2); kproc++)
261 {
262 for (int jproc = 0; jproc < mapping.dimension(1); jproc++)
263 {
264 for (int iproc = 0; iproc < mapping.dimension(0); iproc++)
265 {
266 const int numproc = mapping(iproc, jproc, kproc);
267 int imin2, jmin2, kmin2, ni2, nj2, nk2;
268 if (numproc == Process::me())
269 {
270 recv_tmp = sendtmp;
271 imin2 = geom.get_offset_local(0);
272 jmin2 = geom.get_offset_local(1);
273 kmin2 = geom.get_offset_local(2);
274 ni2 = ni;
275 nj2 = nj;
276 nk2 = nk;
277 }
278 else
279 {
280 recevoir(imin2,numproc,0);
281 recevoir(ni2,numproc,0);
282 recevoir(jmin2,numproc,0);
283 recevoir(nj2,numproc,0);
284 recevoir(kmin2,numproc,0);
285 recevoir(nk2,numproc,0);
286 recevoir(recv_tmp, numproc,0);
287 }
288
289 for (int k = 0; k < nk2; k++)
290 for (int j = 0; j < nj2; j++)
291 for (int i = 0; i < ni2; i++)
292 output[(((k+kmin2)*njtot+(j+jmin2))*nitot+i+imin2)*nbcompo + component] = recv_tmp[(k*nj2+j)*ni2+i];
293 }
294 }
295 }
296 }
297}
298
299/** Redistribute data read by master procs to all other procs.
300 *
301 */
302template<typename _IN_TYPE_, typename _TYPE_, typename _TYPE_ARRAY_>
303void IJK_Striped_Writer::redistribute_load(const BigTRUSTArray<_IN_TYPE_>& input, IJK_Field_template<_TYPE_,_TYPE_ARRAY_>& output,
304 const int nitot, const int njtot, const int nktot, const int nbcompo, const int component)
305{
306 const Domaine_IJK& splitting = output.get_domaine();
307 // For the moment, assume data is collected on mpi process 0
308 // mpi processes send data to master
309 int ni = output.ni();
310 int nj = output.nj();
311 int nk = output.nk();
312
313 // Some processors might have empty domain, will not receive any message so do not send !
314 if (!Process::je_suis_maitre() && splitting.get_nb_elem_local(0) > 0)
315 {
316 envoyer(splitting.get_offset_local(0), 0, 0);
317 envoyer(ni, 0, 0);
318 envoyer(splitting.get_offset_local(1), 0, 0);
319 envoyer(nj, 0, 0);
320 envoyer(splitting.get_offset_local(2), 0, 0);
321 envoyer(nk, 0, 0);
322 }
323
324 TRUSTArray<_IN_TYPE_> send_tmp;
325 TRUSTArray<_IN_TYPE_> recv_tmp;
327 {
328 // Master mpi process collects the data
329 IntTab mapping;
330 splitting.get_processor_mapping(mapping);
331 for (int kproc = 0; kproc < mapping.dimension(2); kproc++)
332 {
333 for (int jproc = 0; jproc < mapping.dimension(1); jproc++)
334 {
335 for (int iproc = 0; iproc < mapping.dimension(0); iproc++)
336 {
337 const int numproc = mapping(iproc, jproc, kproc);
338 int imin2, jmin2, kmin2, ni2, nj2, nk2;
339 if (numproc == Process::me())
340 {
341 imin2 = splitting.get_offset_local(0);
342 jmin2 = splitting.get_offset_local(1);
343 kmin2 = splitting.get_offset_local(2);
344 ni2 = ni;
345 nj2 = nj;
346 nk2 = nk;
347 }
348 else
349 {
350 recevoir(imin2,numproc,0);
351 recevoir(ni2,numproc,0);
352 recevoir(jmin2,numproc,0);
353 recevoir(nj2,numproc,0);
354 recevoir(kmin2,numproc,0);
355 recevoir(nk2,numproc,0);
356 }
357 send_tmp.resize_array(ni2 * nj2 * nk2);
358
359 for (int k = 0; k < nk2; k++)
360 for (int j = 0; j < nj2; j++)
361 for (int i = 0; i < ni2; i++)
362 send_tmp[(k*nj2+j)*ni2+i] = input[(((k+kmin2)*njtot+(j+jmin2))*nitot+i+imin2)*nbcompo + component];
363
364 if (numproc != Process::me())
365 envoyer(send_tmp, numproc, 0);
366 else
367 recv_tmp = send_tmp;
368 }
369 }
370 }
371 }
372
373 // Some processors might have empty domain, will not receive any message so do not send !
374 if (splitting.get_nb_elem_local(0) > 0)
375 {
377 recevoir(recv_tmp, 0, 0);
378
379 for (int k = 0; k < nk; k++)
380 for (int j = 0; j < nj; j++)
381 for (int i = 0; i < ni; i++)
382 output(i,j,k) = (_TYPE_)recv_tmp[(k*nj+j)*ni+i];
383 }
384}
385
386
387// Write 3 velocity components at faces in "lata" format:
388// write ni*nj*nk lines of data with
389// ni, nj, nk = number of nodes in the domain
390// line of data = 3 values. 1st value = component i of velocity on left face of element (i,j,k), 2nd value = component j, etc...
391// Returns the number of written values (1 value = 3 scalars)
392template<typename _OUT_TYPE_, typename _TYPE_, typename _TYPE_ARRAY_>
394{
398 {
399 Cerr << "Error in IJK_Striped_Writer::write_data_parallel_template(vx, vy, vz): provided fields have incorrect localisation" << finl;
401 }
402 const Domaine_IJK& splitting = vx.get_domaine();
403 // In lata format, the velocity is written as an array of (vx, vy, vz) vectors.
404 // Size of the array is the total number of nodes in the mesh.
405 // The velocity associated with a node is the combination of velocities at the faces
406 // at the right of the node (in each direction).
407 const int nitot = splitting.get_nb_items_global(Domaine_IJK::ELEM, 0) + 1;
408 const int njtot = splitting.get_nb_items_global(Domaine_IJK::ELEM, 1) + 1;
409 const int nktot = splitting.get_nb_items_global(Domaine_IJK::ELEM, 2) + 1;
410
411 write_data_parallel2_template<_OUT_TYPE_,_TYPE_,_TYPE_ARRAY_>(filename, nitot, njtot, nktot, vx, vy, vz);
412
413 return (Size_t) nktot * njtot * nitot;
414}
415
416template<typename _OUT_TYPE_, typename _TYPE_, typename _TYPE_ARRAY_>
418{
419 const Domaine_IJK& splitting = f.get_domaine();
420
421 int nitot = 0, njtot = 0, nktot = 0;
423 {
424 nitot = splitting.get_nb_items_global(Domaine_IJK::ELEM, 0);
425 njtot = splitting.get_nb_items_global(Domaine_IJK::ELEM, 1);
426 nktot = splitting.get_nb_items_global(Domaine_IJK::ELEM, 2);
427 }
428 else if (f.get_localisation() == Domaine_IJK::NODES)
429 {
430 nitot = splitting.get_nb_items_global(Domaine_IJK::ELEM, 0)+1;
431 njtot = splitting.get_nb_items_global(Domaine_IJK::ELEM, 1)+1;
432 nktot = splitting.get_nb_items_global(Domaine_IJK::ELEM, 2)+1;
433 }
434 else
435 {
436 Cerr << "Error: cannot write single component of a field that is at ELEMents or nodes" << finl;
438 }
439 write_data_parallel2_template<_OUT_TYPE_,_TYPE_,_TYPE_ARRAY_>(filename, nitot, njtot, nktot, f, f, f);
440
441 return (Size_t) nktot * njtot * nitot;
442}
443
444// if vx, vy and vz are references to the same object, we assume that we have only 1 component.
445// otherwise, assume we have 3 components
446template<typename _OUT_TYPE_, typename _TYPE_, typename _TYPE_ARRAY_>
448 const int file_ni_tot, const int file_nj_tot, const int file_nk_tot,
450{
451 const Domaine_IJK& splitting = vx.get_domaine();
452 const int nb_components = ((&vy == &vx) && (&vz == &vx)) ? 1 : 3;
453#ifdef INT_is_64_
454 constexpr Size_t INT64_OFFSET = 6; // Size of "INT64" string (with term \0 char) in bytes ...
455#else
456 constexpr Size_t INT64_OFFSET = 0;
457#endif
458
459 // Number of processes that write to the file:
460 // Each process will write a portion of the total bloc of data, aligned on the bloc size.
461 // According to recommandations of LRZ about GPFS, we must write blocks of data
462 // which are a multiple of the gpfs block size (which is ???), aligned by the same amount,
463 // in order to avoid thrashing the filesystem.
464
465 Size_t block_size = Parallel_io_parameters::get_max_block_size();
466 // Reasonable default value for the number of writing processes (one per node, but no more than 16).
467 int nb_writing_processes = Parallel_io_parameters::get_nb_writing_processes();
468
469 // Indices of writing processes : spread over the whole set of processes to minimize the chance that
470 // we share the bandwidth with another writing process:
471 ArrOfInt writing_processes(nb_writing_processes);
472 int my_writing_process_index = -1;
473 for (int i = 0; i < nb_writing_processes; i++)
474 {
475 writing_processes[i] = Process::nproc() * i / nb_writing_processes;
476 if (Process::me() == writing_processes[i])
477 my_writing_process_index = i;
478 }
479 //Cerr << "file_n_tot= " << file_ni_tot << " " << file_nj_tot << " " << file_nk_tot << finl;
480 //Cerr << "ncompo= " << nb_components << finl;
481 const Size_t total_data_bytes = (Size_t) file_nk_tot * file_nj_tot * file_ni_tot * nb_components * sizeof(_OUT_TYPE_);
482
483 FILE *file_pointer = 0;
484 // According to recommandations of LRZ about GPFS, we create the file on processor 0,
485 // do a barrier, then open the file on the other processors. Moreover, we seek to the end of the file
486 // and write something to give it the full size before other processors try to open the file.
487 // Writing 8 bytes at arbitrary position and creating the file with any size is instantaneous.
488 // Go back to unix interface because I need 64 bits offsets here
489 if (my_writing_process_index == 0)
490 {
491 errno = 0;
492 file_pointer = fopen(filename, "w"); // Write and truncate
493 if (!file_pointer || errno)
494 {
495 Cerr << "Error opening file " << filename << ": fopen(filename,w) failed" << finl;
497 }
498#ifdef INT_is_64_
499 const char i64[] = "INT64";
500 fwrite(i64, sizeof(char), INT64_OFFSET, file_pointer);
501#endif
502 // Seek at the end of file and write something to set the file size and check if filessystem
503 // accepts the final file size:
504 // On 64 bit platform, the prototype for the fseek library function takes a 64 bit integer, so this is ok:
505 fseek(file_pointer, total_data_bytes + INT64_OFFSET - sizeof(Size_t), SEEK_SET);
506 if (errno != 0)
507 {
508 Cerr << "Error seeking at file offset " << (int)(total_data_bytes>>32) << "GB in file " << filename << finl;
510 }
511 // Write something to allocate disk space for the file:
512 fwrite(&total_data_bytes, sizeof(Size_t), 1, file_pointer);
513 if (errno != 0)
514 {
515 Cerr << "Error writing at file offset " << (int)(total_data_bytes>>32) << "GB in file " << filename << finl;
517 }
518 }
519 // Wait for the processor 0 to initialize the file:
521 // Other writing processors can open the file for read/write now:
522 if (my_writing_process_index > 0)
523 {
524 errno = 0;
525 file_pointer = fopen(filename, "r+"); // Open for read/write
526 if (!file_pointer || errno)
527 {
528 Cerr << "Error opening file " << filename << ": fopen(filename,r+) failed on writing process " << my_writing_process_index << finl;
530 }
531 }
532
533 Schema_Comm schema_comm;
534 schema_comm.set_all_to_allv_flag(1);
535 // Sending to write-processes, array contains only other processes, not me
536 ArrOfInt send_pe_list(writing_processes.size_array());
537
538 send_pe_list.resize(0);
539 for (int i = 0; i < writing_processes.size_array(); i++)
540 if (i != my_writing_process_index) // Declare everybody but me.
541 send_pe_list.append_array(writing_processes[i]);
542 ArrOfInt recv_pe_list(Process::nproc()-1);
543
544 recv_pe_list.resize(0);
545 if (my_writing_process_index >= 0)
546 {
547 for (int i = 0; i < Process::nproc(); i++)
548 {
549 if (i != Process::me()) // Declare everybody but me.
550 recv_pe_list.append_array(i);
551 }
552 }
553 schema_comm.set_send_recv_pe_list(send_pe_list, recv_pe_list, 1 /* allow sending to myself */);
554
555 int total_number_of_blocks = (int)((total_data_bytes + block_size - 1) / block_size);
556
557 // Temporary array for interlaced data:
558 TRUSTArray<_OUT_TYPE_> tmp(std::max(vx.ni(), std::max(vy.ni(), vz.ni())) * nb_components);
559 TRUSTArray<_OUT_TYPE_> recv_tmp;
560 if (my_writing_process_index >= 0)
561 recv_tmp.resize_array((int)(block_size / sizeof(_OUT_TYPE_)));
562
563 // Index of the block that the first writing processor wants to write:
564 // writing process 0 writes block 0, writing process 1 writes block 1, etc
565 for (int i_block_proc0 = 0; i_block_proc0 < total_number_of_blocks; i_block_proc0 += nb_writing_processes)
566 {
567 schema_comm.begin_comm();
568 // Each processor send the data required to fill the next
569 // nb_writing_processes blocks to each writing process:
570 // data for i_block_proc0 will be sent to writing_process[0]
571 // data for i_block_proc0+1 will be sent to writing_process[1],
572 // etc
573 for (int iwrite_process = 0; iwrite_process < nb_writing_processes; iwrite_process++)
574 {
575 Sortie& send_buffer = schema_comm.send_buffer(writing_processes[iwrite_process]);
576
577 Size_t offset_start_of_block = (Size_t) block_size * (i_block_proc0 + iwrite_process);
578 Size_t offset_end_of_block = offset_start_of_block + block_size;
579 if (offset_end_of_block > total_data_bytes)
580 offset_end_of_block = total_data_bytes;
581 if (offset_start_of_block >= offset_end_of_block)
582 break; // Remaining writing processors have no data to write
583 // Loop on j and k local coordinates, for each, find if there
584 // is a segment imin..imax to send.
585 // copy the segment with xyz components interlaced into tmp buffer and put
586 // the requested excerpt into the buffer
587 // For faces: we have perhaps not the same number of faces in each direction for each component, take max:
588 const int kmax = std::max(vx.nk(), std::max(vy.nk(), vz.nk()));
589 const int jmax = std::max(vx.nj(), std::max(vy.nj(), vz.nj()));
590 const int imax = std::max(vx.ni(), std::max(vy.ni(), vz.ni()));
591 for (int k = 0; k < kmax; k++)
592 {
593 for (int j = 0; j < jmax; j++)
594 {
595 // This is the position of the current block in the file, in bytes
596 // This is where the segment (i=0..imax-1, j, k) resides in the file, in bytes
597 Size_t offset_segment_start = (Size_t) (k + splitting.get_offset_local(DIRECTION_K)) * file_nj_tot;
598 offset_segment_start = (offset_segment_start + j + splitting.get_offset_local(DIRECTION_J)) * file_ni_tot;
599 offset_segment_start += splitting.get_offset_local(DIRECTION_I);
600 offset_segment_start *= nb_components * sizeof(_OUT_TYPE_);
601 Size_t offset_segment_end = offset_segment_start + imax * nb_components * sizeof(_OUT_TYPE_);
602
603 // Compute the intersection of the two ranges (block and segment):
604 const Size_t offset_intersection_start = (offset_start_of_block < offset_segment_start) ? offset_segment_start : offset_start_of_block;
605 const Size_t offset_intersection_end = (offset_end_of_block < offset_segment_end) ? offset_end_of_block : offset_segment_end;
606
607 // Check if intersection is empty ?
608 if (offset_intersection_end > offset_intersection_start)
609 {
610 // Copy i=0..imax data into tmp buffer and interlace components.
611 // Warn: for fields with 3 components, as the block_size is a power of two, only one block every three blocks
612 // starts with components x. Other blocks start with a component y or z... Therefore we interlace xyz data
613 // and select data for the current block from this interlace data. Also convert to write type:
614 tmp = 0.; // for data that is beyond limits
615 for (int i_component = 0; i_component < nb_components; i_component++)
616 {
617 const IJK_Field_template<_TYPE_,_TYPE_ARRAY_>& src_field = (i_component==0)?vx:((i_component==1)?vy:vz);
618 // There are not the same number of faces in each direction, check that we are not beyond limits for this component:
619 if (k < src_field.nk() && j < src_field.nj())
620 {
621 const int max_this_compo = src_field.ni();
622 for (int i = 0; i < max_this_compo; i++)
623 {
624 tmp[i * nb_components + i_component] = (_OUT_TYPE_)src_field(i, j, k);
625 }
626 }
627 }
628 // Everything still in bytes:
629 const int start_index_within_block = (int) (offset_intersection_start - offset_start_of_block);
630 const int start_index_within_tmp = (int) (offset_intersection_start - offset_segment_start);
631 // Length is in bytes
632 const int data_length = (int) (offset_intersection_end - offset_intersection_start);
633 send_buffer << start_index_within_block << data_length;
634 const int sz = (int)sizeof(_OUT_TYPE_);
635 send_buffer.put(tmp.addr() + start_index_within_tmp/sz,
636 data_length/sz, 1);
637 }
638 }
639 }
640 }
641 // Transmit data to writing processes:
642 schema_comm.echange_taille_et_messages();
643
644 // Extract received data and write to disk:
645 if (my_writing_process_index >= 0)
646 {
647 recv_tmp = 0.;
648 // Cast to Size_t to be sure that the multiply will not overflow:
649 const Size_t offset = (Size_t) block_size * (i_block_proc0 + my_writing_process_index);
650 //Cerr << "Offset=" << (int)offset << finl;
651 Size_t this_block_size = block_size;
652 if (total_data_bytes - offset < this_block_size)
653 this_block_size = total_data_bytes - offset;
654 const int nproc=Process::nproc();
655 for (int i_source_process = 0; i_source_process < nproc; i_source_process++)
656 {
657 Entree& recv_buffer = schema_comm.recv_buffer(i_source_process);
658 while (1)
659 {
660 int start_index_within_block;
661 recv_buffer >> start_index_within_block;
662 if (recv_buffer.eof())
663 break;
664 int data_length;
665 recv_buffer >> data_length;
666 //Cerr << "start_index_within_bloc= "<< start_index_within_block
667 // << " length= " << data_length
668 // << " this_block_size= " << (int)this_block_size << finl;
669 if (start_index_within_block < 0 || data_length < 1 || start_index_within_block + data_length > this_block_size)
670 {
671 Cerr << "Internal error in writing ijk lata file: start_index_within_block and data_length are invalid" << finl;
673 }
674 if (start_index_within_block + data_length > recv_tmp.size_array() * (int)sizeof(_OUT_TYPE_))
675 {
676 Cerr << "Internal error in writing ijk lata file: index out of bound" << finl;
678 }
679 const int sz = (int)sizeof(_OUT_TYPE_);
680 recv_buffer.get(recv_tmp.addr() + start_index_within_block/sz,
681 data_length/sz);
682 }
683 }
684 // Each process writes the data chunk
685 // On 64 bit platform, the prototype for the fseek library function takes a 64 bit integer.
686 errno = 0;
687 fseek(file_pointer, offset + INT64_OFFSET, SEEK_SET);
688 if (errno != 0)
689 {
690 Cerr << "Error seeking at file offset " << (int)(offset>>32) << "GB in file " << filename << finl;
692 }
693 fwrite((const char*)recv_tmp.addr(), this_block_size /* size in bytes */, 1, file_pointer);
694 if (errno != 0)
695 {
696 Cerr << "Error writing at file offset " << (int)(offset>>32) << "GB in file " << filename << finl;
698 }
699 }
700 schema_comm.end_comm();
701 }
702
703
704 if (file_pointer) // (Some processors don't open the file)
705 fclose(file_pointer);
706}
707
708#endif
This class encapsulates all the information related to the eulerian mesh for TrioIJK.
Definition Domaine_IJK.h:47
int get_offset_local(int direction) const
Returns the local offset in requested direction.
int get_nb_elem_local(int direction) const
Returns the number of elements owned by this processor in the given direction.
bool get_periodic_flag(int direction) const
Method returns true if periodic in this direction.
int get_nprocessor_per_direction(int direction) const
Returns the number of slices in the given direction.
int get_nb_items_local(Localisation loc, int direction) const
Returns the number of local items (on this processor) for the given localisation in the requested dir...
void get_processor_mapping(IntTab &mapping) const
Fills an array containing the mapping of processors.
int get_nb_items_global(Localisation loc, int direction) const
Returns the number of local items (on this processor) for the given localisation in the requested dir...
int get_local_slice_index(int direction) const
Returns the position of the local subdomain in the requested direction.
Localisation
Localisation sub class.
Definition Domaine_IJK.h:53
Class defining operators and methods for all reading operation in an input flow (file,...
Definition Entree.h:42
virtual int get(int *ob, std::streamsize n)
Definition Entree.cpp:222
virtual int eof()
Definition Entree.cpp:256
: This class is an IJK_Field_local with parallel informations.
Domaine_IJK::Localisation get_localisation() const
const Domaine_IJK & get_domaine() const
void write_data_parallel2_template(const char *filename, const int file_ni_tot, const int file_nj_tot, const int file_nk_tot, const IJK_Field_template< _TYPE_, _TYPE_ARRAY_ > &vx, const IJK_Field_template< _TYPE_, _TYPE_ARRAY_ > &vy, const IJK_Field_template< _TYPE_, _TYPE_ARRAY_ > &vz)
Size_t write_data_parallel_template(const char *filename, const IJK_Field_template< _TYPE_, _TYPE_ARRAY_ > &vx, const IJK_Field_template< _TYPE_, _TYPE_ARRAY_ > &vy, const IJK_Field_template< _TYPE_, _TYPE_ARRAY_ > &vz)
void redistribute_load(const BigTRUSTArray< _IN_TYPE_ > &input, IJK_Field_template< _TYPE_, _TYPE_ARRAY_ > &output, const int nitot, const int njtot, const int nktot, const int nbcompo, const int component)
Size_t write_data_template(const char *filename, const IJK_Field_template< _TYPE_, _TYPE_ARRAY_ > &f)
Size_t write_data_parallele_plan_template(const char *filename, const IJK_Field_template< _TYPE_, _TYPE_ARRAY_ > &vx, const IJK_Field_template< _TYPE_, _TYPE_ARRAY_ > &vy, const IJK_Field_template< _TYPE_, _TYPE_ARRAY_ > &vz)
void redistribute(const IJK_Field_template< _TYPE_, _TYPE_ARRAY_ > &input, BigTRUSTArray< _OUT_TYPE_ > &output, const int nitot, const int njtot, const int nktot, const int nbcompo, int component)
static int nproc()
renvoie le nombre de processeurs dans le groupe courant Voir Comm_Group::nproc() et PE_Groups::curren...
Definition Process.cpp:104
static void barrier()
Synchronise tous les processeurs du groupe courant (attend que tous les processeurs soient arrives a ...
Definition Process.cpp:136
static int me()
renvoie mon rang dans le groupe de communication courant.
Definition Process.cpp:125
static void exit(int exit_code=-1)
Routine de sortie de TRUST dans une region Kokkos.
Definition Process.cpp:455
static int je_suis_maitre()
renvoie 1 si on est sur le processeur maitre du groupe courant (c'est a dire me() == 0),...
Definition Process.cpp:86
Cette classe est a la classe C++ ofstream ce que la classe Sortie est a la classe C++ ostream Elle re...
Definition SFichier.h:27
void echange_taille_et_messages() const
Cette methode lance l'echange de donnees entre tous les processeurs.
Sortie & send_buffer(int num_PE) const
renvoie le buffer correspondant au processeur num_PE pour y entasser des donnees a envoyer.
void end_comm() const
Vide les buffers et libere les ressources: on a fini de lire les donnees recues dans les buffers.
Entree & recv_buffer(int num_PE) const
renvoie le buffer correspondant au processeur num_PE pour y lire les donnees recues.
void begin_comm() const
Reserve les buffers de comm pour une nouvelle communication.
void set_send_recv_pe_list(const ArrOfInt &send_pe_list, const ArrOfInt &recv_pe_list, const int me_to_me=0)
Definit la liste des processeurs a qui on va envoyer et de qui on va recevoir des donnees.
void set_all_to_allv_flag(int x)
virtual int ouvrir(const char *name, IOS_OPEN_MODE mode=ios::out)
Classe de base des flux de sortie.
Definition Sortie.h:52
virtual int put(const unsigned *ob, std::streamsize n, std::streamsize nb_colonnes=1)
Definition Sortie.cpp:101
void set_bin(bool bin) override
Change le mode d'ecriture du fichier.
Definition Sortie.cpp:255
Represents a an array of int/int64/double/... values.
Definition TRUSTArray.h:81
void append_array(_TYPE_ valeur)
_SIZE_ size_array() const
_TYPE_ * addr()
void resize_array(_SIZE_ new_size, RESIZE_OPTIONS opt=RESIZE_OPTIONS::COPY_INIT)
void resize(_SIZE_ new_size, RESIZE_OPTIONS opt=RESIZE_OPTIONS::COPY_INIT)
Definition TRUSTArray.h:156
_SIZE_ dimension(int d) const
Definition TRUSTTab.tpp:133