10 #define _XOPEN_SOURCE 700
25 #include "partest_util.h"
27 sion_int64 partest_write_chunk_to_sionfile(
int sid,
35 sion_int64 partest_read_chunk_from_sionfile(
int sid,
48 int test_paropen_multi_mpi(
char *filename,
54 double starttime, write_starttime, read_starttime, unlinktime, gunlinktime;
55 double timings[TIMINGS_MAX_NUM],ftimings[TIMINGS_MAX_NUM],gtimings[TIMINGS_MAX_NUM];
56 sion_int64 stats[STATS_MAX_NUM], fstats[STATS_MAX_NUM], gstats[STATS_MAX_NUM];
58 double checksum_fp, checksum_read_fp;
59 int globalrank, sid, i;
60 int chunkcnt, using_hints;
61 sion_int64 rchunksize;
62 sion_int32 rfsblksize;
65 int ser_count,ser_step,ser_done;
68 DPRINTFTS(communicators->all_rank,
"start");
73 if (communicators->work_size == -1) {
79 MPI_Comm_size(communicators->
work, &communicators->work_size);
80 MPI_Comm_rank(communicators->
work, &communicators->work_rank);
81 MPI_Comm_size(communicators->
workread, &communicators->workread_size);
82 MPI_Comm_rank(communicators->
workread, &communicators->workread_rank);
83 fprintf(stderr,
"timings[%06d] entering test_paropen_multi_mpi work=%d of %d workread=%d of %d\n", communicators->all_rank,
84 communicators->work_rank, communicators->work_size,
85 communicators->workread_rank, communicators->workread_size
90 do_debug=(((options->debug && communicators->all_rank == 0)) || ((options->Debug && (communicators->all_rank+1) == communicators->all_size)));
92 for(i=0;i<TIMINGS_MAX_NUM;i++) timings[i]=ftimings[i]=gtimings[i]=0.0;
93 for(i=0;i<STATS_MAX_NUM;i++) stats[i]=fstats[i]=gstats[i]=0;
95 if(options->do_write) {
99 barrier_after_start(communicators->
local);
101 write_starttime = starttime = MPI_Wtime();
102 strcpy(file_mode,
"bw");
103 if(options->use_posix) {
104 strcat(file_mode,
",posix");
106 strcat(file_mode,
",ansi");
108 if (options->collmsa) {
109 strcat(file_mode,
",collmsa");
112 &options->chunksize, &options->fsblksize, &globalrank, NULL, &newfname);
113 using_hints=sion_using_hints(sid);
114 stats[STATS_WR_NUM_FILES]=options->numfiles;
115 timings[TIMINGS_WR_OPEN] = MPI_Wtime()-starttime;
117 starttime = MPI_Wtime();
118 barrier_after_open(communicators->
local);
119 timings[TIMINGS_WR_OPEN_BARR_FILE] += MPI_Wtime()-starttime;
121 starttime = MPI_Wtime();
122 barrier_after_open(communicators->
work);
123 timings[TIMINGS_WR_OPEN_BARR_GLOBAL] = MPI_Wtime()-starttime;
126 starttime = MPI_Wtime();
127 MPI_Comm_size(communicators->
local, &communicators->local_size);
128 MPI_Comm_rank(communicators->
local, &communicators->local_rank);
130 if(options->serialize_blocknum==-2) sion_startof_transaction_mpi(sid);
131 if(options->serialize_blocknum>0) ser_step=options->serialize_blocknum;
132 else ser_step=communicators->local_size;
134 timings[TIMINGS_WR_WRITE_SYNC] += MPI_Wtime()-starttime;
136 for(ser_count=0;ser_count<communicators->local_size;ser_count+=ser_step) {
137 if ((!ser_done) && communicators->local_rank<(ser_count+ser_step)) {
140 starttime = MPI_Wtime();
141 stats[STATS_BYTES_WR_WROTE]+=partest_write_chunk_to_sionfile(sid,communicators->all_rank,
142 localbuffer,options,do_debug&&options->verbose,
143 &checksum_fp,&chunkcnt);
144 stats[STATS_BYTES_WR_NUM_CHUNKS]+=chunkcnt;
145 timings[TIMINGS_WR_WRITE] += MPI_Wtime()-starttime;
149 starttime = MPI_Wtime();
150 if(options->serialize_blocknum==-2) sion_endof_transaction_mpi(sid);
151 timings[TIMINGS_WR_WRITE_SYNC] += MPI_Wtime()-starttime;
153 starttime = MPI_Wtime();
154 barrier_after_write(communicators->
local);
155 timings[TIMINGS_WR_WRITE_BARR_FILE] += MPI_Wtime()-starttime;
159 starttime = MPI_Wtime();
160 barrier_after_write(communicators->
work);
161 timings[TIMINGS_WR_WRITE_BARR_GLOBAL] = MPI_Wtime()-starttime;
163 starttime = MPI_Wtime();
165 timings[TIMINGS_WR_CLOSE] = MPI_Wtime()-starttime;
167 starttime = MPI_Wtime();
168 barrier_after_close(communicators->
local);
169 timings[TIMINGS_WR_CLOSE_BARR_FILE] = MPI_Wtime()-starttime;
171 starttime = MPI_Wtime();
172 barrier_after_close(communicators->
work);
173 timings[TIMINGS_WR_CLOSE_BARR_GLOBAL] = MPI_Wtime()-starttime;
174 timings[TIMINGS_WR_TOTAL] = MPI_Wtime()-write_starttime;
176 if (timings[TIMINGS_WR_TOTAL] == 0) timings[TIMINGS_WR_TOTAL] = -1;
179 starttime = MPI_Wtime();
180 if ( (communicators->work_size>0) && (communicators->work_rank==0) ) {
181 fprintf(stderr,
"partest filespec: ( ) using_hints = %ld\n", (
long) using_hints);
182 fprintf(stderr,
"partest filespec: ( ) fsblksize = %ld\n", (
long) options->fsblksize);
185 if (options->verbose) {
186 write_timings(
"TASK",TIMINGS_METHOD_WRITE,timings,stats,communicators,options,1);
189 write_timings(
"TASK",TIMINGS_METHOD_WRITE,timings,stats,communicators,options,0);
192 timings[TIMINGS_MSGS] += MPI_Wtime()-starttime;
194 if (options->numfiles > 1) {
196 MPI_Reduce(timings, ftimings, TIMINGS_MAX_NUM, MPI_DOUBLE, MPI_MAX, 0, communicators->
local);
197 MPI_Reduce(stats, fstats, STATS_MAX_NUM, SION_MPI_INT64, MPI_SUM, 0, communicators->
local);
199 if (communicators->local_rank == 0) {
200 write_timings(
"FILE",TIMINGS_METHOD_WRITE,ftimings,fstats,communicators,options,0);
205 MPI_Reduce(timings, gtimings, TIMINGS_MAX_NUM, MPI_DOUBLE, MPI_MAX, 0, communicators->
work);
206 MPI_Reduce(stats, gstats, STATS_MAX_NUM, SION_MPI_INT64, MPI_SUM, 0, communicators->
work);
208 if (communicators->work_rank == 0) {
209 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
210 write_timings(
"TOTAL",TIMINGS_METHOD_WRITE,gtimings,gstats,communicators,options,0);
211 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
214 if(newfname) {free(newfname);newfname=NULL;}
220 barrier_after_close(communicators->
work);
222 if(options->do_read) {
226 for (i = 0; i < ((options->totalsize < options->bufsize) ? options->totalsize : options->bufsize); i++) {
227 localbuffer[i] =
' ';
230 barrier_after_start(communicators->
local);
232 read_starttime = starttime = MPI_Wtime();
233 strcpy(file_mode,
"br");
234 if(options->use_posix) {
235 strcat(file_mode,
",posix");
237 strcat(file_mode,
",ansi");
239 if (options->collmsa) {
240 strcat(file_mode,
",collmsa");
243 starttime = MPI_Wtime();
244 if (options->collectiveopenforread) {
247 &communicators->
local, &options->chunksize, &options->fsblksize, &globalrank, NULL, &newfname);
248 stats[STATS_RD_NUM_FILES]=options->numfiles;
252 sid =
sion_open_rank(filename, file_mode, &rchunksize, &rfsblksize, &communicators->workread_rank, NULL);
253 stats[STATS_RD_NUM_FILES]=-1;
255 using_hints=sion_using_hints(sid);
256 timings[TIMINGS_RD_OPEN] = MPI_Wtime()-starttime;
258 starttime = MPI_Wtime();
259 barrier_after_open(communicators->
local);
260 timings[TIMINGS_RD_OPEN_BARR_FILE] += MPI_Wtime()-starttime;
262 starttime = MPI_Wtime();
263 barrier_after_open(communicators->
workread);
264 timings[TIMINGS_RD_OPEN_BARR_GLOBAL] = MPI_Wtime()-starttime;
267 starttime = MPI_Wtime();
269 MPI_Comm_size(communicators->
local, &communicators->local_size);
270 MPI_Comm_rank(communicators->
local, &communicators->local_rank);
272 if(options->serialize_blocknum==-2) sion_startof_transaction_mpi(sid);
273 if(options->serialize_blocknum>0) ser_step=options->serialize_blocknum;
274 else ser_step=communicators->local_size;
277 timings[TIMINGS_RD_READ_SYNC] += MPI_Wtime()-starttime;
279 for(ser_count=0;ser_count<communicators->local_size;ser_count+=ser_step) {
281 if ((!ser_done) && communicators->local_rank<(ser_count+ser_step)) {
284 starttime = MPI_Wtime();
285 stats[STATS_BYTES_RD_READ]+=partest_read_chunk_from_sionfile(sid,communicators->all_rank,
286 localbuffer,options,do_debug&&options->verbose,
287 &checksum_read_fp,&chunkcnt);
288 stats[STATS_BYTES_RD_NUM_CHUNKS]+=chunkcnt;
289 timings[TIMINGS_RD_READ] += MPI_Wtime()-starttime;
292 starttime = MPI_Wtime();
293 if(options->serialize_blocknum==-2) sion_endof_transaction_mpi(sid);
294 timings[TIMINGS_RD_READ_SYNC] += MPI_Wtime()-starttime;
296 starttime = MPI_Wtime();
297 barrier_after_read(communicators->
local);
298 timings[TIMINGS_RD_READ_BARR_FILE] += MPI_Wtime()-starttime;
302 starttime = MPI_Wtime();
303 barrier_after_read(communicators->
workread);
304 timings[TIMINGS_RD_READ_BARR_GLOBAL] = MPI_Wtime()-starttime;
307 starttime = MPI_Wtime();
308 if (options->collectiveopenforread) {
314 timings[TIMINGS_RD_CLOSE] = MPI_Wtime()-starttime;
316 starttime = MPI_Wtime();
317 barrier_after_close(communicators->
local);
318 timings[TIMINGS_RD_CLOSE_BARR_FILE] = MPI_Wtime()-starttime;
320 starttime = MPI_Wtime();
321 barrier_after_close(communicators->
workread);
322 timings[TIMINGS_RD_CLOSE_BARR_GLOBAL] = MPI_Wtime()-starttime;
323 timings[TIMINGS_RD_TOTAL] = MPI_Wtime()-read_starttime;
326 starttime = MPI_Wtime();
327 if (timings[TIMINGS_RD_TOTAL] == 0) timings[TIMINGS_RD_TOTAL] = -1;
329 if ( (communicators->work_size>0) && (communicators->workread_rank==0) ) {
330 fprintf(stderr,
"partest filespec: ( ) using_hints = %ld\n", (
long) using_hints);
331 fprintf(stderr,
"partest filespec: ( ) fsblksize = %ld\n", (
long) options->fsblksize);
334 if (options->verbose) {
335 write_timings(
"TASK",TIMINGS_METHOD_READ,timings,stats,communicators,options,1);
339 write_timings(
"TASK",TIMINGS_METHOD_READ,timings,stats,communicators,options,0);
342 timings[TIMINGS_MSGS] += MPI_Wtime()-starttime;
344 if (options->numfiles > 1) {
346 MPI_Reduce(timings, ftimings, TIMINGS_MAX_NUM, MPI_DOUBLE, MPI_MAX, 0, communicators->
local);
347 MPI_Reduce(stats, fstats, STATS_MAX_NUM, SION_MPI_INT64, MPI_SUM, 0, communicators->
local);
349 if (communicators->local_rank == 0) {
350 write_timings(
"FILE",TIMINGS_METHOD_READ,ftimings,fstats,communicators,options,0);
355 MPI_Reduce(timings, gtimings, TIMINGS_MAX_NUM, MPI_DOUBLE, MPI_MAX, 0, communicators->
workread);
356 MPI_Reduce(stats, gstats, STATS_MAX_NUM, SION_MPI_INT64, MPI_SUM, 0, communicators->
workread);
358 DPRINTFTS(communicators->all_rank,
"after red.");
359 if (communicators->workread_rank == 0) {
360 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
361 write_timings(
"TOTAL",TIMINGS_METHOD_READ,gtimings,gstats,communicators,options,0);
362 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
366 if(!options->suppress_checksum) {
367 if (fabs(checksum_fp - checksum_read_fp) > 1e-5) {
368 fprintf(stderr,
"timings[%06d] ERROR in double checksum %14.10f==%14.10f, diff=%14.10f\n", communicators->local_rank,
369 checksum_fp, checksum_read_fp, checksum_fp - checksum_read_fp);
376 if(options->unlink_files) {
377 starttime = MPI_Wtime();
378 barrier_before_unlink(communicators->
workread);
379 if (communicators->local_rank == 0) {
380 fprintf(stderr,
"partest result: unlink file %s ...\n", newfname);
383 barrier_after_unlink(communicators->
workread);
384 unlinktime = MPI_Wtime() - starttime;
385 MPI_Reduce(&unlinktime, &gunlinktime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->
work);
386 if (communicators->work_rank == 0) {
387 fprintf(stderr,
"partest result: ultime=%10.6fs unlink %s\n", gunlinktime, newfname);
391 if(newfname) {free(newfname);newfname=NULL;}
396 sion_int64 partest_write_chunk_to_sionfile(
int sid,
404 sion_int64 bsumwrote;
405 size_t bytes_in_chunk;
406 size_t bwrite, bwrote;
409 sion_int64 bufsize = (sion_int64) options->bufsize;
410 sion_int64 totalsize = (sion_int64) options->totalsize;
411 sion_int64 startoffset = options->startoffset;
421 if(startoffset==0) bwrite = bufsize;
423 bwrite = startoffset; startoffset=0;
429 fprintf(stderr,
"timings[%06d] write %lld bytes\n", rank, (sion_int64) bwrite);
432 bytes_in_chunk+=bwrite;
433 if(bytes_in_chunk>options->chunksize) {
435 bytes_in_chunk=bwrite;
437 if(options->collectivewrite) {
438 bwrote = sion_coll_fwrite_mpi(localbuffer, 1, bwrite, sid);
444 if(!options->suppress_checksum) {
445 for (i = 0; i < bwrote; i++)
446 *checksum_fp += (
double) localbuffer[i];
454 fprintf(stderr,
"timings[%06d] wrote %10lld bytes total: wrote %14lld bytes (%10.4f MB) left %14lld bytes (%10.4f MB)\n", rank,
456 bsumwrote, bsumwrote / 1024.0 / 1024.0,
458 left / 1024.0 / 1024.0 );
465 sion_int64 partest_read_chunk_from_sionfile(
int sid,
470 double *checksum_read_fp,
475 size_t bytes_in_chunk;
476 size_t btoread, bread;
480 sion_int64 bufsize = (sion_int64) options->bufsize;
481 sion_int64 totalsize = (sion_int64) options->totalsize;
482 sion_int64 startoffset = options->startoffset;
484 *checksum_read_fp = 0;
492 while ((left > 0) && (!myfeof)) {
494 if(startoffset==0) btoread = bufsize;
496 btoread = startoffset; startoffset=0;
501 bytes_in_chunk+=btoread;
502 if(bytes_in_chunk>options->chunksize) {
506 if(options->collectiveread) {
507 bread = sion_coll_fread_mpi(localbuffer, 1, btoread, sid);
509 bread =
sion_fread(localbuffer, 1, btoread, sid);
513 if(!options->suppress_checksum) {
514 for (i = 0; i < bread; i++)
515 *checksum_read_fp += (
double) localbuffer[i];
524 fprintf(stderr,
"timings[%06d] read %10lld bytes total: read %14lld bytes (%10.4f MB) left %14lld bytes (%10.4f MB)\n", rank,
526 bsumread, bsumread / 1024.0 / 1024.0,
528 left / 1024.0 / 1024.0 );