23 #include "partest_util.h" 25 int test_mpiio_multi_mpi(
char *filename,
36 double gstarttime, starttime, gopentime, opentime, unlinktime, gwritetime, writetime, gclosetime, closetime, readtime, greadtime;
37 double barr1time, barr2time, barr3time;
40 sion_int64 bsumwrote, sumsize, bsumread;
41 char cbuffer[MAXCHARLEN];
42 int bwrite, bwrote, btoread, bread, chunkcnt;
43 double checksum_fp, checksum_read_fp;
51 MPI_Info info = MPI_INFO_NULL;
53 int ser_count,ser_step,ser_done;
57 if (communicators->work_size == -1) {
61 if (options->mpiio_lb >= 0) {
67 if (info_created == 0) {
68 MPI_Info_create(&info);
72 if (options->mpiio_lb == 0) {
73 if (communicators->work_rank == 0)
74 fprintf(stderr,
"partest[%03d] set IBM_largeblock_io to false\n", communicators->work_rank);
75 MPI_Info_set(info,
"IBM_largeblock_io",
"false");
78 if (communicators->work_rank == 0)
79 fprintf(stderr,
"partest[%03d] set IBM_largeblock_io to true\n", communicators->work_rank);
80 MPI_Info_set(info,
"IBM_largeblock_io",
"true");
84 if (options->mpiio_bs >= 0) {
86 if (info_created == 0) {
87 MPI_Info_create(&info);
90 sprintf(option,
"%dKB", options->mpiio_bs);
91 if (communicators->work_rank == 0)
92 fprintf(stderr,
"partest[%03d] set IBM_io_buffer_size to %s\n", communicators->work_rank, option);
93 MPI_Info_set(info,
"IBM_io_buffer_size", option);
96 if (options->mpiio_sa >= 0) {
97 if (info_created == 0) {
98 MPI_Info_create(&info);
101 if (options->mpiio_sa == 0) {
102 if (communicators->work_rank == 0)
103 fprintf(stderr,
"partest[%03d] set IBM_sparse_access to false\n", communicators->work_rank);
104 MPI_Info_set(info,
"IBM_sparse_access",
"false");
107 if (communicators->work_rank == 0)
108 fprintf(stderr,
"partest[%03d] set IBM_sparse_access to true\n", communicators->work_rank);
109 MPI_Info_set(info,
"IBM_sparse_access",
"true");
113 sprintf(file,
"%s.%06d", filename, communicators->file_number);
115 DPRINTFTS(communicators->all_rank,
"start");
119 DPRINTFTS(communicators->all_rank,
"[W]before open");
120 starttime = MPI_Wtime();
121 ioflags = MPI_MODE_CREATE | MPI_MODE_WRONLY;
122 if ((err = MPI_File_open(communicators->
local, file, ioflags, info, &fh)) != MPI_SUCCESS) {
124 MPI_Error_string(err, msg, &lg);
125 fprintf(stderr,
"[%04d]: MPI_File5A_open [%s]: impossible to open %s\n", communicators->all_rank, msg, file);
126 MPI_Abort(communicators->
all, 1);
129 opentime = MPI_Wtime() - starttime;
131 offset = communicators->local_rank * options->totalsize;
140 starttime = MPI_Wtime();
141 barrier_after_open(communicators->
work);
142 barr1time = MPI_Wtime() - starttime;
143 DPRINTFTS(communicators->all_rank,
"[W]after open");
144 gstarttime = starttime = MPI_Wtime();
146 if(options->serialize_blocknum>0) ser_step=options->serialize_blocknum;
147 else ser_step=communicators->local_size;
149 for(ser_count=0;ser_count<communicators->local_size;ser_count+=ser_step) {
150 if ((!ser_done) && communicators->local_rank<(ser_count+ser_step)) {
155 if ((err = MPI_File_seek(fh, offset, MPI_SEEK_SET)) != MPI_SUCCESS) {
157 MPI_Error_string(err, msg, &lg);
158 fprintf(stderr,
"[%04d]: MPI_File_seek [%s] : cannot seek in %s\n", communicators->all_rank, msg, file);
159 MPI_Abort(communicators->
all, 1);
164 left = options->totalsize;
170 bwrite = (int) options->bufsize;
174 if (((options->debug && communicators->all_rank == 0)) || ((options->Debug && communicators->all_rank == communicators->all_size))) {
175 fprintf(stderr,
"timings[%03d] write %lld bytes\n", communicators->all_rank, (sion_int64) bwrite);
178 if ((err = MPI_File_write(fh, ptr, bwrite, MPI_CHAR, &status)) != MPI_SUCCESS) {
180 MPI_Error_string(err, msg, &lg);
181 fprintf(stderr,
"[%04d]: MPI_File_write [%s] : impossible to write in %s\n", communicators->all_rank, msg, file);
182 MPI_Abort(communicators->
all, 1);
186 MPI_Get_count(&status, MPI_CHAR, &bwrote);
187 left -= (sion_int64) bwrote;
188 bsumwrote += (sion_int64) bwrote;
193 for (i = 0; i < bwrote; i++)
194 checksum_fp += (
double) localbuffer[i];
197 if (((options->debug && communicators->all_rank == 0)) || ((options->Debug && communicators->all_rank == communicators->all_size))) {
198 fprintf(stderr,
"timings[%03d] wrote (%lld bytes) %lld bytes (%10.4f MB) (%lld left)\n",
199 communicators->all_rank, (sion_int64) bwrote, bsumwrote, bsumwrote / 1024.0 / 1024.0, (sion_int64) left);
200 MPI_File_get_position(fh, &offset);
201 fprintf(stderr,
"timings[%03d] after write position in file= %lld \n", communicators->all_rank, offset);
206 barrier_after_write(communicators->
local);
211 writetime = MPI_Wtime() - starttime;
212 starttime = MPI_Wtime();
213 barrier_after_write(communicators->
work);
214 barr2time = MPI_Wtime() - starttime;
215 gwritetime = MPI_Wtime() - gstarttime;
216 starttime = MPI_Wtime();
218 closetime = MPI_Wtime() - starttime;
219 DPRINTFTS(communicators->all_rank,
"[W]before close");
220 starttime = MPI_Wtime();
221 barrier_after_close(communicators->
work);
222 barr3time = MPI_Wtime() - starttime;
223 DPRINTFTS(communicators->all_rank,
"[W]after close");
225 if (options->verbose) {
227 "timings[%03d] open=%10.6fs write=%10.6fs close=%10.6fs barrier(open=%10.6fs, write=%10.6fs, close=%10.6fs) #chunks=%d bw=%10.4f MB/s ionode=%d\n",
228 communicators->all_rank, opentime, writetime, closetime, barr1time, barr2time, barr3time, chunkcnt,
229 options->totalsize / 1024.0 / 1024.0 / writetime, communicators->ionode_number);
230 collective_print_gather(cbuffer, communicators->
work);
233 if (options->numfiles >= 1) {
234 MPI_Reduce(&bsumwrote, &sumsize, 1, SION_MPI_INT64, MPI_SUM, 0, communicators->
local);
235 if (communicators->local_rank == 0) {
236 fprintf(stderr,
"partest result: wrote %10.4f MB to %s\n", 1.0 * sumsize / 1024.0 / 1024.0, file);
240 DPRINTFTS(communicators->all_rank,
"before red.");
241 MPI_Reduce(&bsumwrote, &sumsize, 1, SION_MPI_INT64, MPI_SUM, 0, communicators->
work);
242 MPI_Reduce(&opentime, &gopentime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->
work);
243 MPI_Reduce(&closetime, &gclosetime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->
work);
245 DPRINTFTS(communicators->all_rank,
"after red.");
246 if (communicators->work_rank == 0) {
247 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
248 fprintf(stderr,
"TOTAL result: open=%10.6fs close=%10.6fs wrote %10.4f MB write+barrier=%10.6fs bw=%10.4f MB/s to %d files\n",
249 gopentime, gclosetime, 1.0 * sumsize / 1024.0 / 1024.0, gwritetime, 1.0 * sumsize / 1024.0 / 1024.0 / gwritetime, options->numfiles);
250 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
253 if (communicators->work_rank == 0)
254 fprintf(stderr,
"*********************************************************************************************\n");
258 DPRINTFTS(communicators->all_rank,
"[R]before open");
259 starttime = MPI_Wtime();
262 ioflags = MPI_MODE_RDONLY;
264 if ((err = MPI_File_open(communicators->
local, file, ioflags, info, &fhin)) != MPI_SUCCESS) {
266 MPI_Error_string(err, msg, &lg);
267 fprintf(stderr,
"[%04d]: MPI_File_open [%s]: impossible to open %s\n", communicators->all_rank, msg, file);
268 MPI_Abort(communicators->
all, 1);
271 opentime = MPI_Wtime() - starttime;
273 starttime = MPI_Wtime();
274 barrier_after_open(communicators->
work);
275 barr1time = MPI_Wtime() - starttime;
276 DPRINTFTS(communicators->all_rank,
"[R]after open");
277 gstarttime = starttime = MPI_Wtime();
279 if(options->serialize_blocknum>0) ser_step=options->serialize_blocknum;
280 else ser_step=communicators->local_size;
282 for(ser_count=0;ser_count<communicators->local_size;ser_count+=ser_step) {
284 if ((!ser_done) && communicators->local_rank<(ser_count+ser_step)) {
289 offset = communicators->local_rank * options->totalsize;
290 if ((err = MPI_File_seek(fhin, offset, MPI_SEEK_SET)) != MPI_SUCCESS) {
292 MPI_Error_string(err, msg, &lg);
293 fprintf(stderr,
"[%04d]: MPI_File_seek [%s] : cannot seek in %s\n", communicators->all_rank, msg, file);
294 MPI_Abort(communicators->
all, 1);
299 left = options->totalsize;
304 btoread = options->bufsize;
308 if ((err = MPI_File_read(fhin, ptr, btoread, MPI_CHAR, &status)) != MPI_SUCCESS) {
310 MPI_Error_string(err, msg, &lg);
311 fprintf(stderr,
"[%04d]: MPI_File_read [%s]: impossible to read in %s\n", communicators->all_rank, msg, file);
312 MPI_Abort(communicators->
all, 1);
316 MPI_Get_count(&status, MPI_CHAR, &bread);
322 checksum_read_fp=0.0;
323 for (i = 0; i < bread; i++)
324 checksum_read_fp += (
double) localbuffer[i];
327 if (((options->debug && communicators->all_rank == 0)) || ((options->Debug && communicators->all_rank == communicators->all_size))) {
328 fprintf(stderr,
"timings[%03d] read (%lld bytes) %lld bytes (%10.4f MB) (%lld left)\n", communicators->all_rank, (sion_int64) bread, bsumread,
329 bsumread / 1024.0 / 1024.0, (sion_int64) left);
334 barrier_after_read(communicators->
local);
339 readtime = MPI_Wtime() - starttime;
340 starttime = MPI_Wtime();
341 barrier_after_read(communicators->
work);
342 barr2time = MPI_Wtime() - starttime;
343 greadtime = MPI_Wtime() - gstarttime;
345 starttime = MPI_Wtime();
346 MPI_File_close(&fhin);
347 closetime = MPI_Wtime() - starttime;
348 barrier_after_close(communicators->
work);
350 if (options->verbose) {
352 "timings[%03d] open=%10.6fs read=%10.6fs close=%10.6fs barrier(open=%10.6fs, read=%10.6fs, close=%10.6fs) #chunks=%d bw=%10.4f MB/s ionode=%d (check %d)\n",
353 communicators->all_rank, opentime, readtime, closetime, barr1time, barr2time, barr3time, chunkcnt,
354 options->totalsize / 1024.0 / 1024.0 / readtime, communicators->ionode_number, (fabs(checksum_fp - checksum_read_fp) < 1e-5));
356 collective_print_gather(cbuffer, communicators->
work);
361 if (fabs(checksum_fp - checksum_read_fp) > 1e-5) {
362 fprintf(stderr,
"timings[%03d] ERROR in double checksum %14.10f==%14.10f, diff=%14.10f\n", communicators->all_rank,
363 checksum_fp, checksum_read_fp, checksum_fp - checksum_read_fp);
367 if (options->numfiles >= 1) {
368 MPI_Reduce(&bsumread, &sumsize, 1, SION_MPI_INT64, MPI_SUM, 0, communicators->
local);
369 if (communicators->local_rank == 0) {
370 fprintf(stderr,
"partest result: read %10.4f MB from %s\n", 1.0 * sumsize / 1024.0 / 1024.0, file);
374 DPRINTFTS(communicators->all_rank,
"before red.");
375 MPI_Reduce(&bsumread, &sumsize, 1, SION_MPI_INT64, MPI_SUM, 0, communicators->
work);
376 MPI_Reduce(&opentime, &gopentime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->
work);
377 MPI_Reduce(&closetime, &gclosetime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->
work);
378 DPRINTFTS(communicators->all_rank,
"after red.");
379 if (communicators->work_rank == 0) {
380 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
381 fprintf(stderr,
"TOTAL result: open=%10.6fs close=%10.6fs read %10.4f MB read+barrier=%10.6fs br=%10.4f MB/s from %d files\n",
382 gopentime, gclosetime, 1.0 * sumsize / 1024.0 / 1024.0, greadtime, 1.0 * sumsize / 1024.0 / 1024.0 / greadtime, options->numfiles);
383 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
387 if(options->unlink_files) {
388 starttime = MPI_Wtime();
389 barrier_before_unlink(communicators->
work);
390 if (communicators->local_rank == 0) {
391 fprintf(stderr,
"partest result: unlink file %s ...\n", file);
394 barrier_after_unlink(communicators->
work);
395 unlinktime = MPI_Wtime() - starttime;
396 if (communicators->local_rank == 0) {
397 fprintf(stderr,
"partest result: ultime=%10.6fs unlink %s\n", unlinktime, file);
402 MPI_Info_free(&info);