SIONlib  1.7.4
Scalable I/O library for parallel access to task-local files
partest_tasklocalfile.c
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2019 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
10 #define _XOPEN_SOURCE 700
11 
12 #include <stdio.h>
13 #include <stdlib.h>
14 #include <string.h>
15 #include <ctype.h>
16 #include <unistd.h>
17 #include <mpi.h>
18 #include <time.h>
19 #include <math.h>
20 
21 #include "sion.h"
22 #include "sion_printts.h"
23 #include "sion_debug.h"
24 #include "sion_file.h"
25 #include "partest.h"
26 #include "partest_util.h"
27 
28 int
29 test_single_mpi(char *filename,
30  char *localbuffer,
31  _test_communicators *communicators,
32  _test_options *options) {
33 
34  int i;
35  char fname[FNAMELEN];
36  _sion_fileptr *sion_fileptr;
37 
38  double starttime, write_starttime, read_starttime, unlinktime, gunlinktime;
39  double timings[TIMINGS_MAX_NUM],ftimings[TIMINGS_MAX_NUM],gtimings[TIMINGS_MAX_NUM];
40  sion_int64 stats[STATS_MAX_NUM], fstats[STATS_MAX_NUM], gstats[STATS_MAX_NUM];
41 
42  sion_int64 left;
43  sion_int64 bsumwrote, bsumread;
44  size_t bwrite, bwrote, btoread, bread, chunkcnt;
45  double checksum_fp, checksum_read_fp;
46  int ser_count,ser_step,ser_done;
47  int api_to_use;
48  int do_debug;
49 
50  /* not working task ? */
51  if (communicators->work_size == -1) {
52  /* printf("wf: not working %d\n",communicators->all_rank); */
53  return (0);
54  }
55 
56  if(options->use_posix) {
57  api_to_use=SION_FILE_FLAG_POSIX;
58  } else {
59  api_to_use=SION_FILE_FLAG_ANSI;
60  }
61 
62  for(i=0;i<TIMINGS_MAX_NUM;i++) timings[i]=ftimings[i]=gtimings[i]=0.0;
63  for(i=0;i<STATS_MAX_NUM;i++) stats[i]=fstats[i]=gstats[i]=0;
64 
65  do_debug=(((options->debug && communicators->all_rank == 0)) || ((options->Debug && (communicators->all_rank+1) == communicators->all_size)));
66 
67  sprintf(fname, "%s.%06d", filename, communicators->work_rank);
68 
69  if(options->do_write) {
70 
71  /****************************** WRITE *****************************/
72 
73  /* to synchronize start */
74  barrier_after_start(communicators->local);
75 
76  /* TIMING */ write_starttime = starttime = MPI_Wtime();
77 
78  /* FIRST OPEN to create files */
79  sion_fileptr = _sion_file_open(fname,api_to_use|SION_FILE_FLAG_WRITE|SION_FILE_FLAG_CREATE,0);
80  if (!sion_fileptr) {
81  fprintf(stderr, "cannot open %s for writing, aborting ...\n", fname);
82  MPI_Abort(communicators->all, 1);
83  }
84  stats[STATS_WR_NUM_FILES]=communicators->work_size;
85  /* TIMING */ timings[TIMINGS_WR_CREATE] = MPI_Wtime()-starttime;
86 
87  /* TIMING */ starttime = MPI_Wtime();
88  barrier_after_open(communicators->work);
89  /* TIMING */ timings[TIMINGS_WR_CREATE_BARR_OPEN] = MPI_Wtime()-starttime;
90 
91  /* TIMING */ starttime = MPI_Wtime();
92  _sion_file_close(sion_fileptr);
93  /* TIMING */ timings[TIMINGS_WR_CREATE_CLOSE] = MPI_Wtime()-starttime;
94 
95  /* TIMING */ starttime = MPI_Wtime();
96  barrier_after_open(communicators->work);
97  /* TIMING */ timings[TIMINGS_WR_CREATE_BARR_CLOSE] = MPI_Wtime()-starttime;
98 
99 
100  /* TIMING */ starttime = MPI_Wtime();
101  /* SECOND REOPEN of existing files */
102  sion_fileptr = _sion_file_open(fname,api_to_use|SION_FILE_FLAG_WRITE,0);
103  if (!sion_fileptr) {
104  fprintf(stderr, "cannot open %s for writing, aborting ...\n", fname);
105  MPI_Abort(communicators->all, 1);
106  }
107  /* TIMING */ timings[TIMINGS_WR_OPEN] = MPI_Wtime()-starttime;
108 
109  /* TIMING */ starttime = MPI_Wtime();
110  barrier_after_open(communicators->work);
111  /* TIMING */ timings[TIMINGS_WR_OPEN_BARR_GLOBAL] = MPI_Wtime()-starttime;
112 
113  /* TIMING */ starttime = MPI_Wtime();
114  if(options->serialize_blocknum>0) ser_step=options->serialize_blocknum;
115  else ser_step=communicators->local_size;
116  ser_done=0;
117  /* TIMING */ timings[TIMINGS_WR_WRITE_SYNC] += MPI_Wtime()-starttime;
118 
119 
120  for(ser_count=0;ser_count<communicators->local_size;ser_count+=ser_step) {
121 
122  if ((!ser_done) && communicators->local_rank<(ser_count+ser_step)) {
123  ser_done=1;
124 
125  /* TIMING */ starttime = MPI_Wtime();
126  left = options->totalsize;
127  bsumwrote = 0;
128  chunkcnt = 0;
129 
130  while (left > 0) {
131  bwrite = options->bufsize;
132  if (bwrite > left)
133  bwrite = left;
134 
135  if (do_debug) {
136  fprintf(stderr, "timings[%03d] write %lld bytes\n", communicators->all_rank, (sion_int64) bwrite);
137  }
138 
139  bwrote = _sion_file_write(localbuffer, bwrite, sion_fileptr);
140 
141  left -= (sion_int64) bwrote;
142  bsumwrote += (sion_int64) bwrote;
143  chunkcnt++;
144 
145 #ifdef CHECKSUM
146  if(!options->suppress_checksum) {
147  checksum_fp=0.0;
148  for (i = 0; i < bwrote; i++)
149  checksum_fp += (double) localbuffer[i];
150  }
151 #endif
152 
153  if (do_debug) {
154  fprintf(stderr, "timings[%03d] wrote (%lld bytes) %lld bytes (%10.4f MB) (%lld left)\n",
155  communicators->all_rank, (sion_int64) bwrote, bsumwrote, bsumwrote / 1024.0 / 1024.0, (sion_int64) left);
156  }
157 
158  } /* while (left>0) */
159 
160  /* TIMING */ timings[TIMINGS_WR_WRITE] += MPI_Wtime()-starttime;
161  stats[STATS_BYTES_WR_WROTE]+=bsumwrote;
162  stats[STATS_BYTES_WR_NUM_CHUNKS]+=chunkcnt;
163 
164  } /* ser */
165 
166  /* TIMING */ starttime = MPI_Wtime();
167  barrier_after_write(communicators->local);
168  /* TIMING */ timings[TIMINGS_WR_WRITE_BARR_FILE] += MPI_Wtime()-starttime;
169 
170  } /* for(ser ...) */
171 
172  /* TIMING */ starttime = MPI_Wtime();
173  barrier_after_write(communicators->work);
174  /* TIMING */ timings[TIMINGS_WR_WRITE_BARR_GLOBAL] = MPI_Wtime()-starttime;
175 
176  /* TIMING */ starttime = MPI_Wtime();
177  _sion_file_close(sion_fileptr);
178  /* TIMING */ timings[TIMINGS_WR_CLOSE] = MPI_Wtime()-starttime;
179 
180  /* TIMING */ starttime = MPI_Wtime();
181  barrier_after_close(communicators->work);
182  /* TIMING */ timings[TIMINGS_WR_CLOSE_BARR_GLOBAL] = MPI_Wtime()-starttime;
183 
184  /* TIMING */ timings[TIMINGS_WR_TOTAL] = MPI_Wtime()-write_starttime;
185 
186 
187  if (options->verbose) {
188  write_timings("TASK",TIMINGS_METHOD_WRITE,timings,stats,communicators,options,1);
189  }
190 
191  if (do_debug) {
192  write_timings("TASK",TIMINGS_METHOD_WRITE,timings,stats,communicators,options,0);
193  }
194 
195  MPI_Reduce(timings, gtimings, TIMINGS_MAX_NUM, MPI_DOUBLE, MPI_MAX, 0, communicators->work);
196  MPI_Reduce(stats, gstats, STATS_MAX_NUM, SION_MPI_INT64, MPI_SUM, 0, communicators->work);
197 
198  if (communicators->work_rank == 0) {
199  fprintf(stderr, "------------------------------------------------------------------------------------------\n");
200  write_timings("TOTAL",TIMINGS_METHOD_WRITE,gtimings,gstats,communicators,options,0);
201  fprintf(stderr, "------------------------------------------------------------------------------------------\n");
202  }
203 
204  } /* do_write */
205 
206  /* to synchronize after write */
207  barrier_after_close(communicators->work);
208 
209  if(options->do_read) {
210  /****************************** READ *****************************/
211 
212  sprintf(fname, "%s.%06d", filename, communicators->workread_rank);
213  /* printf("on rank %3d READ: filename %s\n",communicators->all_rank,fname); */
214 
215 
216  /* to synchronize start */
217  barrier_after_start(communicators->local);
218 
219  /* TIMING */ read_starttime = starttime = MPI_Wtime();
220 
221  sion_fileptr = _sion_file_open(fname,api_to_use|SION_FILE_FLAG_READ,0);
222  if (!sion_fileptr) {
223  fprintf(stderr, "cannot open %s for writing, aborting ...\n", fname);
224  MPI_Abort(communicators->all, 1);
225  }
226  stats[STATS_RD_NUM_FILES]=communicators->work_size;
227  /* TIMING */ timings[TIMINGS_RD_OPEN] = MPI_Wtime()-starttime;
228 
229  /* TIMING */ starttime = MPI_Wtime();
230  barrier_after_open(communicators->work);
231  /* TIMING */ timings[TIMINGS_RD_OPEN_BARR_GLOBAL] = MPI_Wtime()-starttime;
232 
233 
234  /* TIMING */ starttime = MPI_Wtime();
235  if(options->serialize_blocknum>0) ser_step=options->serialize_blocknum;
236  else ser_step=communicators->local_size;
237  ser_done=0;
238  /* TIMING */ timings[TIMINGS_RD_READ_SYNC] += MPI_Wtime()-starttime;
239 
240  for(ser_count=0;ser_count<communicators->local_size;ser_count+=ser_step) {
241 
242  if ((!ser_done) && communicators->local_rank<(ser_count+ser_step)) {
243  ser_done=1;
244 
245  /* TIMING */ starttime = MPI_Wtime();
246  left = options->totalsize;
247  bsumread = 0;
248  chunkcnt = 0;
249 
250  while (left > 0) {
251  btoread = options->bufsize;
252  if (btoread > left)
253  btoread = left;
254 
255  bread = _sion_file_read(localbuffer, btoread, sion_fileptr);
256 
257  left -= bread;
258  bsumread += bread;
259  chunkcnt++;
260 
261 #ifdef CHECKSUM
262  if(!options->suppress_checksum) {
263  checksum_read_fp=0.0;
264  for (i = 0; i < bread; i++)
265  checksum_read_fp += (double) localbuffer[i];
266  }
267 #endif
268 
269  if (do_debug) {
270  fprintf(stderr, "timings[%03d] read (%lld bytes) %lld bytes (%10.4f MB) (%lld left)\n", communicators->all_rank, (sion_int64) bread, bsumread,
271  bsumread / 1024.0 / 1024.0, (sion_int64) left);
272  }
273 
274  } /* while(left>0) */
275 
276  /* TIMING */ timings[TIMINGS_RD_READ] += MPI_Wtime()-starttime;
277 
278  stats[STATS_BYTES_RD_READ]+=bsumread;
279  stats[STATS_BYTES_RD_NUM_CHUNKS]+=chunkcnt;
280 
281  /* TIMING */ starttime = MPI_Wtime();
282  barrier_after_read(communicators->local);
283  /* TIMING */ timings[TIMINGS_RD_READ_BARR_FILE] += MPI_Wtime()-starttime;
284 
285  } /* if (ser...) */
286 
287  } /* for (ser ...) */
288 
289  /* TIMING */ starttime = MPI_Wtime();
290  barrier_after_read(communicators->workread);
291  /* TIMING */ timings[TIMINGS_RD_READ_BARR_GLOBAL] = MPI_Wtime()-starttime;
292 
293  /* TIMING */ starttime = MPI_Wtime();
294  _sion_file_close(sion_fileptr);
295  /* TIMING */ timings[TIMINGS_RD_CLOSE] = MPI_Wtime()-starttime;
296 
297  /* TIMING */ starttime = MPI_Wtime();
298  barrier_after_close(communicators->workread);
299  /* TIMING */ timings[TIMINGS_RD_CLOSE_BARR_GLOBAL] = MPI_Wtime()-starttime;
300  /* TIMING */ timings[TIMINGS_RD_TOTAL] = MPI_Wtime()-read_starttime;
301 
302  /* TIMING */ starttime = MPI_Wtime();
303 
304  if (timings[TIMINGS_RD_TOTAL] == 0) timings[TIMINGS_RD_TOTAL] = -1;
305 
306  if (options->verbose) {
307  write_timings("TASK",TIMINGS_METHOD_READ,timings,stats,communicators,options,1);
308  }
309 
310  if (do_debug) {
311  write_timings("TASK",TIMINGS_METHOD_READ,timings,stats,communicators,options,0);
312  }
313 
314  /* TIMING */ timings[TIMINGS_MSGS] += MPI_Wtime()-starttime;
315  if (options->numfiles > 1) {
316 
317  MPI_Reduce(timings, ftimings, TIMINGS_MAX_NUM, MPI_DOUBLE, MPI_MAX, 0, communicators->local);
318  MPI_Reduce(stats, fstats, STATS_MAX_NUM, SION_MPI_INT64, MPI_SUM, 0, communicators->local);
319 
320  if (communicators->local_rank == 0) {
321  write_timings("FILE",TIMINGS_METHOD_READ,ftimings,fstats,communicators,options,0);
322  }
323 
324  }
325 
326  MPI_Reduce(timings, gtimings, TIMINGS_MAX_NUM, MPI_DOUBLE, MPI_MAX, 0, communicators->workread);
327  MPI_Reduce(stats, gstats, STATS_MAX_NUM, SION_MPI_INT64, MPI_SUM, 0, communicators->workread);
328 
329  /* */ DPRINTFTS(communicators->all_rank, "after red.");
330  if (communicators->workread_rank == 0) {
331  fprintf(stderr, "------------------------------------------------------------------------------------------\n");
332  write_timings("TOTAL",TIMINGS_METHOD_READ,gtimings,gstats,communicators,options,0);
333  fprintf(stderr, "------------------------------------------------------------------------------------------\n");
334  }
335 
336 #ifdef CHECKSUM
337  if(!options->suppress_checksum) {
338  if (fabs(checksum_fp - checksum_read_fp) > 1e-5) {
339  fprintf(stderr, "timings[%03d] ERROR in double checksum %14.10f==%14.10f, diff=%14.10f\n", communicators->all_rank,
340  checksum_fp, checksum_read_fp, checksum_fp - checksum_read_fp);
341  }
342  }
343 #endif
344 
345  }
346 
347  if(options->unlink_files) {
348  /* */ starttime = MPI_Wtime();
349  barrier_before_unlink(communicators->work);
350  unlink(fname);
351  barrier_after_unlink(communicators->work);
352  /* */ unlinktime = MPI_Wtime() - starttime;
353  MPI_Reduce(&unlinktime, &gunlinktime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->work);
354  if (communicators->work_rank == 0) {
355  fprintf(stderr, "partest result: ultime=%10.6fs unlink %s\n", gunlinktime, fname);
356  }
357  }
358 
359  return (1);
360 }
sion_int64 _sion_file_write(const void *data, sion_int64 bytes, _sion_fileptr *sion_fileptr)
Write data to file.
Definition: sion_file.c:141
_sion_fileptr * _sion_file_open(const char *fname, unsigned int flags, unsigned int addflags)
Create and open a new file for writing.
Definition: sion_file.c:42
#define SION_FILE_FLAG_WRITE
Definition: sion_file.h:26
sion_int64 _sion_file_read(void *data, sion_int64 bytes, _sion_fileptr *sion_fileptr)
Read data from file.
Definition: sion_file.c:166
#define SION_FILE_FLAG_READ
Definition: sion_file.h:27
#define SION_FILE_FLAG_POSIX
Definition: sion_file.h:24
#define SION_FILE_FLAG_ANSI
Definition: sion_file.h:22
#define SION_FILE_FLAG_CREATE
Definition: sion_file.h:25
int _sion_file_close(_sion_fileptr *sion_fileptr)
Close file and destroys fileptr structure.
Definition: sion_file.c:109
Sion Time Stamp Header.