SIONlib  1.7.4
Scalable I/O library for parallel access to task-local files
ompi_partest_sionfile.c
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2019 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
10 #define _XOPEN_SOURCE 700
11 
12 #include <stdio.h>
13 #include <stdlib.h>
14 #include <string.h>
15 #include <ctype.h>
16 #include <unistd.h>
17 #include <mpi.h>
18 
19 #include <time.h>
20 #include <math.h>
21 
22 #include "sion.h"
23 #include "sion_debug.h"
24 #include "sion_printts.h"
25 #include "ompi_partest.h"
26 #include <omp.h>
27 
28 /****************************************************************************************************************
29  *
30  * test_paropen_multi_ompi
31  *
32  ***************************************************************************************************************/
33 
44 int test_paropen_multi_ompi(char *filename,
45  char *localbuffer,
46  _test_communicators *communicators,
47  _test_options *options
48  ) {
49 
50  double starttime, gopentime, opentime, unlinktime, gwritetime, writetime, gclosetime, closetime, readtime, greadtime;
51  double barr1time, barr2time, barr3time;
52 
53  double t_writetime, t_readtime, t_opentime, t_closetime;
54  sion_int64 t_bsumread, t_bsumwrote;
55 
56  sion_int64 left;
57  sion_int64 bsumwrote, sumsize, bsumread;
58  double checksum_fp, checksum_read_fp;
59  int globalrank, sid, i, lstartoffset;
60  size_t bwrite, bwrote, btoread, bread;
61  int chunkcnt;
62  sion_int64 rchunksize;
63  sion_int32 rfsblksize;
64  FILE *fp;
65  int fd;
66  char cbuffer[2*MAXCHARLEN];
67  char *newfname;
68  size_t bytes_in_chunk;
69  int myfeof;
70  int ser_count,ser_step,ser_done;
71 
72  /* */ DPRINTFTS(communicators->all_rank, "start");
73 
74  /* not working task ? */
75  if (communicators->work_size == -1) {
76  return (0);
77  }
78 
79  if(0){
80 
81  MPI_Comm_size(communicators->work, &communicators->work_size);
82  MPI_Comm_rank(communicators->work, &communicators->work_rank);
83  MPI_Comm_size(communicators->workread, &communicators->workread_size);
84  MPI_Comm_rank(communicators->workread, &communicators->workread_rank);
85  fprintf(stderr, "timings[%03d,t%03d] entering test_paropen_multi_mpi work=%d of %d workread=%d of %d\n", communicators->all_rank,omp_get_thread_num(),
86  communicators->work_rank, communicators->work_size,
87  communicators->workread_rank, communicators->workread_size
88  );
89  }
90 
91  /****************************** WRITE *****************************/
92 
93  /* */ DPRINTFTS(communicators->all_rank, "[W]before open");
94  /* */ starttime = MPI_Wtime();
95  /* numfiles>=1 -> sion will split the communicator */
96  /* numfiles<=0 -> communicators->local contain correct local communicator, computed by split_communicator */
97  sid = sion_paropen_ompi(filename, "bw", &options->numfiles, communicators->work, &communicators->local, &options->chunksize, &options->fsblksize, &globalrank, &fp, &newfname);
98  /* */ opentime = MPI_Wtime() - starttime;
99  /* */ starttime = MPI_Wtime();
100  barrier_after_open(communicators->work);
101  /* */ barr1time = MPI_Wtime() - starttime;
102  /* */ DPRINTFTS(communicators->all_rank, "[W]after open");
103 
104  /* local is computed again by sion_paropen_mpi if numfiles >=1 */
105 
106  MPI_Comm_size(communicators->local, &communicators->local_size);
107  MPI_Comm_rank(communicators->local, &communicators->local_rank);
108 
109  checksum_fp = 0;
110 
111  if(options->use_posix) {
112  fd = fileno(fp);
113  }
114 
115  if(options->serialize_blocknum>0) ser_step=options->serialize_blocknum;
116  else ser_step=communicators->local_size;
117  ser_done=0;
118  for(ser_count=0;ser_count<communicators->local_size;ser_count+=ser_step) {
119  if ((!ser_done) && communicators->local_rank<(ser_count+ser_step)) {
120  /* fprintf(stderr, "starting write on task %d ser_count=%d ser_step=%d ser_done=%d\n", communicators->local_rank,ser_count,ser_step,ser_done); */
121  ser_done=1;
122  left = options->totalsize;
123  bsumwrote = 0;
124  chunkcnt = 0;
125  bytes_in_chunk=0;
126  lstartoffset=options->startoffset;
127  /* Write until we reach the total size of the data */
128  while (left > 0) {
129  if(lstartoffset==0) bwrite = options->bufsize;
130  else {
131  bwrite = lstartoffset; lstartoffset=0;
132  }
133  if (bwrite > left) bwrite = left;
134 
135  #pragma omp master
136  {
137  if (((options->debug && communicators->all_rank == 0)) || ((options->Debug && communicators->all_rank == communicators->all_size))) {
138  fprintf(stderr, "timings[%03d,t%03d] write %lld bytes\n", communicators->all_rank,omp_get_thread_num(), (sion_int64) bwrite);
139  }
140  }
141 #pragma omp barrier
142 
143  bytes_in_chunk+=bwrite;
144  if(bytes_in_chunk>options->chunksize) {
145  sion_ensure_free_space(sid, bwrite);
146  bytes_in_chunk=bwrite;
147  }
148 
149  if(options->use_posix) {
150  bwrote = write(fd, localbuffer, 1*bwrite);
151  } else {
152  bwrote = fwrite(localbuffer, 1, bwrite, fp);
153  }
154 
155  #ifdef CHECKSUM
156  if(!options->suppress_checksum) {
157  checksum_fp=0.0;
158  for (i = 0; i < bwrote; i++)
159  checksum_fp += (double) localbuffer[i];
160  }
161  #endif
162 
163  left -= bwrote;
164  bsumwrote += bwrote;
165  chunkcnt++;
166 
167 
168  if (((options->debug && communicators->all_rank == 0)) || ((options->Debug && communicators->all_rank == communicators->all_size))) {
169  fprintf(stderr, "timings[%03d,t%03d] wrote (%lld bytes) %lld bytes (%10.4f MB) (%lld left)\n", communicators->all_rank,omp_get_thread_num(),(sion_int64) bwrote,
170  bsumwrote, bsumwrote / 1024.0 / 1024.0, (sion_int64) left);
171  fprintf(stderr, "timings[%03d,t%03d] after write position in file= %lld \n", communicators->all_rank,omp_get_thread_num(), sion_get_position(sid));
172  }
173  }
174  }
175 
176  }
177  fflush(fp);
178 
179  /* */ writetime = MPI_Wtime() - starttime;
180 
181  /* */ starttime = MPI_Wtime();
182  barrier_after_write(communicators->work);
183  /* */ barr2time = MPI_Wtime() - starttime;
184 
185  /* */ starttime = MPI_Wtime();
186  sion_parclose_ompi(sid);
187  /* */ closetime = MPI_Wtime() - starttime;
188  /* */ DPRINTFTS(communicators->all_rank, "[W]before close");
189  /* */ starttime = MPI_Wtime();
190  barrier_after_close(communicators->work);
191  /* */ barr3time = MPI_Wtime() - starttime;
192  /* */ DPRINTFTS(communicators->all_rank, "[W]after close");
193 
194  if (writetime == 0) writetime = -1;
195 
196  if (options->verbose) {
197  sprintf(cbuffer,
198  "timings[%03d,t%03d] open=%10.6fs write=%10.6fs close=%10.6fs barrier(open=%10.6fs, write=%10.6fs, close=%10.6fs) #chunks=%d bw=%10.4f MB/s ionode=%d\n",
199  communicators->all_rank,omp_get_thread_num(), opentime, writetime, closetime, barr1time, barr2time, barr3time, chunkcnt,
200  options->totalsize / 1024.0 / 1024.0 / writetime, communicators->ionode_number);
201  collective_print_gather(cbuffer, communicators->work);
202 
203  }
204 
205  if (options->numfiles >= 1) {
206  reduce_omp(&bsumwrote,&t_bsumwrote,MPI_SUM,_PARTEST_SION_INT64);
207  #pragma omp master
208  {
209  sumsize = 0;
210  MPI_Reduce(&t_bsumwrote, &sumsize, 1, SION_MPI_INT64, MPI_SUM, 0, communicators->local);
211  if (communicators->local_rank == 0) {
212  fprintf(stderr, "partest result: local totalsize=%10.4f MB wrote %10.4f MB to %s all_rank=%d\n", options->totalsize / 1024.0 / 1024.0,
213  1.0 * sumsize / 1024.0 / 1024.0, newfname, communicators->all_rank);
214  }
215  }
216  #pragma omp barrier
217  }
218 
219  /* */ DPRINTFTS(communicators->all_rank, "before red.");
220  reduce_omp(&bsumwrote,&t_bsumwrote,MPI_SUM,_PARTEST_SION_INT64);
221  reduce_omp(&opentime,&t_opentime,MPI_MAX,_PARTEST_DOUBLE);
222  reduce_omp(&closetime,&t_closetime,MPI_MAX,_PARTEST_DOUBLE);
223  reduce_omp(&writetime,&t_writetime,MPI_MAX,_PARTEST_DOUBLE);
224 
225  #pragma omp master
226  {
227  MPI_Reduce(&t_bsumwrote, &sumsize, 1, SION_MPI_INT64, MPI_SUM, 0, communicators->work);
228  MPI_Reduce(&t_opentime, &gopentime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->work);
229  MPI_Reduce(&t_closetime, &gclosetime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->work);
230  MPI_Reduce(&t_writetime, &gwritetime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->work);
231  }
232 #pragma omp barrier
233 
234  /* */ DPRINTFTS(communicators->all_rank, "after red.");
235  #pragma omp master
236  {
237  if (communicators->work_rank == 0) {
238  fprintf(stderr, "------------------------------------------------------------------------------------------\n");
239  fprintf(stderr, "TOTAL result: open=%10.6fs close=%10.6fs wrote %10.4f MB write=%10.6fs bw=%10.4f MB/s to %d files\n",
240  gopentime, gclosetime, 1.0 * sumsize / 1024.0 / 1024.0, gwritetime, 1.0 * sumsize / 1024.0 / 1024.0 / gwritetime, options->numfiles);
241  fprintf(stderr, "------------------------------------------------------------------------------------------\n");
242  }
243  if (communicators->work_rank == 0)
244  fprintf(stderr, "*********************************************************************************************\n");
245  }
246 #pragma omp barrier
247 
248  /****************************** READ *****************************/
249 
250  /* reset localbuffer */
251  for (i = 0; i < ((options->totalsize < options->bufsize) ? options->totalsize : options->bufsize); i++) {
252  localbuffer[i] = ' ';
253  }
254 
255  /* */ DPRINTFTS(communicators->all_rank, "[R]before open");
256  /* */ starttime = MPI_Wtime();
257  if (options->collectiveopenforread) {
258  /* commlocal and numfiles will be read from sion file */
259  sid = sion_paropen_ompi(filename,
260  "br", &options->numfiles, communicators->workread, &communicators->local, &options->chunksize, &options->fsblksize, &globalrank, &fp, &newfname);
261  /* local is computed again by sion_paropen_mpi if numfiles >=1 */
262  MPI_Comm_size(communicators->local, &communicators->local_size);
263  MPI_Comm_rank(communicators->local, &communicators->local_rank);
264 
265  }
266  else {
267  /* there is some work to for multifile sion file */
268  sid = sion_open_rank(filename, "br", &rchunksize, &rfsblksize, &communicators->workread_rank, &fp);
269  }
270  /* */ opentime = MPI_Wtime() - starttime;
271 
272  /* */ starttime = MPI_Wtime();
273  barrier_after_open(communicators->workread);
274  /* */ barr1time = MPI_Wtime() - starttime;
275  /* */ DPRINTFTS(communicators->all_rank, "[R]after open");
276 
277  if(options->use_posix) {
278  fd = fileno(fp);
279  }
280 
281  if(options->serialize_blocknum>0) ser_step=options->serialize_blocknum;
282  else ser_step=communicators->local_size;
283  ser_done=0;
284 
285  for(ser_count=0;ser_count<communicators->local_size;ser_count+=ser_step) {
286 
287  if ((!ser_done) && communicators->local_rank<(ser_count+ser_step)) {
288  /* fprintf(stderr, "starting read on task %d ser_count=%d ser_step=%d ser_done=%d\n", communicators->local_rank,ser_count,ser_step,ser_done); */
289  ser_done=1;
290 
291  checksum_read_fp = 0;
292  left = options->totalsize;
293  bsumread = 0;
294  chunkcnt = 0;
295  bytes_in_chunk = 0;
296  lstartoffset=options->startoffset;
297 
298  myfeof=sion_feof(sid);
299  while ((left > 0) && (!myfeof)) {
300 
301  if(lstartoffset==0) btoread = options->bufsize;
302  else {
303  btoread = lstartoffset; lstartoffset=0;
304  }
305  if (btoread > left)
306  btoread = left;
307 
308  bytes_in_chunk+=btoread;
309  if(bytes_in_chunk>options->chunksize) {
310  myfeof=sion_feof(sid);
311  }
312  if(!myfeof) {
313  if(options->use_posix) {
314  bread = read(fd, localbuffer, 1*btoread);
315  } else {
316  bread = fread(localbuffer, 1, btoread, fp);
317  }
318 
319 
320  #ifdef CHECKSUM
321  if(!options->suppress_checksum) {
322  checksum_read_fp=0.0;
323  for (i = 0; i < bread; i++)
324  checksum_read_fp += (double) localbuffer[i];
325  }
326  #endif
327 
328  left -= bread;
329  bsumread += bread;
330  chunkcnt++;
331  #pragma omp master
332  {
333  if (((options->debug && communicators->all_rank == 0)) || ((options->Debug && communicators->all_rank == communicators->all_size))) {
334  fprintf(stderr, "timings[%03d,t%03d] read (%lld bytes) %lld bytes (%10.4f MB) (%lld left)\n",
335  communicators->all_rank,omp_get_thread_num(), (sion_int64) bread, bsumread, bsumread / 1024.0 / 1024.0, (sion_int64) left);
336  fprintf(stderr, "timings[%03d,t%03d] after read position in file= %lld restinblock=%lld\n",
337  communicators->all_rank,omp_get_thread_num(), sion_get_position(sid), sion_bytes_avail_in_block(sid));
338  }
339  }
340 #pragma omp barrier
341  }
342  }
343  }
344 
345  /* fprintf(stderr, "after barrier on local comm on task %d ser_count=%d ser_step=%d ser_done=%d\n", communicators->local_rank,ser_count,ser_step,ser_done); */
346  /* barrier_after_read(communicators->local); */
347  }
348  fflush(fp);
349  /* */ readtime = MPI_Wtime() - starttime;
350 
351  /* */ starttime= MPI_Wtime();
352  barrier_after_read(communicators->workread);
353  /* */ barr2time = MPI_Wtime() - starttime;
354 
355  /* */ starttime = MPI_Wtime();
356  if (options->collectiveopenforread) {
357  sion_parclose_ompi(sid);
358  }
359  else {
360  sion_close(sid);
361  }
362  /* */ closetime = MPI_Wtime() - starttime;
363  /* */ DPRINTFTS(communicators->all_rank, "[R]before close");
364  barrier_after_close(communicators->workread);
365  /* */ DPRINTFTS(communicators->all_rank, "[R]after close");
366 
367  if (readtime == 0)
368  readtime = -1;
369  if (options->verbose) {
370  sprintf(cbuffer,
371  "timings[%03d,t%03d] open=%10.6fs read=%10.6fs close=%10.6fs barrier(open=%10.6fs, read=%10.6fs, close=%10.6fs) #chunks=%d br=%10.4f MB/s ionode=%d (check %d)\n",
372  communicators->all_rank,omp_get_thread_num(), opentime, readtime, closetime, barr1time, barr2time, barr3time, chunkcnt,
373  options->totalsize / 1024.0 / 1024.0 / readtime, communicators->ionode_number, (fabs(checksum_fp - checksum_read_fp) < 1e-5));
374 
375  collective_print_gather(cbuffer, communicators->workread);
376 
377  }
378 
379 #ifdef CHECKSUM
380  if(!options->suppress_checksum) {
381  if (fabs(checksum_fp - checksum_read_fp) > 1e-5) {
382  fprintf(stderr, "timings[%03d,t%03d] ERROR in double checksum %14.10f==%14.10f, diff=%14.10f\n", communicators->local_rank,omp_get_thread_num(),
383  checksum_fp, checksum_read_fp, checksum_fp - checksum_read_fp);
384  }
385  }
386 #endif
387 
388  if (options->numfiles >= 1) {
389  reduce_omp(&bsumread,&t_bsumread,MPI_SUM,_PARTEST_SION_INT64);
390  #pragma omp master
391  {
392  MPI_Reduce(&t_bsumread, &sumsize, 1, SION_MPI_INT64, MPI_SUM, 0, communicators->local);
393  if (communicators->local_rank == 0) {
394  fprintf(stderr, "partest result: read %10.4f MB from %s\n", 1.0 * sumsize / 1024.0 / 1024.0, newfname);
395  }
396  }
397 #pragma omp barrier
398  }
399 
400  /* */ DPRINTFTS(communicators->all_rank, "before red.");
401  reduce_omp(&bsumread,&t_bsumread,MPI_SUM,_PARTEST_SION_INT64);
402  reduce_omp(&opentime,&t_opentime,MPI_MAX,_PARTEST_DOUBLE);
403  reduce_omp(&closetime,&t_closetime,MPI_MAX,_PARTEST_DOUBLE);
404  reduce_omp(&readtime,&t_readtime,MPI_MAX,_PARTEST_DOUBLE);
405  #pragma omp master
406  {
407  MPI_Reduce(&t_bsumread, &sumsize, 1, SION_MPI_INT64, MPI_SUM, 0, communicators->workread);
408  MPI_Reduce(&t_opentime, &gopentime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->workread);
409  MPI_Reduce(&t_closetime, &gclosetime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->workread);
410  MPI_Reduce(&t_readtime, &greadtime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->workread);
411  }
412 #pragma omp barrier
413  /* */ DPRINTFTS(communicators->all_rank, "after red.");
414  #pragma omp master
415  {
416  if (communicators->workread_rank == 0) {
417  fprintf(stderr, "------------------------------------------------------------------------------------------\n");
418  fprintf(stderr, "TOTAL result: open=%10.6fs close=%10.6fs read %10.4f MB read=%10.6fs br=%10.4f MB/s from %d files\n",
419  gopentime, gclosetime, 1.0 * sumsize / 1024.0 / 1024.0, greadtime, 1.0 * sumsize / 1024.0 / 1024.0 / greadtime, options->numfiles);
420  fprintf(stderr, "------------------------------------------------------------------------------------------\n");
421  }
422  }
423 #pragma omp barrier
424 
425  if(options->unlink_files) {
426  /* */ starttime = MPI_Wtime();
427  barrier_before_unlink(communicators->workread);
428  #pragma omp master
429  {
430  if (communicators->local_rank == 0) {
431  fprintf(stderr, "partest result: unlink file %s ...\n", newfname);
432  unlink(newfname);
433  }
434  }
435 #pragma omp barrier
436  barrier_after_unlink(communicators->workread);
437  /* */ unlinktime = MPI_Wtime() - starttime;
438  #pragma omp master
439  {
440  if (communicators->local_rank == 0) {
441  fprintf(stderr, "partest result: ultime=%10.6fs unlink %s\n", unlinktime, newfname);
442  }
443  }
444 #pragma omp barrier
445  }
446 
447  return (1);
448 }
449 
int sion_feof(int sid)
Function that indicates whether the end of file is reached for this task.
Definition: sion_common.c:809
sion_int64 sion_bytes_avail_in_block(int sid)
Return the number of bytes available in the current chunk.
Definition: sion_common.c:879
int sion_ensure_free_space(int sid, sion_int64 bytes)
Funtion to ensure that enough space is available for writing.
Definition: sion_common.c:1053
sion_int64 sion_get_position(int sid)
Function that returns the current file position.
Definition: sion_common.c:930
int sion_close(int sid)
Close a sion file.
Definition: sion_serial.c:106
int sion_open_rank(char *fname, const char *file_mode, sion_int64 *chunksize, sion_int32 *fsblksize, int *rank, FILE **fileptr)
Open a sion file for a specific rank.
Definition: sion_serial.c:83
Sion Time Stamp Header.