10 #define _XOPEN_SOURCE 700 25 #include "ompi_partest.h" 44 int test_paropen_multi_ompi(
char *filename,
50 double starttime, gopentime, opentime, unlinktime, gwritetime, writetime, gclosetime, closetime, readtime, greadtime;
51 double barr1time, barr2time, barr3time;
53 double t_writetime, t_readtime, t_opentime, t_closetime;
54 sion_int64 t_bsumread, t_bsumwrote;
57 sion_int64 bsumwrote, sumsize, bsumread;
58 double checksum_fp, checksum_read_fp;
59 int globalrank, sid, i, lstartoffset;
60 size_t bwrite, bwrote, btoread, bread;
62 sion_int64 rchunksize;
63 sion_int32 rfsblksize;
66 char cbuffer[2*MAXCHARLEN];
68 size_t bytes_in_chunk;
70 int ser_count,ser_step,ser_done;
72 DPRINTFTS(communicators->all_rank,
"start");
75 if (communicators->work_size == -1) {
81 MPI_Comm_size(communicators->
work, &communicators->work_size);
82 MPI_Comm_rank(communicators->
work, &communicators->work_rank);
83 MPI_Comm_size(communicators->
workread, &communicators->workread_size);
84 MPI_Comm_rank(communicators->
workread, &communicators->workread_rank);
85 fprintf(stderr,
"timings[%03d,t%03d] entering test_paropen_multi_mpi work=%d of %d workread=%d of %d\n", communicators->all_rank,omp_get_thread_num(),
86 communicators->work_rank, communicators->work_size,
87 communicators->workread_rank, communicators->workread_size
93 DPRINTFTS(communicators->all_rank,
"[W]before open");
94 starttime = MPI_Wtime();
97 sid = sion_paropen_ompi(filename,
"bw", &options->numfiles, communicators->
work, &communicators->
local, &options->chunksize, &options->fsblksize, &globalrank, &fp, &newfname);
98 opentime = MPI_Wtime() - starttime;
99 starttime = MPI_Wtime();
100 barrier_after_open(communicators->
work);
101 barr1time = MPI_Wtime() - starttime;
102 DPRINTFTS(communicators->all_rank,
"[W]after open");
106 MPI_Comm_size(communicators->
local, &communicators->local_size);
107 MPI_Comm_rank(communicators->
local, &communicators->local_rank);
111 if(options->use_posix) {
115 if(options->serialize_blocknum>0) ser_step=options->serialize_blocknum;
116 else ser_step=communicators->local_size;
118 for(ser_count=0;ser_count<communicators->local_size;ser_count+=ser_step) {
119 if ((!ser_done) && communicators->local_rank<(ser_count+ser_step)) {
122 left = options->totalsize;
126 lstartoffset=options->startoffset;
129 if(lstartoffset==0) bwrite = options->bufsize;
131 bwrite = lstartoffset; lstartoffset=0;
133 if (bwrite > left) bwrite = left;
137 if (((options->debug && communicators->all_rank == 0)) || ((options->Debug && communicators->all_rank == communicators->all_size))) {
138 fprintf(stderr,
"timings[%03d,t%03d] write %lld bytes\n", communicators->all_rank,omp_get_thread_num(), (sion_int64) bwrite);
143 bytes_in_chunk+=bwrite;
144 if(bytes_in_chunk>options->chunksize) {
146 bytes_in_chunk=bwrite;
149 if(options->use_posix) {
150 bwrote = write(fd, localbuffer, 1*bwrite);
152 bwrote = fwrite(localbuffer, 1, bwrite, fp);
156 if(!options->suppress_checksum) {
158 for (i = 0; i < bwrote; i++)
159 checksum_fp += (
double) localbuffer[i];
168 if (((options->debug && communicators->all_rank == 0)) || ((options->Debug && communicators->all_rank == communicators->all_size))) {
169 fprintf(stderr,
"timings[%03d,t%03d] wrote (%lld bytes) %lld bytes (%10.4f MB) (%lld left)\n", communicators->all_rank,omp_get_thread_num(),(sion_int64) bwrote,
170 bsumwrote, bsumwrote / 1024.0 / 1024.0, (sion_int64) left);
171 fprintf(stderr,
"timings[%03d,t%03d] after write position in file= %lld \n", communicators->all_rank,omp_get_thread_num(),
sion_get_position(sid));
179 writetime = MPI_Wtime() - starttime;
181 starttime = MPI_Wtime();
182 barrier_after_write(communicators->
work);
183 barr2time = MPI_Wtime() - starttime;
185 starttime = MPI_Wtime();
186 sion_parclose_ompi(sid);
187 closetime = MPI_Wtime() - starttime;
188 DPRINTFTS(communicators->all_rank,
"[W]before close");
189 starttime = MPI_Wtime();
190 barrier_after_close(communicators->
work);
191 barr3time = MPI_Wtime() - starttime;
192 DPRINTFTS(communicators->all_rank,
"[W]after close");
194 if (writetime == 0) writetime = -1;
196 if (options->verbose) {
198 "timings[%03d,t%03d] open=%10.6fs write=%10.6fs close=%10.6fs barrier(open=%10.6fs, write=%10.6fs, close=%10.6fs) #chunks=%d bw=%10.4f MB/s ionode=%d\n",
199 communicators->all_rank,omp_get_thread_num(), opentime, writetime, closetime, barr1time, barr2time, barr3time, chunkcnt,
200 options->totalsize / 1024.0 / 1024.0 / writetime, communicators->ionode_number);
201 collective_print_gather(cbuffer, communicators->
work);
205 if (options->numfiles >= 1) {
206 reduce_omp(&bsumwrote,&t_bsumwrote,MPI_SUM,_PARTEST_SION_INT64);
210 MPI_Reduce(&t_bsumwrote, &sumsize, 1, SION_MPI_INT64, MPI_SUM, 0, communicators->
local);
211 if (communicators->local_rank == 0) {
212 fprintf(stderr,
"partest result: local totalsize=%10.4f MB wrote %10.4f MB to %s all_rank=%d\n", options->totalsize / 1024.0 / 1024.0,
213 1.0 * sumsize / 1024.0 / 1024.0, newfname, communicators->all_rank);
219 DPRINTFTS(communicators->all_rank,
"before red.");
220 reduce_omp(&bsumwrote,&t_bsumwrote,MPI_SUM,_PARTEST_SION_INT64);
221 reduce_omp(&opentime,&t_opentime,MPI_MAX,_PARTEST_DOUBLE);
222 reduce_omp(&closetime,&t_closetime,MPI_MAX,_PARTEST_DOUBLE);
223 reduce_omp(&writetime,&t_writetime,MPI_MAX,_PARTEST_DOUBLE);
227 MPI_Reduce(&t_bsumwrote, &sumsize, 1, SION_MPI_INT64, MPI_SUM, 0, communicators->
work);
228 MPI_Reduce(&t_opentime, &gopentime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->
work);
229 MPI_Reduce(&t_closetime, &gclosetime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->
work);
230 MPI_Reduce(&t_writetime, &gwritetime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->
work);
234 DPRINTFTS(communicators->all_rank,
"after red.");
237 if (communicators->work_rank == 0) {
238 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
239 fprintf(stderr,
"TOTAL result: open=%10.6fs close=%10.6fs wrote %10.4f MB write=%10.6fs bw=%10.4f MB/s to %d files\n",
240 gopentime, gclosetime, 1.0 * sumsize / 1024.0 / 1024.0, gwritetime, 1.0 * sumsize / 1024.0 / 1024.0 / gwritetime, options->numfiles);
241 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
243 if (communicators->work_rank == 0)
244 fprintf(stderr,
"*********************************************************************************************\n");
251 for (i = 0; i < ((options->totalsize < options->bufsize) ? options->totalsize : options->bufsize); i++) {
252 localbuffer[i] =
' ';
255 DPRINTFTS(communicators->all_rank,
"[R]before open");
256 starttime = MPI_Wtime();
257 if (options->collectiveopenforread) {
259 sid = sion_paropen_ompi(filename,
260 "br", &options->numfiles, communicators->
workread, &communicators->
local, &options->chunksize, &options->fsblksize, &globalrank, &fp, &newfname);
262 MPI_Comm_size(communicators->
local, &communicators->local_size);
263 MPI_Comm_rank(communicators->
local, &communicators->local_rank);
268 sid =
sion_open_rank(filename,
"br", &rchunksize, &rfsblksize, &communicators->workread_rank, &fp);
270 opentime = MPI_Wtime() - starttime;
272 starttime = MPI_Wtime();
273 barrier_after_open(communicators->
workread);
274 barr1time = MPI_Wtime() - starttime;
275 DPRINTFTS(communicators->all_rank,
"[R]after open");
277 if(options->use_posix) {
281 if(options->serialize_blocknum>0) ser_step=options->serialize_blocknum;
282 else ser_step=communicators->local_size;
285 for(ser_count=0;ser_count<communicators->local_size;ser_count+=ser_step) {
287 if ((!ser_done) && communicators->local_rank<(ser_count+ser_step)) {
291 checksum_read_fp = 0;
292 left = options->totalsize;
296 lstartoffset=options->startoffset;
299 while ((left > 0) && (!myfeof)) {
301 if(lstartoffset==0) btoread = options->bufsize;
303 btoread = lstartoffset; lstartoffset=0;
308 bytes_in_chunk+=btoread;
309 if(bytes_in_chunk>options->chunksize) {
313 if(options->use_posix) {
314 bread = read(fd, localbuffer, 1*btoread);
316 bread = fread(localbuffer, 1, btoread, fp);
321 if(!options->suppress_checksum) {
322 checksum_read_fp=0.0;
323 for (i = 0; i < bread; i++)
324 checksum_read_fp += (
double) localbuffer[i];
333 if (((options->debug && communicators->all_rank == 0)) || ((options->Debug && communicators->all_rank == communicators->all_size))) {
334 fprintf(stderr,
"timings[%03d,t%03d] read (%lld bytes) %lld bytes (%10.4f MB) (%lld left)\n",
335 communicators->all_rank,omp_get_thread_num(), (sion_int64) bread, bsumread, bsumread / 1024.0 / 1024.0, (sion_int64) left);
336 fprintf(stderr,
"timings[%03d,t%03d] after read position in file= %lld restinblock=%lld\n",
349 readtime = MPI_Wtime() - starttime;
351 starttime= MPI_Wtime();
352 barrier_after_read(communicators->
workread);
353 barr2time = MPI_Wtime() - starttime;
355 starttime = MPI_Wtime();
356 if (options->collectiveopenforread) {
357 sion_parclose_ompi(sid);
362 closetime = MPI_Wtime() - starttime;
363 DPRINTFTS(communicators->all_rank,
"[R]before close");
364 barrier_after_close(communicators->
workread);
365 DPRINTFTS(communicators->all_rank,
"[R]after close");
369 if (options->verbose) {
371 "timings[%03d,t%03d] open=%10.6fs read=%10.6fs close=%10.6fs barrier(open=%10.6fs, read=%10.6fs, close=%10.6fs) #chunks=%d br=%10.4f MB/s ionode=%d (check %d)\n",
372 communicators->all_rank,omp_get_thread_num(), opentime, readtime, closetime, barr1time, barr2time, barr3time, chunkcnt,
373 options->totalsize / 1024.0 / 1024.0 / readtime, communicators->ionode_number, (fabs(checksum_fp - checksum_read_fp) < 1e-5));
375 collective_print_gather(cbuffer, communicators->
workread);
380 if(!options->suppress_checksum) {
381 if (fabs(checksum_fp - checksum_read_fp) > 1e-5) {
382 fprintf(stderr,
"timings[%03d,t%03d] ERROR in double checksum %14.10f==%14.10f, diff=%14.10f\n", communicators->local_rank,omp_get_thread_num(),
383 checksum_fp, checksum_read_fp, checksum_fp - checksum_read_fp);
388 if (options->numfiles >= 1) {
389 reduce_omp(&bsumread,&t_bsumread,MPI_SUM,_PARTEST_SION_INT64);
392 MPI_Reduce(&t_bsumread, &sumsize, 1, SION_MPI_INT64, MPI_SUM, 0, communicators->
local);
393 if (communicators->local_rank == 0) {
394 fprintf(stderr,
"partest result: read %10.4f MB from %s\n", 1.0 * sumsize / 1024.0 / 1024.0, newfname);
400 DPRINTFTS(communicators->all_rank,
"before red.");
401 reduce_omp(&bsumread,&t_bsumread,MPI_SUM,_PARTEST_SION_INT64);
402 reduce_omp(&opentime,&t_opentime,MPI_MAX,_PARTEST_DOUBLE);
403 reduce_omp(&closetime,&t_closetime,MPI_MAX,_PARTEST_DOUBLE);
404 reduce_omp(&readtime,&t_readtime,MPI_MAX,_PARTEST_DOUBLE);
407 MPI_Reduce(&t_bsumread, &sumsize, 1, SION_MPI_INT64, MPI_SUM, 0, communicators->
workread);
408 MPI_Reduce(&t_opentime, &gopentime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->
workread);
409 MPI_Reduce(&t_closetime, &gclosetime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->
workread);
410 MPI_Reduce(&t_readtime, &greadtime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->
workread);
413 DPRINTFTS(communicators->all_rank,
"after red.");
416 if (communicators->workread_rank == 0) {
417 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
418 fprintf(stderr,
"TOTAL result: open=%10.6fs close=%10.6fs read %10.4f MB read=%10.6fs br=%10.4f MB/s from %d files\n",
419 gopentime, gclosetime, 1.0 * sumsize / 1024.0 / 1024.0, greadtime, 1.0 * sumsize / 1024.0 / 1024.0 / greadtime, options->numfiles);
420 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
425 if(options->unlink_files) {
426 starttime = MPI_Wtime();
427 barrier_before_unlink(communicators->
workread);
430 if (communicators->local_rank == 0) {
431 fprintf(stderr,
"partest result: unlink file %s ...\n", newfname);
436 barrier_after_unlink(communicators->
workread);
437 unlinktime = MPI_Wtime() - starttime;
440 if (communicators->local_rank == 0) {
441 fprintf(stderr,
"partest result: ultime=%10.6fs unlink %s\n", unlinktime, newfname);
int sion_feof(int sid)
Function that indicates whether the end of file is reached for this task.
sion_int64 sion_bytes_avail_in_block(int sid)
Return the number of bytes available in the current chunk.
int sion_ensure_free_space(int sid, sion_int64 bytes)
Funtion to ensure that enough space is available for writing.
sion_int64 sion_get_position(int sid)
Function that returns the current file position.
int sion_close(int sid)
Close a sion file.
int sion_open_rank(char *fname, const char *file_mode, sion_int64 *chunksize, sion_int32 *fsblksize, int *rank, FILE **fileptr)
Open a sion file for a specific rank.