10 #define _XOPEN_SOURCE 700 25 #include "partest_util.h" 27 int test_mpiio_multi_mpi(
char *filename,
38 double gstarttime, starttime, gopentime, opentime, unlinktime, gwritetime, writetime, gclosetime, closetime, readtime, greadtime;
39 double barr1time, barr2time, barr3time;
42 sion_int64 bsumwrote, sumsize, bsumread;
43 char cbuffer[MAXCHARLEN];
44 int bwrite, bwrote, btoread, bread, chunkcnt = 0;
45 double checksum_fp = 0, checksum_read_fp = 0;
53 MPI_Info info = MPI_INFO_NULL;
55 int ser_count,ser_step,ser_done;
59 if (communicators->work_size == -1) {
63 if (options->mpiio_lb >= 0) {
69 if (info_created == 0) {
70 MPI_Info_create(&info);
74 if (options->mpiio_lb == 0) {
75 if (communicators->work_rank == 0)
76 fprintf(stderr,
"partest[%03d] set IBM_largeblock_io to false\n", communicators->work_rank);
77 MPI_Info_set(info,
"IBM_largeblock_io",
"false");
80 if (communicators->work_rank == 0)
81 fprintf(stderr,
"partest[%03d] set IBM_largeblock_io to true\n", communicators->work_rank);
82 MPI_Info_set(info,
"IBM_largeblock_io",
"true");
86 if (options->mpiio_bs >= 0) {
88 if (info_created == 0) {
89 MPI_Info_create(&info);
92 sprintf(option,
"%dKB", options->mpiio_bs);
93 if (communicators->work_rank == 0)
94 fprintf(stderr,
"partest[%03d] set IBM_io_buffer_size to %s\n", communicators->work_rank, option);
95 MPI_Info_set(info,
"IBM_io_buffer_size", option);
98 if (options->mpiio_sa >= 0) {
99 if (info_created == 0) {
100 MPI_Info_create(&info);
103 if (options->mpiio_sa == 0) {
104 if (communicators->work_rank == 0)
105 fprintf(stderr,
"partest[%03d] set IBM_sparse_access to false\n", communicators->work_rank);
106 MPI_Info_set(info,
"IBM_sparse_access",
"false");
109 if (communicators->work_rank == 0)
110 fprintf(stderr,
"partest[%03d] set IBM_sparse_access to true\n", communicators->work_rank);
111 MPI_Info_set(info,
"IBM_sparse_access",
"true");
115 sprintf(file,
"%s.%06d", filename, communicators->file_number);
117 DPRINTFTS(communicators->all_rank,
"start");
121 DPRINTFTS(communicators->all_rank,
"[W]before open");
122 starttime = MPI_Wtime();
123 ioflags = MPI_MODE_CREATE | MPI_MODE_WRONLY;
124 if ((err = MPI_File_open(communicators->
local, file, ioflags, info, &fh)) != MPI_SUCCESS) {
126 MPI_Error_string(err, msg, &lg);
127 fprintf(stderr,
"[%04d]: MPI_File5A_open [%s]: impossible to open %s\n", communicators->all_rank, msg, file);
128 MPI_Abort(communicators->
all, 1);
131 opentime = MPI_Wtime() - starttime;
133 offset = communicators->local_rank * options->totalsize;
142 starttime = MPI_Wtime();
143 barrier_after_open(communicators->
work);
144 barr1time = MPI_Wtime() - starttime;
145 DPRINTFTS(communicators->all_rank,
"[W]after open");
146 gstarttime = starttime = MPI_Wtime();
148 if(options->serialize_blocknum>0) ser_step=options->serialize_blocknum;
149 else ser_step=communicators->local_size;
151 for(ser_count=0;ser_count<communicators->local_size;ser_count+=ser_step) {
152 if ((!ser_done) && communicators->local_rank<(ser_count+ser_step)) {
157 if ((err = MPI_File_seek(fh, offset, MPI_SEEK_SET)) != MPI_SUCCESS) {
159 MPI_Error_string(err, msg, &lg);
160 fprintf(stderr,
"[%04d]: MPI_File_seek [%s] : cannot seek in %s\n", communicators->all_rank, msg, file);
161 MPI_Abort(communicators->
all, 1);
166 left = options->totalsize;
172 bwrite = (int) options->bufsize;
176 if (((options->debug && communicators->all_rank == 0)) || ((options->Debug && communicators->all_rank == communicators->all_size))) {
177 fprintf(stderr,
"timings[%03d] write %lld bytes\n", communicators->all_rank, (sion_int64) bwrite);
180 if ((err = MPI_File_write(fh, ptr, bwrite, MPI_CHAR, &status)) != MPI_SUCCESS) {
182 MPI_Error_string(err, msg, &lg);
183 fprintf(stderr,
"[%04d]: MPI_File_write [%s] : impossible to write in %s\n", communicators->all_rank, msg, file);
184 MPI_Abort(communicators->
all, 1);
188 MPI_Get_count(&status, MPI_CHAR, &bwrote);
189 left -= (sion_int64) bwrote;
190 bsumwrote += (sion_int64) bwrote;
195 for (i = 0; i < bwrote; i++)
196 checksum_fp += (
double) localbuffer[i];
199 if (((options->debug && communicators->all_rank == 0)) || ((options->Debug && communicators->all_rank == communicators->all_size))) {
200 fprintf(stderr,
"timings[%03d] wrote (%lld bytes) %lld bytes (%10.4f MB) (%lld left)\n",
201 communicators->all_rank, (sion_int64) bwrote, bsumwrote, bsumwrote / 1024.0 / 1024.0, (sion_int64) left);
202 MPI_File_get_position(fh, &offset);
203 fprintf(stderr,
"timings[%03d] after write position in file= %lld \n", communicators->all_rank, offset);
208 barrier_after_write(communicators->
local);
213 writetime = MPI_Wtime() - starttime;
214 starttime = MPI_Wtime();
215 barrier_after_write(communicators->
work);
216 barr2time = MPI_Wtime() - starttime;
217 gwritetime = MPI_Wtime() - gstarttime;
218 starttime = MPI_Wtime();
220 closetime = MPI_Wtime() - starttime;
221 DPRINTFTS(communicators->all_rank,
"[W]before close");
222 starttime = MPI_Wtime();
223 barrier_after_close(communicators->
work);
224 barr3time = MPI_Wtime() - starttime;
225 DPRINTFTS(communicators->all_rank,
"[W]after close");
227 if (options->verbose) {
229 "timings[%03d] open=%10.6fs write=%10.6fs close=%10.6fs barrier(open=%10.6fs, write=%10.6fs, close=%10.6fs) #chunks=%d bw=%10.4f MB/s ionode=%d\n",
230 communicators->all_rank, opentime, writetime, closetime, barr1time, barr2time, barr3time, chunkcnt,
231 options->totalsize / 1024.0 / 1024.0 / writetime, communicators->ionode_number);
232 collective_print_gather(cbuffer, communicators->
work);
235 if (options->numfiles >= 1) {
236 MPI_Reduce(&bsumwrote, &sumsize, 1, SION_MPI_INT64, MPI_SUM, 0, communicators->
local);
237 if (communicators->local_rank == 0) {
238 fprintf(stderr,
"partest result: wrote %10.4f MB to %s\n", 1.0 * sumsize / 1024.0 / 1024.0, file);
242 DPRINTFTS(communicators->all_rank,
"before red.");
243 MPI_Reduce(&bsumwrote, &sumsize, 1, SION_MPI_INT64, MPI_SUM, 0, communicators->
work);
244 MPI_Reduce(&opentime, &gopentime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->
work);
245 MPI_Reduce(&closetime, &gclosetime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->
work);
247 DPRINTFTS(communicators->all_rank,
"after red.");
248 if (communicators->work_rank == 0) {
249 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
250 fprintf(stderr,
"TOTAL result: open=%10.6fs close=%10.6fs wrote %10.4f MB write+barrier=%10.6fs bw=%10.4f MB/s to %d files\n",
251 gopentime, gclosetime, 1.0 * sumsize / 1024.0 / 1024.0, gwritetime, 1.0 * sumsize / 1024.0 / 1024.0 / gwritetime, options->numfiles);
252 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
255 if (communicators->work_rank == 0)
256 fprintf(stderr,
"*********************************************************************************************\n");
260 DPRINTFTS(communicators->all_rank,
"[R]before open");
261 starttime = MPI_Wtime();
264 ioflags = MPI_MODE_RDONLY;
266 if ((err = MPI_File_open(communicators->
local, file, ioflags, info, &fhin)) != MPI_SUCCESS) {
268 MPI_Error_string(err, msg, &lg);
269 fprintf(stderr,
"[%04d]: MPI_File_open [%s]: impossible to open %s\n", communicators->all_rank, msg, file);
270 MPI_Abort(communicators->
all, 1);
273 opentime = MPI_Wtime() - starttime;
275 starttime = MPI_Wtime();
276 barrier_after_open(communicators->
work);
277 barr1time = MPI_Wtime() - starttime;
278 DPRINTFTS(communicators->all_rank,
"[R]after open");
279 gstarttime = starttime = MPI_Wtime();
281 if(options->serialize_blocknum>0) ser_step=options->serialize_blocknum;
282 else ser_step=communicators->local_size;
284 for(ser_count=0;ser_count<communicators->local_size;ser_count+=ser_step) {
286 if ((!ser_done) && communicators->local_rank<(ser_count+ser_step)) {
291 offset = communicators->local_rank * options->totalsize;
292 if ((err = MPI_File_seek(fhin, offset, MPI_SEEK_SET)) != MPI_SUCCESS) {
294 MPI_Error_string(err, msg, &lg);
295 fprintf(stderr,
"[%04d]: MPI_File_seek [%s] : cannot seek in %s\n", communicators->all_rank, msg, file);
296 MPI_Abort(communicators->
all, 1);
301 left = options->totalsize;
306 btoread = options->bufsize;
310 if ((err = MPI_File_read(fhin, ptr, btoread, MPI_CHAR, &status)) != MPI_SUCCESS) {
312 MPI_Error_string(err, msg, &lg);
313 fprintf(stderr,
"[%04d]: MPI_File_read [%s]: impossible to read in %s\n", communicators->all_rank, msg, file);
314 MPI_Abort(communicators->
all, 1);
318 MPI_Get_count(&status, MPI_CHAR, &bread);
324 checksum_read_fp=0.0;
325 for (i = 0; i < bread; i++)
326 checksum_read_fp += (
double) localbuffer[i];
329 if (((options->debug && communicators->all_rank == 0)) || ((options->Debug && communicators->all_rank == communicators->all_size))) {
330 fprintf(stderr,
"timings[%03d] read (%lld bytes) %lld bytes (%10.4f MB) (%lld left)\n", communicators->all_rank, (sion_int64) bread, bsumread,
331 bsumread / 1024.0 / 1024.0, (sion_int64) left);
336 barrier_after_read(communicators->
local);
341 readtime = MPI_Wtime() - starttime;
342 starttime = MPI_Wtime();
343 barrier_after_read(communicators->
work);
344 barr2time = MPI_Wtime() - starttime;
345 greadtime = MPI_Wtime() - gstarttime;
347 starttime = MPI_Wtime();
348 MPI_File_close(&fhin);
349 closetime = MPI_Wtime() - starttime;
350 barrier_after_close(communicators->
work);
352 if (options->verbose) {
354 "timings[%03d] open=%10.6fs read=%10.6fs close=%10.6fs barrier(open=%10.6fs, read=%10.6fs, close=%10.6fs) #chunks=%d bw=%10.4f MB/s ionode=%d (check %d)\n",
355 communicators->all_rank, opentime, readtime, closetime, barr1time, barr2time, barr3time, chunkcnt,
356 options->totalsize / 1024.0 / 1024.0 / readtime, communicators->ionode_number, (fabs(checksum_fp - checksum_read_fp) < 1e-5));
358 collective_print_gather(cbuffer, communicators->
work);
363 if (fabs(checksum_fp - checksum_read_fp) > 1e-5) {
364 fprintf(stderr,
"timings[%03d] ERROR in double checksum %14.10f==%14.10f, diff=%14.10f\n", communicators->all_rank,
365 checksum_fp, checksum_read_fp, checksum_fp - checksum_read_fp);
369 if (options->numfiles >= 1) {
370 MPI_Reduce(&bsumread, &sumsize, 1, SION_MPI_INT64, MPI_SUM, 0, communicators->
local);
371 if (communicators->local_rank == 0) {
372 fprintf(stderr,
"partest result: read %10.4f MB from %s\n", 1.0 * sumsize / 1024.0 / 1024.0, file);
376 DPRINTFTS(communicators->all_rank,
"before red.");
377 MPI_Reduce(&bsumread, &sumsize, 1, SION_MPI_INT64, MPI_SUM, 0, communicators->
work);
378 MPI_Reduce(&opentime, &gopentime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->
work);
379 MPI_Reduce(&closetime, &gclosetime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->
work);
380 DPRINTFTS(communicators->all_rank,
"after red.");
381 if (communicators->work_rank == 0) {
382 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
383 fprintf(stderr,
"TOTAL result: open=%10.6fs close=%10.6fs read %10.4f MB read+barrier=%10.6fs br=%10.4f MB/s from %d files\n",
384 gopentime, gclosetime, 1.0 * sumsize / 1024.0 / 1024.0, greadtime, 1.0 * sumsize / 1024.0 / 1024.0 / greadtime, options->numfiles);
385 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
389 if(options->unlink_files) {
390 starttime = MPI_Wtime();
391 barrier_before_unlink(communicators->
work);
392 if (communicators->local_rank == 0) {
393 fprintf(stderr,
"partest result: unlink file %s ...\n", file);
396 barrier_after_unlink(communicators->
work);
397 unlinktime = MPI_Wtime() - starttime;
398 if (communicators->local_rank == 0) {
399 fprintf(stderr,
"partest result: ultime=%10.6fs unlink %s\n", unlinktime, file);
404 MPI_Info_free(&info);