SIONlib  1.7.4
Scalable I/O library for parallel access to task-local files
partest_mpiio.c
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2019 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
10 #define _XOPEN_SOURCE 700
11 
12 #include <stdio.h>
13 #include <stdlib.h>
14 #include <string.h>
15 #include <ctype.h>
16 #include <unistd.h>
17 #include <mpi.h>
18 #include <time.h>
19 #include <math.h>
20 
21 #include "sion.h"
22 #include "sion_debug.h"
23 #include "sion_printts.h"
24 #include "partest.h"
25 #include "partest_util.h"
26 
27 int test_mpiio_multi_mpi(char *filename,
28  char *localbuffer,
29  _test_communicators *communicators,
30  _test_options *options
31  ) {
32 
33  char *ptr;
34  int err;
35  int i, lg;
36  char file[FNAMELEN];
37 
38  double gstarttime, starttime, gopentime, opentime, unlinktime, gwritetime, writetime, gclosetime, closetime, readtime, greadtime;
39  double barr1time, barr2time, barr3time;
40 
41  sion_int64 left;
42  sion_int64 bsumwrote, sumsize, bsumread;
43  char cbuffer[MAXCHARLEN];
44  int bwrite, bwrote, btoread, bread, chunkcnt = 0;
45  double checksum_fp = 0, checksum_read_fp = 0;
46 
47  int ioflags = 0;
48  char msg[2048];
49  char option[1024];
50  MPI_File fh, fhin;
51  MPI_Status status;
52  MPI_Offset offset;
53  MPI_Info info = MPI_INFO_NULL;
54  int info_created = 0;
55  int ser_count,ser_step,ser_done;
56 
57 
58  /* not working task ? */
59  if (communicators->work_size == -1) {
60  return (0);
61  }
62 
63  if (options->mpiio_lb >= 0) {
64  /*
65  * This hint can be used only if all tasks are being used for I/O:
66  * either the MP_IONODEFILE environment variable is not set,
67  * or it specifies a file that lists all nodes on which the application is running.
68  */
69  if (info_created == 0) {
70  MPI_Info_create(&info);
71  info_created = 1;
72  }
73 
74  if (options->mpiio_lb == 0) {
75  if (communicators->work_rank == 0)
76  fprintf(stderr, "partest[%03d] set IBM_largeblock_io to false\n", communicators->work_rank);
77  MPI_Info_set(info, "IBM_largeblock_io", "false");
78  }
79  else {
80  if (communicators->work_rank == 0)
81  fprintf(stderr, "partest[%03d] set IBM_largeblock_io to true\n", communicators->work_rank);
82  MPI_Info_set(info, "IBM_largeblock_io", "true");
83  }
84  }
85 
86  if (options->mpiio_bs >= 0) {
87 
88  if (info_created == 0) {
89  MPI_Info_create(&info);
90  info_created = 1;
91  }
92  sprintf(option, "%dKB", options->mpiio_bs);
93  if (communicators->work_rank == 0)
94  fprintf(stderr, "partest[%03d] set IBM_io_buffer_size to %s\n", communicators->work_rank, option);
95  MPI_Info_set(info, "IBM_io_buffer_size", option);
96  }
97 
98  if (options->mpiio_sa >= 0) {
99  if (info_created == 0) {
100  MPI_Info_create(&info);
101  info_created = 1;
102  }
103  if (options->mpiio_sa == 0) {
104  if (communicators->work_rank == 0)
105  fprintf(stderr, "partest[%03d] set IBM_sparse_access to false\n", communicators->work_rank);
106  MPI_Info_set(info, "IBM_sparse_access", "false");
107  }
108  else {
109  if (communicators->work_rank == 0)
110  fprintf(stderr, "partest[%03d] set IBM_sparse_access to true\n", communicators->work_rank);
111  MPI_Info_set(info, "IBM_sparse_access", "true");
112  }
113  }
114 
115  sprintf(file, "%s.%06d", filename, communicators->file_number);
116 
117  /* */ DPRINTFTS(communicators->all_rank, "start");
118 
119  /****************************** WRITE *****************************/
120 
121  /* */ DPRINTFTS(communicators->all_rank, "[W]before open");
122  /* */ starttime = MPI_Wtime();
123  ioflags = MPI_MODE_CREATE | MPI_MODE_WRONLY;
124  if ((err = MPI_File_open(communicators->local, file, ioflags, info, &fh)) != MPI_SUCCESS) {
125  lg = sizeof(msg);
126  MPI_Error_string(err, msg, &lg);
127  fprintf(stderr, "[%04d]: MPI_File5A_open [%s]: impossible to open %s\n", communicators->all_rank, msg, file);
128  MPI_Abort(communicators->all, 1);
129  exit(1);
130  }
131  /* */ opentime = MPI_Wtime() - starttime;
132 
133  offset = communicators->local_rank * options->totalsize;
134 
135  /*
136  MPI_Datatype buffer;
137  MPI_Type_contiguous(bufsize,MPI_CHAR,&buffer);
138  MPI_Type_commit(&buffer);
139  MPI_File_set_view(fh, offset,MPI_CHAR, buffer, "native", MPI_INFO_NULL);
140  */
141 
142  /* */ starttime = MPI_Wtime();
143  barrier_after_open(communicators->work);
144  /* */ barr1time = MPI_Wtime() - starttime;
145  /* */ DPRINTFTS(communicators->all_rank, "[W]after open");
146  /* */ gstarttime = starttime = MPI_Wtime();
147 
148  if(options->serialize_blocknum>0) ser_step=options->serialize_blocknum;
149  else ser_step=communicators->local_size;
150  ser_done=0;
151  for(ser_count=0;ser_count<communicators->local_size;ser_count+=ser_step) {
152  if ((!ser_done) && communicators->local_rank<(ser_count+ser_step)) {
153  /* fprintf(stderr, "starting write on task %d ser_count=%d ser_step=%d ser_done=%d\n", communicators->local_rank,ser_count,ser_step,ser_done); */
154  ser_done=1;
155 
156  /* here we hope each node has the same memory size ... */
157  if ((err = MPI_File_seek(fh, offset, MPI_SEEK_SET)) != MPI_SUCCESS) {
158  lg = sizeof(msg);
159  MPI_Error_string(err, msg, &lg);
160  fprintf(stderr, "[%04d]: MPI_File_seek [%s] : cannot seek in %s\n", communicators->all_rank, msg, file);
161  MPI_Abort(communicators->all, 1);
162  exit(1);
163  }
164 
165  ptr = localbuffer;
166  left = options->totalsize;
167  bsumwrote = 0;
168  chunkcnt = 0;
169  bwrote = 0;
170 
171  while (left > 0) {
172  bwrite = (int) options->bufsize;
173  if (bwrite > left)
174  bwrite = (int) left;
175 
176  if (((options->debug && communicators->all_rank == 0)) || ((options->Debug && communicators->all_rank == communicators->all_size))) {
177  fprintf(stderr, "timings[%03d] write %lld bytes\n", communicators->all_rank, (sion_int64) bwrite);
178  }
179 
180  if ((err = MPI_File_write(fh, ptr, bwrite, MPI_CHAR, &status)) != MPI_SUCCESS) {
181  lg = sizeof(msg);
182  MPI_Error_string(err, msg, &lg);
183  fprintf(stderr, "[%04d]: MPI_File_write [%s] : impossible to write in %s\n", communicators->all_rank, msg, file);
184  MPI_Abort(communicators->all, 1);
185  exit(1);
186  }
187 
188  MPI_Get_count(&status, MPI_CHAR, &bwrote);
189  left -= (sion_int64) bwrote;
190  bsumwrote += (sion_int64) bwrote;
191  chunkcnt++;
192 
193 #ifdef CHECKSUM
194  checksum_fp=0.0;
195  for (i = 0; i < bwrote; i++)
196  checksum_fp += (double) localbuffer[i];
197 #endif
198 
199  if (((options->debug && communicators->all_rank == 0)) || ((options->Debug && communicators->all_rank == communicators->all_size))) {
200  fprintf(stderr, "timings[%03d] wrote (%lld bytes) %lld bytes (%10.4f MB) (%lld left)\n",
201  communicators->all_rank, (sion_int64) bwrote, bsumwrote, bsumwrote / 1024.0 / 1024.0, (sion_int64) left);
202  MPI_File_get_position(fh, &offset);
203  fprintf(stderr, "timings[%03d] after write position in file= %lld \n", communicators->all_rank, offset);
204 
205  }
206  }
207  }
208  barrier_after_write(communicators->local);
209  /* fprintf(stderr, "after barrier on local comm on task %d ser_count=%d ser_step=%d ser_done=%d\n", communicators->local_rank,ser_count,ser_step,ser_done); */
210  /* barrier_after_write(communicators->local); */
211  }
212 
213  /* */ writetime = MPI_Wtime() - starttime;
214  /* */ starttime = MPI_Wtime();
215  barrier_after_write(communicators->work);
216  /* */ barr2time = MPI_Wtime() - starttime;
217  /* */ gwritetime = MPI_Wtime() - gstarttime;
218  /* */ starttime = MPI_Wtime();
219  MPI_File_close(&fh);
220  /* */ closetime = MPI_Wtime() - starttime;
221  /* */ DPRINTFTS(communicators->all_rank, "[W]before close");
222  /* */ starttime = MPI_Wtime();
223  barrier_after_close(communicators->work);
224  /* */ barr3time = MPI_Wtime() - starttime;
225  /* */ DPRINTFTS(communicators->all_rank, "[W]after close");
226 
227  if (options->verbose) {
228  sprintf(cbuffer,
229  "timings[%03d] open=%10.6fs write=%10.6fs close=%10.6fs barrier(open=%10.6fs, write=%10.6fs, close=%10.6fs) #chunks=%d bw=%10.4f MB/s ionode=%d\n",
230  communicators->all_rank, opentime, writetime, closetime, barr1time, barr2time, barr3time, chunkcnt,
231  options->totalsize / 1024.0 / 1024.0 / writetime, communicators->ionode_number);
232  collective_print_gather(cbuffer, communicators->work);
233  }
234 
235  if (options->numfiles >= 1) {
236  MPI_Reduce(&bsumwrote, &sumsize, 1, SION_MPI_INT64, MPI_SUM, 0, communicators->local);
237  if (communicators->local_rank == 0) {
238  fprintf(stderr, "partest result: wrote %10.4f MB to %s\n", 1.0 * sumsize / 1024.0 / 1024.0, file);
239  }
240  }
241 
242  /* */ DPRINTFTS(communicators->all_rank, "before red.");
243  MPI_Reduce(&bsumwrote, &sumsize, 1, SION_MPI_INT64, MPI_SUM, 0, communicators->work);
244  MPI_Reduce(&opentime, &gopentime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->work);
245  MPI_Reduce(&closetime, &gclosetime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->work);
246 
247  /* */ DPRINTFTS(communicators->all_rank, "after red.");
248  if (communicators->work_rank == 0) {
249  fprintf(stderr, "------------------------------------------------------------------------------------------\n");
250  fprintf(stderr, "TOTAL result: open=%10.6fs close=%10.6fs wrote %10.4f MB write+barrier=%10.6fs bw=%10.4f MB/s to %d files\n",
251  gopentime, gclosetime, 1.0 * sumsize / 1024.0 / 1024.0, gwritetime, 1.0 * sumsize / 1024.0 / 1024.0 / gwritetime, options->numfiles);
252  fprintf(stderr, "------------------------------------------------------------------------------------------\n");
253  }
254 
255  if (communicators->work_rank == 0)
256  fprintf(stderr, "*********************************************************************************************\n");
257 
258  /****************************** READ *****************************/
259 
260  /* */ DPRINTFTS(communicators->all_rank, "[R]before open");
261  /* */ starttime = MPI_Wtime();
262 
263 
264  ioflags = MPI_MODE_RDONLY;
265 
266  if ((err = MPI_File_open(communicators->local, file, ioflags, info, &fhin)) != MPI_SUCCESS) {
267  lg = sizeof(msg);
268  MPI_Error_string(err, msg, &lg);
269  fprintf(stderr, "[%04d]: MPI_File_open [%s]: impossible to open %s\n", communicators->all_rank, msg, file);
270  MPI_Abort(communicators->all, 1);
271  exit(1);
272  }
273  /* */ opentime = MPI_Wtime() - starttime;
274 
275  /* */ starttime = MPI_Wtime();
276  barrier_after_open(communicators->work);
277  /* */ barr1time = MPI_Wtime() - starttime;
278  /* */ DPRINTFTS(communicators->all_rank, "[R]after open");
279  /* */ gstarttime = starttime = MPI_Wtime();
280 
281  if(options->serialize_blocknum>0) ser_step=options->serialize_blocknum;
282  else ser_step=communicators->local_size;
283  ser_done=0;
284  for(ser_count=0;ser_count<communicators->local_size;ser_count+=ser_step) {
285 
286  if ((!ser_done) && communicators->local_rank<(ser_count+ser_step)) {
287  /* fprintf(stderr, "starting read on task %d ser_count=%d ser_step=%d ser_done=%d\n", communicators->local_rank,ser_count,ser_step,ser_done); */
288  ser_done=1;
289 
290  /* here we hope each node has the same memory size ... */
291  offset = communicators->local_rank * options->totalsize;
292  if ((err = MPI_File_seek(fhin, offset, MPI_SEEK_SET)) != MPI_SUCCESS) {
293  lg = sizeof(msg);
294  MPI_Error_string(err, msg, &lg);
295  fprintf(stderr, "[%04d]: MPI_File_seek [%s] : cannot seek in %s\n", communicators->all_rank, msg, file);
296  MPI_Abort(communicators->all, 1);
297  exit(1);
298  }
299 
300  ptr = localbuffer;
301  left = options->totalsize;
302  bsumread = 0;
303  chunkcnt = 0;
304 
305  while (left > 0) {
306  btoread = options->bufsize;
307  if (btoread > left)
308  btoread = left;
309 
310  if ((err = MPI_File_read(fhin, ptr, btoread, MPI_CHAR, &status)) != MPI_SUCCESS) {
311  lg = sizeof(msg);
312  MPI_Error_string(err, msg, &lg);
313  fprintf(stderr, "[%04d]: MPI_File_read [%s]: impossible to read in %s\n", communicators->all_rank, msg, file);
314  MPI_Abort(communicators->all, 1);
315  exit(1);
316  }
317 
318  MPI_Get_count(&status, MPI_CHAR, &bread);
319  left -= bread;
320  bsumread += bread;
321  chunkcnt++;
322 
323 #ifdef CHECKSUM
324  checksum_read_fp=0.0;
325  for (i = 0; i < bread; i++)
326  checksum_read_fp += (double) localbuffer[i];
327 #endif
328 
329  if (((options->debug && communicators->all_rank == 0)) || ((options->Debug && communicators->all_rank == communicators->all_size))) {
330  fprintf(stderr, "timings[%03d] read (%lld bytes) %lld bytes (%10.4f MB) (%lld left)\n", communicators->all_rank, (sion_int64) bread, bsumread,
331  bsumread / 1024.0 / 1024.0, (sion_int64) left);
332  }
333  }
334 
335  }
336  barrier_after_read(communicators->local);
337  /* fprintf(stderr, "after barrier on local comm on task %d ser_count=%d ser_step=%d ser_done=%d\n", communicators->local_rank,ser_count,ser_step,ser_done); */
338  /* barrier_after_read(communicators->local); */
339  }
340 
341  /* */ readtime = MPI_Wtime() - starttime;
342  /* */ starttime = MPI_Wtime();
343  barrier_after_read(communicators->work);
344  /* */ barr2time = MPI_Wtime() - starttime;
345  /* */ greadtime = MPI_Wtime() - gstarttime;
346 
347  /* */ starttime = MPI_Wtime();
348  MPI_File_close(&fhin);
349  /* */ closetime = MPI_Wtime() - starttime;
350  barrier_after_close(communicators->work);
351 
352  if (options->verbose) {
353  sprintf(cbuffer,
354  "timings[%03d] open=%10.6fs read=%10.6fs close=%10.6fs barrier(open=%10.6fs, read=%10.6fs, close=%10.6fs) #chunks=%d bw=%10.4f MB/s ionode=%d (check %d)\n",
355  communicators->all_rank, opentime, readtime, closetime, barr1time, barr2time, barr3time, chunkcnt,
356  options->totalsize / 1024.0 / 1024.0 / readtime, communicators->ionode_number, (fabs(checksum_fp - checksum_read_fp) < 1e-5));
357 
358  collective_print_gather(cbuffer, communicators->work);
359 
360  }
361 
362 #ifdef CHECKSUM
363  if (fabs(checksum_fp - checksum_read_fp) > 1e-5) {
364  fprintf(stderr, "timings[%03d] ERROR in double checksum %14.10f==%14.10f, diff=%14.10f\n", communicators->all_rank,
365  checksum_fp, checksum_read_fp, checksum_fp - checksum_read_fp);
366  }
367 #endif
368 
369  if (options->numfiles >= 1) {
370  MPI_Reduce(&bsumread, &sumsize, 1, SION_MPI_INT64, MPI_SUM, 0, communicators->local);
371  if (communicators->local_rank == 0) {
372  fprintf(stderr, "partest result: read %10.4f MB from %s\n", 1.0 * sumsize / 1024.0 / 1024.0, file);
373  }
374  }
375 
376  /* */ DPRINTFTS(communicators->all_rank, "before red.");
377  MPI_Reduce(&bsumread, &sumsize, 1, SION_MPI_INT64, MPI_SUM, 0, communicators->work);
378  MPI_Reduce(&opentime, &gopentime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->work);
379  MPI_Reduce(&closetime, &gclosetime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->work);
380  /* */ DPRINTFTS(communicators->all_rank, "after red.");
381  if (communicators->work_rank == 0) {
382  fprintf(stderr, "------------------------------------------------------------------------------------------\n");
383  fprintf(stderr, "TOTAL result: open=%10.6fs close=%10.6fs read %10.4f MB read+barrier=%10.6fs br=%10.4f MB/s from %d files\n",
384  gopentime, gclosetime, 1.0 * sumsize / 1024.0 / 1024.0, greadtime, 1.0 * sumsize / 1024.0 / 1024.0 / greadtime, options->numfiles);
385  fprintf(stderr, "------------------------------------------------------------------------------------------\n");
386  }
387 
388 
389  if(options->unlink_files) {
390  /* */ starttime = MPI_Wtime();
391  barrier_before_unlink(communicators->work);
392  if (communicators->local_rank == 0) {
393  fprintf(stderr, "partest result: unlink file %s ...\n", file);
394  unlink(file);
395  }
396  barrier_after_unlink(communicators->work);
397  /* */ unlinktime = MPI_Wtime() - starttime;
398  if (communicators->local_rank == 0) {
399  fprintf(stderr, "partest result: ultime=%10.6fs unlink %s\n", unlinktime, file);
400  }
401  }
402 
403  if (info_created)
404  MPI_Info_free(&info);
405 
406  return (1);
407 }
Sion Time Stamp Header.