SIONlib  1.7.1
Scalable I/O library for parallel access to task-local files
partest_tasklocalfile.c
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2016 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
10 #include <stdio.h>
11 #include <stdlib.h>
12 #include <string.h>
13 #include <ctype.h>
14 #include <unistd.h>
15 #include <mpi.h>
16 #include <time.h>
17 #include <math.h>
18 
19 #include "sion.h"
20 #include "sion_printts.h"
21 #include "sion_debug.h"
22 #include "sion_file.h"
23 #include "partest.h"
24 #include "partest_util.h"
25 
26 int
27 test_single_mpi(char *filename,
28  char *localbuffer,
29  _test_communicators *communicators,
30  _test_options *options) {
31 
32  int i;
33  char fname[FNAMELEN];
34  _sion_fileptr *sion_fileptr;
35 
36  double starttime, write_starttime, read_starttime, unlinktime, gunlinktime;
37  double timings[TIMINGS_MAX_NUM],ftimings[TIMINGS_MAX_NUM],gtimings[TIMINGS_MAX_NUM];
38  sion_int64 stats[STATS_MAX_NUM], fstats[STATS_MAX_NUM], gstats[STATS_MAX_NUM];
39 
40  sion_int64 left;
41  sion_int64 bsumwrote, bsumread;
42  size_t bwrite, bwrote, btoread, bread, chunkcnt;
43  double checksum_fp, checksum_read_fp;
44  int ser_count,ser_step,ser_done;
45  int api_to_use;
46  int do_debug;
47 
48  /* not working task ? */
49  if (communicators->work_size == -1) {
50  /* printf("wf: not working %d\n",communicators->all_rank); */
51  return (0);
52  }
53 
54  if(options->use_posix) {
55  api_to_use=SION_FILE_FLAG_POSIX;
56  } else {
57  api_to_use=SION_FILE_FLAG_ANSI;
58  }
59 
60  for(i=0;i<TIMINGS_MAX_NUM;i++) timings[i]=ftimings[i]=gtimings[i]=0.0;
61  for(i=0;i<STATS_MAX_NUM;i++) stats[i]=fstats[i]=gstats[i]=0;
62 
63  do_debug=(((options->debug && communicators->all_rank == 0)) || ((options->Debug && (communicators->all_rank+1) == communicators->all_size)));
64 
65  sprintf(fname, "%s.%06d", filename, communicators->work_rank);
66 
67  if(options->do_write) {
68 
69  /****************************** WRITE *****************************/
70 
71  /* to synchronize start */
72  barrier_after_start(communicators->local);
73 
74  /* TIMING */ write_starttime = starttime = MPI_Wtime();
75 
76  /* FIRST OPEN to create files */
77  sion_fileptr = _sion_file_open(fname,api_to_use|SION_FILE_FLAG_WRITE|SION_FILE_FLAG_CREATE,0);
78  if (!sion_fileptr) {
79  fprintf(stderr, "cannot open %s for writing, aborting ...\n", fname);
80  MPI_Abort(communicators->all, 1);
81  }
82  stats[STATS_WR_NUM_FILES]=communicators->work_size;
83  /* TIMING */ timings[TIMINGS_WR_CREATE] = MPI_Wtime()-starttime;
84 
85  /* TIMING */ starttime = MPI_Wtime();
86  barrier_after_open(communicators->work);
87  /* TIMING */ timings[TIMINGS_WR_CREATE_BARR_OPEN] = MPI_Wtime()-starttime;
88 
89  /* TIMING */ starttime = MPI_Wtime();
90  _sion_file_close(sion_fileptr);
91  /* TIMING */ timings[TIMINGS_WR_CREATE_CLOSE] = MPI_Wtime()-starttime;
92 
93  /* TIMING */ starttime = MPI_Wtime();
94  barrier_after_open(communicators->work);
95  /* TIMING */ timings[TIMINGS_WR_CREATE_BARR_CLOSE] = MPI_Wtime()-starttime;
96 
97 
98  /* TIMING */ starttime = MPI_Wtime();
99  /* SECOND REOPEN of existing files */
100  sion_fileptr = _sion_file_open(fname,api_to_use|SION_FILE_FLAG_WRITE,0);
101  if (!sion_fileptr) {
102  fprintf(stderr, "cannot open %s for writing, aborting ...\n", fname);
103  MPI_Abort(communicators->all, 1);
104  }
105  /* TIMING */ timings[TIMINGS_WR_OPEN] = MPI_Wtime()-starttime;
106 
107  /* TIMING */ starttime = MPI_Wtime();
108  barrier_after_open(communicators->work);
109  /* TIMING */ timings[TIMINGS_WR_OPEN_BARR_GLOBAL] = MPI_Wtime()-starttime;
110 
111  /* TIMING */ starttime = MPI_Wtime();
112  if(options->serialize_blocknum>0) ser_step=options->serialize_blocknum;
113  else ser_step=communicators->local_size;
114  ser_done=0;
115  /* TIMING */ timings[TIMINGS_WR_WRITE_SYNC] += MPI_Wtime()-starttime;
116 
117 
118  for(ser_count=0;ser_count<communicators->local_size;ser_count+=ser_step) {
119 
120  if ((!ser_done) && communicators->local_rank<(ser_count+ser_step)) {
121  ser_done=1;
122 
123  /* TIMING */ starttime = MPI_Wtime();
124  left = options->totalsize;
125  bsumwrote = 0;
126  chunkcnt = 0;
127 
128  while (left > 0) {
129  bwrite = options->bufsize;
130  if (bwrite > left)
131  bwrite = left;
132 
133  if (do_debug) {
134  fprintf(stderr, "timings[%03d] write %lld bytes\n", communicators->all_rank, (sion_int64) bwrite);
135  }
136 
137  bwrote = _sion_file_write(localbuffer, bwrite, sion_fileptr);
138 
139  left -= (sion_int64) bwrote;
140  bsumwrote += (sion_int64) bwrote;
141  chunkcnt++;
142 
143 #ifdef CHECKSUM
144  if(!options->suppress_checksum) {
145  checksum_fp=0.0;
146  for (i = 0; i < bwrote; i++)
147  checksum_fp += (double) localbuffer[i];
148  }
149 #endif
150 
151  if (do_debug) {
152  fprintf(stderr, "timings[%03d] wrote (%lld bytes) %lld bytes (%10.4f MB) (%lld left)\n",
153  communicators->all_rank, (sion_int64) bwrote, bsumwrote, bsumwrote / 1024.0 / 1024.0, (sion_int64) left);
154  }
155 
156  } /* while (left>0) */
157 
158  /* TIMING */ timings[TIMINGS_WR_WRITE] += MPI_Wtime()-starttime;
159  stats[STATS_BYTES_WR_WROTE]+=bsumwrote;
160  stats[STATS_BYTES_WR_NUM_CHUNKS]+=chunkcnt;
161 
162  } /* ser */
163 
164  /* TIMING */ starttime = MPI_Wtime();
165  barrier_after_write(communicators->local);
166  /* TIMING */ timings[TIMINGS_WR_WRITE_BARR_FILE] += MPI_Wtime()-starttime;
167 
168  } /* for(ser ...) */
169 
170  /* TIMING */ starttime = MPI_Wtime();
171  barrier_after_write(communicators->work);
172  /* TIMING */ timings[TIMINGS_WR_WRITE_BARR_GLOBAL] = MPI_Wtime()-starttime;
173 
174  /* TIMING */ starttime = MPI_Wtime();
175  _sion_file_close(sion_fileptr);
176  /* TIMING */ timings[TIMINGS_WR_CLOSE] = MPI_Wtime()-starttime;
177 
178  /* TIMING */ starttime = MPI_Wtime();
179  barrier_after_close(communicators->work);
180  /* TIMING */ timings[TIMINGS_WR_CLOSE_BARR_GLOBAL] = MPI_Wtime()-starttime;
181 
182  /* TIMING */ timings[TIMINGS_WR_TOTAL] = MPI_Wtime()-write_starttime;
183 
184 
185  if (options->verbose) {
186  write_timings("TASK",TIMINGS_METHOD_WRITE,timings,stats,communicators,options,1);
187  }
188 
189  if (do_debug) {
190  write_timings("TASK",TIMINGS_METHOD_WRITE,timings,stats,communicators,options,0);
191  }
192 
193  MPI_Reduce(timings, gtimings, TIMINGS_MAX_NUM, MPI_DOUBLE, MPI_MAX, 0, communicators->work);
194  MPI_Reduce(stats, gstats, STATS_MAX_NUM, SION_MPI_INT64, MPI_SUM, 0, communicators->work);
195 
196  if (communicators->work_rank == 0) {
197  fprintf(stderr, "------------------------------------------------------------------------------------------\n");
198  write_timings("TOTAL",TIMINGS_METHOD_WRITE,gtimings,gstats,communicators,options,0);
199  fprintf(stderr, "------------------------------------------------------------------------------------------\n");
200  }
201 
202  } /* do_write */
203 
204  /* to synchronize after write */
205  barrier_after_close(communicators->work);
206 
207  if(options->do_read) {
208  /****************************** READ *****************************/
209 
210  sprintf(fname, "%s.%06d", filename, communicators->workread_rank);
211  /* printf("on rank %3d READ: filename %s\n",communicators->all_rank,fname); */
212 
213 
214  /* to synchronize start */
215  barrier_after_start(communicators->local);
216 
217  /* TIMING */ read_starttime = starttime = MPI_Wtime();
218 
219  sion_fileptr = _sion_file_open(fname,api_to_use|SION_FILE_FLAG_READ,0);
220  if (!sion_fileptr) {
221  fprintf(stderr, "cannot open %s for writing, aborting ...\n", fname);
222  MPI_Abort(communicators->all, 1);
223  }
224  stats[STATS_RD_NUM_FILES]=communicators->work_size;
225  /* TIMING */ timings[TIMINGS_RD_OPEN] = MPI_Wtime()-starttime;
226 
227  /* TIMING */ starttime = MPI_Wtime();
228  barrier_after_open(communicators->work);
229  /* TIMING */ timings[TIMINGS_RD_OPEN_BARR_GLOBAL] = MPI_Wtime()-starttime;
230 
231 
232  /* TIMING */ starttime = MPI_Wtime();
233  if(options->serialize_blocknum>0) ser_step=options->serialize_blocknum;
234  else ser_step=communicators->local_size;
235  ser_done=0;
236  /* TIMING */ timings[TIMINGS_RD_READ_SYNC] += MPI_Wtime()-starttime;
237 
238  for(ser_count=0;ser_count<communicators->local_size;ser_count+=ser_step) {
239 
240  if ((!ser_done) && communicators->local_rank<(ser_count+ser_step)) {
241  ser_done=1;
242 
243  /* TIMING */ starttime = MPI_Wtime();
244  left = options->totalsize;
245  bsumread = 0;
246  chunkcnt = 0;
247 
248  while (left > 0) {
249  btoread = options->bufsize;
250  if (btoread > left)
251  btoread = left;
252 
253  bread = _sion_file_read(localbuffer, btoread, sion_fileptr);
254 
255  left -= bread;
256  bsumread += bread;
257  chunkcnt++;
258 
259 #ifdef CHECKSUM
260  if(!options->suppress_checksum) {
261  checksum_read_fp=0.0;
262  for (i = 0; i < bread; i++)
263  checksum_read_fp += (double) localbuffer[i];
264  }
265 #endif
266 
267  if (do_debug) {
268  fprintf(stderr, "timings[%03d] read (%lld bytes) %lld bytes (%10.4f MB) (%lld left)\n", communicators->all_rank, (sion_int64) bread, bsumread,
269  bsumread / 1024.0 / 1024.0, (sion_int64) left);
270  }
271 
272  } /* while(left>0) */
273 
274  /* TIMING */ timings[TIMINGS_RD_READ] += MPI_Wtime()-starttime;
275 
276  stats[STATS_BYTES_RD_READ]+=bsumread;
277  stats[STATS_BYTES_RD_NUM_CHUNKS]+=chunkcnt;
278 
279  /* TIMING */ starttime = MPI_Wtime();
280  barrier_after_read(communicators->local);
281  /* TIMING */ timings[TIMINGS_RD_READ_BARR_FILE] += MPI_Wtime()-starttime;
282 
283  } /* if (ser...) */
284 
285  } /* for (ser ...) */
286 
287  /* TIMING */ starttime = MPI_Wtime();
288  barrier_after_read(communicators->workread);
289  /* TIMING */ timings[TIMINGS_RD_READ_BARR_GLOBAL] = MPI_Wtime()-starttime;
290 
291  /* TIMING */ starttime = MPI_Wtime();
292  _sion_file_close(sion_fileptr);
293  /* TIMING */ timings[TIMINGS_RD_CLOSE] = MPI_Wtime()-starttime;
294 
295  /* TIMING */ starttime = MPI_Wtime();
296  barrier_after_close(communicators->workread);
297  /* TIMING */ timings[TIMINGS_RD_CLOSE_BARR_GLOBAL] = MPI_Wtime()-starttime;
298  /* TIMING */ timings[TIMINGS_RD_TOTAL] = MPI_Wtime()-read_starttime;
299 
300  /* TIMING */ starttime = MPI_Wtime();
301 
302  if (timings[TIMINGS_RD_TOTAL] == 0) timings[TIMINGS_RD_TOTAL] = -1;
303 
304  if (options->verbose) {
305  write_timings("TASK",TIMINGS_METHOD_READ,timings,stats,communicators,options,1);
306  }
307 
308  if (do_debug) {
309  write_timings("TASK",TIMINGS_METHOD_READ,timings,stats,communicators,options,0);
310  }
311 
312  /* TIMING */ timings[TIMINGS_MSGS] += MPI_Wtime()-starttime;
313  if (options->numfiles > 1) {
314 
315  MPI_Reduce(timings, ftimings, TIMINGS_MAX_NUM, MPI_DOUBLE, MPI_MAX, 0, communicators->local);
316  MPI_Reduce(stats, fstats, STATS_MAX_NUM, SION_MPI_INT64, MPI_SUM, 0, communicators->local);
317 
318  if (communicators->local_rank == 0) {
319  write_timings("FILE",TIMINGS_METHOD_READ,ftimings,fstats,communicators,options,0);
320  }
321 
322  }
323 
324  MPI_Reduce(timings, gtimings, TIMINGS_MAX_NUM, MPI_DOUBLE, MPI_MAX, 0, communicators->workread);
325  MPI_Reduce(stats, gstats, STATS_MAX_NUM, SION_MPI_INT64, MPI_SUM, 0, communicators->workread);
326 
327  /* */ DPRINTFTS(communicators->all_rank, "after red.");
328  if (communicators->workread_rank == 0) {
329  fprintf(stderr, "------------------------------------------------------------------------------------------\n");
330  write_timings("TOTAL",TIMINGS_METHOD_READ,gtimings,gstats,communicators,options,0);
331  fprintf(stderr, "------------------------------------------------------------------------------------------\n");
332  }
333 
334 #ifdef CHECKSUM
335  if(!options->suppress_checksum) {
336  if (fabs(checksum_fp - checksum_read_fp) > 1e-5) {
337  fprintf(stderr, "timings[%03d] ERROR in double checksum %14.10f==%14.10f, diff=%14.10f\n", communicators->all_rank,
338  checksum_fp, checksum_read_fp, checksum_fp - checksum_read_fp);
339  }
340  }
341 #endif
342 
343  }
344 
345  if(options->unlink_files) {
346  /* */ starttime = MPI_Wtime();
347  barrier_before_unlink(communicators->work);
348  unlink(fname);
349  barrier_after_unlink(communicators->work);
350  /* */ unlinktime = MPI_Wtime() - starttime;
351  MPI_Reduce(&unlinktime, &gunlinktime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->work);
352  if (communicators->work_rank == 0) {
353  fprintf(stderr, "partest result: ultime=%10.6fs unlink %s\n", gunlinktime, fname);
354  }
355  }
356 
357  return (1);
358 }
sion_int64 _sion_file_write(const void *data, sion_int64 bytes, _sion_fileptr *sion_fileptr)
Write data to file.
Definition: sion_file.c:148
_sion_fileptr * _sion_file_open(const char *fname, unsigned int flags, unsigned int addflags)
Create and open a new file for writing.
Definition: sion_file.c:41
#define SION_FILE_FLAG_WRITE
Definition: sion_file.h:23
sion_int64 _sion_file_read(void *data, sion_int64 bytes, _sion_fileptr *sion_fileptr)
Read data from file.
Definition: sion_file.c:169
#define SION_FILE_FLAG_READ
Definition: sion_file.h:24
#define SION_FILE_FLAG_POSIX
Definition: sion_file.h:21
#define SION_FILE_FLAG_ANSI
Definition: sion_file.h:19
#define SION_FILE_FLAG_CREATE
Definition: sion_file.h:22
int _sion_file_close(_sion_fileptr *sion_fileptr)
Close file and destroys fileptr structure.
Definition: sion_file.c:118
Sion Time Stamp Header.