23 #include "partest_util.h" 25 sion_int64 partest_write_chunk_to_sionfile(
int sid,
33 sion_int64 partest_read_chunk_from_sionfile(
int sid,
46 int test_paropen_multi_mpi(
char *filename,
52 double starttime, write_starttime, read_starttime, unlinktime, gunlinktime;
53 double timings[TIMINGS_MAX_NUM],ftimings[TIMINGS_MAX_NUM],gtimings[TIMINGS_MAX_NUM];
54 sion_int64 stats[STATS_MAX_NUM], fstats[STATS_MAX_NUM], gstats[STATS_MAX_NUM];
56 double checksum_fp, checksum_read_fp;
57 int globalrank, sid, i;
58 int chunkcnt, using_hints;
59 sion_int64 rchunksize;
60 sion_int32 rfsblksize;
63 int ser_count,ser_step,ser_done;
66 DPRINTFTS(communicators->all_rank,
"start");
71 if (communicators->work_size == -1) {
77 MPI_Comm_size(communicators->
work, &communicators->work_size);
78 MPI_Comm_rank(communicators->
work, &communicators->work_rank);
79 MPI_Comm_size(communicators->
workread, &communicators->workread_size);
80 MPI_Comm_rank(communicators->
workread, &communicators->workread_rank);
81 fprintf(stderr,
"timings[%06d] entering test_paropen_multi_mpi work=%d of %d workread=%d of %d\n", communicators->all_rank,
82 communicators->work_rank, communicators->work_size,
83 communicators->workread_rank, communicators->workread_size
88 do_debug=(((options->debug && communicators->all_rank == 0)) || ((options->Debug && (communicators->all_rank+1) == communicators->all_size)));
90 for(i=0;i<TIMINGS_MAX_NUM;i++) timings[i]=ftimings[i]=gtimings[i]=0.0;
91 for(i=0;i<STATS_MAX_NUM;i++) stats[i]=fstats[i]=gstats[i]=0;
93 if(options->do_write) {
97 barrier_after_start(communicators->
local);
99 write_starttime = starttime = MPI_Wtime();
100 if(options->use_posix) {
101 file_mode=
"bw,posix";
106 &options->chunksize, &options->fsblksize, &globalrank, NULL, &newfname);
107 using_hints=sion_using_hints(sid);
108 stats[STATS_WR_NUM_FILES]=options->numfiles;
109 timings[TIMINGS_WR_OPEN] = MPI_Wtime()-starttime;
111 starttime = MPI_Wtime();
112 barrier_after_open(communicators->
local);
113 timings[TIMINGS_WR_OPEN_BARR_FILE] += MPI_Wtime()-starttime;
115 starttime = MPI_Wtime();
116 barrier_after_open(communicators->
work);
117 timings[TIMINGS_WR_OPEN_BARR_GLOBAL] = MPI_Wtime()-starttime;
120 starttime = MPI_Wtime();
121 MPI_Comm_size(communicators->
local, &communicators->local_size);
122 MPI_Comm_rank(communicators->
local, &communicators->local_rank);
124 if(options->serialize_blocknum==-2) sion_startof_transaction_mpi(sid);
125 if(options->serialize_blocknum>0) ser_step=options->serialize_blocknum;
126 else ser_step=communicators->local_size;
128 timings[TIMINGS_WR_WRITE_SYNC] += MPI_Wtime()-starttime;
130 for(ser_count=0;ser_count<communicators->local_size;ser_count+=ser_step) {
131 if ((!ser_done) && communicators->local_rank<(ser_count+ser_step)) {
134 starttime = MPI_Wtime();
135 stats[STATS_BYTES_WR_WROTE]+=partest_write_chunk_to_sionfile(sid,communicators->all_rank,
136 localbuffer,options,do_debug&&options->verbose,
137 &checksum_fp,&chunkcnt);
138 stats[STATS_BYTES_WR_NUM_CHUNKS]+=chunkcnt;
139 timings[TIMINGS_WR_WRITE] += MPI_Wtime()-starttime;
143 starttime = MPI_Wtime();
144 if(options->serialize_blocknum==-2) sion_endof_transaction_mpi(sid);
145 timings[TIMINGS_WR_WRITE_SYNC] += MPI_Wtime()-starttime;
147 starttime = MPI_Wtime();
148 barrier_after_write(communicators->
local);
149 timings[TIMINGS_WR_WRITE_BARR_FILE] += MPI_Wtime()-starttime;
153 starttime = MPI_Wtime();
154 barrier_after_write(communicators->
work);
155 timings[TIMINGS_WR_WRITE_BARR_GLOBAL] = MPI_Wtime()-starttime;
157 starttime = MPI_Wtime();
159 timings[TIMINGS_WR_CLOSE] = MPI_Wtime()-starttime;
161 starttime = MPI_Wtime();
162 barrier_after_close(communicators->
local);
163 timings[TIMINGS_WR_CLOSE_BARR_FILE] = MPI_Wtime()-starttime;
165 starttime = MPI_Wtime();
166 barrier_after_close(communicators->
work);
167 timings[TIMINGS_WR_CLOSE_BARR_GLOBAL] = MPI_Wtime()-starttime;
168 timings[TIMINGS_WR_TOTAL] = MPI_Wtime()-write_starttime;
170 if (timings[TIMINGS_WR_TOTAL] == 0) timings[TIMINGS_WR_TOTAL] = -1;
173 starttime = MPI_Wtime();
174 if ( (communicators->work_size>0) && (communicators->work_rank==0) ) {
175 fprintf(stderr,
"partest filespec: ( ) using_hints = %ld\n", (
long) using_hints);
176 fprintf(stderr,
"partest filespec: ( ) fsblksize = %ld\n", (
long) options->fsblksize);
179 if (options->verbose) {
180 write_timings(
"TASK",TIMINGS_METHOD_WRITE,timings,stats,communicators,options,1);
183 write_timings(
"TASK",TIMINGS_METHOD_WRITE,timings,stats,communicators,options,0);
186 timings[TIMINGS_MSGS] += MPI_Wtime()-starttime;
188 if (options->numfiles > 1) {
190 MPI_Reduce(timings, ftimings, TIMINGS_MAX_NUM, MPI_DOUBLE, MPI_MAX, 0, communicators->
local);
191 MPI_Reduce(stats, fstats, STATS_MAX_NUM, SION_MPI_INT64, MPI_SUM, 0, communicators->
local);
193 if (communicators->local_rank == 0) {
194 write_timings(
"FILE",TIMINGS_METHOD_WRITE,ftimings,fstats,communicators,options,0);
199 MPI_Reduce(timings, gtimings, TIMINGS_MAX_NUM, MPI_DOUBLE, MPI_MAX, 0, communicators->
work);
200 MPI_Reduce(stats, gstats, STATS_MAX_NUM, SION_MPI_INT64, MPI_SUM, 0, communicators->
work);
202 if (communicators->work_rank == 0) {
203 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
204 write_timings(
"TOTAL",TIMINGS_METHOD_WRITE,gtimings,gstats,communicators,options,0);
205 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
208 if(newfname) {free(newfname);newfname=NULL;}
214 barrier_after_close(communicators->
work);
216 if(options->do_read) {
220 for (i = 0; i < ((options->totalsize < options->bufsize) ? options->totalsize : options->bufsize); i++) {
221 localbuffer[i] =
' ';
224 barrier_after_start(communicators->
local);
226 read_starttime = starttime = MPI_Wtime();
227 if(options->use_posix) {
228 file_mode=
"br,posix";
233 starttime = MPI_Wtime();
234 if (options->collectiveopenforread) {
237 &communicators->
local, &options->chunksize, &options->fsblksize, &globalrank, NULL, &newfname);
238 stats[STATS_RD_NUM_FILES]=options->numfiles;
242 sid =
sion_open_rank(filename, file_mode, &rchunksize, &rfsblksize, &communicators->workread_rank, NULL);
243 stats[STATS_RD_NUM_FILES]=-1;
245 using_hints=sion_using_hints(sid);
246 timings[TIMINGS_RD_OPEN] = MPI_Wtime()-starttime;
248 starttime = MPI_Wtime();
249 barrier_after_open(communicators->
local);
250 timings[TIMINGS_RD_OPEN_BARR_FILE] += MPI_Wtime()-starttime;
252 starttime = MPI_Wtime();
253 barrier_after_open(communicators->
workread);
254 timings[TIMINGS_RD_OPEN_BARR_GLOBAL] = MPI_Wtime()-starttime;
257 starttime = MPI_Wtime();
259 MPI_Comm_size(communicators->
local, &communicators->local_size);
260 MPI_Comm_rank(communicators->
local, &communicators->local_rank);
262 if(options->serialize_blocknum==-2) sion_startof_transaction_mpi(sid);
263 if(options->serialize_blocknum>0) ser_step=options->serialize_blocknum;
264 else ser_step=communicators->local_size;
267 timings[TIMINGS_RD_READ_SYNC] += MPI_Wtime()-starttime;
269 for(ser_count=0;ser_count<communicators->local_size;ser_count+=ser_step) {
271 if ((!ser_done) && communicators->local_rank<(ser_count+ser_step)) {
274 starttime = MPI_Wtime();
275 stats[STATS_BYTES_RD_READ]+=partest_read_chunk_from_sionfile(sid,communicators->all_rank,
276 localbuffer,options,do_debug&&options->verbose,
277 &checksum_read_fp,&chunkcnt);
278 stats[STATS_BYTES_RD_NUM_CHUNKS]+=chunkcnt;
279 timings[TIMINGS_RD_READ] += MPI_Wtime()-starttime;
282 starttime = MPI_Wtime();
283 if(options->serialize_blocknum==-2) sion_endof_transaction_mpi(sid);
284 timings[TIMINGS_RD_READ_SYNC] += MPI_Wtime()-starttime;
286 starttime = MPI_Wtime();
287 barrier_after_read(communicators->
local);
288 timings[TIMINGS_RD_READ_BARR_FILE] += MPI_Wtime()-starttime;
292 starttime = MPI_Wtime();
293 barrier_after_read(communicators->
workread);
294 timings[TIMINGS_RD_READ_BARR_GLOBAL] = MPI_Wtime()-starttime;
297 starttime = MPI_Wtime();
298 if (options->collectiveopenforread) {
304 timings[TIMINGS_RD_CLOSE] = MPI_Wtime()-starttime;
306 starttime = MPI_Wtime();
307 barrier_after_close(communicators->
local);
308 timings[TIMINGS_RD_CLOSE_BARR_FILE] = MPI_Wtime()-starttime;
310 starttime = MPI_Wtime();
311 barrier_after_close(communicators->
workread);
312 timings[TIMINGS_RD_CLOSE_BARR_GLOBAL] = MPI_Wtime()-starttime;
313 timings[TIMINGS_RD_TOTAL] = MPI_Wtime()-read_starttime;
316 starttime = MPI_Wtime();
317 if (timings[TIMINGS_RD_TOTAL] == 0) timings[TIMINGS_RD_TOTAL] = -1;
319 if ( (communicators->work_size>0) && (communicators->workread_rank==0) ) {
320 fprintf(stderr,
"partest filespec: ( ) using_hints = %ld\n", (
long) using_hints);
321 fprintf(stderr,
"partest filespec: ( ) fsblksize = %ld\n", (
long) options->fsblksize);
324 if (options->verbose) {
325 write_timings(
"TASK",TIMINGS_METHOD_READ,timings,stats,communicators,options,1);
329 write_timings(
"TASK",TIMINGS_METHOD_READ,timings,stats,communicators,options,0);
332 timings[TIMINGS_MSGS] += MPI_Wtime()-starttime;
334 if (options->numfiles > 1) {
336 MPI_Reduce(timings, ftimings, TIMINGS_MAX_NUM, MPI_DOUBLE, MPI_MAX, 0, communicators->
local);
337 MPI_Reduce(stats, fstats, STATS_MAX_NUM, SION_MPI_INT64, MPI_SUM, 0, communicators->
local);
339 if (communicators->local_rank == 0) {
340 write_timings(
"FILE",TIMINGS_METHOD_READ,ftimings,fstats,communicators,options,0);
345 MPI_Reduce(timings, gtimings, TIMINGS_MAX_NUM, MPI_DOUBLE, MPI_MAX, 0, communicators->
workread);
346 MPI_Reduce(stats, gstats, STATS_MAX_NUM, SION_MPI_INT64, MPI_SUM, 0, communicators->
workread);
348 DPRINTFTS(communicators->all_rank,
"after red.");
349 if (communicators->workread_rank == 0) {
350 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
351 write_timings(
"TOTAL",TIMINGS_METHOD_READ,gtimings,gstats,communicators,options,0);
352 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
356 if(!options->suppress_checksum) {
357 if (fabs(checksum_fp - checksum_read_fp) > 1e-5) {
358 fprintf(stderr,
"timings[%06d] ERROR in double checksum %14.10f==%14.10f, diff=%14.10f\n", communicators->local_rank,
359 checksum_fp, checksum_read_fp, checksum_fp - checksum_read_fp);
366 if(options->unlink_files) {
367 starttime = MPI_Wtime();
368 barrier_before_unlink(communicators->
workread);
369 if (communicators->local_rank == 0) {
370 fprintf(stderr,
"partest result: unlink file %s ...\n", newfname);
373 barrier_after_unlink(communicators->
workread);
374 unlinktime = MPI_Wtime() - starttime;
375 MPI_Reduce(&unlinktime, &gunlinktime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->
work);
376 if (communicators->work_rank == 0) {
377 fprintf(stderr,
"partest result: ultime=%10.6fs unlink %s\n", gunlinktime, newfname);
381 if(newfname) {free(newfname);newfname=NULL;}
386 sion_int64 partest_write_chunk_to_sionfile(
int sid,
394 sion_int64 bsumwrote;
395 size_t bytes_in_chunk;
396 size_t bwrite, bwrote;
399 sion_int64 bufsize = (sion_int64) options->bufsize;
400 sion_int64 totalsize = (sion_int64) options->totalsize;
401 sion_int64 startoffset = options->startoffset;
411 if(startoffset==0) bwrite = bufsize;
413 bwrite = startoffset; startoffset=0;
419 fprintf(stderr,
"timings[%06d] write %lld bytes\n", rank, (sion_int64) bwrite);
422 bytes_in_chunk+=bwrite;
423 if(bytes_in_chunk>options->chunksize) {
425 bytes_in_chunk=bwrite;
427 if(options->collectivewrite) {
428 bwrote = sion_coll_fwrite_mpi(localbuffer, 1, bwrite, sid);
434 if(!options->suppress_checksum) {
435 for (i = 0; i < bwrote; i++)
436 *checksum_fp += (
double) localbuffer[i];
444 fprintf(stderr,
"timings[%06d] wrote %10lld bytes total: wrote %14lld bytes (%10.4f MB) left %14lld bytes (%10.4f MB)\n", rank,
446 bsumwrote, bsumwrote / 1024.0 / 1024.0,
448 left / 1024.0 / 1024.0 );
455 sion_int64 partest_read_chunk_from_sionfile(
int sid,
460 double *checksum_read_fp,
465 size_t bytes_in_chunk;
466 size_t btoread, bread;
470 sion_int64 bufsize = (sion_int64) options->bufsize;
471 sion_int64 totalsize = (sion_int64) options->totalsize;
472 sion_int64 startoffset = options->startoffset;
474 *checksum_read_fp = 0;
482 while ((left > 0) && (!myfeof)) {
484 if(startoffset==0) btoread = bufsize;
486 btoread = startoffset; startoffset=0;
491 bytes_in_chunk+=btoread;
492 if(bytes_in_chunk>options->chunksize) {
496 if(options->collectiveread) {
497 bread = sion_coll_fread_mpi(localbuffer, 1, btoread, sid);
499 bread =
sion_fread(localbuffer, 1, btoread, sid);
503 if(!options->suppress_checksum) {
504 for (i = 0; i < bread; i++)
505 *checksum_read_fp += (
double) localbuffer[i];
514 fprintf(stderr,
"timings[%06d] read %10lld bytes total: read %14lld bytes (%10.4f MB) left %14lld bytes (%10.4f MB)\n", rank,
516 bsumread, bsumread / 1024.0 / 1024.0,
518 left / 1024.0 / 1024.0 );
int sion_feof(int sid)
Function that indicates whether the end of file is reached for this task.
int sion_ensure_free_space(int sid, sion_int64 bytes)
Funtion to ensure that enough space is available for writing.
int sion_close(int sid)
Close a sion file.
size_t sion_fwrite(const void *data, size_t size, size_t nitems, int sid)
Write data to sion file.
int sion_open_rank(char *fname, const char *file_mode, sion_int64 *chunksize, sion_int32 *fsblksize, int *rank, FILE **fileptr)
Open a sion file for a specific rank.
int sion_paropen_mpi(const char *fname, const char *file_mode, int *numFiles, MPI_Comm gComm, const MPI_Comm *lComm, sion_int64 *chunksize, sion_int32 *fsblksize, int *globalrank, FILE **fileptr, char **newfname)
Open a sion file using MPI.
size_t sion_fread(void *data, size_t size, size_t nitems, int sid)
Read data from sion file.
int sion_parclose_mpi(int sid)
Close a sion file using MPI.