SIONlib  1.7.7
Scalable I/O library for parallel access to task-local files
partest_tasklocalfile.c
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2019 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** Copyright (c) 2019 **
8 ** DataDirect Networks **
9 ** **
10 ** See the file COPYRIGHT in the package base directory for details **
11 ****************************************************************************/
12 
13 #define _XOPEN_SOURCE 700
14 
15 #include <stdio.h>
16 #include <stdlib.h>
17 #include <string.h>
18 #include <ctype.h>
19 #include <unistd.h>
20 #include <mpi.h>
21 #include <time.h>
22 #include <math.h>
23 
24 #include "sion.h"
25 #include "sion_printts.h"
26 #include "sion_debug.h"
27 #include "sion_file.h"
28 #include "partest.h"
29 #include "partest_util.h"
30 
31 int
32 test_single_mpi(char *filename,
33  char *localbuffer,
34  _test_communicators *communicators,
35  _test_options *options) {
36 
37  int i;
38  char fname[FNAMELEN];
39  _sion_fileptr *sion_fileptr;
40 
41  double starttime, write_starttime, read_starttime, unlinktime, gunlinktime;
42  double timings[TIMINGS_MAX_NUM],ftimings[TIMINGS_MAX_NUM],gtimings[TIMINGS_MAX_NUM];
43  sion_int64 stats[STATS_MAX_NUM], fstats[STATS_MAX_NUM], gstats[STATS_MAX_NUM];
44 
45  sion_int64 left;
46  sion_int64 bsumwrote, bsumread;
47  size_t bwrite, bwrote, btoread, bread, chunkcnt;
48  double checksum_fp, checksum_read_fp;
49  int ser_count,ser_step,ser_done;
50  int api_to_use;
51  int do_debug;
52 
53  /* not working task ? */
54  if (communicators->work_size == -1) {
55  /* printf("wf: not working %d\n",communicators->all_rank); */
56  return (0);
57  }
58 
59  if(options->use_posix) {
60  api_to_use=SION_FILE_FLAG_POSIX;
61  }
62 #if defined(_SION_SIONFWD)
63  else if (options->use_sionfwd) {
64  api_to_use = SION_FILE_FLAG_SIONFWD;
65  }
66 #endif
67 #ifdef _SION_IME_NATIVE
68  else if (options->use_ime_native) {
69  api_to_use = SION_FILE_FLAG_IME_NATIVE;
70  }
71 #endif
72  else {
73  api_to_use=SION_FILE_FLAG_ANSI;
74  }
75 
76  for(i=0;i<TIMINGS_MAX_NUM;i++) timings[i]=ftimings[i]=gtimings[i]=0.0;
77  for(i=0;i<STATS_MAX_NUM;i++) stats[i]=fstats[i]=gstats[i]=0;
78 
79  do_debug=(((options->debug && communicators->all_rank == 0)) || ((options->Debug && (communicators->all_rank+1) == communicators->all_size)));
80 
81  sprintf(fname, "%s.%06d", filename, communicators->work_rank);
82 
83  if(options->do_write) {
84 
85  /****************************** WRITE *****************************/
86 
87  /* to synchronize start */
88  barrier_after_start(communicators->local);
89 
90  /* TIMING */ write_starttime = starttime = MPI_Wtime();
91 
92  /* FIRST OPEN to create files */
93  sion_fileptr = _sion_file_open(fname,api_to_use|SION_FILE_FLAG_WRITE|SION_FILE_FLAG_CREATE,0);
94  if (!sion_fileptr) {
95  fprintf(stderr, "cannot open %s for writing, aborting ...\n", fname);
96  MPI_Abort(communicators->all, 1);
97  }
98  stats[STATS_WR_NUM_FILES]=communicators->work_size;
99  /* TIMING */ timings[TIMINGS_WR_CREATE] = MPI_Wtime()-starttime;
100 
101  /* TIMING */ starttime = MPI_Wtime();
102  barrier_after_open(communicators->work);
103  /* TIMING */ timings[TIMINGS_WR_CREATE_BARR_OPEN] = MPI_Wtime()-starttime;
104 
105  /* TIMING */ starttime = MPI_Wtime();
106  _sion_file_close(sion_fileptr);
107  /* TIMING */ timings[TIMINGS_WR_CREATE_CLOSE] = MPI_Wtime()-starttime;
108 
109  /* TIMING */ starttime = MPI_Wtime();
110  barrier_after_open(communicators->work);
111  /* TIMING */ timings[TIMINGS_WR_CREATE_BARR_CLOSE] = MPI_Wtime()-starttime;
112 
113 
114  /* TIMING */ starttime = MPI_Wtime();
115  /* SECOND REOPEN of existing files */
116  sion_fileptr = _sion_file_open(fname,api_to_use|SION_FILE_FLAG_WRITE,0);
117  if (!sion_fileptr) {
118  fprintf(stderr, "cannot open %s for writing, aborting ...\n", fname);
119  MPI_Abort(communicators->all, 1);
120  }
121  /* TIMING */ timings[TIMINGS_WR_OPEN] = MPI_Wtime()-starttime;
122 
123  /* TIMING */ starttime = MPI_Wtime();
124  barrier_after_open(communicators->work);
125  /* TIMING */ timings[TIMINGS_WR_OPEN_BARR_GLOBAL] = MPI_Wtime()-starttime;
126 
127  /* TIMING */ starttime = MPI_Wtime();
128  if(options->serialize_blocknum>0) ser_step=options->serialize_blocknum;
129  else ser_step=communicators->local_size;
130  ser_done=0;
131  /* TIMING */ timings[TIMINGS_WR_WRITE_SYNC] += MPI_Wtime()-starttime;
132 
133 
134  for(ser_count=0;ser_count<communicators->local_size;ser_count+=ser_step) {
135 
136  if ((!ser_done) && communicators->local_rank<(ser_count+ser_step)) {
137  ser_done=1;
138 
139  /* TIMING */ starttime = MPI_Wtime();
140  left = options->totalsize;
141  bsumwrote = 0;
142  chunkcnt = 0;
143 
144  while (left > 0) {
145  bwrite = options->bufsize;
146  if (bwrite > left)
147  bwrite = left;
148 
149  if (do_debug) {
150  fprintf(stderr, "timings[%03d] write %lld bytes\n", communicators->all_rank, (sion_int64) bwrite);
151  }
152 
153  bwrote = _sion_file_write(localbuffer, bwrite, sion_fileptr);
154 
155  left -= (sion_int64) bwrote;
156  bsumwrote += (sion_int64) bwrote;
157  chunkcnt++;
158 
159 #ifdef CHECKSUM
160  if(!options->suppress_checksum) {
161  checksum_fp=0.0;
162  for (i = 0; i < bwrote; i++)
163  checksum_fp += (double) localbuffer[i];
164  }
165 #endif
166 
167  if (do_debug) {
168  fprintf(stderr, "timings[%03d] wrote (%lld bytes) %lld bytes (%10.4f MB) (%lld left)\n",
169  communicators->all_rank, (sion_int64) bwrote, bsumwrote, bsumwrote / 1024.0 / 1024.0, (sion_int64) left);
170  }
171 
172  } /* while (left>0) */
173 
174  /* TIMING */ timings[TIMINGS_WR_WRITE] += MPI_Wtime()-starttime;
175  stats[STATS_BYTES_WR_WROTE]+=bsumwrote;
176  stats[STATS_BYTES_WR_NUM_CHUNKS]+=chunkcnt;
177 
178  } /* ser */
179 
180  /* TIMING */ starttime = MPI_Wtime();
181  barrier_after_write(communicators->local);
182  /* TIMING */ timings[TIMINGS_WR_WRITE_BARR_FILE] += MPI_Wtime()-starttime;
183 
184  } /* for(ser ...) */
185 
186  /* TIMING */ starttime = MPI_Wtime();
187  barrier_after_write(communicators->work);
188  /* TIMING */ timings[TIMINGS_WR_WRITE_BARR_GLOBAL] = MPI_Wtime()-starttime;
189 
190  /* TIMING */ starttime = MPI_Wtime();
191  _sion_file_close(sion_fileptr);
192  /* TIMING */ timings[TIMINGS_WR_CLOSE] = MPI_Wtime()-starttime;
193 
194  /* TIMING */ starttime = MPI_Wtime();
195  barrier_after_close(communicators->work);
196  /* TIMING */ timings[TIMINGS_WR_CLOSE_BARR_GLOBAL] = MPI_Wtime()-starttime;
197 
198  /* TIMING */ timings[TIMINGS_WR_TOTAL] = MPI_Wtime()-write_starttime;
199 
200 
201  if (options->verbose) {
202  write_timings("TASK",TIMINGS_METHOD_WRITE,timings,stats,communicators,options,1);
203  }
204 
205  if (do_debug) {
206  write_timings("TASK",TIMINGS_METHOD_WRITE,timings,stats,communicators,options,0);
207  }
208 
209  MPI_Reduce(timings, gtimings, TIMINGS_MAX_NUM, MPI_DOUBLE, MPI_MAX, 0, communicators->work);
210  MPI_Reduce(stats, gstats, STATS_MAX_NUM, SION_MPI_INT64, MPI_SUM, 0, communicators->work);
211 
212  if (communicators->work_rank == 0) {
213  fprintf(stderr, "------------------------------------------------------------------------------------------\n");
214  write_timings("TOTAL",TIMINGS_METHOD_WRITE,gtimings,gstats,communicators,options,0);
215  fprintf(stderr, "------------------------------------------------------------------------------------------\n");
216  }
217 
218  } /* do_write */
219 
220  /* to synchronize after write */
221  barrier_after_close(communicators->work);
222 
223  if(options->do_read) {
224  /****************************** READ *****************************/
225 
226  sprintf(fname, "%s.%06d", filename, communicators->workread_rank);
227  /* printf("on rank %3d READ: filename %s\n",communicators->all_rank,fname); */
228 
229 
230  /* to synchronize start */
231  barrier_after_start(communicators->local);
232 
233  /* TIMING */ read_starttime = starttime = MPI_Wtime();
234 
235  sion_fileptr = _sion_file_open(fname,api_to_use|SION_FILE_FLAG_READ,0);
236  if (!sion_fileptr) {
237  fprintf(stderr, "cannot open %s for writing, aborting ...\n", fname);
238  MPI_Abort(communicators->all, 1);
239  }
240  stats[STATS_RD_NUM_FILES]=communicators->work_size;
241  /* TIMING */ timings[TIMINGS_RD_OPEN] = MPI_Wtime()-starttime;
242 
243  /* TIMING */ starttime = MPI_Wtime();
244  barrier_after_open(communicators->work);
245  /* TIMING */ timings[TIMINGS_RD_OPEN_BARR_GLOBAL] = MPI_Wtime()-starttime;
246 
247 
248  /* TIMING */ starttime = MPI_Wtime();
249  if(options->serialize_blocknum>0) ser_step=options->serialize_blocknum;
250  else ser_step=communicators->local_size;
251  ser_done=0;
252  /* TIMING */ timings[TIMINGS_RD_READ_SYNC] += MPI_Wtime()-starttime;
253 
254  for(ser_count=0;ser_count<communicators->local_size;ser_count+=ser_step) {
255 
256  if ((!ser_done) && communicators->local_rank<(ser_count+ser_step)) {
257  ser_done=1;
258 
259  /* TIMING */ starttime = MPI_Wtime();
260  left = options->totalsize;
261  bsumread = 0;
262  chunkcnt = 0;
263 
264  while (left > 0) {
265  btoread = options->bufsize;
266  if (btoread > left)
267  btoread = left;
268 
269  bread = _sion_file_read(localbuffer, btoread, sion_fileptr);
270 
271  left -= bread;
272  bsumread += bread;
273  chunkcnt++;
274 
275 #ifdef CHECKSUM
276  if(!options->suppress_checksum) {
277  checksum_read_fp=0.0;
278  for (i = 0; i < bread; i++)
279  checksum_read_fp += (double) localbuffer[i];
280  }
281 #endif
282 
283  if (do_debug) {
284  fprintf(stderr, "timings[%03d] read (%lld bytes) %lld bytes (%10.4f MB) (%lld left)\n", communicators->all_rank, (sion_int64) bread, bsumread,
285  bsumread / 1024.0 / 1024.0, (sion_int64) left);
286  }
287 
288  } /* while(left>0) */
289 
290  /* TIMING */ timings[TIMINGS_RD_READ] += MPI_Wtime()-starttime;
291 
292  stats[STATS_BYTES_RD_READ]+=bsumread;
293  stats[STATS_BYTES_RD_NUM_CHUNKS]+=chunkcnt;
294 
295  /* TIMING */ starttime = MPI_Wtime();
296  barrier_after_read(communicators->local);
297  /* TIMING */ timings[TIMINGS_RD_READ_BARR_FILE] += MPI_Wtime()-starttime;
298 
299  } /* if (ser...) */
300 
301  } /* for (ser ...) */
302 
303  /* TIMING */ starttime = MPI_Wtime();
304  barrier_after_read(communicators->workread);
305  /* TIMING */ timings[TIMINGS_RD_READ_BARR_GLOBAL] = MPI_Wtime()-starttime;
306 
307  /* TIMING */ starttime = MPI_Wtime();
308  _sion_file_close(sion_fileptr);
309  /* TIMING */ timings[TIMINGS_RD_CLOSE] = MPI_Wtime()-starttime;
310 
311  /* TIMING */ starttime = MPI_Wtime();
312  barrier_after_close(communicators->workread);
313  /* TIMING */ timings[TIMINGS_RD_CLOSE_BARR_GLOBAL] = MPI_Wtime()-starttime;
314  /* TIMING */ timings[TIMINGS_RD_TOTAL] = MPI_Wtime()-read_starttime;
315 
316  /* TIMING */ starttime = MPI_Wtime();
317 
318  if (timings[TIMINGS_RD_TOTAL] == 0) timings[TIMINGS_RD_TOTAL] = -1;
319 
320  if (options->verbose) {
321  write_timings("TASK",TIMINGS_METHOD_READ,timings,stats,communicators,options,1);
322  }
323 
324  if (do_debug) {
325  write_timings("TASK",TIMINGS_METHOD_READ,timings,stats,communicators,options,0);
326  }
327 
328  /* TIMING */ timings[TIMINGS_MSGS] += MPI_Wtime()-starttime;
329  if (options->numfiles > 1) {
330 
331  MPI_Reduce(timings, ftimings, TIMINGS_MAX_NUM, MPI_DOUBLE, MPI_MAX, 0, communicators->local);
332  MPI_Reduce(stats, fstats, STATS_MAX_NUM, SION_MPI_INT64, MPI_SUM, 0, communicators->local);
333 
334  if (communicators->local_rank == 0) {
335  write_timings("FILE",TIMINGS_METHOD_READ,ftimings,fstats,communicators,options,0);
336  }
337 
338  }
339 
340  MPI_Reduce(timings, gtimings, TIMINGS_MAX_NUM, MPI_DOUBLE, MPI_MAX, 0, communicators->workread);
341  MPI_Reduce(stats, gstats, STATS_MAX_NUM, SION_MPI_INT64, MPI_SUM, 0, communicators->workread);
342 
343  /* */ DPRINTFTS(communicators->all_rank, "after red.");
344  if (communicators->workread_rank == 0) {
345  fprintf(stderr, "------------------------------------------------------------------------------------------\n");
346  write_timings("TOTAL",TIMINGS_METHOD_READ,gtimings,gstats,communicators,options,0);
347  fprintf(stderr, "------------------------------------------------------------------------------------------\n");
348  }
349 
350 #ifdef CHECKSUM
351  if(!options->suppress_checksum) {
352  if (fabs(checksum_fp - checksum_read_fp) > 1e-5) {
353  fprintf(stderr, "timings[%03d] ERROR in double checksum %14.10f==%14.10f, diff=%14.10f\n", communicators->all_rank,
354  checksum_fp, checksum_read_fp, checksum_fp - checksum_read_fp);
355  }
356  }
357 #endif
358 
359  }
360 
361  if(options->unlink_files) {
362  /* */ starttime = MPI_Wtime();
363  barrier_before_unlink(communicators->work);
364  unlink(fname);
365  barrier_after_unlink(communicators->work);
366  /* */ unlinktime = MPI_Wtime() - starttime;
367  MPI_Reduce(&unlinktime, &gunlinktime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->work);
368  if (communicators->work_rank == 0) {
369  fprintf(stderr, "partest result: ultime=%10.6fs unlink %s\n", gunlinktime, fname);
370  }
371  }
372 
373  return (1);
374 }
int _sion_file_close(_sion_fileptr *sion_fileptr)
Close file and destroys fileptr structure.
Definition: sion_file.c:178
sion_int64 _sion_file_write(const void *data, sion_int64 bytes, _sion_fileptr *sion_fileptr)
Write data to file.
Definition: sion_file.c:221
_sion_fileptr * _sion_file_open(const char *fname, unsigned int flags, unsigned int addflags)
Create and open a new file for writing.
Definition: sion_file.c:53
sion_int64 _sion_file_read(void *data, sion_int64 bytes, _sion_fileptr *sion_fileptr)
Read data from file.
Definition: sion_file.c:257
#define SION_FILE_FLAG_READ
Definition: sion_file.h:30
#define SION_FILE_FLAG_CREATE
Definition: sion_file.h:28
#define SION_FILE_FLAG_ANSI
Definition: sion_file.h:25
#define SION_FILE_FLAG_WRITE
Definition: sion_file.h:29
#define SION_FILE_FLAG_POSIX
Definition: sion_file.h:27
Sion Time Stamp Header.