23 #include "partest_util.h" 25 sion_int64 partest_write_chunk_to_sionfile(
int sid,
33 sion_int64 partest_read_chunk_from_sionfile(
int sid,
46 int test_paropen_multi_mpi(
char *filename,
52 double starttime, write_starttime, read_starttime, unlinktime, gunlinktime;
53 double timings[TIMINGS_MAX_NUM],ftimings[TIMINGS_MAX_NUM],gtimings[TIMINGS_MAX_NUM];
54 sion_int64 stats[STATS_MAX_NUM], fstats[STATS_MAX_NUM], gstats[STATS_MAX_NUM];
56 double checksum_fp, checksum_read_fp;
57 int globalrank, sid, i;
58 int chunkcnt, using_hints;
59 sion_int64 rchunksize;
60 sion_int32 rfsblksize;
63 int ser_count,ser_step,ser_done;
66 DPRINTFTS(communicators->all_rank,
"start");
71 if (communicators->work_size == -1) {
77 MPI_Comm_size(communicators->
work, &communicators->work_size);
78 MPI_Comm_rank(communicators->
work, &communicators->work_rank);
79 MPI_Comm_size(communicators->
workread, &communicators->workread_size);
80 MPI_Comm_rank(communicators->
workread, &communicators->workread_rank);
81 fprintf(stderr,
"timings[%06d] entering test_paropen_multi_mpi work=%d of %d workread=%d of %d\n", communicators->all_rank,
82 communicators->work_rank, communicators->work_size,
83 communicators->workread_rank, communicators->workread_size
88 do_debug=(((options->debug && communicators->all_rank == 0)) || ((options->Debug && (communicators->all_rank+1) == communicators->all_size)));
90 for(i=0;i<TIMINGS_MAX_NUM;i++) timings[i]=ftimings[i]=gtimings[i]=0.0;
91 for(i=0;i<STATS_MAX_NUM;i++) stats[i]=fstats[i]=gstats[i]=0;
93 if(options->do_write) {
97 barrier_after_start(communicators->
local);
99 write_starttime = starttime = MPI_Wtime();
100 if(options->use_posix) {
101 file_mode=
"bw,posix";
106 &options->chunksize, &options->fsblksize, &globalrank, NULL, &newfname);
107 using_hints=sion_using_hints(sid);
108 stats[STATS_WR_NUM_FILES]=options->numfiles;
109 timings[TIMINGS_WR_OPEN] = MPI_Wtime()-starttime;
111 starttime = MPI_Wtime();
112 barrier_after_open(communicators->
local);
113 timings[TIMINGS_WR_OPEN_BARR_FILE] += MPI_Wtime()-starttime;
115 starttime = MPI_Wtime();
116 barrier_after_open(communicators->
work);
117 timings[TIMINGS_WR_OPEN_BARR_GLOBAL] = MPI_Wtime()-starttime;
120 starttime = MPI_Wtime();
121 MPI_Comm_size(communicators->
local, &communicators->local_size);
122 MPI_Comm_rank(communicators->
local, &communicators->local_rank);
124 if(options->serialize_blocknum==-2) sion_startof_transaction_mpi(sid);
125 if(options->serialize_blocknum>0) ser_step=options->serialize_blocknum;
126 else ser_step=communicators->local_size;
128 timings[TIMINGS_WR_WRITE_SYNC] += MPI_Wtime()-starttime;
130 for(ser_count=0;ser_count<communicators->local_size;ser_count+=ser_step) {
131 if ((!ser_done) && communicators->local_rank<(ser_count+ser_step)) {
134 starttime = MPI_Wtime();
135 stats[STATS_BYTES_WR_WROTE]+=partest_write_chunk_to_sionfile(sid,communicators->all_rank,
136 localbuffer,options,do_debug&&options->verbose,
137 &checksum_fp,&chunkcnt);
138 stats[STATS_BYTES_WR_NUM_CHUNKS]+=chunkcnt;
139 timings[TIMINGS_WR_WRITE] += MPI_Wtime()-starttime;
143 starttime = MPI_Wtime();
144 if(options->serialize_blocknum==-2) sion_endof_transaction_mpi(sid);
145 timings[TIMINGS_WR_WRITE_SYNC] += MPI_Wtime()-starttime;
147 starttime = MPI_Wtime();
148 barrier_after_write(communicators->
local);
149 timings[TIMINGS_WR_WRITE_BARR_FILE] += MPI_Wtime()-starttime;
153 starttime = MPI_Wtime();
154 barrier_after_write(communicators->
work);
155 timings[TIMINGS_WR_WRITE_BARR_GLOBAL] = MPI_Wtime()-starttime;
157 starttime = MPI_Wtime();
159 timings[TIMINGS_WR_CLOSE] = MPI_Wtime()-starttime;
161 starttime = MPI_Wtime();
162 barrier_after_close(communicators->
local);
163 timings[TIMINGS_WR_CLOSE_BARR_FILE] = MPI_Wtime()-starttime;
165 starttime = MPI_Wtime();
166 barrier_after_close(communicators->
work);
167 timings[TIMINGS_WR_CLOSE_BARR_GLOBAL] = MPI_Wtime()-starttime;
168 timings[TIMINGS_WR_TOTAL] = MPI_Wtime()-write_starttime;
170 starttime = MPI_Wtime();
172 if (timings[TIMINGS_WR_TOTAL] == 0) timings[TIMINGS_WR_TOTAL] = -1;
175 starttime = MPI_Wtime();
176 if ( (communicators->work_size>0) && (communicators->work_rank==0) ) {
177 fprintf(stderr,
"partest filespec: ( ) using_hints = %ld\n", (
long) using_hints);
178 fprintf(stderr,
"partest filespec: ( ) fsblksize = %ld\n", (
long) options->fsblksize);
181 if (options->verbose) {
182 write_timings(
"TASK",TIMINGS_METHOD_WRITE,timings,stats,communicators,options,1);
185 write_timings(
"TASK",TIMINGS_METHOD_WRITE,timings,stats,communicators,options,0);
188 timings[TIMINGS_MSGS] += MPI_Wtime()-starttime;
190 if (options->numfiles > 1) {
192 MPI_Reduce(timings, ftimings, TIMINGS_MAX_NUM, MPI_DOUBLE, MPI_MAX, 0, communicators->
local);
193 MPI_Reduce(stats, fstats, STATS_MAX_NUM, SION_MPI_INT64, MPI_SUM, 0, communicators->
local);
195 if (communicators->local_rank == 0) {
196 write_timings(
"FILE",TIMINGS_METHOD_WRITE,ftimings,fstats,communicators,options,0);
201 MPI_Reduce(timings, gtimings, TIMINGS_MAX_NUM, MPI_DOUBLE, MPI_MAX, 0, communicators->
work);
202 MPI_Reduce(stats, gstats, STATS_MAX_NUM, SION_MPI_INT64, MPI_SUM, 0, communicators->
work);
204 if (communicators->work_rank == 0) {
205 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
206 write_timings(
"TOTAL",TIMINGS_METHOD_WRITE,gtimings,gstats,communicators,options,0);
207 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
210 if(newfname) {free(newfname);newfname=NULL;}
216 barrier_after_close(communicators->
work);
218 if(options->do_read) {
222 for (i = 0; i < ((options->totalsize < options->bufsize) ? options->totalsize : options->bufsize); i++) {
223 localbuffer[i] =
' ';
226 barrier_after_start(communicators->
local);
228 read_starttime = starttime = MPI_Wtime();
229 if(options->use_posix) {
230 file_mode=
"br,posix";
235 starttime = MPI_Wtime();
236 if (options->collectiveopenforread) {
239 &communicators->
local, &options->chunksize, &options->fsblksize, &globalrank, NULL, &newfname);
240 stats[STATS_RD_NUM_FILES]=options->numfiles;
244 sid =
sion_open_rank(filename, file_mode, &rchunksize, &rfsblksize, &communicators->workread_rank, NULL);
245 stats[STATS_RD_NUM_FILES]=-1;
247 using_hints=sion_using_hints(sid);
248 timings[TIMINGS_RD_OPEN] = MPI_Wtime()-starttime;
250 starttime = MPI_Wtime();
251 barrier_after_open(communicators->
local);
252 timings[TIMINGS_RD_OPEN_BARR_FILE] += MPI_Wtime()-starttime;
254 starttime = MPI_Wtime();
255 barrier_after_open(communicators->
workread);
256 timings[TIMINGS_RD_OPEN_BARR_GLOBAL] = MPI_Wtime()-starttime;
259 starttime = MPI_Wtime();
261 MPI_Comm_size(communicators->
local, &communicators->local_size);
262 MPI_Comm_rank(communicators->
local, &communicators->local_rank);
264 if(options->serialize_blocknum==-2) sion_startof_transaction_mpi(sid);
265 if(options->serialize_blocknum>0) ser_step=options->serialize_blocknum;
266 else ser_step=communicators->local_size;
269 timings[TIMINGS_RD_READ_SYNC] += MPI_Wtime()-starttime;
271 for(ser_count=0;ser_count<communicators->local_size;ser_count+=ser_step) {
273 if ((!ser_done) && communicators->local_rank<(ser_count+ser_step)) {
276 starttime = MPI_Wtime();
277 stats[STATS_BYTES_RD_READ]+=partest_read_chunk_from_sionfile(sid,communicators->all_rank,
278 localbuffer,options,do_debug&&options->verbose,
279 &checksum_read_fp,&chunkcnt);
280 stats[STATS_BYTES_RD_NUM_CHUNKS]+=chunkcnt;
281 timings[TIMINGS_RD_READ] += MPI_Wtime()-starttime;
284 starttime = MPI_Wtime();
285 if(options->serialize_blocknum==-2) sion_endof_transaction_mpi(sid);
286 timings[TIMINGS_RD_READ_SYNC] += MPI_Wtime()-starttime;
288 starttime = MPI_Wtime();
289 barrier_after_read(communicators->
local);
290 timings[TIMINGS_RD_READ_BARR_FILE] += MPI_Wtime()-starttime;
294 starttime = MPI_Wtime();
295 barrier_after_read(communicators->
workread);
296 timings[TIMINGS_RD_READ_BARR_GLOBAL] = MPI_Wtime()-starttime;
299 starttime = MPI_Wtime();
300 if (options->collectiveopenforread) {
306 timings[TIMINGS_RD_CLOSE] = MPI_Wtime()-starttime;
308 starttime = MPI_Wtime();
309 barrier_after_close(communicators->
local);
310 timings[TIMINGS_RD_CLOSE_BARR_FILE] = MPI_Wtime()-starttime;
312 starttime = MPI_Wtime();
313 barrier_after_close(communicators->
workread);
314 timings[TIMINGS_RD_CLOSE_BARR_GLOBAL] = MPI_Wtime()-starttime;
315 timings[TIMINGS_RD_TOTAL] = MPI_Wtime()-read_starttime;
318 starttime = MPI_Wtime();
319 if (timings[TIMINGS_RD_TOTAL] == 0) timings[TIMINGS_RD_TOTAL] = -1;
321 if ( (communicators->work_size>0) && (communicators->workread_rank==0) ) {
322 fprintf(stderr,
"partest filespec: ( ) using_hints = %ld\n", (
long) using_hints);
323 fprintf(stderr,
"partest filespec: ( ) fsblksize = %ld\n", (
long) options->fsblksize);
326 if (options->verbose) {
327 write_timings(
"TASK",TIMINGS_METHOD_READ,timings,stats,communicators,options,1);
331 write_timings(
"TASK",TIMINGS_METHOD_READ,timings,stats,communicators,options,0);
334 timings[TIMINGS_MSGS] += MPI_Wtime()-starttime;
336 if (options->numfiles > 1) {
338 MPI_Reduce(timings, ftimings, TIMINGS_MAX_NUM, MPI_DOUBLE, MPI_MAX, 0, communicators->
local);
339 MPI_Reduce(stats, fstats, STATS_MAX_NUM, SION_MPI_INT64, MPI_SUM, 0, communicators->
local);
341 if (communicators->local_rank == 0) {
342 write_timings(
"FILE",TIMINGS_METHOD_READ,ftimings,fstats,communicators,options,0);
347 MPI_Reduce(timings, gtimings, TIMINGS_MAX_NUM, MPI_DOUBLE, MPI_MAX, 0, communicators->
workread);
348 MPI_Reduce(stats, gstats, STATS_MAX_NUM, SION_MPI_INT64, MPI_SUM, 0, communicators->
workread);
350 DPRINTFTS(communicators->all_rank,
"after red.");
351 if (communicators->workread_rank == 0) {
352 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
353 write_timings(
"TOTAL",TIMINGS_METHOD_READ,gtimings,gstats,communicators,options,0);
354 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
358 if(!options->suppress_checksum) {
359 if (fabs(checksum_fp - checksum_read_fp) > 1e-5) {
360 fprintf(stderr,
"timings[%06d] ERROR in double checksum %14.10f==%14.10f, diff=%14.10f\n", communicators->local_rank,
361 checksum_fp, checksum_read_fp, checksum_fp - checksum_read_fp);
368 if(options->unlink_files) {
369 starttime = MPI_Wtime();
370 barrier_before_unlink(communicators->
workread);
371 if (communicators->local_rank == 0) {
372 fprintf(stderr,
"partest result: unlink file %s ...\n", newfname);
375 barrier_after_unlink(communicators->
workread);
376 unlinktime = MPI_Wtime() - starttime;
377 MPI_Reduce(&unlinktime, &gunlinktime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->
work);
378 if (communicators->work_rank == 0) {
379 fprintf(stderr,
"partest result: ultime=%10.6fs unlink %s\n", gunlinktime, newfname);
383 if(newfname) {free(newfname);newfname=NULL;}
388 sion_int64 partest_write_chunk_to_sionfile(
int sid,
396 sion_int64 bsumwrote;
397 size_t bytes_in_chunk;
398 size_t bwrite, bwrote;
401 sion_int64 bufsize = (sion_int64) options->bufsize;
402 sion_int64 totalsize = (sion_int64) options->totalsize;
403 sion_int64 startoffset = options->startoffset;
413 if(startoffset==0) bwrite = bufsize;
415 bwrite = startoffset; startoffset=0;
421 fprintf(stderr,
"timings[%06d] write %lld bytes\n", rank, (sion_int64) bwrite);
424 bytes_in_chunk+=bwrite;
425 if(bytes_in_chunk>options->chunksize) {
427 bytes_in_chunk=bwrite;
429 if(options->collectivewrite) {
430 bwrote = sion_coll_fwrite_mpi(localbuffer, 1, bwrite, sid);
436 if(!options->suppress_checksum) {
437 for (i = 0; i < bwrote; i++)
438 *checksum_fp += (
double) localbuffer[i];
446 fprintf(stderr,
"timings[%06d] wrote %10lld bytes total: wrote %14lld bytes (%10.4f MB) left %14lld bytes (%10.4f MB)\n", rank,
448 bsumwrote, bsumwrote / 1024.0 / 1024.0,
450 left / 1024.0 / 1024.0 );
457 sion_int64 partest_read_chunk_from_sionfile(
int sid,
462 double *checksum_read_fp,
467 size_t bytes_in_chunk;
468 size_t btoread, bread;
472 sion_int64 bufsize = (sion_int64) options->bufsize;
473 sion_int64 totalsize = (sion_int64) options->totalsize;
474 sion_int64 startoffset = options->startoffset;
476 *checksum_read_fp = 0;
484 while ((left > 0) && (!myfeof)) {
486 if(startoffset==0) btoread = bufsize;
488 btoread = startoffset; startoffset=0;
493 bytes_in_chunk+=btoread;
494 if(bytes_in_chunk>options->chunksize) {
498 if(options->collectiveread) {
499 bread = sion_coll_fread_mpi(localbuffer, 1, btoread, sid);
501 bread =
sion_fread(localbuffer, 1, btoread, sid);
505 if(!options->suppress_checksum) {
506 for (i = 0; i < bread; i++)
507 *checksum_read_fp += (
double) localbuffer[i];
516 fprintf(stderr,
"timings[%06d] read %10lld bytes total: read %14lld bytes (%10.4f MB) left %14lld bytes (%10.4f MB)\n", rank,
518 bsumread, bsumread / 1024.0 / 1024.0,
520 left / 1024.0 / 1024.0 );
int sion_feof(int sid)
Function that indicates whether the end of file is reached for this task.
int sion_ensure_free_space(int sid, sion_int64 bytes)
Funtion to ensure that enough space is available for writing.
int sion_close(int sid)
Close a sion file.
size_t sion_fwrite(const void *data, size_t size, size_t nitems, int sid)
Write data to sion file.
int sion_open_rank(char *fname, const char *file_mode, sion_int64 *chunksize, sion_int32 *fsblksize, int *rank, FILE **fileptr)
Open a sion file for a specific rank.
int sion_paropen_mpi(const char *fname, const char *file_mode, int *numFiles, MPI_Comm gComm, const MPI_Comm *lComm, sion_int64 *chunksize, sion_int32 *fsblksize, int *globalrank, FILE **fileptr, char **newfname)
Open a sion file using MPI.
size_t sion_fread(void *data, size_t size, size_t nitems, int sid)
Read data from sion file.
int sion_parclose_mpi(int sid)
Close a sion file using MPI.