23 #include "ompi_partest.h"    42 int test_paropen_multi_ompi(
char *filename,
    48   double    starttime, gopentime, opentime, unlinktime, gwritetime, writetime, gclosetime, closetime, readtime, greadtime;
    49   double    barr1time, barr2time, barr3time;
    51   double t_writetime, t_readtime, t_opentime, t_closetime;
    52   sion_int64 t_bsumread, t_bsumwrote;
    55   sion_int64 bsumwrote, sumsize, bsumread;
    56   double    checksum_fp, checksum_read_fp;
    57   int       globalrank, sid, i, lstartoffset;
    58   size_t    bwrite, bwrote, btoread, bread;
    60   sion_int64 rchunksize;
    61   sion_int32 rfsblksize;
    64   char      cbuffer[2*MAXCHARLEN];
    66   size_t    bytes_in_chunk; 
    68   int       ser_count,ser_step,ser_done;
    70    DPRINTFTS(communicators->all_rank, 
"start");
    73   if (communicators->work_size == -1) {
    79     MPI_Comm_size(communicators->
work, &communicators->work_size);
    80     MPI_Comm_rank(communicators->
work, &communicators->work_rank);
    81     MPI_Comm_size(communicators->
workread, &communicators->workread_size);
    82     MPI_Comm_rank(communicators->
workread, &communicators->workread_rank);
    83     fprintf(stderr, 
"timings[%03d,t%03d] entering test_paropen_multi_mpi work=%d of %d workread=%d of %d\n",  communicators->all_rank,omp_get_thread_num(),
    84         communicators->work_rank, communicators->work_size, 
    85         communicators->workread_rank, communicators->workread_size 
    91    DPRINTFTS(communicators->all_rank, 
"[W]before open");
    92    starttime = MPI_Wtime();
    95   sid = sion_paropen_ompi(filename, 
"bw", &options->numfiles, communicators->
work, &communicators->
local, &options->chunksize, &options->fsblksize, &globalrank, &fp, &newfname);
    96    opentime = MPI_Wtime() - starttime;
    97    starttime = MPI_Wtime();
    98   barrier_after_open(communicators->
work);
    99    barr1time = MPI_Wtime() - starttime;
   100    DPRINTFTS(communicators->all_rank, 
"[W]after open");
   104       MPI_Comm_size(communicators->
local, &communicators->local_size);
   105       MPI_Comm_rank(communicators->
local, &communicators->local_rank);
   109   if(options->use_posix) {
   113   if(options->serialize_blocknum>0) ser_step=options->serialize_blocknum;
   114   else                     ser_step=communicators->local_size;
   116   for(ser_count=0;ser_count<communicators->local_size;ser_count+=ser_step) {
   117     if ((!ser_done) && communicators->local_rank<(ser_count+ser_step)) {
   120       left = options->totalsize;
   124       lstartoffset=options->startoffset;
   127             if(lstartoffset==0) bwrite = options->bufsize;
   129               bwrite = lstartoffset; lstartoffset=0;
   131             if (bwrite > left) bwrite = left;
   135                 if (((options->debug && communicators->all_rank == 0)) || ((options->Debug && communicators->all_rank == communicators->all_size))) {
   136                   fprintf(stderr, 
"timings[%03d,t%03d] write %lld bytes\n", communicators->all_rank,omp_get_thread_num(), (sion_int64) bwrite);
   141             bytes_in_chunk+=bwrite;
   142             if(bytes_in_chunk>options->chunksize) {
   144               bytes_in_chunk=bwrite;
   147             if(options->use_posix) {
   148               bwrote = write(fd, localbuffer, 1*bwrite);
   150               bwrote = fwrite(localbuffer, 1, bwrite, fp);
   154                 if(!options->suppress_checksum) {
   156                   for (i = 0; i < bwrote; i++)
   157                     checksum_fp += (
double) localbuffer[i];
   166             if (((options->debug && communicators->all_rank == 0)) || ((options->Debug && communicators->all_rank == communicators->all_size))) {
   167               fprintf(stderr, 
"timings[%03d,t%03d] wrote (%lld bytes) %lld bytes (%10.4f MB) (%lld left)\n", communicators->all_rank,omp_get_thread_num(),(sion_int64) bwrote,
   168                   bsumwrote, bsumwrote / 1024.0 / 1024.0, (sion_int64) left);
   169               fprintf(stderr, 
"timings[%03d,t%03d] after write   position in file= %lld \n", communicators->all_rank,omp_get_thread_num(), 
sion_get_position(sid));
   177    writetime = MPI_Wtime() - starttime;
   179    starttime = MPI_Wtime();
   180   barrier_after_write(communicators->
work);
   181    barr2time = MPI_Wtime() - starttime;
   183    starttime = MPI_Wtime();
   184   sion_parclose_ompi(sid);
   185    closetime = MPI_Wtime() - starttime;
   186    DPRINTFTS(communicators->all_rank, 
"[W]before close");
   187    starttime = MPI_Wtime();
   188   barrier_after_close(communicators->
work);
   189    barr3time = MPI_Wtime() - starttime;
   190    DPRINTFTS(communicators->all_rank, 
"[W]after close");
   192   if (writetime == 0) writetime = -1;
   194   if (options->verbose) {
   196             "timings[%03d,t%03d] open=%10.6fs write=%10.6fs close=%10.6fs barrier(open=%10.6fs, write=%10.6fs, close=%10.6fs) #chunks=%d bw=%10.4f MB/s ionode=%d\n",
   197             communicators->all_rank,omp_get_thread_num(), opentime, writetime, closetime, barr1time, barr2time, barr3time, chunkcnt,
   198             options->totalsize / 1024.0 / 1024.0 / writetime, communicators->ionode_number);
   199         collective_print_gather(cbuffer, communicators->
work);
   203   if (options->numfiles >= 1) {
   204       reduce_omp(&bsumwrote,&t_bsumwrote,MPI_SUM,_PARTEST_SION_INT64);
   208               MPI_Reduce(&t_bsumwrote, &sumsize, 1, SION_MPI_INT64, MPI_SUM, 0, communicators->
local);
   209               if (communicators->local_rank == 0) {
   210                 fprintf(stderr, 
"partest result: local totalsize=%10.4f MB  wrote %10.4f MB to %s all_rank=%d\n", options->totalsize / 1024.0 / 1024.0,
   211                         1.0 * sumsize / 1024.0 / 1024.0, newfname, communicators->all_rank);
   217    DPRINTFTS(communicators->all_rank, 
"before red.");
   218   reduce_omp(&bsumwrote,&t_bsumwrote,MPI_SUM,_PARTEST_SION_INT64);
   219   reduce_omp(&opentime,&t_opentime,MPI_MAX,_PARTEST_DOUBLE);
   220   reduce_omp(&closetime,&t_closetime,MPI_MAX,_PARTEST_DOUBLE);
   221   reduce_omp(&writetime,&t_writetime,MPI_MAX,_PARTEST_DOUBLE);
   225       MPI_Reduce(&t_bsumwrote, &sumsize, 1, SION_MPI_INT64, MPI_SUM, 0, communicators->
work);
   226       MPI_Reduce(&t_opentime, &gopentime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->
work);
   227       MPI_Reduce(&t_closetime, &gclosetime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->
work);
   228       MPI_Reduce(&t_writetime, &gwritetime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->
work);
   232    DPRINTFTS(communicators->all_rank, 
"after red.");
   235       if (communicators->work_rank == 0) {
   236         fprintf(stderr, 
"------------------------------------------------------------------------------------------\n");
   237         fprintf(stderr, 
"TOTAL result: open=%10.6fs close=%10.6fs wrote %10.4f MB write=%10.6fs bw=%10.4f MB/s to %d files\n",
   238                 gopentime, gclosetime, 1.0 * sumsize / 1024.0 / 1024.0, gwritetime, 1.0 * sumsize / 1024.0 / 1024.0 / gwritetime, options->numfiles);
   239         fprintf(stderr, 
"------------------------------------------------------------------------------------------\n");
   241       if (communicators->work_rank == 0)
   242           fprintf(stderr, 
"*********************************************************************************************\n");
   249   for (i = 0; i < ((options->totalsize < options->bufsize) ? options->totalsize : options->bufsize); i++) {
   250     localbuffer[i] = 
' ';
   253    DPRINTFTS(communicators->all_rank, 
"[R]before open");
   254    starttime = MPI_Wtime();
   255   if (options->collectiveopenforread) {
   257     sid = sion_paropen_ompi(filename,
   258                            "br", &options->numfiles, communicators->
workread, &communicators->
local, &options->chunksize, &options->fsblksize, &globalrank, &fp, &newfname);
   260     MPI_Comm_size(communicators->
local, &communicators->local_size);
   261     MPI_Comm_rank(communicators->
local, &communicators->local_rank);
   266     sid = 
sion_open_rank(filename, 
"br", &rchunksize, &rfsblksize, &communicators->workread_rank, &fp);
   268    opentime = MPI_Wtime() - starttime;
   270    starttime = MPI_Wtime();
   271   barrier_after_open(communicators->
workread);
   272    barr1time = MPI_Wtime() - starttime;
   273    DPRINTFTS(communicators->all_rank, 
"[R]after open");
   275   if(options->use_posix) {
   279   if(options->serialize_blocknum>0) ser_step=options->serialize_blocknum;
   280   else                     ser_step=communicators->local_size;
   283   for(ser_count=0;ser_count<communicators->local_size;ser_count+=ser_step) {
   285     if ((!ser_done) && communicators->local_rank<(ser_count+ser_step)) {
   289       checksum_read_fp = 0;
   290       left = options->totalsize;
   294       lstartoffset=options->startoffset;
   297       while ((left > 0) && (!myfeof)) {
   299     if(lstartoffset==0) btoread = options->bufsize;
   301       btoread = lstartoffset; lstartoffset=0;
   306     bytes_in_chunk+=btoread;
   307     if(bytes_in_chunk>options->chunksize) {
   311           if(options->use_posix) {
   312             bread = read(fd, localbuffer, 1*btoread);
   314             bread = fread(localbuffer, 1, btoread, fp);
   319           if(!options->suppress_checksum) {
   320             checksum_read_fp=0.0;
   321             for (i = 0; i < bread; i++)
   322               checksum_read_fp += (
double) localbuffer[i];
   331           if (((options->debug && communicators->all_rank == 0)) || ((options->Debug && communicators->all_rank == communicators->all_size))) {
   332             fprintf(stderr, 
"timings[%03d,t%03d] read (%lld bytes) %lld bytes (%10.4f MB) (%lld left)\n",
   333                 communicators->all_rank,omp_get_thread_num(), (sion_int64) bread, bsumread, bsumread / 1024.0 / 1024.0, (sion_int64) left);
   334             fprintf(stderr, 
"timings[%03d,t%03d] after read   position in file= %lld restinblock=%lld\n",
   347    readtime = MPI_Wtime() - starttime;
   349    starttime= MPI_Wtime();
   350   barrier_after_read(communicators->
workread);
   351    barr2time = MPI_Wtime() - starttime;
   353    starttime = MPI_Wtime();
   354   if (options->collectiveopenforread) {
   355     sion_parclose_ompi(sid);
   360    closetime = MPI_Wtime() - starttime;
   361    DPRINTFTS(communicators->all_rank, 
"[R]before close");
   362   barrier_after_close(communicators->
workread);
   363    DPRINTFTS(communicators->all_rank, 
"[R]after close");
   367   if (options->verbose) {
   369             "timings[%03d,t%03d] open=%10.6fs read=%10.6fs close=%10.6fs barrier(open=%10.6fs, read=%10.6fs, close=%10.6fs) #chunks=%d br=%10.4f MB/s ionode=%d (check %d)\n",
   370             communicators->all_rank,omp_get_thread_num(), opentime, readtime, closetime, barr1time, barr2time, barr3time, chunkcnt,
   371             options->totalsize / 1024.0 / 1024.0 / readtime, communicators->ionode_number, (fabs(checksum_fp - checksum_read_fp) < 1e-5));
   373     collective_print_gather(cbuffer, communicators->
workread);
   378   if(!options->suppress_checksum) {
   379     if (fabs(checksum_fp - checksum_read_fp) > 1e-5) {
   380       fprintf(stderr, 
"timings[%03d,t%03d] ERROR in double checksum  %14.10f==%14.10f, diff=%14.10f\n", communicators->local_rank,omp_get_thread_num(),
   381           checksum_fp, checksum_read_fp, checksum_fp - checksum_read_fp);
   386   if (options->numfiles >= 1) {
   387       reduce_omp(&bsumread,&t_bsumread,MPI_SUM,_PARTEST_SION_INT64);
   390             MPI_Reduce(&t_bsumread, &sumsize, 1, SION_MPI_INT64, MPI_SUM, 0, communicators->
local);
   391         if (communicators->local_rank == 0) {
   392           fprintf(stderr, 
"partest result: read  %10.4f MB from %s\n", 1.0 * sumsize / 1024.0 / 1024.0, newfname);
   398    DPRINTFTS(communicators->all_rank, 
"before red.");
   399       reduce_omp(&bsumread,&t_bsumread,MPI_SUM,_PARTEST_SION_INT64);
   400       reduce_omp(&opentime,&t_opentime,MPI_MAX,_PARTEST_DOUBLE);
   401       reduce_omp(&closetime,&t_closetime,MPI_MAX,_PARTEST_DOUBLE);
   402       reduce_omp(&readtime,&t_readtime,MPI_MAX,_PARTEST_DOUBLE);
   405           MPI_Reduce(&t_bsumread, &sumsize, 1, SION_MPI_INT64, MPI_SUM, 0, communicators->
workread);
   406           MPI_Reduce(&t_opentime, &gopentime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->
workread);
   407           MPI_Reduce(&t_closetime, &gclosetime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->
workread);
   408           MPI_Reduce(&t_readtime, &greadtime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->
workread);
   411    DPRINTFTS(communicators->all_rank, 
"after red.");
   414           if (communicators->workread_rank == 0) {
   415             fprintf(stderr, 
"------------------------------------------------------------------------------------------\n");
   416             fprintf(stderr, 
"TOTAL result: open=%10.6fs close=%10.6fs read %10.4f MB read=%10.6fs br=%10.4f MB/s from %d files\n",
   417                     gopentime, gclosetime, 1.0 * sumsize / 1024.0 / 1024.0, greadtime, 1.0 * sumsize / 1024.0 / 1024.0 / greadtime, options->numfiles);
   418             fprintf(stderr, 
"------------------------------------------------------------------------------------------\n");
   423   if(options->unlink_files) {
   424      starttime = MPI_Wtime();
   425     barrier_before_unlink(communicators->
workread);
   428         if (communicators->local_rank == 0) {
   429           fprintf(stderr, 
"partest result: unlink file %s ...\n", newfname);
   434     barrier_after_unlink(communicators->
workread);
   435      unlinktime = MPI_Wtime() - starttime;
   438         if (communicators->local_rank == 0) {
   439           fprintf(stderr, 
"partest result:  ultime=%10.6fs unlink %s\n", unlinktime, newfname);
 int sion_feof(int sid)
Function that indicates whether the end of file is reached for this task.
 
sion_int64 sion_bytes_avail_in_block(int sid)
Return the number of bytes available in the current chunk.
 
int sion_ensure_free_space(int sid, sion_int64 bytes)
Funtion to ensure that enough space is available for writing.
 
sion_int64 sion_get_position(int sid)
Function that returns the current file position.
 
int sion_close(int sid)
Close a sion file.
 
int sion_open_rank(char *fname, const char *file_mode, sion_int64 *chunksize, sion_int32 *fsblksize, int *rank, FILE **fileptr)
Open a sion file for a specific rank.