SIONlib  1.7.1
Scalable I/O library for parallel access to task-local files
ompi_partest_sionfile.c
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2016 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
10 #include <stdio.h>
11 #include <stdlib.h>
12 #include <string.h>
13 #include <ctype.h>
14 #include <unistd.h>
15 #include <mpi.h>
16 
17 #include <time.h>
18 #include <math.h>
19 
20 #include "sion.h"
21 #include "sion_debug.h"
22 #include "sion_printts.h"
23 #include "ompi_partest.h"
24 #include <omp.h>
25 
26 /****************************************************************************************************************
27  *
28  * test_paropen_multi_ompi
29  *
30  ***************************************************************************************************************/
31 
42 int test_paropen_multi_ompi(char *filename,
43  char *localbuffer,
44  _test_communicators *communicators,
45  _test_options *options
46  ) {
47 
48  double starttime, gopentime, opentime, unlinktime, gwritetime, writetime, gclosetime, closetime, readtime, greadtime;
49  double barr1time, barr2time, barr3time;
50 
51  double t_writetime, t_readtime, t_opentime, t_closetime;
52  sion_int64 t_bsumread, t_bsumwrote;
53 
54  sion_int64 left;
55  sion_int64 bsumwrote, sumsize, bsumread;
56  double checksum_fp, checksum_read_fp;
57  int globalrank, sid, i, lstartoffset;
58  size_t bwrite, bwrote, btoread, bread;
59  int chunkcnt;
60  sion_int64 rchunksize;
61  sion_int32 rfsblksize;
62  FILE *fp;
63  int fd;
64  char cbuffer[2*MAXCHARLEN];
65  char *newfname;
66  size_t bytes_in_chunk;
67  int myfeof;
68  int ser_count,ser_step,ser_done;
69 
70  /* */ DPRINTFTS(communicators->all_rank, "start");
71 
72  /* not working task ? */
73  if (communicators->work_size == -1) {
74  return (0);
75  }
76 
77  if(0){
78 
79  MPI_Comm_size(communicators->work, &communicators->work_size);
80  MPI_Comm_rank(communicators->work, &communicators->work_rank);
81  MPI_Comm_size(communicators->workread, &communicators->workread_size);
82  MPI_Comm_rank(communicators->workread, &communicators->workread_rank);
83  fprintf(stderr, "timings[%03d,t%03d] entering test_paropen_multi_mpi work=%d of %d workread=%d of %d\n", communicators->all_rank,omp_get_thread_num(),
84  communicators->work_rank, communicators->work_size,
85  communicators->workread_rank, communicators->workread_size
86  );
87  }
88 
89  /****************************** WRITE *****************************/
90 
91  /* */ DPRINTFTS(communicators->all_rank, "[W]before open");
92  /* */ starttime = MPI_Wtime();
93  /* numfiles>=1 -> sion will split the communicator */
94  /* numfiles<=0 -> communicators->local contain correct local communicator, computed by split_communicator */
95  sid = sion_paropen_ompi(filename, "bw", &options->numfiles, communicators->work, &communicators->local, &options->chunksize, &options->fsblksize, &globalrank, &fp, &newfname);
96  /* */ opentime = MPI_Wtime() - starttime;
97  /* */ starttime = MPI_Wtime();
98  barrier_after_open(communicators->work);
99  /* */ barr1time = MPI_Wtime() - starttime;
100  /* */ DPRINTFTS(communicators->all_rank, "[W]after open");
101 
102  /* local is computed again by sion_paropen_mpi if numfiles >=1 */
103 
104  MPI_Comm_size(communicators->local, &communicators->local_size);
105  MPI_Comm_rank(communicators->local, &communicators->local_rank);
106 
107  checksum_fp = 0;
108 
109  if(options->use_posix) {
110  fd = fileno(fp);
111  }
112 
113  if(options->serialize_blocknum>0) ser_step=options->serialize_blocknum;
114  else ser_step=communicators->local_size;
115  ser_done=0;
116  for(ser_count=0;ser_count<communicators->local_size;ser_count+=ser_step) {
117  if ((!ser_done) && communicators->local_rank<(ser_count+ser_step)) {
118  /* fprintf(stderr, "starting write on task %d ser_count=%d ser_step=%d ser_done=%d\n", communicators->local_rank,ser_count,ser_step,ser_done); */
119  ser_done=1;
120  left = options->totalsize;
121  bsumwrote = 0;
122  chunkcnt = 0;
123  bytes_in_chunk=0;
124  lstartoffset=options->startoffset;
125  /* Write until we reach the total size of the data */
126  while (left > 0) {
127  if(lstartoffset==0) bwrite = options->bufsize;
128  else {
129  bwrite = lstartoffset; lstartoffset=0;
130  }
131  if (bwrite > left) bwrite = left;
132 
133  #pragma omp master
134  {
135  if (((options->debug && communicators->all_rank == 0)) || ((options->Debug && communicators->all_rank == communicators->all_size))) {
136  fprintf(stderr, "timings[%03d,t%03d] write %lld bytes\n", communicators->all_rank,omp_get_thread_num(), (sion_int64) bwrite);
137  }
138  }
139 #pragma omp barrier
140 
141  bytes_in_chunk+=bwrite;
142  if(bytes_in_chunk>options->chunksize) {
143  sion_ensure_free_space(sid, bwrite);
144  bytes_in_chunk=bwrite;
145  }
146 
147  if(options->use_posix) {
148  bwrote = write(fd, localbuffer, 1*bwrite);
149  } else {
150  bwrote = fwrite(localbuffer, 1, bwrite, fp);
151  }
152 
153  #ifdef CHECKSUM
154  if(!options->suppress_checksum) {
155  checksum_fp=0.0;
156  for (i = 0; i < bwrote; i++)
157  checksum_fp += (double) localbuffer[i];
158  }
159  #endif
160 
161  left -= bwrote;
162  bsumwrote += bwrote;
163  chunkcnt++;
164 
165 
166  if (((options->debug && communicators->all_rank == 0)) || ((options->Debug && communicators->all_rank == communicators->all_size))) {
167  fprintf(stderr, "timings[%03d,t%03d] wrote (%lld bytes) %lld bytes (%10.4f MB) (%lld left)\n", communicators->all_rank,omp_get_thread_num(),(sion_int64) bwrote,
168  bsumwrote, bsumwrote / 1024.0 / 1024.0, (sion_int64) left);
169  fprintf(stderr, "timings[%03d,t%03d] after write position in file= %lld \n", communicators->all_rank,omp_get_thread_num(), sion_get_position(sid));
170  }
171  }
172  }
173 
174  }
175  fflush(fp);
176 
177  /* */ writetime = MPI_Wtime() - starttime;
178 
179  /* */ starttime = MPI_Wtime();
180  barrier_after_write(communicators->work);
181  /* */ barr2time = MPI_Wtime() - starttime;
182 
183  /* */ starttime = MPI_Wtime();
184  sion_parclose_ompi(sid);
185  /* */ closetime = MPI_Wtime() - starttime;
186  /* */ DPRINTFTS(communicators->all_rank, "[W]before close");
187  /* */ starttime = MPI_Wtime();
188  barrier_after_close(communicators->work);
189  /* */ barr3time = MPI_Wtime() - starttime;
190  /* */ DPRINTFTS(communicators->all_rank, "[W]after close");
191 
192  if (writetime == 0) writetime = -1;
193 
194  if (options->verbose) {
195  sprintf(cbuffer,
196  "timings[%03d,t%03d] open=%10.6fs write=%10.6fs close=%10.6fs barrier(open=%10.6fs, write=%10.6fs, close=%10.6fs) #chunks=%d bw=%10.4f MB/s ionode=%d\n",
197  communicators->all_rank,omp_get_thread_num(), opentime, writetime, closetime, barr1time, barr2time, barr3time, chunkcnt,
198  options->totalsize / 1024.0 / 1024.0 / writetime, communicators->ionode_number);
199  collective_print_gather(cbuffer, communicators->work);
200 
201  }
202 
203  if (options->numfiles >= 1) {
204  reduce_omp(&bsumwrote,&t_bsumwrote,MPI_SUM,_PARTEST_SION_INT64);
205  #pragma omp master
206  {
207  sumsize = 0;
208  MPI_Reduce(&t_bsumwrote, &sumsize, 1, SION_MPI_INT64, MPI_SUM, 0, communicators->local);
209  if (communicators->local_rank == 0) {
210  fprintf(stderr, "partest result: local totalsize=%10.4f MB wrote %10.4f MB to %s all_rank=%d\n", options->totalsize / 1024.0 / 1024.0,
211  1.0 * sumsize / 1024.0 / 1024.0, newfname, communicators->all_rank);
212  }
213  }
214  #pragma omp barrier
215  }
216 
217  /* */ DPRINTFTS(communicators->all_rank, "before red.");
218  reduce_omp(&bsumwrote,&t_bsumwrote,MPI_SUM,_PARTEST_SION_INT64);
219  reduce_omp(&opentime,&t_opentime,MPI_MAX,_PARTEST_DOUBLE);
220  reduce_omp(&closetime,&t_closetime,MPI_MAX,_PARTEST_DOUBLE);
221  reduce_omp(&writetime,&t_writetime,MPI_MAX,_PARTEST_DOUBLE);
222 
223  #pragma omp master
224  {
225  MPI_Reduce(&t_bsumwrote, &sumsize, 1, SION_MPI_INT64, MPI_SUM, 0, communicators->work);
226  MPI_Reduce(&t_opentime, &gopentime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->work);
227  MPI_Reduce(&t_closetime, &gclosetime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->work);
228  MPI_Reduce(&t_writetime, &gwritetime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->work);
229  }
230 #pragma omp barrier
231 
232  /* */ DPRINTFTS(communicators->all_rank, "after red.");
233  #pragma omp master
234  {
235  if (communicators->work_rank == 0) {
236  fprintf(stderr, "------------------------------------------------------------------------------------------\n");
237  fprintf(stderr, "TOTAL result: open=%10.6fs close=%10.6fs wrote %10.4f MB write=%10.6fs bw=%10.4f MB/s to %d files\n",
238  gopentime, gclosetime, 1.0 * sumsize / 1024.0 / 1024.0, gwritetime, 1.0 * sumsize / 1024.0 / 1024.0 / gwritetime, options->numfiles);
239  fprintf(stderr, "------------------------------------------------------------------------------------------\n");
240  }
241  if (communicators->work_rank == 0)
242  fprintf(stderr, "*********************************************************************************************\n");
243  }
244 #pragma omp barrier
245 
246  /****************************** READ *****************************/
247 
248  /* reset localbuffer */
249  for (i = 0; i < ((options->totalsize < options->bufsize) ? options->totalsize : options->bufsize); i++) {
250  localbuffer[i] = ' ';
251  }
252 
253  /* */ DPRINTFTS(communicators->all_rank, "[R]before open");
254  /* */ starttime = MPI_Wtime();
255  if (options->collectiveopenforread) {
256  /* commlocal and numfiles will be read from sion file */
257  sid = sion_paropen_ompi(filename,
258  "br", &options->numfiles, communicators->workread, &communicators->local, &options->chunksize, &options->fsblksize, &globalrank, &fp, &newfname);
259  /* local is computed again by sion_paropen_mpi if numfiles >=1 */
260  MPI_Comm_size(communicators->local, &communicators->local_size);
261  MPI_Comm_rank(communicators->local, &communicators->local_rank);
262 
263  }
264  else {
265  /* there is some work to for multifile sion file */
266  sid = sion_open_rank(filename, "br", &rchunksize, &rfsblksize, &communicators->workread_rank, &fp);
267  }
268  /* */ opentime = MPI_Wtime() - starttime;
269 
270  /* */ starttime = MPI_Wtime();
271  barrier_after_open(communicators->workread);
272  /* */ barr1time = MPI_Wtime() - starttime;
273  /* */ DPRINTFTS(communicators->all_rank, "[R]after open");
274 
275  if(options->use_posix) {
276  fd = fileno(fp);
277  }
278 
279  if(options->serialize_blocknum>0) ser_step=options->serialize_blocknum;
280  else ser_step=communicators->local_size;
281  ser_done=0;
282 
283  for(ser_count=0;ser_count<communicators->local_size;ser_count+=ser_step) {
284 
285  if ((!ser_done) && communicators->local_rank<(ser_count+ser_step)) {
286  /* fprintf(stderr, "starting read on task %d ser_count=%d ser_step=%d ser_done=%d\n", communicators->local_rank,ser_count,ser_step,ser_done); */
287  ser_done=1;
288 
289  checksum_read_fp = 0;
290  left = options->totalsize;
291  bsumread = 0;
292  chunkcnt = 0;
293  bytes_in_chunk = 0;
294  lstartoffset=options->startoffset;
295 
296  myfeof=sion_feof(sid);
297  while ((left > 0) && (!myfeof)) {
298 
299  if(lstartoffset==0) btoread = options->bufsize;
300  else {
301  btoread = lstartoffset; lstartoffset=0;
302  }
303  if (btoread > left)
304  btoread = left;
305 
306  bytes_in_chunk+=btoread;
307  if(bytes_in_chunk>options->chunksize) {
308  myfeof=sion_feof(sid);
309  }
310  if(!myfeof) {
311  if(options->use_posix) {
312  bread = read(fd, localbuffer, 1*btoread);
313  } else {
314  bread = fread(localbuffer, 1, btoread, fp);
315  }
316 
317 
318  #ifdef CHECKSUM
319  if(!options->suppress_checksum) {
320  checksum_read_fp=0.0;
321  for (i = 0; i < bread; i++)
322  checksum_read_fp += (double) localbuffer[i];
323  }
324  #endif
325 
326  left -= bread;
327  bsumread += bread;
328  chunkcnt++;
329  #pragma omp master
330  {
331  if (((options->debug && communicators->all_rank == 0)) || ((options->Debug && communicators->all_rank == communicators->all_size))) {
332  fprintf(stderr, "timings[%03d,t%03d] read (%lld bytes) %lld bytes (%10.4f MB) (%lld left)\n",
333  communicators->all_rank,omp_get_thread_num(), (sion_int64) bread, bsumread, bsumread / 1024.0 / 1024.0, (sion_int64) left);
334  fprintf(stderr, "timings[%03d,t%03d] after read position in file= %lld restinblock=%lld\n",
335  communicators->all_rank,omp_get_thread_num(), sion_get_position(sid), sion_bytes_avail_in_block(sid));
336  }
337  }
338 #pragma omp barrier
339  }
340  }
341  }
342 
343  /* fprintf(stderr, "after barrier on local comm on task %d ser_count=%d ser_step=%d ser_done=%d\n", communicators->local_rank,ser_count,ser_step,ser_done); */
344  /* barrier_after_read(communicators->local); */
345  }
346  fflush(fp);
347  /* */ readtime = MPI_Wtime() - starttime;
348 
349  /* */ starttime= MPI_Wtime();
350  barrier_after_read(communicators->workread);
351  /* */ barr2time = MPI_Wtime() - starttime;
352 
353  /* */ starttime = MPI_Wtime();
354  if (options->collectiveopenforread) {
355  sion_parclose_ompi(sid);
356  }
357  else {
358  sion_close(sid);
359  }
360  /* */ closetime = MPI_Wtime() - starttime;
361  /* */ DPRINTFTS(communicators->all_rank, "[R]before close");
362  barrier_after_close(communicators->workread);
363  /* */ DPRINTFTS(communicators->all_rank, "[R]after close");
364 
365  if (readtime == 0)
366  readtime = -1;
367  if (options->verbose) {
368  sprintf(cbuffer,
369  "timings[%03d,t%03d] open=%10.6fs read=%10.6fs close=%10.6fs barrier(open=%10.6fs, read=%10.6fs, close=%10.6fs) #chunks=%d br=%10.4f MB/s ionode=%d (check %d)\n",
370  communicators->all_rank,omp_get_thread_num(), opentime, readtime, closetime, barr1time, barr2time, barr3time, chunkcnt,
371  options->totalsize / 1024.0 / 1024.0 / readtime, communicators->ionode_number, (fabs(checksum_fp - checksum_read_fp) < 1e-5));
372 
373  collective_print_gather(cbuffer, communicators->workread);
374 
375  }
376 
377 #ifdef CHECKSUM
378  if(!options->suppress_checksum) {
379  if (fabs(checksum_fp - checksum_read_fp) > 1e-5) {
380  fprintf(stderr, "timings[%03d,t%03d] ERROR in double checksum %14.10f==%14.10f, diff=%14.10f\n", communicators->local_rank,omp_get_thread_num(),
381  checksum_fp, checksum_read_fp, checksum_fp - checksum_read_fp);
382  }
383  }
384 #endif
385 
386  if (options->numfiles >= 1) {
387  reduce_omp(&bsumread,&t_bsumread,MPI_SUM,_PARTEST_SION_INT64);
388  #pragma omp master
389  {
390  MPI_Reduce(&t_bsumread, &sumsize, 1, SION_MPI_INT64, MPI_SUM, 0, communicators->local);
391  if (communicators->local_rank == 0) {
392  fprintf(stderr, "partest result: read %10.4f MB from %s\n", 1.0 * sumsize / 1024.0 / 1024.0, newfname);
393  }
394  }
395 #pragma omp barrier
396  }
397 
398  /* */ DPRINTFTS(communicators->all_rank, "before red.");
399  reduce_omp(&bsumread,&t_bsumread,MPI_SUM,_PARTEST_SION_INT64);
400  reduce_omp(&opentime,&t_opentime,MPI_MAX,_PARTEST_DOUBLE);
401  reduce_omp(&closetime,&t_closetime,MPI_MAX,_PARTEST_DOUBLE);
402  reduce_omp(&readtime,&t_readtime,MPI_MAX,_PARTEST_DOUBLE);
403  #pragma omp master
404  {
405  MPI_Reduce(&t_bsumread, &sumsize, 1, SION_MPI_INT64, MPI_SUM, 0, communicators->workread);
406  MPI_Reduce(&t_opentime, &gopentime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->workread);
407  MPI_Reduce(&t_closetime, &gclosetime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->workread);
408  MPI_Reduce(&t_readtime, &greadtime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->workread);
409  }
410 #pragma omp barrier
411  /* */ DPRINTFTS(communicators->all_rank, "after red.");
412  #pragma omp master
413  {
414  if (communicators->workread_rank == 0) {
415  fprintf(stderr, "------------------------------------------------------------------------------------------\n");
416  fprintf(stderr, "TOTAL result: open=%10.6fs close=%10.6fs read %10.4f MB read=%10.6fs br=%10.4f MB/s from %d files\n",
417  gopentime, gclosetime, 1.0 * sumsize / 1024.0 / 1024.0, greadtime, 1.0 * sumsize / 1024.0 / 1024.0 / greadtime, options->numfiles);
418  fprintf(stderr, "------------------------------------------------------------------------------------------\n");
419  }
420  }
421 #pragma omp barrier
422 
423  if(options->unlink_files) {
424  /* */ starttime = MPI_Wtime();
425  barrier_before_unlink(communicators->workread);
426  #pragma omp master
427  {
428  if (communicators->local_rank == 0) {
429  fprintf(stderr, "partest result: unlink file %s ...\n", newfname);
430  unlink(newfname);
431  }
432  }
433 #pragma omp barrier
434  barrier_after_unlink(communicators->workread);
435  /* */ unlinktime = MPI_Wtime() - starttime;
436  #pragma omp master
437  {
438  if (communicators->local_rank == 0) {
439  fprintf(stderr, "partest result: ultime=%10.6fs unlink %s\n", unlinktime, newfname);
440  }
441  }
442 #pragma omp barrier
443  }
444 
445  return (1);
446 }
447 
int sion_feof(int sid)
Function that indicates whether the end of file is reached for this task.
Definition: sion_common.c:770
sion_int64 sion_bytes_avail_in_block(int sid)
Return the number of bytes available in the current chunk.
Definition: sion_common.c:840
int sion_ensure_free_space(int sid, sion_int64 bytes)
Funtion to ensure that enough space is available for writing.
Definition: sion_common.c:1014
sion_int64 sion_get_position(int sid)
Function that returns the current file position.
Definition: sion_common.c:891
int sion_close(int sid)
Close a sion file.
Definition: sion_serial.c:113
int sion_open_rank(char *fname, const char *file_mode, sion_int64 *chunksize, sion_int32 *fsblksize, int *rank, FILE **fileptr)
Open a sion file for a specific rank.
Definition: sion_serial.c:90
Sion Time Stamp Header.