SIONlib  1.7.1
Scalable I/O library for parallel access to task-local files
partest_sionfile.c
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2016 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
10 #include <stdio.h>
11 #include <stdlib.h>
12 #include <string.h>
13 #include <ctype.h>
14 #include <unistd.h>
15 #include <mpi.h>
16 #include <time.h>
17 #include <math.h>
18 
19 #include "sion.h"
20 #include "sion_debug.h"
21 #include "sion_printts.h"
22 #include "partest.h"
23 #include "partest_util.h"
24 
25 sion_int64 partest_write_chunk_to_sionfile( int sid,
26  int rank,
27  char *localbuffer,
28  _test_options *options,
29  int do_debug,
30  double *checksum_fp,
31  int *chunkcnt);
32 
33 sion_int64 partest_read_chunk_from_sionfile( int sid,
34  int rank,
35  char *localbuffer,
36  _test_options *options,
37  int do_debug,
38  double *checksum_fp,
39  int *chunkcnt);
40 
41  /****************************************************************************************************************
42  *
43  * test_paropen_multi_mpi
44  *
45  ***************************************************************************************************************/
46 int test_paropen_multi_mpi(char *filename,
47  char *localbuffer,
48  _test_communicators *communicators,
49  _test_options *options
50  ) {
51 
52  double starttime, write_starttime, read_starttime, unlinktime, gunlinktime;
53  double timings[TIMINGS_MAX_NUM],ftimings[TIMINGS_MAX_NUM],gtimings[TIMINGS_MAX_NUM];
54  sion_int64 stats[STATS_MAX_NUM], fstats[STATS_MAX_NUM], gstats[STATS_MAX_NUM];
55 
56  double checksum_fp, checksum_read_fp;
57  int globalrank, sid, i;
58  int chunkcnt, using_hints;
59  sion_int64 rchunksize;
60  sion_int32 rfsblksize;
61  char *newfname=NULL;
62  int do_debug;
63  int ser_count,ser_step,ser_done;
64  char *file_mode;
65 
66  /* */ DPRINTFTS(communicators->all_rank, "start");
67  /* numfiles>=1 -> sion will split the communicator */
68  /* numfiles<=0 -> communicators->local contain correct local communicator, computed by split_communicator */
69 
70  /* not working task ? */
71  if (communicators->work_size == -1) {
72  return (0);
73  }
74 
75  if(0){
76 
77  MPI_Comm_size(communicators->work, &communicators->work_size);
78  MPI_Comm_rank(communicators->work, &communicators->work_rank);
79  MPI_Comm_size(communicators->workread, &communicators->workread_size);
80  MPI_Comm_rank(communicators->workread, &communicators->workread_rank);
81  fprintf(stderr, "timings[%06d] entering test_paropen_multi_mpi work=%d of %d workread=%d of %d\n", communicators->all_rank,
82  communicators->work_rank, communicators->work_size,
83  communicators->workread_rank, communicators->workread_size
84  );
85  }
86 
87 
88  do_debug=(((options->debug && communicators->all_rank == 0)) || ((options->Debug && (communicators->all_rank+1) == communicators->all_size)));
89 
90  for(i=0;i<TIMINGS_MAX_NUM;i++) timings[i]=ftimings[i]=gtimings[i]=0.0;
91  for(i=0;i<STATS_MAX_NUM;i++) stats[i]=fstats[i]=gstats[i]=0;
92 
93  if(options->do_write) {
94  /****************************** WRITE *****************************/
95 
96  /* to synchronize start */
97  barrier_after_start(communicators->local);
98 
99  /* TIMING */ write_starttime = starttime = MPI_Wtime();
100  if(options->use_posix) {
101  file_mode="bw,posix";
102  } else {
103  file_mode="bw,ansi";
104  }
105  sid = sion_paropen_mpi(filename, file_mode, &options->numfiles, communicators->work, &communicators->local,
106  &options->chunksize, &options->fsblksize, &globalrank, NULL, &newfname);
107  using_hints=sion_using_hints(sid);
108  stats[STATS_WR_NUM_FILES]=options->numfiles;
109  /* TIMING */ timings[TIMINGS_WR_OPEN] = MPI_Wtime()-starttime;
110 
111  /* TIMING */ starttime = MPI_Wtime();
112  barrier_after_open(communicators->local);
113  /* TIMING */ timings[TIMINGS_WR_OPEN_BARR_FILE] += MPI_Wtime()-starttime;
114 
115  /* TIMING */ starttime = MPI_Wtime();
116  barrier_after_open(communicators->work);
117  /* TIMING */ timings[TIMINGS_WR_OPEN_BARR_GLOBAL] = MPI_Wtime()-starttime;
118 
119  /* local is computed again by sion_paropen_mpi if numfiles >=1 */
120  /* TIMING */ starttime = MPI_Wtime();
121  MPI_Comm_size(communicators->local, &communicators->local_size);
122  MPI_Comm_rank(communicators->local, &communicators->local_rank);
123 
124  if(options->serialize_blocknum==-2) sion_startof_transaction_mpi(sid);
125  if(options->serialize_blocknum>0) ser_step=options->serialize_blocknum;
126  else ser_step=communicators->local_size;
127  ser_done=0;
128  /* TIMING */ timings[TIMINGS_WR_WRITE_SYNC] += MPI_Wtime()-starttime;
129 
130  for(ser_count=0;ser_count<communicators->local_size;ser_count+=ser_step) {
131  if ((!ser_done) && communicators->local_rank<(ser_count+ser_step)) {
132  ser_done=1;
133 
134  /* TIMING */ starttime = MPI_Wtime();
135  stats[STATS_BYTES_WR_WROTE]+=partest_write_chunk_to_sionfile(sid,communicators->all_rank,
136  localbuffer,options,do_debug&&options->verbose,
137  &checksum_fp,&chunkcnt);
138  stats[STATS_BYTES_WR_NUM_CHUNKS]+=chunkcnt;
139  /* TIMING */ timings[TIMINGS_WR_WRITE] += MPI_Wtime()-starttime;
140 
141  }
142 
143  /* TIMING */ starttime = MPI_Wtime();
144  if(options->serialize_blocknum==-2) sion_endof_transaction_mpi(sid);
145  /* TIMING */ timings[TIMINGS_WR_WRITE_SYNC] += MPI_Wtime()-starttime;
146 
147  /* TIMING */ starttime = MPI_Wtime();
148  barrier_after_write(communicators->local);
149  /* TIMING */ timings[TIMINGS_WR_WRITE_BARR_FILE] += MPI_Wtime()-starttime;
150 
151  }
152 
153  /* TIMING */ starttime = MPI_Wtime();
154  barrier_after_write(communicators->work);
155  /* TIMING */ timings[TIMINGS_WR_WRITE_BARR_GLOBAL] = MPI_Wtime()-starttime;
156 
157  /* TIMING */ starttime = MPI_Wtime();
158  sion_parclose_mpi(sid);
159  /* TIMING */ timings[TIMINGS_WR_CLOSE] = MPI_Wtime()-starttime;
160 
161  /* TIMING */ starttime = MPI_Wtime();
162  barrier_after_close(communicators->local);
163  /* TIMING */ timings[TIMINGS_WR_CLOSE_BARR_FILE] = MPI_Wtime()-starttime;
164 
165  /* TIMING */ starttime = MPI_Wtime();
166  barrier_after_close(communicators->work);
167  /* TIMING */ timings[TIMINGS_WR_CLOSE_BARR_GLOBAL] = MPI_Wtime()-starttime;
168  /* TIMING */ timings[TIMINGS_WR_TOTAL] = MPI_Wtime()-write_starttime;
169 
170  if (timings[TIMINGS_WR_TOTAL] == 0) timings[TIMINGS_WR_TOTAL] = -1;
171 
172 
173  /* TIMING */ starttime = MPI_Wtime();
174  if ( (communicators->work_size>0) && (communicators->work_rank==0) ) {
175  fprintf(stderr, "partest filespec: ( ) using_hints = %ld\n", (long) using_hints);
176  fprintf(stderr, "partest filespec: ( ) fsblksize = %ld\n", (long) options->fsblksize);
177  }
178 
179  if (options->verbose) {
180  write_timings("TASK",TIMINGS_METHOD_WRITE,timings,stats,communicators,options,1);
181  }
182  if (do_debug) {
183  write_timings("TASK",TIMINGS_METHOD_WRITE,timings,stats,communicators,options,0);
184  }
185 
186  /* TIMING */ timings[TIMINGS_MSGS] += MPI_Wtime()-starttime;
187 
188  if (options->numfiles > 1) {
189 
190  MPI_Reduce(timings, ftimings, TIMINGS_MAX_NUM, MPI_DOUBLE, MPI_MAX, 0, communicators->local);
191  MPI_Reduce(stats, fstats, STATS_MAX_NUM, SION_MPI_INT64, MPI_SUM, 0, communicators->local);
192 
193  if (communicators->local_rank == 0) {
194  write_timings("FILE",TIMINGS_METHOD_WRITE,ftimings,fstats,communicators,options,0);
195  }
196 
197  }
198 
199  MPI_Reduce(timings, gtimings, TIMINGS_MAX_NUM, MPI_DOUBLE, MPI_MAX, 0, communicators->work);
200  MPI_Reduce(stats, gstats, STATS_MAX_NUM, SION_MPI_INT64, MPI_SUM, 0, communicators->work);
201 
202  if (communicators->work_rank == 0) {
203  fprintf(stderr, "------------------------------------------------------------------------------------------\n");
204  write_timings("TOTAL",TIMINGS_METHOD_WRITE,gtimings,gstats,communicators,options,0);
205  fprintf(stderr, "------------------------------------------------------------------------------------------\n");
206  }
207 
208  if(newfname) {free(newfname);newfname=NULL;}
209 
210 
211  } /* do_write */
212 
213  /* to synchronize after write */
214  barrier_after_close(communicators->work);
215 
216  if(options->do_read) {
217  /****************************** READ *****************************/
218 
219  /* reset localbuffer */
220  for (i = 0; i < ((options->totalsize < options->bufsize) ? options->totalsize : options->bufsize); i++) {
221  localbuffer[i] = ' ';
222  }
223  /* to synchronize start */
224  barrier_after_start(communicators->local);
225 
226  /* TIMING */ read_starttime = starttime = MPI_Wtime();
227  if(options->use_posix) {
228  file_mode="br,posix";
229  } else {
230  file_mode="br,ansi";
231  }
232 
233  /* TIMING */ starttime = MPI_Wtime();
234  if (options->collectiveopenforread) {
235  /* commlocal and numfiles will be read from sion file */
236  sid = sion_paropen_mpi(filename,file_mode, &options->numfiles, communicators->workread,
237  &communicators->local, &options->chunksize, &options->fsblksize, &globalrank, NULL, &newfname);
238  stats[STATS_RD_NUM_FILES]=options->numfiles;
239  }
240  else {
241  /* there is some work to for multifile sion file */
242  sid = sion_open_rank(filename, file_mode, &rchunksize, &rfsblksize, &communicators->workread_rank, NULL);
243  stats[STATS_RD_NUM_FILES]=-1;
244  }
245  using_hints=sion_using_hints(sid);
246  /* TIMING */ timings[TIMINGS_RD_OPEN] = MPI_Wtime()-starttime;
247 
248  /* TIMING */ starttime = MPI_Wtime();
249  barrier_after_open(communicators->local);
250  /* TIMING */ timings[TIMINGS_RD_OPEN_BARR_FILE] += MPI_Wtime()-starttime;
251 
252  /* TIMING */ starttime = MPI_Wtime();
253  barrier_after_open(communicators->workread);
254  /* TIMING */ timings[TIMINGS_RD_OPEN_BARR_GLOBAL] = MPI_Wtime()-starttime;
255 
256 
257  /* TIMING */ starttime = MPI_Wtime();
258  /* local is computed again by sion_paropen_mpi if numfiles >=1 */
259  MPI_Comm_size(communicators->local, &communicators->local_size);
260  MPI_Comm_rank(communicators->local, &communicators->local_rank);
261 
262  if(options->serialize_blocknum==-2) sion_startof_transaction_mpi(sid);
263  if(options->serialize_blocknum>0) ser_step=options->serialize_blocknum;
264  else ser_step=communicators->local_size;
265  ser_done=0;
266 
267  /* TIMING */ timings[TIMINGS_RD_READ_SYNC] += MPI_Wtime()-starttime;
268 
269  for(ser_count=0;ser_count<communicators->local_size;ser_count+=ser_step) {
270 
271  if ((!ser_done) && communicators->local_rank<(ser_count+ser_step)) {
272  ser_done=1;
273 
274  /* TIMING */ starttime = MPI_Wtime();
275  stats[STATS_BYTES_RD_READ]+=partest_read_chunk_from_sionfile(sid,communicators->all_rank,
276  localbuffer,options,do_debug&&options->verbose,
277  &checksum_read_fp,&chunkcnt);
278  stats[STATS_BYTES_RD_NUM_CHUNKS]+=chunkcnt;
279  /* TIMING */ timings[TIMINGS_RD_READ] += MPI_Wtime()-starttime;
280 
281  }
282  /* TIMING */ starttime = MPI_Wtime();
283  if(options->serialize_blocknum==-2) sion_endof_transaction_mpi(sid);
284  /* TIMING */ timings[TIMINGS_RD_READ_SYNC] += MPI_Wtime()-starttime;
285 
286  /* TIMING */ starttime = MPI_Wtime();
287  barrier_after_read(communicators->local);
288  /* TIMING */ timings[TIMINGS_RD_READ_BARR_FILE] += MPI_Wtime()-starttime;
289 
290  }
291 
292  /* TIMING */ starttime = MPI_Wtime();
293  barrier_after_read(communicators->workread);
294  /* TIMING */ timings[TIMINGS_RD_READ_BARR_GLOBAL] = MPI_Wtime()-starttime;
295 
296 
297  /* TIMING */ starttime = MPI_Wtime();
298  if (options->collectiveopenforread) {
299  sion_parclose_mpi(sid);
300  }
301  else {
302  sion_close(sid);
303  }
304  /* TIMING */ timings[TIMINGS_RD_CLOSE] = MPI_Wtime()-starttime;
305 
306  /* TIMING */ starttime = MPI_Wtime();
307  barrier_after_close(communicators->local);
308  /* TIMING */ timings[TIMINGS_RD_CLOSE_BARR_FILE] = MPI_Wtime()-starttime;
309 
310  /* TIMING */ starttime = MPI_Wtime();
311  barrier_after_close(communicators->workread);
312  /* TIMING */ timings[TIMINGS_RD_CLOSE_BARR_GLOBAL] = MPI_Wtime()-starttime;
313  /* TIMING */ timings[TIMINGS_RD_TOTAL] = MPI_Wtime()-read_starttime;
314 
315 
316  /* TIMING */ starttime = MPI_Wtime();
317  if (timings[TIMINGS_RD_TOTAL] == 0) timings[TIMINGS_RD_TOTAL] = -1;
318 
319  if ( (communicators->work_size>0) && (communicators->workread_rank==0) ) {
320  fprintf(stderr, "partest filespec: ( ) using_hints = %ld\n", (long) using_hints);
321  fprintf(stderr, "partest filespec: ( ) fsblksize = %ld\n", (long) options->fsblksize);
322  }
323 
324  if (options->verbose) {
325  write_timings("TASK",TIMINGS_METHOD_READ,timings,stats,communicators,options,1);
326  }
327 
328  if (do_debug) {
329  write_timings("TASK",TIMINGS_METHOD_READ,timings,stats,communicators,options,0);
330  }
331 
332  /* TIMING */ timings[TIMINGS_MSGS] += MPI_Wtime()-starttime;
333 
334  if (options->numfiles > 1) {
335 
336  MPI_Reduce(timings, ftimings, TIMINGS_MAX_NUM, MPI_DOUBLE, MPI_MAX, 0, communicators->local);
337  MPI_Reduce(stats, fstats, STATS_MAX_NUM, SION_MPI_INT64, MPI_SUM, 0, communicators->local);
338 
339  if (communicators->local_rank == 0) {
340  write_timings("FILE",TIMINGS_METHOD_READ,ftimings,fstats,communicators,options,0);
341  }
342 
343  }
344 
345  MPI_Reduce(timings, gtimings, TIMINGS_MAX_NUM, MPI_DOUBLE, MPI_MAX, 0, communicators->workread);
346  MPI_Reduce(stats, gstats, STATS_MAX_NUM, SION_MPI_INT64, MPI_SUM, 0, communicators->workread);
347 
348  /* */ DPRINTFTS(communicators->all_rank, "after red.");
349  if (communicators->workread_rank == 0) {
350  fprintf(stderr, "------------------------------------------------------------------------------------------\n");
351  write_timings("TOTAL",TIMINGS_METHOD_READ,gtimings,gstats,communicators,options,0);
352  fprintf(stderr, "------------------------------------------------------------------------------------------\n");
353  }
354 
355 #ifdef CHECKSUM
356  if(!options->suppress_checksum) {
357  if (fabs(checksum_fp - checksum_read_fp) > 1e-5) {
358  fprintf(stderr, "timings[%06d] ERROR in double checksum %14.10f==%14.10f, diff=%14.10f\n", communicators->local_rank,
359  checksum_fp, checksum_read_fp, checksum_fp - checksum_read_fp);
360  }
361  }
362 #endif
363 
364  } /* do_read */
365 
366  if(options->unlink_files) {
367  /* */ starttime = MPI_Wtime();
368  barrier_before_unlink(communicators->workread);
369  if (communicators->local_rank == 0) {
370  fprintf(stderr, "partest result: unlink file %s ...\n", newfname);
371  unlink(newfname);
372  }
373  barrier_after_unlink(communicators->workread);
374  /* */ unlinktime = MPI_Wtime() - starttime;
375  MPI_Reduce(&unlinktime, &gunlinktime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->work);
376  if (communicators->work_rank == 0) {
377  fprintf(stderr, "partest result: ultime=%10.6fs unlink %s\n", gunlinktime, newfname);
378  }
379  }
380 
381  if(newfname) {free(newfname);newfname=NULL;}
382 
383  return (1);
384 }
385 
386 sion_int64 partest_write_chunk_to_sionfile( int sid,
387  int rank,
388  char *localbuffer,
389  _test_options *options,
390  int do_debug,
391  double *checksum_fp,
392  int *chunkcnt) {
393  sion_int64 left;
394  sion_int64 bsumwrote;
395  size_t bytes_in_chunk;
396  size_t bwrite, bwrote;
397  int i;
398 
399  sion_int64 bufsize = (sion_int64) options->bufsize;
400  sion_int64 totalsize = (sion_int64) options->totalsize;
401  sion_int64 startoffset = options->startoffset;
402 
403  *checksum_fp=0.0;
404  left = totalsize;
405  bsumwrote = 0;
406  *chunkcnt = 0;
407  bytes_in_chunk=0;
408 
409  /* Write until total size of the data is reached */
410  while (left > 0) {
411  if(startoffset==0) bwrite = bufsize;
412  else {
413  bwrite = startoffset; startoffset=0;
414  }
415  if (bwrite > left)
416  bwrite = left;
417 
418  if (0) {
419  fprintf(stderr, "timings[%06d] write %lld bytes\n", rank, (sion_int64) bwrite);
420  }
421 
422  bytes_in_chunk+=bwrite;
423  if(bytes_in_chunk>options->chunksize) {
424  sion_ensure_free_space(sid, bwrite);
425  bytes_in_chunk=bwrite;
426  }
427  if(options->collectivewrite) {
428  bwrote = sion_coll_fwrite_mpi(localbuffer, 1, bwrite, sid);
429  } else {
430  bwrote = sion_fwrite(localbuffer, 1, bwrite, sid);
431  }
432 
433 #ifdef CHECKSUM
434  if(!options->suppress_checksum) {
435  for (i = 0; i < bwrote; i++)
436  *checksum_fp += (double) localbuffer[i];
437  }
438 #endif
439  left -= bwrote;
440  bsumwrote += bwrote;
441  (*chunkcnt)++;
442 
443  if (do_debug) {
444  fprintf(stderr, "timings[%06d] wrote %10lld bytes total: wrote %14lld bytes (%10.4f MB) left %14lld bytes (%10.4f MB)\n", rank,
445  (sion_int64) bwrote,
446  bsumwrote, bsumwrote / 1024.0 / 1024.0,
447  (sion_int64) left,
448  left / 1024.0 / 1024.0 );
449  }
450  }
451  return(bsumwrote);
452 }
453 
454 
455 sion_int64 partest_read_chunk_from_sionfile( int sid,
456  int rank,
457  char *localbuffer,
458  _test_options *options,
459  int do_debug,
460  double *checksum_read_fp,
461  int *chunkcnt) {
462 
463  sion_int64 left;
464  sion_int64 bsumread;
465  size_t bytes_in_chunk;
466  size_t btoread, bread;
467  int myfeof;
468  int i;
469 
470  sion_int64 bufsize = (sion_int64) options->bufsize;
471  sion_int64 totalsize = (sion_int64) options->totalsize;
472  sion_int64 startoffset = options->startoffset;
473 
474  *checksum_read_fp = 0;
475  left = totalsize;
476  bsumread = 0;
477  *chunkcnt = 0;
478  bytes_in_chunk = 0;
479 
480 
481  myfeof=sion_feof(sid);
482  while ((left > 0) && (!myfeof)) {
483 
484  if(startoffset==0) btoread = bufsize;
485  else {
486  btoread = startoffset; startoffset=0;
487  }
488  if (btoread > left)
489  btoread = left;
490 
491  bytes_in_chunk+=btoread;
492  if(bytes_in_chunk>options->chunksize) {
493  myfeof=sion_feof(sid);
494  }
495  if(!myfeof) {
496  if(options->collectiveread) {
497  bread = sion_coll_fread_mpi(localbuffer, 1, btoread, sid);
498  } else {
499  bread = sion_fread(localbuffer, 1, btoread, sid);
500  }
501 
502 #ifdef CHECKSUM
503  if(!options->suppress_checksum) {
504  for (i = 0; i < bread; i++)
505  *checksum_read_fp += (double) localbuffer[i];
506  }
507 #endif
508 
509  left -= bread;
510  bsumread += bread;
511  (*chunkcnt)++;
512 
513  if (do_debug) {
514  fprintf(stderr, "timings[%06d] read %10lld bytes total: read %14lld bytes (%10.4f MB) left %14lld bytes (%10.4f MB)\n", rank,
515  (sion_int64) bread,
516  bsumread, bsumread / 1024.0 / 1024.0,
517  (sion_int64) left,
518  left / 1024.0 / 1024.0 );
519  }
520  }
521  }
522 
523  return(bsumread);
524 }
525 
526 
int sion_feof(int sid)
Function that indicates whether the end of file is reached for this task.
Definition: sion_common.c:770
int sion_ensure_free_space(int sid, sion_int64 bytes)
Funtion to ensure that enough space is available for writing.
Definition: sion_common.c:1014
int sion_close(int sid)
Close a sion file.
Definition: sion_serial.c:113
size_t sion_fwrite(const void *data, size_t size, size_t nitems, int sid)
Write data to sion file.
Definition: sion_common.c:472
int sion_open_rank(char *fname, const char *file_mode, sion_int64 *chunksize, sion_int32 *fsblksize, int *rank, FILE **fileptr)
Open a sion file for a specific rank.
Definition: sion_serial.c:90
int sion_paropen_mpi(const char *fname, const char *file_mode, int *numFiles, MPI_Comm gComm, const MPI_Comm *lComm, sion_int64 *chunksize, sion_int32 *fsblksize, int *globalrank, FILE **fileptr, char **newfname)
Open a sion file using MPI.
Definition: sion_mpi_gen.c:83
size_t sion_fread(void *data, size_t size, size_t nitems, int sid)
Read data from sion file.
Definition: sion_common.c:591
Sion Time Stamp Header.
int sion_parclose_mpi(int sid)
Close a sion file using MPI.
Definition: sion_mpi_gen.c:239