23 #include "ompi_partest.h" 42 int test_paropen_multi_ompi(
char *filename,
48 double starttime, gopentime, opentime, unlinktime, gwritetime, writetime, gclosetime, closetime, readtime, greadtime;
49 double barr1time, barr2time, barr3time;
51 double t_writetime, t_readtime, t_opentime, t_closetime;
52 sion_int64 t_bsumread, t_bsumwrote;
55 sion_int64 bsumwrote, sumsize, bsumread;
56 double checksum_fp, checksum_read_fp;
57 int globalrank, sid, i, lstartoffset;
58 size_t bwrite, bwrote, btoread, bread;
60 sion_int64 rchunksize;
61 sion_int32 rfsblksize;
64 char cbuffer[2*MAXCHARLEN];
66 size_t bytes_in_chunk;
68 int ser_count,ser_step,ser_done;
70 DPRINTFTS(communicators->all_rank,
"start");
73 if (communicators->work_size == -1) {
79 MPI_Comm_size(communicators->
work, &communicators->work_size);
80 MPI_Comm_rank(communicators->
work, &communicators->work_rank);
81 MPI_Comm_size(communicators->
workread, &communicators->workread_size);
82 MPI_Comm_rank(communicators->
workread, &communicators->workread_rank);
83 fprintf(stderr,
"timings[%03d,t%03d] entering test_paropen_multi_mpi work=%d of %d workread=%d of %d\n", communicators->all_rank,omp_get_thread_num(),
84 communicators->work_rank, communicators->work_size,
85 communicators->workread_rank, communicators->workread_size
91 DPRINTFTS(communicators->all_rank,
"[W]before open");
92 starttime = MPI_Wtime();
95 sid = sion_paropen_ompi(filename,
"bw", &options->numfiles, communicators->
work, &communicators->
local, &options->chunksize, &options->fsblksize, &globalrank, &fp, &newfname);
96 opentime = MPI_Wtime() - starttime;
97 starttime = MPI_Wtime();
98 barrier_after_open(communicators->
work);
99 barr1time = MPI_Wtime() - starttime;
100 DPRINTFTS(communicators->all_rank,
"[W]after open");
104 MPI_Comm_size(communicators->
local, &communicators->local_size);
105 MPI_Comm_rank(communicators->
local, &communicators->local_rank);
109 if(options->use_posix) {
113 if(options->serialize_blocknum>0) ser_step=options->serialize_blocknum;
114 else ser_step=communicators->local_size;
116 for(ser_count=0;ser_count<communicators->local_size;ser_count+=ser_step) {
117 if ((!ser_done) && communicators->local_rank<(ser_count+ser_step)) {
120 left = options->totalsize;
124 lstartoffset=options->startoffset;
127 if(lstartoffset==0) bwrite = options->bufsize;
129 bwrite = lstartoffset; lstartoffset=0;
131 if (bwrite > left) bwrite = left;
135 if (((options->debug && communicators->all_rank == 0)) || ((options->Debug && communicators->all_rank == communicators->all_size))) {
136 fprintf(stderr,
"timings[%03d,t%03d] write %lld bytes\n", communicators->all_rank,omp_get_thread_num(), (sion_int64) bwrite);
141 bytes_in_chunk+=bwrite;
142 if(bytes_in_chunk>options->chunksize) {
144 bytes_in_chunk=bwrite;
147 if(options->use_posix) {
148 bwrote = write(fd, localbuffer, 1*bwrite);
150 bwrote = fwrite(localbuffer, 1, bwrite, fp);
154 if(!options->suppress_checksum) {
156 for (i = 0; i < bwrote; i++)
157 checksum_fp += (
double) localbuffer[i];
166 if (((options->debug && communicators->all_rank == 0)) || ((options->Debug && communicators->all_rank == communicators->all_size))) {
167 fprintf(stderr,
"timings[%03d,t%03d] wrote (%lld bytes) %lld bytes (%10.4f MB) (%lld left)\n", communicators->all_rank,omp_get_thread_num(),(sion_int64) bwrote,
168 bsumwrote, bsumwrote / 1024.0 / 1024.0, (sion_int64) left);
169 fprintf(stderr,
"timings[%03d,t%03d] after write position in file= %lld \n", communicators->all_rank,omp_get_thread_num(),
sion_get_position(sid));
177 writetime = MPI_Wtime() - starttime;
179 starttime = MPI_Wtime();
180 barrier_after_write(communicators->
work);
181 barr2time = MPI_Wtime() - starttime;
183 starttime = MPI_Wtime();
184 sion_parclose_ompi(sid);
185 closetime = MPI_Wtime() - starttime;
186 DPRINTFTS(communicators->all_rank,
"[W]before close");
187 starttime = MPI_Wtime();
188 barrier_after_close(communicators->
work);
189 barr3time = MPI_Wtime() - starttime;
190 DPRINTFTS(communicators->all_rank,
"[W]after close");
192 if (writetime == 0) writetime = -1;
194 if (options->verbose) {
196 "timings[%03d,t%03d] open=%10.6fs write=%10.6fs close=%10.6fs barrier(open=%10.6fs, write=%10.6fs, close=%10.6fs) #chunks=%d bw=%10.4f MB/s ionode=%d\n",
197 communicators->all_rank,omp_get_thread_num(), opentime, writetime, closetime, barr1time, barr2time, barr3time, chunkcnt,
198 options->totalsize / 1024.0 / 1024.0 / writetime, communicators->ionode_number);
199 collective_print_gather(cbuffer, communicators->
work);
203 if (options->numfiles >= 1) {
204 reduce_omp(&bsumwrote,&t_bsumwrote,MPI_SUM,_PARTEST_SION_INT64);
208 MPI_Reduce(&t_bsumwrote, &sumsize, 1, SION_MPI_INT64, MPI_SUM, 0, communicators->
local);
209 if (communicators->local_rank == 0) {
210 fprintf(stderr,
"partest result: local totalsize=%10.4f MB wrote %10.4f MB to %s all_rank=%d\n", options->totalsize / 1024.0 / 1024.0,
211 1.0 * sumsize / 1024.0 / 1024.0, newfname, communicators->all_rank);
217 DPRINTFTS(communicators->all_rank,
"before red.");
218 reduce_omp(&bsumwrote,&t_bsumwrote,MPI_SUM,_PARTEST_SION_INT64);
219 reduce_omp(&opentime,&t_opentime,MPI_MAX,_PARTEST_DOUBLE);
220 reduce_omp(&closetime,&t_closetime,MPI_MAX,_PARTEST_DOUBLE);
221 reduce_omp(&writetime,&t_writetime,MPI_MAX,_PARTEST_DOUBLE);
225 MPI_Reduce(&t_bsumwrote, &sumsize, 1, SION_MPI_INT64, MPI_SUM, 0, communicators->
work);
226 MPI_Reduce(&t_opentime, &gopentime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->
work);
227 MPI_Reduce(&t_closetime, &gclosetime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->
work);
228 MPI_Reduce(&t_writetime, &gwritetime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->
work);
232 DPRINTFTS(communicators->all_rank,
"after red.");
235 if (communicators->work_rank == 0) {
236 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
237 fprintf(stderr,
"TOTAL result: open=%10.6fs close=%10.6fs wrote %10.4f MB write=%10.6fs bw=%10.4f MB/s to %d files\n",
238 gopentime, gclosetime, 1.0 * sumsize / 1024.0 / 1024.0, gwritetime, 1.0 * sumsize / 1024.0 / 1024.0 / gwritetime, options->numfiles);
239 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
241 if (communicators->work_rank == 0)
242 fprintf(stderr,
"*********************************************************************************************\n");
249 for (i = 0; i < ((options->totalsize < options->bufsize) ? options->totalsize : options->bufsize); i++) {
250 localbuffer[i] =
' ';
253 DPRINTFTS(communicators->all_rank,
"[R]before open");
254 starttime = MPI_Wtime();
255 if (options->collectiveopenforread) {
257 sid = sion_paropen_ompi(filename,
258 "br", &options->numfiles, communicators->
workread, &communicators->
local, &options->chunksize, &options->fsblksize, &globalrank, &fp, &newfname);
260 MPI_Comm_size(communicators->
local, &communicators->local_size);
261 MPI_Comm_rank(communicators->
local, &communicators->local_rank);
266 sid =
sion_open_rank(filename,
"br", &rchunksize, &rfsblksize, &communicators->workread_rank, &fp);
268 opentime = MPI_Wtime() - starttime;
270 starttime = MPI_Wtime();
271 barrier_after_open(communicators->
workread);
272 barr1time = MPI_Wtime() - starttime;
273 DPRINTFTS(communicators->all_rank,
"[R]after open");
275 if(options->use_posix) {
279 if(options->serialize_blocknum>0) ser_step=options->serialize_blocknum;
280 else ser_step=communicators->local_size;
283 for(ser_count=0;ser_count<communicators->local_size;ser_count+=ser_step) {
285 if ((!ser_done) && communicators->local_rank<(ser_count+ser_step)) {
289 checksum_read_fp = 0;
290 left = options->totalsize;
294 lstartoffset=options->startoffset;
297 while ((left > 0) && (!myfeof)) {
299 if(lstartoffset==0) btoread = options->bufsize;
301 btoread = lstartoffset; lstartoffset=0;
306 bytes_in_chunk+=btoread;
307 if(bytes_in_chunk>options->chunksize) {
311 if(options->use_posix) {
312 bread = read(fd, localbuffer, 1*btoread);
314 bread = fread(localbuffer, 1, btoread, fp);
319 if(!options->suppress_checksum) {
320 checksum_read_fp=0.0;
321 for (i = 0; i < bread; i++)
322 checksum_read_fp += (
double) localbuffer[i];
331 if (((options->debug && communicators->all_rank == 0)) || ((options->Debug && communicators->all_rank == communicators->all_size))) {
332 fprintf(stderr,
"timings[%03d,t%03d] read (%lld bytes) %lld bytes (%10.4f MB) (%lld left)\n",
333 communicators->all_rank,omp_get_thread_num(), (sion_int64) bread, bsumread, bsumread / 1024.0 / 1024.0, (sion_int64) left);
334 fprintf(stderr,
"timings[%03d,t%03d] after read position in file= %lld restinblock=%lld\n",
347 readtime = MPI_Wtime() - starttime;
349 starttime= MPI_Wtime();
350 barrier_after_read(communicators->
workread);
351 barr2time = MPI_Wtime() - starttime;
353 starttime = MPI_Wtime();
354 if (options->collectiveopenforread) {
355 sion_parclose_ompi(sid);
360 closetime = MPI_Wtime() - starttime;
361 DPRINTFTS(communicators->all_rank,
"[R]before close");
362 barrier_after_close(communicators->
workread);
363 DPRINTFTS(communicators->all_rank,
"[R]after close");
367 if (options->verbose) {
369 "timings[%03d,t%03d] open=%10.6fs read=%10.6fs close=%10.6fs barrier(open=%10.6fs, read=%10.6fs, close=%10.6fs) #chunks=%d br=%10.4f MB/s ionode=%d (check %d)\n",
370 communicators->all_rank,omp_get_thread_num(), opentime, readtime, closetime, barr1time, barr2time, barr3time, chunkcnt,
371 options->totalsize / 1024.0 / 1024.0 / readtime, communicators->ionode_number, (fabs(checksum_fp - checksum_read_fp) < 1e-5));
373 collective_print_gather(cbuffer, communicators->
workread);
378 if(!options->suppress_checksum) {
379 if (fabs(checksum_fp - checksum_read_fp) > 1e-5) {
380 fprintf(stderr,
"timings[%03d,t%03d] ERROR in double checksum %14.10f==%14.10f, diff=%14.10f\n", communicators->local_rank,omp_get_thread_num(),
381 checksum_fp, checksum_read_fp, checksum_fp - checksum_read_fp);
386 if (options->numfiles >= 1) {
387 reduce_omp(&bsumread,&t_bsumread,MPI_SUM,_PARTEST_SION_INT64);
390 MPI_Reduce(&t_bsumread, &sumsize, 1, SION_MPI_INT64, MPI_SUM, 0, communicators->
local);
391 if (communicators->local_rank == 0) {
392 fprintf(stderr,
"partest result: read %10.4f MB from %s\n", 1.0 * sumsize / 1024.0 / 1024.0, newfname);
398 DPRINTFTS(communicators->all_rank,
"before red.");
399 reduce_omp(&bsumread,&t_bsumread,MPI_SUM,_PARTEST_SION_INT64);
400 reduce_omp(&opentime,&t_opentime,MPI_MAX,_PARTEST_DOUBLE);
401 reduce_omp(&closetime,&t_closetime,MPI_MAX,_PARTEST_DOUBLE);
402 reduce_omp(&readtime,&t_readtime,MPI_MAX,_PARTEST_DOUBLE);
405 MPI_Reduce(&t_bsumread, &sumsize, 1, SION_MPI_INT64, MPI_SUM, 0, communicators->
workread);
406 MPI_Reduce(&t_opentime, &gopentime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->
workread);
407 MPI_Reduce(&t_closetime, &gclosetime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->
workread);
408 MPI_Reduce(&t_readtime, &greadtime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->
workread);
411 DPRINTFTS(communicators->all_rank,
"after red.");
414 if (communicators->workread_rank == 0) {
415 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
416 fprintf(stderr,
"TOTAL result: open=%10.6fs close=%10.6fs read %10.4f MB read=%10.6fs br=%10.4f MB/s from %d files\n",
417 gopentime, gclosetime, 1.0 * sumsize / 1024.0 / 1024.0, greadtime, 1.0 * sumsize / 1024.0 / 1024.0 / greadtime, options->numfiles);
418 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
423 if(options->unlink_files) {
424 starttime = MPI_Wtime();
425 barrier_before_unlink(communicators->
workread);
428 if (communicators->local_rank == 0) {
429 fprintf(stderr,
"partest result: unlink file %s ...\n", newfname);
434 barrier_after_unlink(communicators->
workread);
435 unlinktime = MPI_Wtime() - starttime;
438 if (communicators->local_rank == 0) {
439 fprintf(stderr,
"partest result: ultime=%10.6fs unlink %s\n", unlinktime, newfname);
int sion_feof(int sid)
Function that indicates whether the end of file is reached for this task.
sion_int64 sion_bytes_avail_in_block(int sid)
Return the number of bytes available in the current chunk.
int sion_ensure_free_space(int sid, sion_int64 bytes)
Funtion to ensure that enough space is available for writing.
sion_int64 sion_get_position(int sid)
Function that returns the current file position.
int sion_close(int sid)
Close a sion file.
int sion_open_rank(char *fname, const char *file_mode, sion_int64 *chunksize, sion_int32 *fsblksize, int *rank, FILE **fileptr)
Open a sion file for a specific rank.