13 #define _XOPEN_SOURCE 700
28 #include "partest_util.h"
30 sion_int64 partest_write_chunk_to_sionfile(
int sid,
38 sion_int64 partest_read_chunk_from_sionfile(
int sid,
51 int test_paropen_multi_mpi(
char *filename,
57 double starttime, write_starttime, read_starttime, unlinktime, gunlinktime;
58 double timings[TIMINGS_MAX_NUM],ftimings[TIMINGS_MAX_NUM],gtimings[TIMINGS_MAX_NUM];
59 sion_int64 stats[STATS_MAX_NUM], fstats[STATS_MAX_NUM], gstats[STATS_MAX_NUM];
61 double checksum_fp, checksum_read_fp;
62 int globalrank, sid, i;
63 int chunkcnt, using_hints;
64 sion_int64 rchunksize;
65 sion_int32 rfsblksize;
68 int ser_count,ser_step,ser_done;
71 DPRINTFTS(communicators->all_rank,
"start");
76 if (communicators->work_size == -1) {
82 MPI_Comm_size(communicators->
work, &communicators->work_size);
83 MPI_Comm_rank(communicators->
work, &communicators->work_rank);
84 MPI_Comm_size(communicators->
workread, &communicators->workread_size);
85 MPI_Comm_rank(communicators->
workread, &communicators->workread_rank);
86 fprintf(stderr,
"timings[%06d] entering test_paropen_multi_mpi work=%d of %d workread=%d of %d\n", communicators->all_rank,
87 communicators->work_rank, communicators->work_size,
88 communicators->workread_rank, communicators->workread_size
93 do_debug=(((options->debug && communicators->all_rank == 0)) || ((options->Debug && (communicators->all_rank+1) == communicators->all_size)));
95 for(i=0;i<TIMINGS_MAX_NUM;i++) timings[i]=ftimings[i]=gtimings[i]=0.0;
96 for(i=0;i<STATS_MAX_NUM;i++) stats[i]=fstats[i]=gstats[i]=0;
98 if(options->do_write) {
102 barrier_after_start(communicators->
local);
104 write_starttime = starttime = MPI_Wtime();
105 strcpy(file_mode,
"bw");
106 if(options->use_posix) {
107 strcat(file_mode,
",posix");
109 #if defined(_SION_SIONFWD)
110 else if (options->use_sionfwd) {
111 strcat(file_mode,
",sionfwd");
114 #ifdef _SION_IME_NATIVE
115 else if (options->use_ime_native) {
116 strcat(file_mode,
",ime");
120 strcat(file_mode,
",ansi");
122 if (options->collmsa) {
123 strcat(file_mode,
",collmsa");
126 &options->chunksize, &options->fsblksize, &globalrank, NULL, &newfname);
127 using_hints=sion_using_hints(sid);
128 stats[STATS_WR_NUM_FILES]=options->numfiles;
129 timings[TIMINGS_WR_OPEN] = MPI_Wtime()-starttime;
131 starttime = MPI_Wtime();
132 barrier_after_open(communicators->
local);
133 timings[TIMINGS_WR_OPEN_BARR_FILE] += MPI_Wtime()-starttime;
135 starttime = MPI_Wtime();
136 barrier_after_open(communicators->
work);
137 timings[TIMINGS_WR_OPEN_BARR_GLOBAL] = MPI_Wtime()-starttime;
140 starttime = MPI_Wtime();
141 MPI_Comm_size(communicators->
local, &communicators->local_size);
142 MPI_Comm_rank(communicators->
local, &communicators->local_rank);
144 if(options->serialize_blocknum==-2) sion_startof_transaction_mpi(sid);
145 if(options->serialize_blocknum>0) ser_step=options->serialize_blocknum;
146 else ser_step=communicators->local_size;
148 timings[TIMINGS_WR_WRITE_SYNC] += MPI_Wtime()-starttime;
150 for(ser_count=0;ser_count<communicators->local_size;ser_count+=ser_step) {
151 if ((!ser_done) && communicators->local_rank<(ser_count+ser_step)) {
154 starttime = MPI_Wtime();
155 stats[STATS_BYTES_WR_WROTE]+=partest_write_chunk_to_sionfile(sid,communicators->all_rank,
156 localbuffer,options,do_debug&&options->verbose,
157 &checksum_fp,&chunkcnt);
158 stats[STATS_BYTES_WR_NUM_CHUNKS]+=chunkcnt;
159 timings[TIMINGS_WR_WRITE] += MPI_Wtime()-starttime;
163 starttime = MPI_Wtime();
164 if(options->serialize_blocknum==-2) sion_endof_transaction_mpi(sid);
165 timings[TIMINGS_WR_WRITE_SYNC] += MPI_Wtime()-starttime;
167 starttime = MPI_Wtime();
168 barrier_after_write(communicators->
local);
169 timings[TIMINGS_WR_WRITE_BARR_FILE] += MPI_Wtime()-starttime;
173 starttime = MPI_Wtime();
174 barrier_after_write(communicators->
work);
175 timings[TIMINGS_WR_WRITE_BARR_GLOBAL] = MPI_Wtime()-starttime;
177 starttime = MPI_Wtime();
179 timings[TIMINGS_WR_CLOSE] = MPI_Wtime()-starttime;
181 starttime = MPI_Wtime();
182 barrier_after_close(communicators->
local);
183 timings[TIMINGS_WR_CLOSE_BARR_FILE] = MPI_Wtime()-starttime;
185 starttime = MPI_Wtime();
186 barrier_after_close(communicators->
work);
187 timings[TIMINGS_WR_CLOSE_BARR_GLOBAL] = MPI_Wtime()-starttime;
188 timings[TIMINGS_WR_TOTAL] = MPI_Wtime()-write_starttime;
190 if (timings[TIMINGS_WR_TOTAL] == 0) timings[TIMINGS_WR_TOTAL] = -1;
193 starttime = MPI_Wtime();
194 if ( (communicators->work_size>0) && (communicators->work_rank==0) ) {
195 fprintf(stderr,
"partest filespec: ( ) using_hints = %ld\n", (
long) using_hints);
196 fprintf(stderr,
"partest filespec: ( ) fsblksize = %ld\n", (
long) options->fsblksize);
199 if (options->verbose) {
200 write_timings(
"TASK",TIMINGS_METHOD_WRITE,timings,stats,communicators,options,1);
203 write_timings(
"TASK",TIMINGS_METHOD_WRITE,timings,stats,communicators,options,0);
206 timings[TIMINGS_MSGS] += MPI_Wtime()-starttime;
208 if (options->numfiles > 1) {
210 MPI_Reduce(timings, ftimings, TIMINGS_MAX_NUM, MPI_DOUBLE, MPI_MAX, 0, communicators->
local);
211 MPI_Reduce(stats, fstats, STATS_MAX_NUM, SION_MPI_INT64, MPI_SUM, 0, communicators->
local);
213 if (communicators->local_rank == 0) {
214 write_timings(
"FILE",TIMINGS_METHOD_WRITE,ftimings,fstats,communicators,options,0);
219 MPI_Reduce(timings, gtimings, TIMINGS_MAX_NUM, MPI_DOUBLE, MPI_MAX, 0, communicators->
work);
220 MPI_Reduce(stats, gstats, STATS_MAX_NUM, SION_MPI_INT64, MPI_SUM, 0, communicators->
work);
222 if (communicators->work_rank == 0) {
223 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
224 write_timings(
"TOTAL",TIMINGS_METHOD_WRITE,gtimings,gstats,communicators,options,0);
225 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
228 if(newfname) {free(newfname);newfname=NULL;}
234 barrier_after_close(communicators->
work);
236 if(options->do_read) {
240 for (i = 0; i < ((options->totalsize < options->bufsize) ? options->totalsize : options->bufsize); i++) {
241 localbuffer[i] =
' ';
244 barrier_after_start(communicators->
local);
246 read_starttime = starttime = MPI_Wtime();
247 strcpy(file_mode,
"br");
248 if(options->use_posix) {
249 strcat(file_mode,
",posix");
251 #if defined(_SION_SIONFWD)
252 else if (options->use_sionfwd) {
253 strcat(file_mode,
",sionfwd");
256 #ifdef _SION_IME_NATIVE
257 else if (options->use_ime_native) {
258 strcat(file_mode,
",ime");
262 strcat(file_mode,
",ansi");
264 if (options->collmsa) {
265 strcat(file_mode,
",collmsa");
268 starttime = MPI_Wtime();
269 if (options->collectiveopenforread) {
272 &communicators->
local, &options->chunksize, &options->fsblksize, &globalrank, NULL, &newfname);
273 stats[STATS_RD_NUM_FILES]=options->numfiles;
277 sid =
sion_open_rank(filename, file_mode, &rchunksize, &rfsblksize, &communicators->workread_rank, NULL);
278 stats[STATS_RD_NUM_FILES]=-1;
280 using_hints=sion_using_hints(sid);
281 timings[TIMINGS_RD_OPEN] = MPI_Wtime()-starttime;
283 starttime = MPI_Wtime();
284 barrier_after_open(communicators->
local);
285 timings[TIMINGS_RD_OPEN_BARR_FILE] += MPI_Wtime()-starttime;
287 starttime = MPI_Wtime();
288 barrier_after_open(communicators->
workread);
289 timings[TIMINGS_RD_OPEN_BARR_GLOBAL] = MPI_Wtime()-starttime;
292 starttime = MPI_Wtime();
294 MPI_Comm_size(communicators->
local, &communicators->local_size);
295 MPI_Comm_rank(communicators->
local, &communicators->local_rank);
297 if(options->serialize_blocknum==-2) sion_startof_transaction_mpi(sid);
298 if(options->serialize_blocknum>0) ser_step=options->serialize_blocknum;
299 else ser_step=communicators->local_size;
302 timings[TIMINGS_RD_READ_SYNC] += MPI_Wtime()-starttime;
304 for(ser_count=0;ser_count<communicators->local_size;ser_count+=ser_step) {
306 if ((!ser_done) && communicators->local_rank<(ser_count+ser_step)) {
309 starttime = MPI_Wtime();
310 stats[STATS_BYTES_RD_READ]+=partest_read_chunk_from_sionfile(sid,communicators->all_rank,
311 localbuffer,options,do_debug&&options->verbose,
312 &checksum_read_fp,&chunkcnt);
313 stats[STATS_BYTES_RD_NUM_CHUNKS]+=chunkcnt;
314 timings[TIMINGS_RD_READ] += MPI_Wtime()-starttime;
317 starttime = MPI_Wtime();
318 if(options->serialize_blocknum==-2) sion_endof_transaction_mpi(sid);
319 timings[TIMINGS_RD_READ_SYNC] += MPI_Wtime()-starttime;
321 starttime = MPI_Wtime();
322 barrier_after_read(communicators->
local);
323 timings[TIMINGS_RD_READ_BARR_FILE] += MPI_Wtime()-starttime;
327 starttime = MPI_Wtime();
328 barrier_after_read(communicators->
workread);
329 timings[TIMINGS_RD_READ_BARR_GLOBAL] = MPI_Wtime()-starttime;
332 starttime = MPI_Wtime();
333 if (options->collectiveopenforread) {
339 timings[TIMINGS_RD_CLOSE] = MPI_Wtime()-starttime;
341 starttime = MPI_Wtime();
342 barrier_after_close(communicators->
local);
343 timings[TIMINGS_RD_CLOSE_BARR_FILE] = MPI_Wtime()-starttime;
345 starttime = MPI_Wtime();
346 barrier_after_close(communicators->
workread);
347 timings[TIMINGS_RD_CLOSE_BARR_GLOBAL] = MPI_Wtime()-starttime;
348 timings[TIMINGS_RD_TOTAL] = MPI_Wtime()-read_starttime;
351 starttime = MPI_Wtime();
352 if (timings[TIMINGS_RD_TOTAL] == 0) timings[TIMINGS_RD_TOTAL] = -1;
354 if ( (communicators->work_size>0) && (communicators->workread_rank==0) ) {
355 fprintf(stderr,
"partest filespec: ( ) using_hints = %ld\n", (
long) using_hints);
356 fprintf(stderr,
"partest filespec: ( ) fsblksize = %ld\n", (
long) options->fsblksize);
359 if (options->verbose) {
360 write_timings(
"TASK",TIMINGS_METHOD_READ,timings,stats,communicators,options,1);
364 write_timings(
"TASK",TIMINGS_METHOD_READ,timings,stats,communicators,options,0);
367 timings[TIMINGS_MSGS] += MPI_Wtime()-starttime;
369 if (options->numfiles > 1) {
371 MPI_Reduce(timings, ftimings, TIMINGS_MAX_NUM, MPI_DOUBLE, MPI_MAX, 0, communicators->
local);
372 MPI_Reduce(stats, fstats, STATS_MAX_NUM, SION_MPI_INT64, MPI_SUM, 0, communicators->
local);
374 if (communicators->local_rank == 0) {
375 write_timings(
"FILE",TIMINGS_METHOD_READ,ftimings,fstats,communicators,options,0);
380 MPI_Reduce(timings, gtimings, TIMINGS_MAX_NUM, MPI_DOUBLE, MPI_MAX, 0, communicators->
workread);
381 MPI_Reduce(stats, gstats, STATS_MAX_NUM, SION_MPI_INT64, MPI_SUM, 0, communicators->
workread);
383 DPRINTFTS(communicators->all_rank,
"after red.");
384 if (communicators->workread_rank == 0) {
385 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
386 write_timings(
"TOTAL",TIMINGS_METHOD_READ,gtimings,gstats,communicators,options,0);
387 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
391 if(!options->suppress_checksum) {
392 if (fabs(checksum_fp - checksum_read_fp) > 1e-5) {
393 fprintf(stderr,
"timings[%06d] ERROR in double checksum %14.10f==%14.10f, diff=%14.10f\n", communicators->local_rank,
394 checksum_fp, checksum_read_fp, checksum_fp - checksum_read_fp);
401 if(options->unlink_files) {
402 starttime = MPI_Wtime();
403 barrier_before_unlink(communicators->
workread);
404 if (communicators->local_rank == 0) {
405 fprintf(stderr,
"partest result: unlink file %s ...\n", newfname);
408 barrier_after_unlink(communicators->
workread);
409 unlinktime = MPI_Wtime() - starttime;
410 MPI_Reduce(&unlinktime, &gunlinktime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->
work);
411 if (communicators->work_rank == 0) {
412 fprintf(stderr,
"partest result: ultime=%10.6fs unlink %s\n", gunlinktime, newfname);
416 if(newfname) {free(newfname);newfname=NULL;}
421 sion_int64 partest_write_chunk_to_sionfile(
int sid,
429 sion_int64 bsumwrote;
430 size_t bytes_in_chunk;
431 size_t bwrite, bwrote;
434 sion_int64 bufsize = (sion_int64) options->bufsize;
435 sion_int64 totalsize = (sion_int64) options->totalsize;
436 sion_int64 startoffset = options->startoffset;
446 if(startoffset==0) bwrite = bufsize;
448 bwrite = startoffset; startoffset=0;
454 fprintf(stderr,
"timings[%06d] write %lld bytes\n", rank, (sion_int64) bwrite);
457 bytes_in_chunk+=bwrite;
458 if(bytes_in_chunk>options->chunksize) {
460 bytes_in_chunk=bwrite;
462 if(options->collectivewrite) {
463 bwrote = sion_coll_fwrite_mpi(localbuffer, 1, bwrite, sid);
469 if(!options->suppress_checksum) {
470 for (i = 0; i < bwrote; i++)
471 *checksum_fp += (
double) localbuffer[i];
479 fprintf(stderr,
"timings[%06d] wrote %10lld bytes total: wrote %14lld bytes (%10.4f MB) left %14lld bytes (%10.4f MB)\n", rank,
481 bsumwrote, bsumwrote / 1024.0 / 1024.0,
483 left / 1024.0 / 1024.0 );
490 sion_int64 partest_read_chunk_from_sionfile(
int sid,
495 double *checksum_read_fp,
500 size_t bytes_in_chunk;
501 size_t btoread, bread;
505 sion_int64 bufsize = (sion_int64) options->bufsize;
506 sion_int64 totalsize = (sion_int64) options->totalsize;
507 sion_int64 startoffset = options->startoffset;
509 *checksum_read_fp = 0;
517 while ((left > 0) && (!myfeof)) {
519 if(startoffset==0) btoread = bufsize;
521 btoread = startoffset; startoffset=0;
526 bytes_in_chunk+=btoread;
527 if(bytes_in_chunk>options->chunksize) {
531 if(options->collectiveread) {
532 bread = sion_coll_fread_mpi(localbuffer, 1, btoread, sid);
534 bread =
sion_fread(localbuffer, 1, btoread, sid);
538 if(!options->suppress_checksum) {
539 for (i = 0; i < bread; i++)
540 *checksum_read_fp += (
double) localbuffer[i];
549 fprintf(stderr,
"timings[%06d] read %10lld bytes total: read %14lld bytes (%10.4f MB) left %14lld bytes (%10.4f MB)\n", rank,
551 bsumread, bsumread / 1024.0 / 1024.0,
553 left / 1024.0 / 1024.0 );
size_t sion_fread(void *data, size_t size, size_t nitems, int sid)
Read data from sion file.
size_t sion_fwrite(const void *data, size_t size, size_t nitems, int sid)
Write data to sion file.
int sion_ensure_free_space(int sid, sion_int64 bytes)
Funtion to ensure that enough space is available for writing.
int sion_feof(int sid)
Function that indicates whether the end of file is reached for this task.
int sion_paropen_mpi(const char *fname, const char *file_mode, int *numFiles, MPI_Comm gComm, const MPI_Comm *lComm, sion_int64 *chunksize, sion_int32 *fsblksize, int *globalrank, FILE **fileptr, char **newfname)
Open a sion file using MPI.
int sion_parclose_mpi(int sid)
Close a sion file using MPI.
int sion_open_rank(char *fname, const char *file_mode, sion_int64 *chunksize, sion_int32 *fsblksize, int *rank, FILE **fileptr)
Open a sion file for a specific rank.
int sion_close(int sid)
Close a sion file.