SIONlib  1.6.2
Scalable I/O library for parallel access to task-local files
partest_tasklocalfile.c
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2016 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
10 #include <stdio.h>
11 #include <stdlib.h>
12 #include <string.h>
13 #include <ctype.h>
14 #include <unistd.h>
15 #include <mpi.h>
16 #include <time.h>
17 #include <math.h>
18 
19 #include "sion.h"
20 #include "sion_printts.h"
21 #include "sion_debug.h"
22 #include "sion_file.h"
23 #include "partest.h"
24 #include "partest_util.h"
25 
26 int
27 test_single_mpi(char *filename,
28  char *localbuffer,
29  _test_communicators *communicators,
30  _test_options *options) {
31 
32  int i;
33  char fname[FNAMELEN];
34  _sion_fileptr *sion_fileptr;
35 
36  double starttime, write_starttime, read_starttime, unlinktime, gunlinktime;
37  double timings[TIMINGS_MAX_NUM],ftimings[TIMINGS_MAX_NUM],gtimings[TIMINGS_MAX_NUM];
38  sion_int64 stats[STATS_MAX_NUM], fstats[STATS_MAX_NUM], gstats[STATS_MAX_NUM];
39 
40  sion_int64 left;
41  sion_int64 bsumwrote, bsumread;
42  size_t bwrite, bwrote, btoread, bread, chunkcnt;
43  double checksum_fp, checksum_read_fp;
44  int ser_count,ser_step,ser_done;
45  int api_to_use;
46  int do_debug;
47 
48  /* not working task ? */
49  if (communicators->work_size == -1) {
50  /* printf("wf: not working %d\n",communicators->all_rank); */
51  return (0);
52  }
53 
54  if(options->use_posix) {
55  api_to_use=SION_FILE_FLAG_POSIX;
56  } else {
57  api_to_use=SION_FILE_FLAG_ANSI;
58  }
59 
60  for(i=0;i<TIMINGS_MAX_NUM;i++) timings[i]=ftimings[i]=gtimings[i]=0.0;
61  for(i=0;i<STATS_MAX_NUM;i++) stats[i]=fstats[i]=gstats[i]=0;
62 
63  do_debug=(((options->debug && communicators->all_rank == 0)) || ((options->Debug && (communicators->all_rank+1) == communicators->all_size)));
64 
65  sprintf(fname, "%s.%06d", filename, communicators->work_rank);
66 
67  if(options->do_write) {
68 
69  /****************************** WRITE *****************************/
70 
71  /* to synchronize start */
72  barrier_after_start(communicators->local);
73 
74  /* TIMING */ write_starttime = starttime = MPI_Wtime();
75 
76  /* FIRST OPEN to create files */
77  sion_fileptr = _sion_file_open(fname,api_to_use|SION_FILE_FLAG_WRITE|SION_FILE_FLAG_CREATE,0);
78  if (!sion_fileptr) {
79  fprintf(stderr, "cannot open %s for writing, aborting ...\n", fname);
80  MPI_Abort(communicators->all, 1);
81  }
82  stats[STATS_WR_NUM_FILES]=communicators->work_size;
83  /* TIMING */ timings[TIMINGS_WR_CREATE] = MPI_Wtime()-starttime;
84 
85  /* TIMING */ starttime = MPI_Wtime();
86  barrier_after_open(communicators->work);
87  /* TIMING */ timings[TIMINGS_WR_CREATE_BARR_OPEN] = MPI_Wtime()-starttime;
88 
89  /* TIMING */ starttime = MPI_Wtime();
90  _sion_file_close(sion_fileptr);
91  /* TIMING */ timings[TIMINGS_WR_CREATE_CLOSE] = MPI_Wtime()-starttime;
92 
93  /* TIMING */ starttime = MPI_Wtime();
94  barrier_after_open(communicators->work);
95  /* TIMING */ timings[TIMINGS_WR_CREATE_BARR_CLOSE] = MPI_Wtime()-starttime;
96 
97 
98  /* TIMING */ starttime = MPI_Wtime();
99  /* SECOND REOPEN of existing files */
100  sion_fileptr = _sion_file_open(fname,api_to_use|SION_FILE_FLAG_WRITE,0);
101  if (!sion_fileptr) {
102  fprintf(stderr, "cannot open %s for writing, aborting ...\n", fname);
103  MPI_Abort(communicators->all, 1);
104  }
105  /* TIMING */ timings[TIMINGS_WR_OPEN] = MPI_Wtime()-starttime;
106 
107  /* TIMING */ starttime = MPI_Wtime();
108  barrier_after_open(communicators->work);
109  /* TIMING */ timings[TIMINGS_WR_OPEN_BARR_GLOBAL] = MPI_Wtime()-starttime;
110 
111  /* TIMING */ starttime = MPI_Wtime();
112  if(options->serialize_blocknum>0) ser_step=options->serialize_blocknum;
113  else ser_step=communicators->local_size;
114  ser_done=0;
115  /* TIMING */ timings[TIMINGS_WR_WRITE_SYNC] += MPI_Wtime()-starttime;
116 
117 
118  for(ser_count=0;ser_count<communicators->local_size;ser_count+=ser_step) {
119 
120  if ((!ser_done) && communicators->local_rank<(ser_count+ser_step)) {
121  ser_done=1;
122 
123  /* TIMING */ starttime = MPI_Wtime();
124  left = options->totalsize;
125  bsumwrote = 0;
126  chunkcnt = 0;
127  bwrote = 0;
128 
129  while (left > 0) {
130  bwrite = options->bufsize;
131  if (bwrite > left)
132  bwrite = left;
133 
134  if (do_debug) {
135  fprintf(stderr, "timings[%03d] write %lld bytes\n", communicators->all_rank, (sion_int64) bwrite);
136  }
137 
138  bwrote = _sion_file_write(localbuffer, bwrite, sion_fileptr);
139 
140  left -= (sion_int64) bwrote;
141  bsumwrote += (sion_int64) bwrote;
142  chunkcnt++;
143 
144 #ifdef CHECKSUM
145  if(!options->suppress_checksum) {
146  checksum_fp=0.0;
147  for (i = 0; i < bwrote; i++)
148  checksum_fp += (double) localbuffer[i];
149  }
150 #endif
151 
152  if (do_debug) {
153  fprintf(stderr, "timings[%03d] wrote (%lld bytes) %lld bytes (%10.4f MB) (%lld left)\n",
154  communicators->all_rank, (sion_int64) bwrote, bsumwrote, bsumwrote / 1024.0 / 1024.0, (sion_int64) left);
155  }
156 
157  } /* while (left>0) */
158 
159  /* TIMING */ timings[TIMINGS_WR_WRITE] += MPI_Wtime()-starttime;
160  stats[STATS_BYTES_WR_WROTE]+=bsumwrote;
161  stats[STATS_BYTES_WR_NUM_CHUNKS]+=chunkcnt;
162 
163  } /* ser */
164 
165  /* TIMING */ starttime = MPI_Wtime();
166  barrier_after_write(communicators->local);
167  /* TIMING */ timings[TIMINGS_WR_WRITE_BARR_FILE] += MPI_Wtime()-starttime;
168 
169  } /* for(ser ...) */
170 
171  /* TIMING */ starttime = MPI_Wtime();
172  barrier_after_write(communicators->work);
173  /* TIMING */ timings[TIMINGS_WR_WRITE_BARR_GLOBAL] = MPI_Wtime()-starttime;
174 
175  /* TIMING */ starttime = MPI_Wtime();
176  _sion_file_close(sion_fileptr);
177  /* TIMING */ timings[TIMINGS_WR_CLOSE] = MPI_Wtime()-starttime;
178 
179  /* TIMING */ starttime = MPI_Wtime();
180  barrier_after_close(communicators->work);
181  /* TIMING */ timings[TIMINGS_WR_CLOSE_BARR_GLOBAL] = MPI_Wtime()-starttime;
182 
183  /* TIMING */ timings[TIMINGS_WR_TOTAL] = MPI_Wtime()-write_starttime;
184 
185  /* TIMING */ starttime = MPI_Wtime();
186 
187 
188  if (options->verbose) {
189  write_timings("TASK",TIMINGS_METHOD_WRITE,timings,stats,communicators,options,1);
190  }
191 
192  if (do_debug) {
193  write_timings("TASK",TIMINGS_METHOD_WRITE,timings,stats,communicators,options,0);
194  }
195 
196  MPI_Reduce(timings, gtimings, TIMINGS_MAX_NUM, MPI_DOUBLE, MPI_MAX, 0, communicators->work);
197  MPI_Reduce(stats, gstats, STATS_MAX_NUM, SION_MPI_INT64, MPI_SUM, 0, communicators->work);
198 
199  if (communicators->work_rank == 0) {
200  fprintf(stderr, "------------------------------------------------------------------------------------------\n");
201  write_timings("TOTAL",TIMINGS_METHOD_WRITE,gtimings,gstats,communicators,options,0);
202  fprintf(stderr, "------------------------------------------------------------------------------------------\n");
203  }
204 
205  } /* do_write */
206 
207  /* to synchronize after write */
208  barrier_after_close(communicators->work);
209 
210  if(options->do_read) {
211  /****************************** READ *****************************/
212 
213  sprintf(fname, "%s.%06d", filename, communicators->workread_rank);
214  /* printf("on rank %3d READ: filename %s\n",communicators->all_rank,fname); */
215 
216 
217  /* to synchronize start */
218  barrier_after_start(communicators->local);
219 
220  /* TIMING */ read_starttime = starttime = MPI_Wtime();
221 
222  sion_fileptr = _sion_file_open(fname,api_to_use|SION_FILE_FLAG_READ,0);
223  if (!sion_fileptr) {
224  fprintf(stderr, "cannot open %s for writing, aborting ...\n", fname);
225  MPI_Abort(communicators->all, 1);
226  }
227  stats[STATS_RD_NUM_FILES]=communicators->work_size;
228  /* TIMING */ timings[TIMINGS_RD_OPEN] = MPI_Wtime()-starttime;
229 
230  /* TIMING */ starttime = MPI_Wtime();
231  barrier_after_open(communicators->work);
232  /* TIMING */ timings[TIMINGS_RD_OPEN_BARR_GLOBAL] = MPI_Wtime()-starttime;
233 
234 
235  /* TIMING */ starttime = MPI_Wtime();
236  if(options->serialize_blocknum>0) ser_step=options->serialize_blocknum;
237  else ser_step=communicators->local_size;
238  ser_done=0;
239  /* TIMING */ timings[TIMINGS_RD_READ_SYNC] += MPI_Wtime()-starttime;
240 
241  for(ser_count=0;ser_count<communicators->local_size;ser_count+=ser_step) {
242 
243  if ((!ser_done) && communicators->local_rank<(ser_count+ser_step)) {
244  ser_done=1;
245 
246  /* TIMING */ starttime = MPI_Wtime();
247  left = options->totalsize;
248  bsumread = 0;
249  chunkcnt = 0;
250 
251  while (left > 0) {
252  btoread = options->bufsize;
253  if (btoread > left)
254  btoread = left;
255 
256  bread = _sion_file_read(localbuffer, btoread, sion_fileptr);
257 
258  left -= bread;
259  bsumread += bread;
260  chunkcnt++;
261 
262 #ifdef CHECKSUM
263  if(!options->suppress_checksum) {
264  checksum_read_fp=0.0;
265  for (i = 0; i < bread; i++)
266  checksum_read_fp += (double) localbuffer[i];
267  }
268 #endif
269 
270  if (do_debug) {
271  fprintf(stderr, "timings[%03d] read (%lld bytes) %lld bytes (%10.4f MB) (%lld left)\n", communicators->all_rank, (sion_int64) bread, bsumread,
272  bsumread / 1024.0 / 1024.0, (sion_int64) left);
273  }
274 
275  } /* while(left>0) */
276 
277  /* TIMING */ timings[TIMINGS_RD_READ] += MPI_Wtime()-starttime;
278 
279  stats[STATS_BYTES_RD_READ]+=bsumread;
280  stats[STATS_BYTES_RD_NUM_CHUNKS]+=chunkcnt;
281 
282  /* TIMING */ starttime = MPI_Wtime();
283  barrier_after_read(communicators->local);
284  /* TIMING */ timings[TIMINGS_RD_READ_BARR_FILE] += MPI_Wtime()-starttime;
285 
286  } /* if (ser...) */
287 
288  } /* for (ser ...) */
289 
290  /* TIMING */ starttime = MPI_Wtime();
291  barrier_after_read(communicators->workread);
292  /* TIMING */ timings[TIMINGS_RD_READ_BARR_GLOBAL] = MPI_Wtime()-starttime;
293 
294  /* TIMING */ starttime = MPI_Wtime();
295  _sion_file_close(sion_fileptr);
296  /* TIMING */ timings[TIMINGS_RD_CLOSE] = MPI_Wtime()-starttime;
297 
298  /* TIMING */ starttime = MPI_Wtime();
299  barrier_after_close(communicators->workread);
300  /* TIMING */ timings[TIMINGS_RD_CLOSE_BARR_GLOBAL] = MPI_Wtime()-starttime;
301  /* TIMING */ timings[TIMINGS_RD_TOTAL] = MPI_Wtime()-read_starttime;
302 
303  /* TIMING */ starttime = MPI_Wtime();
304 
305  if (timings[TIMINGS_RD_TOTAL] == 0) timings[TIMINGS_RD_TOTAL] = -1;
306 
307  if (options->verbose) {
308  write_timings("TASK",TIMINGS_METHOD_READ,timings,stats,communicators,options,1);
309  }
310 
311  if (do_debug) {
312  write_timings("TASK",TIMINGS_METHOD_READ,timings,stats,communicators,options,0);
313  }
314 
315  /* TIMING */ timings[TIMINGS_MSGS] += MPI_Wtime()-starttime;
316  if (options->numfiles > 1) {
317 
318  MPI_Reduce(timings, ftimings, TIMINGS_MAX_NUM, MPI_DOUBLE, MPI_MAX, 0, communicators->local);
319  MPI_Reduce(stats, fstats, STATS_MAX_NUM, SION_MPI_INT64, MPI_SUM, 0, communicators->local);
320 
321  if (communicators->local_rank == 0) {
322  write_timings("FILE",TIMINGS_METHOD_READ,ftimings,fstats,communicators,options,0);
323  }
324 
325  }
326 
327  MPI_Reduce(timings, gtimings, TIMINGS_MAX_NUM, MPI_DOUBLE, MPI_MAX, 0, communicators->workread);
328  MPI_Reduce(stats, gstats, STATS_MAX_NUM, SION_MPI_INT64, MPI_SUM, 0, communicators->workread);
329 
330  /* */ DPRINTFTS(communicators->all_rank, "after red.");
331  if (communicators->workread_rank == 0) {
332  fprintf(stderr, "------------------------------------------------------------------------------------------\n");
333  write_timings("TOTAL",TIMINGS_METHOD_READ,gtimings,gstats,communicators,options,0);
334  fprintf(stderr, "------------------------------------------------------------------------------------------\n");
335  }
336 
337 #ifdef CHECKSUM
338  if(!options->suppress_checksum) {
339  if (fabs(checksum_fp - checksum_read_fp) > 1e-5) {
340  fprintf(stderr, "timings[%03d] ERROR in double checksum %14.10f==%14.10f, diff=%14.10f\n", communicators->all_rank,
341  checksum_fp, checksum_read_fp, checksum_fp - checksum_read_fp);
342  }
343  }
344 #endif
345 
346  }
347 
348  if(options->unlink_files) {
349  /* */ starttime = MPI_Wtime();
350  barrier_before_unlink(communicators->work);
351  unlink(fname);
352  barrier_after_unlink(communicators->work);
353  /* */ unlinktime = MPI_Wtime() - starttime;
354  MPI_Reduce(&unlinktime, &gunlinktime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->work);
355  if (communicators->work_rank == 0) {
356  fprintf(stderr, "partest result: ultime=%10.6fs unlink %s\n", gunlinktime, fname);
357  }
358  }
359 
360  return (1);
361 }
sion_int64 _sion_file_write(const void *data, sion_int64 bytes, _sion_fileptr *sion_fileptr)
Write data to file.
Definition: sion_file.c:147
_sion_fileptr * _sion_file_open(const char *fname, unsigned int flags, unsigned int addflags)
Create and open a new file for writing.
Definition: sion_file.c:40
#define SION_FILE_FLAG_WRITE
Definition: sion_file.h:23
sion_int64 _sion_file_read(void *data, sion_int64 bytes, _sion_fileptr *sion_fileptr)
Read data from file.
Definition: sion_file.c:168
#define SION_FILE_FLAG_READ
Definition: sion_file.h:24
#define SION_FILE_FLAG_POSIX
Definition: sion_file.h:21
#define SION_FILE_FLAG_ANSI
Definition: sion_file.h:19
#define SION_FILE_FLAG_CREATE
Definition: sion_file.h:22
int _sion_file_close(_sion_fileptr *sion_fileptr)
Close file and destroys fileptr structure.
Definition: sion_file.c:117
Sion Time Stamp Header.