SIONlib  1.7.1
Scalable I/O library for parallel access to task-local files
partest_mpiio.c
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2016 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
10 #include <stdio.h>
11 #include <stdlib.h>
12 #include <string.h>
13 #include <ctype.h>
14 #include <unistd.h>
15 #include <mpi.h>
16 #include <time.h>
17 #include <math.h>
18 
19 #include "sion.h"
20 #include "sion_debug.h"
21 #include "sion_printts.h"
22 #include "partest.h"
23 #include "partest_util.h"
24 
25 int test_mpiio_multi_mpi(char *filename,
26  char *localbuffer,
27  _test_communicators *communicators,
28  _test_options *options
29  ) {
30 
31  char *ptr;
32  int err;
33  int i, lg;
34  char file[FNAMELEN];
35 
36  double gstarttime, starttime, gopentime, opentime, unlinktime, gwritetime, writetime, gclosetime, closetime, readtime, greadtime;
37  double barr1time, barr2time, barr3time;
38 
39  sion_int64 left;
40  sion_int64 bsumwrote, sumsize, bsumread;
41  char cbuffer[MAXCHARLEN];
42  int bwrite, bwrote, btoread, bread, chunkcnt = 0;
43  double checksum_fp = 0, checksum_read_fp = 0;
44 
45  int ioflags = 0;
46  char msg[2048];
47  char option[1024];
48  MPI_File fh, fhin;
49  MPI_Status status;
50  MPI_Offset offset;
51  MPI_Info info = MPI_INFO_NULL;
52  int info_created = 0;
53  int ser_count,ser_step,ser_done;
54 
55 
56  /* not working task ? */
57  if (communicators->work_size == -1) {
58  return (0);
59  }
60 
61  if (options->mpiio_lb >= 0) {
62  /*
63  * This hint can be used only if all tasks are being used for I/O:
64  * either the MP_IONODEFILE environment variable is not set,
65  * or it specifies a file that lists all nodes on which the application is running.
66  */
67  if (info_created == 0) {
68  MPI_Info_create(&info);
69  info_created = 1;
70  }
71 
72  if (options->mpiio_lb == 0) {
73  if (communicators->work_rank == 0)
74  fprintf(stderr, "partest[%03d] set IBM_largeblock_io to false\n", communicators->work_rank);
75  MPI_Info_set(info, "IBM_largeblock_io", "false");
76  }
77  else {
78  if (communicators->work_rank == 0)
79  fprintf(stderr, "partest[%03d] set IBM_largeblock_io to true\n", communicators->work_rank);
80  MPI_Info_set(info, "IBM_largeblock_io", "true");
81  }
82  }
83 
84  if (options->mpiio_bs >= 0) {
85 
86  if (info_created == 0) {
87  MPI_Info_create(&info);
88  info_created = 1;
89  }
90  sprintf(option, "%dKB", options->mpiio_bs);
91  if (communicators->work_rank == 0)
92  fprintf(stderr, "partest[%03d] set IBM_io_buffer_size to %s\n", communicators->work_rank, option);
93  MPI_Info_set(info, "IBM_io_buffer_size", option);
94  }
95 
96  if (options->mpiio_sa >= 0) {
97  if (info_created == 0) {
98  MPI_Info_create(&info);
99  info_created = 1;
100  }
101  if (options->mpiio_sa == 0) {
102  if (communicators->work_rank == 0)
103  fprintf(stderr, "partest[%03d] set IBM_sparse_access to false\n", communicators->work_rank);
104  MPI_Info_set(info, "IBM_sparse_access", "false");
105  }
106  else {
107  if (communicators->work_rank == 0)
108  fprintf(stderr, "partest[%03d] set IBM_sparse_access to true\n", communicators->work_rank);
109  MPI_Info_set(info, "IBM_sparse_access", "true");
110  }
111  }
112 
113  sprintf(file, "%s.%06d", filename, communicators->file_number);
114 
115  /* */ DPRINTFTS(communicators->all_rank, "start");
116 
117  /****************************** WRITE *****************************/
118 
119  /* */ DPRINTFTS(communicators->all_rank, "[W]before open");
120  /* */ starttime = MPI_Wtime();
121  ioflags = MPI_MODE_CREATE | MPI_MODE_WRONLY;
122  if ((err = MPI_File_open(communicators->local, file, ioflags, info, &fh)) != MPI_SUCCESS) {
123  lg = sizeof(msg);
124  MPI_Error_string(err, msg, &lg);
125  fprintf(stderr, "[%04d]: MPI_File5A_open [%s]: impossible to open %s\n", communicators->all_rank, msg, file);
126  MPI_Abort(communicators->all, 1);
127  exit(1);
128  }
129  /* */ opentime = MPI_Wtime() - starttime;
130 
131  offset = communicators->local_rank * options->totalsize;
132 
133  /*
134  MPI_Datatype buffer;
135  MPI_Type_contiguous(bufsize,MPI_CHAR,&buffer);
136  MPI_Type_commit(&buffer);
137  MPI_File_set_view(fh, offset,MPI_CHAR, buffer, "native", MPI_INFO_NULL);
138  */
139 
140  /* */ starttime = MPI_Wtime();
141  barrier_after_open(communicators->work);
142  /* */ barr1time = MPI_Wtime() - starttime;
143  /* */ DPRINTFTS(communicators->all_rank, "[W]after open");
144  /* */ gstarttime = starttime = MPI_Wtime();
145 
146  if(options->serialize_blocknum>0) ser_step=options->serialize_blocknum;
147  else ser_step=communicators->local_size;
148  ser_done=0;
149  for(ser_count=0;ser_count<communicators->local_size;ser_count+=ser_step) {
150  if ((!ser_done) && communicators->local_rank<(ser_count+ser_step)) {
151  /* fprintf(stderr, "starting write on task %d ser_count=%d ser_step=%d ser_done=%d\n", communicators->local_rank,ser_count,ser_step,ser_done); */
152  ser_done=1;
153 
154  /* here we hope each node has the same memory size ... */
155  if ((err = MPI_File_seek(fh, offset, MPI_SEEK_SET)) != MPI_SUCCESS) {
156  lg = sizeof(msg);
157  MPI_Error_string(err, msg, &lg);
158  fprintf(stderr, "[%04d]: MPI_File_seek [%s] : cannot seek in %s\n", communicators->all_rank, msg, file);
159  MPI_Abort(communicators->all, 1);
160  exit(1);
161  }
162 
163  ptr = localbuffer;
164  left = options->totalsize;
165  bsumwrote = 0;
166  chunkcnt = 0;
167  bwrote = 0;
168 
169  while (left > 0) {
170  bwrite = (int) options->bufsize;
171  if (bwrite > left)
172  bwrite = (int) left;
173 
174  if (((options->debug && communicators->all_rank == 0)) || ((options->Debug && communicators->all_rank == communicators->all_size))) {
175  fprintf(stderr, "timings[%03d] write %lld bytes\n", communicators->all_rank, (sion_int64) bwrite);
176  }
177 
178  if ((err = MPI_File_write(fh, ptr, bwrite, MPI_CHAR, &status)) != MPI_SUCCESS) {
179  lg = sizeof(msg);
180  MPI_Error_string(err, msg, &lg);
181  fprintf(stderr, "[%04d]: MPI_File_write [%s] : impossible to write in %s\n", communicators->all_rank, msg, file);
182  MPI_Abort(communicators->all, 1);
183  exit(1);
184  }
185 
186  MPI_Get_count(&status, MPI_CHAR, &bwrote);
187  left -= (sion_int64) bwrote;
188  bsumwrote += (sion_int64) bwrote;
189  chunkcnt++;
190 
191 #ifdef CHECKSUM
192  checksum_fp=0.0;
193  for (i = 0; i < bwrote; i++)
194  checksum_fp += (double) localbuffer[i];
195 #endif
196 
197  if (((options->debug && communicators->all_rank == 0)) || ((options->Debug && communicators->all_rank == communicators->all_size))) {
198  fprintf(stderr, "timings[%03d] wrote (%lld bytes) %lld bytes (%10.4f MB) (%lld left)\n",
199  communicators->all_rank, (sion_int64) bwrote, bsumwrote, bsumwrote / 1024.0 / 1024.0, (sion_int64) left);
200  MPI_File_get_position(fh, &offset);
201  fprintf(stderr, "timings[%03d] after write position in file= %lld \n", communicators->all_rank, offset);
202 
203  }
204  }
205  }
206  barrier_after_write(communicators->local);
207  /* fprintf(stderr, "after barrier on local comm on task %d ser_count=%d ser_step=%d ser_done=%d\n", communicators->local_rank,ser_count,ser_step,ser_done); */
208  /* barrier_after_write(communicators->local); */
209  }
210 
211  /* */ writetime = MPI_Wtime() - starttime;
212  /* */ starttime = MPI_Wtime();
213  barrier_after_write(communicators->work);
214  /* */ barr2time = MPI_Wtime() - starttime;
215  /* */ gwritetime = MPI_Wtime() - gstarttime;
216  /* */ starttime = MPI_Wtime();
217  MPI_File_close(&fh);
218  /* */ closetime = MPI_Wtime() - starttime;
219  /* */ DPRINTFTS(communicators->all_rank, "[W]before close");
220  /* */ starttime = MPI_Wtime();
221  barrier_after_close(communicators->work);
222  /* */ barr3time = MPI_Wtime() - starttime;
223  /* */ DPRINTFTS(communicators->all_rank, "[W]after close");
224 
225  if (options->verbose) {
226  sprintf(cbuffer,
227  "timings[%03d] open=%10.6fs write=%10.6fs close=%10.6fs barrier(open=%10.6fs, write=%10.6fs, close=%10.6fs) #chunks=%d bw=%10.4f MB/s ionode=%d\n",
228  communicators->all_rank, opentime, writetime, closetime, barr1time, barr2time, barr3time, chunkcnt,
229  options->totalsize / 1024.0 / 1024.0 / writetime, communicators->ionode_number);
230  collective_print_gather(cbuffer, communicators->work);
231  }
232 
233  if (options->numfiles >= 1) {
234  MPI_Reduce(&bsumwrote, &sumsize, 1, SION_MPI_INT64, MPI_SUM, 0, communicators->local);
235  if (communicators->local_rank == 0) {
236  fprintf(stderr, "partest result: wrote %10.4f MB to %s\n", 1.0 * sumsize / 1024.0 / 1024.0, file);
237  }
238  }
239 
240  /* */ DPRINTFTS(communicators->all_rank, "before red.");
241  MPI_Reduce(&bsumwrote, &sumsize, 1, SION_MPI_INT64, MPI_SUM, 0, communicators->work);
242  MPI_Reduce(&opentime, &gopentime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->work);
243  MPI_Reduce(&closetime, &gclosetime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->work);
244 
245  /* */ DPRINTFTS(communicators->all_rank, "after red.");
246  if (communicators->work_rank == 0) {
247  fprintf(stderr, "------------------------------------------------------------------------------------------\n");
248  fprintf(stderr, "TOTAL result: open=%10.6fs close=%10.6fs wrote %10.4f MB write+barrier=%10.6fs bw=%10.4f MB/s to %d files\n",
249  gopentime, gclosetime, 1.0 * sumsize / 1024.0 / 1024.0, gwritetime, 1.0 * sumsize / 1024.0 / 1024.0 / gwritetime, options->numfiles);
250  fprintf(stderr, "------------------------------------------------------------------------------------------\n");
251  }
252 
253  if (communicators->work_rank == 0)
254  fprintf(stderr, "*********************************************************************************************\n");
255 
256  /****************************** READ *****************************/
257 
258  /* */ DPRINTFTS(communicators->all_rank, "[R]before open");
259  /* */ starttime = MPI_Wtime();
260 
261 
262  ioflags = MPI_MODE_RDONLY;
263 
264  if ((err = MPI_File_open(communicators->local, file, ioflags, info, &fhin)) != MPI_SUCCESS) {
265  lg = sizeof(msg);
266  MPI_Error_string(err, msg, &lg);
267  fprintf(stderr, "[%04d]: MPI_File_open [%s]: impossible to open %s\n", communicators->all_rank, msg, file);
268  MPI_Abort(communicators->all, 1);
269  exit(1);
270  }
271  /* */ opentime = MPI_Wtime() - starttime;
272 
273  /* */ starttime = MPI_Wtime();
274  barrier_after_open(communicators->work);
275  /* */ barr1time = MPI_Wtime() - starttime;
276  /* */ DPRINTFTS(communicators->all_rank, "[R]after open");
277  /* */ gstarttime = starttime = MPI_Wtime();
278 
279  if(options->serialize_blocknum>0) ser_step=options->serialize_blocknum;
280  else ser_step=communicators->local_size;
281  ser_done=0;
282  for(ser_count=0;ser_count<communicators->local_size;ser_count+=ser_step) {
283 
284  if ((!ser_done) && communicators->local_rank<(ser_count+ser_step)) {
285  /* fprintf(stderr, "starting read on task %d ser_count=%d ser_step=%d ser_done=%d\n", communicators->local_rank,ser_count,ser_step,ser_done); */
286  ser_done=1;
287 
288  /* here we hope each node has the same memory size ... */
289  offset = communicators->local_rank * options->totalsize;
290  if ((err = MPI_File_seek(fhin, offset, MPI_SEEK_SET)) != MPI_SUCCESS) {
291  lg = sizeof(msg);
292  MPI_Error_string(err, msg, &lg);
293  fprintf(stderr, "[%04d]: MPI_File_seek [%s] : cannot seek in %s\n", communicators->all_rank, msg, file);
294  MPI_Abort(communicators->all, 1);
295  exit(1);
296  }
297 
298  ptr = localbuffer;
299  left = options->totalsize;
300  bsumread = 0;
301  chunkcnt = 0;
302 
303  while (left > 0) {
304  btoread = options->bufsize;
305  if (btoread > left)
306  btoread = left;
307 
308  if ((err = MPI_File_read(fhin, ptr, btoread, MPI_CHAR, &status)) != MPI_SUCCESS) {
309  lg = sizeof(msg);
310  MPI_Error_string(err, msg, &lg);
311  fprintf(stderr, "[%04d]: MPI_File_read [%s]: impossible to read in %s\n", communicators->all_rank, msg, file);
312  MPI_Abort(communicators->all, 1);
313  exit(1);
314  }
315 
316  MPI_Get_count(&status, MPI_CHAR, &bread);
317  left -= bread;
318  bsumread += bread;
319  chunkcnt++;
320 
321 #ifdef CHECKSUM
322  checksum_read_fp=0.0;
323  for (i = 0; i < bread; i++)
324  checksum_read_fp += (double) localbuffer[i];
325 #endif
326 
327  if (((options->debug && communicators->all_rank == 0)) || ((options->Debug && communicators->all_rank == communicators->all_size))) {
328  fprintf(stderr, "timings[%03d] read (%lld bytes) %lld bytes (%10.4f MB) (%lld left)\n", communicators->all_rank, (sion_int64) bread, bsumread,
329  bsumread / 1024.0 / 1024.0, (sion_int64) left);
330  }
331  }
332 
333  }
334  barrier_after_read(communicators->local);
335  /* fprintf(stderr, "after barrier on local comm on task %d ser_count=%d ser_step=%d ser_done=%d\n", communicators->local_rank,ser_count,ser_step,ser_done); */
336  /* barrier_after_read(communicators->local); */
337  }
338 
339  /* */ readtime = MPI_Wtime() - starttime;
340  /* */ starttime = MPI_Wtime();
341  barrier_after_read(communicators->work);
342  /* */ barr2time = MPI_Wtime() - starttime;
343  /* */ greadtime = MPI_Wtime() - gstarttime;
344 
345  /* */ starttime = MPI_Wtime();
346  MPI_File_close(&fhin);
347  /* */ closetime = MPI_Wtime() - starttime;
348  barrier_after_close(communicators->work);
349 
350  if (options->verbose) {
351  sprintf(cbuffer,
352  "timings[%03d] open=%10.6fs read=%10.6fs close=%10.6fs barrier(open=%10.6fs, read=%10.6fs, close=%10.6fs) #chunks=%d bw=%10.4f MB/s ionode=%d (check %d)\n",
353  communicators->all_rank, opentime, readtime, closetime, barr1time, barr2time, barr3time, chunkcnt,
354  options->totalsize / 1024.0 / 1024.0 / readtime, communicators->ionode_number, (fabs(checksum_fp - checksum_read_fp) < 1e-5));
355 
356  collective_print_gather(cbuffer, communicators->work);
357 
358  }
359 
360 #ifdef CHECKSUM
361  if (fabs(checksum_fp - checksum_read_fp) > 1e-5) {
362  fprintf(stderr, "timings[%03d] ERROR in double checksum %14.10f==%14.10f, diff=%14.10f\n", communicators->all_rank,
363  checksum_fp, checksum_read_fp, checksum_fp - checksum_read_fp);
364  }
365 #endif
366 
367  if (options->numfiles >= 1) {
368  MPI_Reduce(&bsumread, &sumsize, 1, SION_MPI_INT64, MPI_SUM, 0, communicators->local);
369  if (communicators->local_rank == 0) {
370  fprintf(stderr, "partest result: read %10.4f MB from %s\n", 1.0 * sumsize / 1024.0 / 1024.0, file);
371  }
372  }
373 
374  /* */ DPRINTFTS(communicators->all_rank, "before red.");
375  MPI_Reduce(&bsumread, &sumsize, 1, SION_MPI_INT64, MPI_SUM, 0, communicators->work);
376  MPI_Reduce(&opentime, &gopentime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->work);
377  MPI_Reduce(&closetime, &gclosetime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->work);
378  /* */ DPRINTFTS(communicators->all_rank, "after red.");
379  if (communicators->work_rank == 0) {
380  fprintf(stderr, "------------------------------------------------------------------------------------------\n");
381  fprintf(stderr, "TOTAL result: open=%10.6fs close=%10.6fs read %10.4f MB read+barrier=%10.6fs br=%10.4f MB/s from %d files\n",
382  gopentime, gclosetime, 1.0 * sumsize / 1024.0 / 1024.0, greadtime, 1.0 * sumsize / 1024.0 / 1024.0 / greadtime, options->numfiles);
383  fprintf(stderr, "------------------------------------------------------------------------------------------\n");
384  }
385 
386 
387  if(options->unlink_files) {
388  /* */ starttime = MPI_Wtime();
389  barrier_before_unlink(communicators->work);
390  if (communicators->local_rank == 0) {
391  fprintf(stderr, "partest result: unlink file %s ...\n", file);
392  unlink(file);
393  }
394  barrier_after_unlink(communicators->work);
395  /* */ unlinktime = MPI_Wtime() - starttime;
396  if (communicators->local_rank == 0) {
397  fprintf(stderr, "partest result: ultime=%10.6fs unlink %s\n", unlinktime, file);
398  }
399  }
400 
401  if (info_created)
402  MPI_Info_free(&info);
403 
404  return (1);
405 }
Sion Time Stamp Header.