10 #define _XOPEN_SOURCE 700 26 #include "ompi_partest.h" 32 #define _PARTEST_SION_INT32 10 33 #define _PARTEST_SION_INT64 11 34 #define _PARTEST_DOUBLE 12 54 int test_paropen_omp(
char *filename,
60 double starttime, gopentime, opentime, unlinktime, gwritetime, writetime, gclosetime, closetime, readtime, greadtime;
61 double barr1time, barr2time, barr3time;
64 sion_int64 bsumwrote, sumsize, bsumread;
65 double checksum_fp, checksum_read_fp;
66 int globalrank, sid, i, lstartoffset;
67 size_t bwrite, bwrote, btoread, bread;
69 sion_int64 rchunksize;
70 sion_int32 rfsblksize;
73 char cbuffer[2*MAXCHARLEN];
75 size_t bytes_in_chunk;
78 int thread_num = omp_get_thread_num();
82 starttime = MPI_Wtime();
83 sid = sion_paropen_omp(filename,
"bw", &options->chunksize, &options->fsblksize, &globalrank, &fp, &newfname);
84 opentime = MPI_Wtime() - starttime;
85 starttime = MPI_Wtime();
86 barrier_after_open(communicators->
work);
87 barr1time = MPI_Wtime() - starttime;
92 if(options->use_posix) {
96 left = options->totalsize;
100 lstartoffset=options->startoffset;
103 if(lstartoffset==0) bwrite = options->bufsize;
105 bwrite = lstartoffset; lstartoffset=0;
107 if (bwrite > left) bwrite = left;
109 if (((options->debug && thread_num == 0)) || ((options->Debug && (thread_num == omp_get_num_threads() - 1)))) {
110 fprintf(stderr,
"timings[t%03d] write %lld bytes\n", thread_num, (sion_int64) bwrite);
113 bytes_in_chunk+=bwrite;
114 if(bytes_in_chunk>options->chunksize) {
116 bytes_in_chunk=bwrite;
119 if(options->use_posix) {
120 bwrote = write(fd, localbuffer, 1*bwrite);
122 bwrote = fwrite(localbuffer, 1, bwrite, fp);
126 if(!options->suppress_checksum) {
128 for (i = 0; i < bwrote; i++)
129 checksum_fp += (
double) localbuffer[i];
138 if (((options->debug && thread_num == 0)) || ((options->Debug && (thread_num == omp_get_num_threads() - 1)))) {
139 fprintf(stderr,
"timings[t%03d] wrote (%lld bytes) %lld bytes (%10.4f MB) (%lld left)\n",
140 thread_num,(sion_int64) bwrote, bsumwrote, bsumwrote / 1024.0 / 1024.0, (sion_int64) left);
141 fprintf(stderr,
"timings[t%03d] after write position in file= %lld \n", thread_num,
sion_get_position(sid));
145 writetime = MPI_Wtime() - starttime;
147 starttime = MPI_Wtime();
148 barrier_after_write(communicators->
work);
149 barr2time = MPI_Wtime() - starttime;
151 starttime = MPI_Wtime();
152 sion_parclose_omp(sid);
153 closetime = MPI_Wtime() - starttime;
155 starttime = MPI_Wtime();
156 barrier_after_close(communicators->
work);
157 barr3time = MPI_Wtime() - starttime;
160 if (writetime == 0) writetime = -1;
162 if (options->verbose) {
164 "timings[t%03d] open=%10.6fs write=%10.6fs close=%10.6fs barrier(open=%10.6fs, write=%10.6fs, close=%10.6fs) #chunks=%d bw=%10.4f MB/s ionode=%d\n",
165 thread_num, opentime, writetime, closetime, barr1time, barr2time, barr3time, chunkcnt,
166 options->totalsize / 1024.0 / 1024.0 / writetime, communicators->ionode_number);
167 collective_print_gather(cbuffer, communicators->
work);
170 reduce_omp(&bsumwrote,&sumsize,MPI_SUM,_PARTEST_SION_INT64);
171 reduce_omp(&opentime,&gopentime,MPI_MAX,_PARTEST_DOUBLE);
172 reduce_omp(&closetime,&gclosetime,MPI_MAX,_PARTEST_DOUBLE);
173 reduce_omp(&writetime,&gwritetime,MPI_MAX,_PARTEST_DOUBLE);
177 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
178 fprintf(stderr,
"TOTAL result: open=%10.6fs close=%10.6fs wrote %10.4f MB write=%10.6fs bw=%10.4f MB/s to %d files\n",
179 gopentime, gclosetime, 1.0 * sumsize / 1024.0 / 1024.0, gwritetime, 1.0 * sumsize / 1024.0 / 1024.0 / gwritetime, options->numfiles);
180 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
181 fprintf(stderr,
"*********************************************************************************************\n");
188 for (i = 0; i < ((options->totalsize < options->bufsize) ? options->totalsize : options->bufsize); i++) {
189 localbuffer[i] =
' ';
192 starttime = MPI_Wtime();
193 if (options->collectiveopenforread) {
195 sid = sion_paropen_omp(filename,
"br", &options->chunksize, &options->fsblksize, &globalrank, &fp, &newfname);
200 sid =
sion_open_rank(filename,
"br", &rchunksize, &rfsblksize, &communicators->workread_rank, &fp);
202 opentime = MPI_Wtime() - starttime;
204 starttime = MPI_Wtime();
205 barrier_after_open(communicators->
workread);
206 barr1time = MPI_Wtime() - starttime;
208 if(options->use_posix) {
212 checksum_read_fp = 0;
213 left = options->totalsize;
217 lstartoffset=options->startoffset;
220 while ((left > 0) && (!myfeof)) {
222 if(lstartoffset==0) btoread = options->bufsize;
224 btoread = lstartoffset; lstartoffset=0;
229 bytes_in_chunk+=btoread;
230 if(bytes_in_chunk>options->chunksize) {
234 if(options->use_posix) {
235 bread = read(fd, localbuffer, 1*btoread);
237 bread = fread(localbuffer, 1, btoread, fp);
241 if(!options->suppress_checksum) {
242 checksum_read_fp=0.0;
243 for (i = 0; i < bread; i++)
244 checksum_read_fp += (
double) localbuffer[i];
253 if (((options->debug && thread_num == 0)) || ((options->Debug && (thread_num == omp_get_num_threads() - 1)))) {
254 fprintf(stderr,
"timings[t%03d] read (%lld bytes) %lld bytes (%10.4f MB) (%lld left)\n",
255 thread_num, (sion_int64) bread, bsumread, bsumread / 1024.0 / 1024.0, (sion_int64) left);
256 fprintf(stderr,
"timings[t%03d] after read position in file= %lld restinblock=%lld\n",
265 readtime = MPI_Wtime() - starttime;
267 starttime= MPI_Wtime();
268 barrier_after_read(communicators->
workread);
269 barr2time = MPI_Wtime() - starttime;
271 starttime = MPI_Wtime();
272 if (options->collectiveopenforread) {
273 sion_parclose_omp(sid);
278 closetime = MPI_Wtime() - starttime;
280 barrier_after_close(communicators->
workread);
284 if (options->verbose) {
286 "timings[t%03d] open=%10.6fs read=%10.6fs close=%10.6fs barrier(open=%10.6fs, read=%10.6fs, close=%10.6fs) #chunks=%d br=%10.4f MB/s ionode=%d (check %d)\n",
287 thread_num, opentime, readtime, closetime, barr1time, barr2time, barr3time, chunkcnt,
288 options->totalsize / 1024.0 / 1024.0 / readtime, communicators->ionode_number, (fabs(checksum_fp - checksum_read_fp) < 1e-5));
289 collective_print_gather(cbuffer, communicators->
workread);
294 if(!options->suppress_checksum) {
295 if (fabs(checksum_fp - checksum_read_fp) > 1e-5) {
296 fprintf(stderr,
"timings[t%03d] ERROR in double checksum %14.10f==%14.10f, diff=%14.10f\n", thread_num,
297 checksum_fp, checksum_read_fp, checksum_fp - checksum_read_fp);
302 reduce_omp(&bsumread,&sumsize,MPI_SUM,_PARTEST_SION_INT64);
303 reduce_omp(&opentime,&gopentime,MPI_MAX,_PARTEST_DOUBLE);
304 reduce_omp(&closetime,&gclosetime,MPI_MAX,_PARTEST_DOUBLE);
305 reduce_omp(&readtime,&greadtime,MPI_MAX,_PARTEST_DOUBLE);
309 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
310 fprintf(stderr,
"TOTAL result: open=%10.6fs close=%10.6fs read %10.4f MB read=%10.6fs br=%10.4f MB/s from %d files\n",
311 gopentime, gclosetime, 1.0 * sumsize / 1024.0 / 1024.0, greadtime, 1.0 * sumsize / 1024.0 / 1024.0 / greadtime, options->numfiles);
312 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
316 if(options->unlink_files) {
317 starttime = MPI_Wtime();
318 barrier_before_unlink(communicators->
workread);
321 fprintf(stderr,
"partest result: unlink file %s ...\n", newfname);
326 barrier_after_unlink(communicators->
workread);
327 unlinktime = MPI_Wtime() - starttime;
330 fprintf(stderr,
"partest result: ultime=%10.6fs unlink %s\n", unlinktime, newfname);
int sion_feof(int sid)
Function that indicates whether the end of file is reached for this task.
sion_int64 sion_bytes_avail_in_block(int sid)
Return the number of bytes available in the current chunk.
int sion_ensure_free_space(int sid, sion_int64 bytes)
Funtion to ensure that enough space is available for writing.
sion_int64 sion_get_position(int sid)
Function that returns the current file position.
int sion_close(int sid)
Close a sion file.
int sion_open_rank(char *fname, const char *file_mode, sion_int64 *chunksize, sion_int32 *fsblksize, int *rank, FILE **fileptr)
Open a sion file for a specific rank.