24 #include "ompi_partest.h" 30 #define _PARTEST_SION_INT32 10 31 #define _PARTEST_SION_INT64 11 32 #define _PARTEST_DOUBLE 12 52 int test_paropen_omp(
char *filename,
58 double starttime, gopentime, opentime, unlinktime, gwritetime, writetime, gclosetime, closetime, readtime, greadtime;
59 double barr1time, barr2time, barr3time;
62 sion_int64 bsumwrote, sumsize, bsumread;
63 double checksum_fp, checksum_read_fp;
64 int globalrank, sid, i, lstartoffset;
65 size_t bwrite, bwrote, btoread, bread;
67 sion_int64 rchunksize;
68 sion_int32 rfsblksize;
71 char cbuffer[2*MAXCHARLEN];
73 size_t bytes_in_chunk;
76 int thread_num = omp_get_thread_num();
80 starttime = MPI_Wtime();
81 sid = sion_paropen_omp(filename,
"bw", &options->chunksize, &options->fsblksize, &globalrank, &fp, &newfname);
82 opentime = MPI_Wtime() - starttime;
83 starttime = MPI_Wtime();
84 barrier_after_open(communicators->
work);
85 barr1time = MPI_Wtime() - starttime;
90 if(options->use_posix) {
94 left = options->totalsize;
98 lstartoffset=options->startoffset;
101 if(lstartoffset==0) bwrite = options->bufsize;
103 bwrite = lstartoffset; lstartoffset=0;
105 if (bwrite > left) bwrite = left;
107 if (((options->debug && thread_num == 0)) || ((options->Debug && (thread_num == omp_get_num_threads() - 1)))) {
108 fprintf(stderr,
"timings[t%03d] write %lld bytes\n", thread_num, (sion_int64) bwrite);
111 bytes_in_chunk+=bwrite;
112 if(bytes_in_chunk>options->chunksize) {
114 bytes_in_chunk=bwrite;
117 if(options->use_posix) {
118 bwrote = write(fd, localbuffer, 1*bwrite);
120 bwrote = fwrite(localbuffer, 1, bwrite, fp);
124 if(!options->suppress_checksum) {
126 for (i = 0; i < bwrote; i++)
127 checksum_fp += (
double) localbuffer[i];
136 if (((options->debug && thread_num == 0)) || ((options->Debug && (thread_num == omp_get_num_threads() - 1)))) {
137 fprintf(stderr,
"timings[t%03d] wrote (%lld bytes) %lld bytes (%10.4f MB) (%lld left)\n",
138 thread_num,(sion_int64) bwrote, bsumwrote, bsumwrote / 1024.0 / 1024.0, (sion_int64) left);
139 fprintf(stderr,
"timings[t%03d] after write position in file= %lld \n", thread_num,
sion_get_position(sid));
143 writetime = MPI_Wtime() - starttime;
145 starttime = MPI_Wtime();
146 barrier_after_write(communicators->
work);
147 barr2time = MPI_Wtime() - starttime;
149 starttime = MPI_Wtime();
150 sion_parclose_omp(sid);
151 closetime = MPI_Wtime() - starttime;
153 starttime = MPI_Wtime();
154 barrier_after_close(communicators->
work);
155 barr3time = MPI_Wtime() - starttime;
158 if (writetime == 0) writetime = -1;
160 if (options->verbose) {
162 "timings[t%03d] open=%10.6fs write=%10.6fs close=%10.6fs barrier(open=%10.6fs, write=%10.6fs, close=%10.6fs) #chunks=%d bw=%10.4f MB/s ionode=%d\n",
163 thread_num, opentime, writetime, closetime, barr1time, barr2time, barr3time, chunkcnt,
164 options->totalsize / 1024.0 / 1024.0 / writetime, communicators->ionode_number);
165 collective_print_gather(cbuffer, communicators->
work);
168 reduce_omp(&bsumwrote,&sumsize,MPI_SUM,_PARTEST_SION_INT64);
169 reduce_omp(&opentime,&gopentime,MPI_MAX,_PARTEST_DOUBLE);
170 reduce_omp(&closetime,&gclosetime,MPI_MAX,_PARTEST_DOUBLE);
171 reduce_omp(&writetime,&gwritetime,MPI_MAX,_PARTEST_DOUBLE);
175 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
176 fprintf(stderr,
"TOTAL result: open=%10.6fs close=%10.6fs wrote %10.4f MB write=%10.6fs bw=%10.4f MB/s to %d files\n",
177 gopentime, gclosetime, 1.0 * sumsize / 1024.0 / 1024.0, gwritetime, 1.0 * sumsize / 1024.0 / 1024.0 / gwritetime, options->numfiles);
178 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
179 fprintf(stderr,
"*********************************************************************************************\n");
186 for (i = 0; i < ((options->totalsize < options->bufsize) ? options->totalsize : options->bufsize); i++) {
187 localbuffer[i] =
' ';
190 starttime = MPI_Wtime();
191 if (options->collectiveopenforread) {
193 sid = sion_paropen_omp(filename,
"br", &options->chunksize, &options->fsblksize, &globalrank, &fp, &newfname);
198 sid =
sion_open_rank(filename,
"br", &rchunksize, &rfsblksize, &communicators->workread_rank, &fp);
200 opentime = MPI_Wtime() - starttime;
202 starttime = MPI_Wtime();
203 barrier_after_open(communicators->
workread);
204 barr1time = MPI_Wtime() - starttime;
206 if(options->use_posix) {
210 checksum_read_fp = 0;
211 left = options->totalsize;
215 lstartoffset=options->startoffset;
218 while ((left > 0) && (!myfeof)) {
220 if(lstartoffset==0) btoread = options->bufsize;
222 btoread = lstartoffset; lstartoffset=0;
227 bytes_in_chunk+=btoread;
228 if(bytes_in_chunk>options->chunksize) {
232 if(options->use_posix) {
233 bread = read(fd, localbuffer, 1*btoread);
235 bread = fread(localbuffer, 1, btoread, fp);
239 if(!options->suppress_checksum) {
240 checksum_read_fp=0.0;
241 for (i = 0; i < bread; i++)
242 checksum_read_fp += (
double) localbuffer[i];
251 if (((options->debug && thread_num == 0)) || ((options->Debug && (thread_num == omp_get_num_threads() - 1)))) {
252 fprintf(stderr,
"timings[t%03d] read (%lld bytes) %lld bytes (%10.4f MB) (%lld left)\n",
253 thread_num, (sion_int64) bread, bsumread, bsumread / 1024.0 / 1024.0, (sion_int64) left);
254 fprintf(stderr,
"timings[t%03d] after read position in file= %lld restinblock=%lld\n",
263 readtime = MPI_Wtime() - starttime;
265 starttime= MPI_Wtime();
266 barrier_after_read(communicators->
workread);
267 barr2time = MPI_Wtime() - starttime;
269 starttime = MPI_Wtime();
270 if (options->collectiveopenforread) {
271 sion_parclose_omp(sid);
276 closetime = MPI_Wtime() - starttime;
278 barrier_after_close(communicators->
workread);
282 if (options->verbose) {
284 "timings[t%03d] open=%10.6fs read=%10.6fs close=%10.6fs barrier(open=%10.6fs, read=%10.6fs, close=%10.6fs) #chunks=%d br=%10.4f MB/s ionode=%d (check %d)\n",
285 thread_num, opentime, readtime, closetime, barr1time, barr2time, barr3time, chunkcnt,
286 options->totalsize / 1024.0 / 1024.0 / readtime, communicators->ionode_number, (fabs(checksum_fp - checksum_read_fp) < 1e-5));
287 collective_print_gather(cbuffer, communicators->
workread);
292 if(!options->suppress_checksum) {
293 if (fabs(checksum_fp - checksum_read_fp) > 1e-5) {
294 fprintf(stderr,
"timings[t%03d] ERROR in double checksum %14.10f==%14.10f, diff=%14.10f\n", thread_num,
295 checksum_fp, checksum_read_fp, checksum_fp - checksum_read_fp);
300 reduce_omp(&bsumread,&sumsize,MPI_SUM,_PARTEST_SION_INT64);
301 reduce_omp(&opentime,&gopentime,MPI_MAX,_PARTEST_DOUBLE);
302 reduce_omp(&closetime,&gclosetime,MPI_MAX,_PARTEST_DOUBLE);
303 reduce_omp(&readtime,&greadtime,MPI_MAX,_PARTEST_DOUBLE);
307 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
308 fprintf(stderr,
"TOTAL result: open=%10.6fs close=%10.6fs read %10.4f MB read=%10.6fs br=%10.4f MB/s from %d files\n",
309 gopentime, gclosetime, 1.0 * sumsize / 1024.0 / 1024.0, greadtime, 1.0 * sumsize / 1024.0 / 1024.0 / greadtime, options->numfiles);
310 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
314 if(options->unlink_files) {
315 starttime = MPI_Wtime();
316 barrier_before_unlink(communicators->
workread);
319 fprintf(stderr,
"partest result: unlink file %s ...\n", newfname);
324 barrier_after_unlink(communicators->
workread);
325 unlinktime = MPI_Wtime() - starttime;
328 fprintf(stderr,
"partest result: ultime=%10.6fs unlink %s\n", unlinktime, newfname);
int sion_feof(int sid)
Function that indicates whether the end of file is reached for this task.
sion_int64 sion_bytes_avail_in_block(int sid)
Return the number of bytes available in the current chunk.
int sion_ensure_free_space(int sid, sion_int64 bytes)
Funtion to ensure that enough space is available for writing.
sion_int64 sion_get_position(int sid)
Function that returns the current file position.
int sion_close(int sid)
Close a sion file.
int sion_open_rank(char *fname, const char *file_mode, sion_int64 *chunksize, sion_int32 *fsblksize, int *rank, FILE **fileptr)
Open a sion file for a specific rank.