SIONlib  1.7.4
Scalable I/O library for parallel access to task-local files
partest_sionfile.c
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2019 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
10 #define _XOPEN_SOURCE 700
11 
12 #include <stdio.h>
13 #include <stdlib.h>
14 #include <string.h>
15 #include <ctype.h>
16 #include <unistd.h>
17 #include <mpi.h>
18 #include <time.h>
19 #include <math.h>
20 
21 #include "sion.h"
22 #include "sion_debug.h"
23 #include "sion_printts.h"
24 #include "partest.h"
25 #include "partest_util.h"
26 
27 sion_int64 partest_write_chunk_to_sionfile( int sid,
28  int rank,
29  char *localbuffer,
30  _test_options *options,
31  int do_debug,
32  double *checksum_fp,
33  int *chunkcnt);
34 
35 sion_int64 partest_read_chunk_from_sionfile( int sid,
36  int rank,
37  char *localbuffer,
38  _test_options *options,
39  int do_debug,
40  double *checksum_fp,
41  int *chunkcnt);
42 
43  /****************************************************************************************************************
44  *
45  * test_paropen_multi_mpi
46  *
47  ***************************************************************************************************************/
48 int test_paropen_multi_mpi(char *filename,
49  char *localbuffer,
50  _test_communicators *communicators,
51  _test_options *options
52  ) {
53 
54  double starttime, write_starttime, read_starttime, unlinktime, gunlinktime;
55  double timings[TIMINGS_MAX_NUM],ftimings[TIMINGS_MAX_NUM],gtimings[TIMINGS_MAX_NUM];
56  sion_int64 stats[STATS_MAX_NUM], fstats[STATS_MAX_NUM], gstats[STATS_MAX_NUM];
57 
58  double checksum_fp, checksum_read_fp;
59  int globalrank, sid, i;
60  int chunkcnt, using_hints;
61  sion_int64 rchunksize;
62  sion_int32 rfsblksize;
63  char *newfname=NULL;
64  int do_debug;
65  int ser_count,ser_step,ser_done;
66  char *file_mode;
67 
68  /* */ DPRINTFTS(communicators->all_rank, "start");
69  /* numfiles>=1 -> sion will split the communicator */
70  /* numfiles<=0 -> communicators->local contain correct local communicator, computed by split_communicator */
71 
72  /* not working task ? */
73  if (communicators->work_size == -1) {
74  return (0);
75  }
76 
77  if(0){
78 
79  MPI_Comm_size(communicators->work, &communicators->work_size);
80  MPI_Comm_rank(communicators->work, &communicators->work_rank);
81  MPI_Comm_size(communicators->workread, &communicators->workread_size);
82  MPI_Comm_rank(communicators->workread, &communicators->workread_rank);
83  fprintf(stderr, "timings[%06d] entering test_paropen_multi_mpi work=%d of %d workread=%d of %d\n", communicators->all_rank,
84  communicators->work_rank, communicators->work_size,
85  communicators->workread_rank, communicators->workread_size
86  );
87  }
88 
89 
90  do_debug=(((options->debug && communicators->all_rank == 0)) || ((options->Debug && (communicators->all_rank+1) == communicators->all_size)));
91 
92  for(i=0;i<TIMINGS_MAX_NUM;i++) timings[i]=ftimings[i]=gtimings[i]=0.0;
93  for(i=0;i<STATS_MAX_NUM;i++) stats[i]=fstats[i]=gstats[i]=0;
94 
95  if(options->do_write) {
96  /****************************** WRITE *****************************/
97 
98  /* to synchronize start */
99  barrier_after_start(communicators->local);
100 
101  /* TIMING */ write_starttime = starttime = MPI_Wtime();
102  if(options->use_posix) {
103  file_mode="bw,posix";
104  } else {
105  file_mode="bw,ansi";
106  }
107  sid = sion_paropen_mpi(filename, file_mode, &options->numfiles, communicators->work, &communicators->local,
108  &options->chunksize, &options->fsblksize, &globalrank, NULL, &newfname);
109  using_hints=sion_using_hints(sid);
110  stats[STATS_WR_NUM_FILES]=options->numfiles;
111  /* TIMING */ timings[TIMINGS_WR_OPEN] = MPI_Wtime()-starttime;
112 
113  /* TIMING */ starttime = MPI_Wtime();
114  barrier_after_open(communicators->local);
115  /* TIMING */ timings[TIMINGS_WR_OPEN_BARR_FILE] += MPI_Wtime()-starttime;
116 
117  /* TIMING */ starttime = MPI_Wtime();
118  barrier_after_open(communicators->work);
119  /* TIMING */ timings[TIMINGS_WR_OPEN_BARR_GLOBAL] = MPI_Wtime()-starttime;
120 
121  /* local is computed again by sion_paropen_mpi if numfiles >=1 */
122  /* TIMING */ starttime = MPI_Wtime();
123  MPI_Comm_size(communicators->local, &communicators->local_size);
124  MPI_Comm_rank(communicators->local, &communicators->local_rank);
125 
126  if(options->serialize_blocknum==-2) sion_startof_transaction_mpi(sid);
127  if(options->serialize_blocknum>0) ser_step=options->serialize_blocknum;
128  else ser_step=communicators->local_size;
129  ser_done=0;
130  /* TIMING */ timings[TIMINGS_WR_WRITE_SYNC] += MPI_Wtime()-starttime;
131 
132  for(ser_count=0;ser_count<communicators->local_size;ser_count+=ser_step) {
133  if ((!ser_done) && communicators->local_rank<(ser_count+ser_step)) {
134  ser_done=1;
135 
136  /* TIMING */ starttime = MPI_Wtime();
137  stats[STATS_BYTES_WR_WROTE]+=partest_write_chunk_to_sionfile(sid,communicators->all_rank,
138  localbuffer,options,do_debug&&options->verbose,
139  &checksum_fp,&chunkcnt);
140  stats[STATS_BYTES_WR_NUM_CHUNKS]+=chunkcnt;
141  /* TIMING */ timings[TIMINGS_WR_WRITE] += MPI_Wtime()-starttime;
142 
143  }
144 
145  /* TIMING */ starttime = MPI_Wtime();
146  if(options->serialize_blocknum==-2) sion_endof_transaction_mpi(sid);
147  /* TIMING */ timings[TIMINGS_WR_WRITE_SYNC] += MPI_Wtime()-starttime;
148 
149  /* TIMING */ starttime = MPI_Wtime();
150  barrier_after_write(communicators->local);
151  /* TIMING */ timings[TIMINGS_WR_WRITE_BARR_FILE] += MPI_Wtime()-starttime;
152 
153  }
154 
155  /* TIMING */ starttime = MPI_Wtime();
156  barrier_after_write(communicators->work);
157  /* TIMING */ timings[TIMINGS_WR_WRITE_BARR_GLOBAL] = MPI_Wtime()-starttime;
158 
159  /* TIMING */ starttime = MPI_Wtime();
160  sion_parclose_mpi(sid);
161  /* TIMING */ timings[TIMINGS_WR_CLOSE] = MPI_Wtime()-starttime;
162 
163  /* TIMING */ starttime = MPI_Wtime();
164  barrier_after_close(communicators->local);
165  /* TIMING */ timings[TIMINGS_WR_CLOSE_BARR_FILE] = MPI_Wtime()-starttime;
166 
167  /* TIMING */ starttime = MPI_Wtime();
168  barrier_after_close(communicators->work);
169  /* TIMING */ timings[TIMINGS_WR_CLOSE_BARR_GLOBAL] = MPI_Wtime()-starttime;
170  /* TIMING */ timings[TIMINGS_WR_TOTAL] = MPI_Wtime()-write_starttime;
171 
172  if (timings[TIMINGS_WR_TOTAL] == 0) timings[TIMINGS_WR_TOTAL] = -1;
173 
174 
175  /* TIMING */ starttime = MPI_Wtime();
176  if ( (communicators->work_size>0) && (communicators->work_rank==0) ) {
177  fprintf(stderr, "partest filespec: ( ) using_hints = %ld\n", (long) using_hints);
178  fprintf(stderr, "partest filespec: ( ) fsblksize = %ld\n", (long) options->fsblksize);
179  }
180 
181  if (options->verbose) {
182  write_timings("TASK",TIMINGS_METHOD_WRITE,timings,stats,communicators,options,1);
183  }
184  if (do_debug) {
185  write_timings("TASK",TIMINGS_METHOD_WRITE,timings,stats,communicators,options,0);
186  }
187 
188  /* TIMING */ timings[TIMINGS_MSGS] += MPI_Wtime()-starttime;
189 
190  if (options->numfiles > 1) {
191 
192  MPI_Reduce(timings, ftimings, TIMINGS_MAX_NUM, MPI_DOUBLE, MPI_MAX, 0, communicators->local);
193  MPI_Reduce(stats, fstats, STATS_MAX_NUM, SION_MPI_INT64, MPI_SUM, 0, communicators->local);
194 
195  if (communicators->local_rank == 0) {
196  write_timings("FILE",TIMINGS_METHOD_WRITE,ftimings,fstats,communicators,options,0);
197  }
198 
199  }
200 
201  MPI_Reduce(timings, gtimings, TIMINGS_MAX_NUM, MPI_DOUBLE, MPI_MAX, 0, communicators->work);
202  MPI_Reduce(stats, gstats, STATS_MAX_NUM, SION_MPI_INT64, MPI_SUM, 0, communicators->work);
203 
204  if (communicators->work_rank == 0) {
205  fprintf(stderr, "------------------------------------------------------------------------------------------\n");
206  write_timings("TOTAL",TIMINGS_METHOD_WRITE,gtimings,gstats,communicators,options,0);
207  fprintf(stderr, "------------------------------------------------------------------------------------------\n");
208  }
209 
210  if(newfname) {free(newfname);newfname=NULL;}
211 
212 
213  } /* do_write */
214 
215  /* to synchronize after write */
216  barrier_after_close(communicators->work);
217 
218  if(options->do_read) {
219  /****************************** READ *****************************/
220 
221  /* reset localbuffer */
222  for (i = 0; i < ((options->totalsize < options->bufsize) ? options->totalsize : options->bufsize); i++) {
223  localbuffer[i] = ' ';
224  }
225  /* to synchronize start */
226  barrier_after_start(communicators->local);
227 
228  /* TIMING */ read_starttime = starttime = MPI_Wtime();
229  if(options->use_posix) {
230  file_mode="br,posix";
231  } else {
232  file_mode="br,ansi";
233  }
234 
235  /* TIMING */ starttime = MPI_Wtime();
236  if (options->collectiveopenforread) {
237  /* commlocal and numfiles will be read from sion file */
238  sid = sion_paropen_mpi(filename,file_mode, &options->numfiles, communicators->workread,
239  &communicators->local, &options->chunksize, &options->fsblksize, &globalrank, NULL, &newfname);
240  stats[STATS_RD_NUM_FILES]=options->numfiles;
241  }
242  else {
243  /* there is some work to for multifile sion file */
244  sid = sion_open_rank(filename, file_mode, &rchunksize, &rfsblksize, &communicators->workread_rank, NULL);
245  stats[STATS_RD_NUM_FILES]=-1;
246  }
247  using_hints=sion_using_hints(sid);
248  /* TIMING */ timings[TIMINGS_RD_OPEN] = MPI_Wtime()-starttime;
249 
250  /* TIMING */ starttime = MPI_Wtime();
251  barrier_after_open(communicators->local);
252  /* TIMING */ timings[TIMINGS_RD_OPEN_BARR_FILE] += MPI_Wtime()-starttime;
253 
254  /* TIMING */ starttime = MPI_Wtime();
255  barrier_after_open(communicators->workread);
256  /* TIMING */ timings[TIMINGS_RD_OPEN_BARR_GLOBAL] = MPI_Wtime()-starttime;
257 
258 
259  /* TIMING */ starttime = MPI_Wtime();
260  /* local is computed again by sion_paropen_mpi if numfiles >=1 */
261  MPI_Comm_size(communicators->local, &communicators->local_size);
262  MPI_Comm_rank(communicators->local, &communicators->local_rank);
263 
264  if(options->serialize_blocknum==-2) sion_startof_transaction_mpi(sid);
265  if(options->serialize_blocknum>0) ser_step=options->serialize_blocknum;
266  else ser_step=communicators->local_size;
267  ser_done=0;
268 
269  /* TIMING */ timings[TIMINGS_RD_READ_SYNC] += MPI_Wtime()-starttime;
270 
271  for(ser_count=0;ser_count<communicators->local_size;ser_count+=ser_step) {
272 
273  if ((!ser_done) && communicators->local_rank<(ser_count+ser_step)) {
274  ser_done=1;
275 
276  /* TIMING */ starttime = MPI_Wtime();
277  stats[STATS_BYTES_RD_READ]+=partest_read_chunk_from_sionfile(sid,communicators->all_rank,
278  localbuffer,options,do_debug&&options->verbose,
279  &checksum_read_fp,&chunkcnt);
280  stats[STATS_BYTES_RD_NUM_CHUNKS]+=chunkcnt;
281  /* TIMING */ timings[TIMINGS_RD_READ] += MPI_Wtime()-starttime;
282 
283  }
284  /* TIMING */ starttime = MPI_Wtime();
285  if(options->serialize_blocknum==-2) sion_endof_transaction_mpi(sid);
286  /* TIMING */ timings[TIMINGS_RD_READ_SYNC] += MPI_Wtime()-starttime;
287 
288  /* TIMING */ starttime = MPI_Wtime();
289  barrier_after_read(communicators->local);
290  /* TIMING */ timings[TIMINGS_RD_READ_BARR_FILE] += MPI_Wtime()-starttime;
291 
292  }
293 
294  /* TIMING */ starttime = MPI_Wtime();
295  barrier_after_read(communicators->workread);
296  /* TIMING */ timings[TIMINGS_RD_READ_BARR_GLOBAL] = MPI_Wtime()-starttime;
297 
298 
299  /* TIMING */ starttime = MPI_Wtime();
300  if (options->collectiveopenforread) {
301  sion_parclose_mpi(sid);
302  }
303  else {
304  sion_close(sid);
305  }
306  /* TIMING */ timings[TIMINGS_RD_CLOSE] = MPI_Wtime()-starttime;
307 
308  /* TIMING */ starttime = MPI_Wtime();
309  barrier_after_close(communicators->local);
310  /* TIMING */ timings[TIMINGS_RD_CLOSE_BARR_FILE] = MPI_Wtime()-starttime;
311 
312  /* TIMING */ starttime = MPI_Wtime();
313  barrier_after_close(communicators->workread);
314  /* TIMING */ timings[TIMINGS_RD_CLOSE_BARR_GLOBAL] = MPI_Wtime()-starttime;
315  /* TIMING */ timings[TIMINGS_RD_TOTAL] = MPI_Wtime()-read_starttime;
316 
317 
318  /* TIMING */ starttime = MPI_Wtime();
319  if (timings[TIMINGS_RD_TOTAL] == 0) timings[TIMINGS_RD_TOTAL] = -1;
320 
321  if ( (communicators->work_size>0) && (communicators->workread_rank==0) ) {
322  fprintf(stderr, "partest filespec: ( ) using_hints = %ld\n", (long) using_hints);
323  fprintf(stderr, "partest filespec: ( ) fsblksize = %ld\n", (long) options->fsblksize);
324  }
325 
326  if (options->verbose) {
327  write_timings("TASK",TIMINGS_METHOD_READ,timings,stats,communicators,options,1);
328  }
329 
330  if (do_debug) {
331  write_timings("TASK",TIMINGS_METHOD_READ,timings,stats,communicators,options,0);
332  }
333 
334  /* TIMING */ timings[TIMINGS_MSGS] += MPI_Wtime()-starttime;
335 
336  if (options->numfiles > 1) {
337 
338  MPI_Reduce(timings, ftimings, TIMINGS_MAX_NUM, MPI_DOUBLE, MPI_MAX, 0, communicators->local);
339  MPI_Reduce(stats, fstats, STATS_MAX_NUM, SION_MPI_INT64, MPI_SUM, 0, communicators->local);
340 
341  if (communicators->local_rank == 0) {
342  write_timings("FILE",TIMINGS_METHOD_READ,ftimings,fstats,communicators,options,0);
343  }
344 
345  }
346 
347  MPI_Reduce(timings, gtimings, TIMINGS_MAX_NUM, MPI_DOUBLE, MPI_MAX, 0, communicators->workread);
348  MPI_Reduce(stats, gstats, STATS_MAX_NUM, SION_MPI_INT64, MPI_SUM, 0, communicators->workread);
349 
350  /* */ DPRINTFTS(communicators->all_rank, "after red.");
351  if (communicators->workread_rank == 0) {
352  fprintf(stderr, "------------------------------------------------------------------------------------------\n");
353  write_timings("TOTAL",TIMINGS_METHOD_READ,gtimings,gstats,communicators,options,0);
354  fprintf(stderr, "------------------------------------------------------------------------------------------\n");
355  }
356 
357 #ifdef CHECKSUM
358  if(!options->suppress_checksum) {
359  if (fabs(checksum_fp - checksum_read_fp) > 1e-5) {
360  fprintf(stderr, "timings[%06d] ERROR in double checksum %14.10f==%14.10f, diff=%14.10f\n", communicators->local_rank,
361  checksum_fp, checksum_read_fp, checksum_fp - checksum_read_fp);
362  }
363  }
364 #endif
365 
366  } /* do_read */
367 
368  if(options->unlink_files) {
369  /* */ starttime = MPI_Wtime();
370  barrier_before_unlink(communicators->workread);
371  if (communicators->local_rank == 0) {
372  fprintf(stderr, "partest result: unlink file %s ...\n", newfname);
373  unlink(newfname);
374  }
375  barrier_after_unlink(communicators->workread);
376  /* */ unlinktime = MPI_Wtime() - starttime;
377  MPI_Reduce(&unlinktime, &gunlinktime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->work);
378  if (communicators->work_rank == 0) {
379  fprintf(stderr, "partest result: ultime=%10.6fs unlink %s\n", gunlinktime, newfname);
380  }
381  }
382 
383  if(newfname) {free(newfname);newfname=NULL;}
384 
385  return (1);
386 }
387 
388 sion_int64 partest_write_chunk_to_sionfile( int sid,
389  int rank,
390  char *localbuffer,
391  _test_options *options,
392  int do_debug,
393  double *checksum_fp,
394  int *chunkcnt) {
395  sion_int64 left;
396  sion_int64 bsumwrote;
397  size_t bytes_in_chunk;
398  size_t bwrite, bwrote;
399  int i;
400 
401  sion_int64 bufsize = (sion_int64) options->bufsize;
402  sion_int64 totalsize = (sion_int64) options->totalsize;
403  sion_int64 startoffset = options->startoffset;
404 
405  *checksum_fp=0.0;
406  left = totalsize;
407  bsumwrote = 0;
408  *chunkcnt = 0;
409  bytes_in_chunk=0;
410 
411  /* Write until total size of the data is reached */
412  while (left > 0) {
413  if(startoffset==0) bwrite = bufsize;
414  else {
415  bwrite = startoffset; startoffset=0;
416  }
417  if (bwrite > left)
418  bwrite = left;
419 
420  if (0) {
421  fprintf(stderr, "timings[%06d] write %lld bytes\n", rank, (sion_int64) bwrite);
422  }
423 
424  bytes_in_chunk+=bwrite;
425  if(bytes_in_chunk>options->chunksize) {
426  sion_ensure_free_space(sid, bwrite);
427  bytes_in_chunk=bwrite;
428  }
429  if(options->collectivewrite) {
430  bwrote = sion_coll_fwrite_mpi(localbuffer, 1, bwrite, sid);
431  } else {
432  bwrote = sion_fwrite(localbuffer, 1, bwrite, sid);
433  }
434 
435 #ifdef CHECKSUM
436  if(!options->suppress_checksum) {
437  for (i = 0; i < bwrote; i++)
438  *checksum_fp += (double) localbuffer[i];
439  }
440 #endif
441  left -= bwrote;
442  bsumwrote += bwrote;
443  (*chunkcnt)++;
444 
445  if (do_debug) {
446  fprintf(stderr, "timings[%06d] wrote %10lld bytes total: wrote %14lld bytes (%10.4f MB) left %14lld bytes (%10.4f MB)\n", rank,
447  (sion_int64) bwrote,
448  bsumwrote, bsumwrote / 1024.0 / 1024.0,
449  (sion_int64) left,
450  left / 1024.0 / 1024.0 );
451  }
452  }
453  return(bsumwrote);
454 }
455 
456 
457 sion_int64 partest_read_chunk_from_sionfile( int sid,
458  int rank,
459  char *localbuffer,
460  _test_options *options,
461  int do_debug,
462  double *checksum_read_fp,
463  int *chunkcnt) {
464 
465  sion_int64 left;
466  sion_int64 bsumread;
467  size_t bytes_in_chunk;
468  size_t btoread, bread;
469  int myfeof;
470  int i;
471 
472  sion_int64 bufsize = (sion_int64) options->bufsize;
473  sion_int64 totalsize = (sion_int64) options->totalsize;
474  sion_int64 startoffset = options->startoffset;
475 
476  *checksum_read_fp = 0;
477  left = totalsize;
478  bsumread = 0;
479  *chunkcnt = 0;
480  bytes_in_chunk = 0;
481 
482 
483  myfeof=sion_feof(sid);
484  while ((left > 0) && (!myfeof)) {
485 
486  if(startoffset==0) btoread = bufsize;
487  else {
488  btoread = startoffset; startoffset=0;
489  }
490  if (btoread > left)
491  btoread = left;
492 
493  bytes_in_chunk+=btoread;
494  if(bytes_in_chunk>options->chunksize) {
495  myfeof=sion_feof(sid);
496  }
497  if(!myfeof) {
498  if(options->collectiveread) {
499  bread = sion_coll_fread_mpi(localbuffer, 1, btoread, sid);
500  } else {
501  bread = sion_fread(localbuffer, 1, btoread, sid);
502  }
503 
504 #ifdef CHECKSUM
505  if(!options->suppress_checksum) {
506  for (i = 0; i < bread; i++)
507  *checksum_read_fp += (double) localbuffer[i];
508  }
509 #endif
510 
511  left -= bread;
512  bsumread += bread;
513  (*chunkcnt)++;
514 
515  if (do_debug) {
516  fprintf(stderr, "timings[%06d] read %10lld bytes total: read %14lld bytes (%10.4f MB) left %14lld bytes (%10.4f MB)\n", rank,
517  (sion_int64) bread,
518  bsumread, bsumread / 1024.0 / 1024.0,
519  (sion_int64) left,
520  left / 1024.0 / 1024.0 );
521  }
522  }
523  }
524 
525  return(bsumread);
526 }
527 
528 
int sion_feof(int sid)
Function that indicates whether the end of file is reached for this task.
Definition: sion_common.c:809
int sion_ensure_free_space(int sid, sion_int64 bytes)
Funtion to ensure that enough space is available for writing.
Definition: sion_common.c:1053
int sion_close(int sid)
Close a sion file.
Definition: sion_serial.c:106
size_t sion_fwrite(const void *data, size_t size, size_t nitems, int sid)
Write data to sion file.
Definition: sion_common.c:470
int sion_open_rank(char *fname, const char *file_mode, sion_int64 *chunksize, sion_int32 *fsblksize, int *rank, FILE **fileptr)
Open a sion file for a specific rank.
Definition: sion_serial.c:83
int sion_paropen_mpi(const char *fname, const char *file_mode, int *numFiles, MPI_Comm gComm, const MPI_Comm *lComm, sion_int64 *chunksize, sion_int32 *fsblksize, int *globalrank, FILE **fileptr, char **newfname)
Open a sion file using MPI.
Definition: sion_mpi_gen.c:85
size_t sion_fread(void *data, size_t size, size_t nitems, int sid)
Read data from sion file.
Definition: sion_common.c:609
Sion Time Stamp Header.
int sion_parclose_mpi(int sid)
Close a sion file using MPI.
Definition: sion_mpi_gen.c:230