SIONlib  1.7.1
Scalable I/O library for parallel access to task-local files
omp_partest_sionfile.c
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2016 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
10 #include <stdio.h>
11 #include <stdlib.h>
12 #include <string.h>
13 #include <ctype.h>
14 #include <unistd.h>
15 #include <mpi.h>
16 
17 #include <time.h>
18 #include <math.h>
19 
20 
21 #include "sion.h"
22 #include "sion_debug.h"
23 #include "sion_printts.h"
24 #include "ompi_partest.h"
25 
26 #include <omp.h>
27 
28 
29 
30 #define _PARTEST_SION_INT32 10
31 #define _PARTEST_SION_INT64 11
32 #define _PARTEST_DOUBLE 12
33 
34 
35 
36 /****************************************************************************************************************
37  *
38  * test_paropen_multi_omp
39  *
40  ***************************************************************************************************************/
41 
52 int test_paropen_omp(char *filename,
53  char *localbuffer,
54  _test_communicators *communicators,
55  _test_options *options
56  ) {
57 
58  double starttime, gopentime, opentime, unlinktime, gwritetime, writetime, gclosetime, closetime, readtime, greadtime;
59  double barr1time, barr2time, barr3time;
60 
61  sion_int64 left;
62  sion_int64 bsumwrote, sumsize, bsumread;
63  double checksum_fp, checksum_read_fp;
64  int globalrank, sid, i, lstartoffset;
65  size_t bwrite, bwrote, btoread, bread;
66  int chunkcnt;
67  sion_int64 rchunksize;
68  sion_int32 rfsblksize;
69  FILE *fp;
70  int fd;
71  char cbuffer[2*MAXCHARLEN];
72  char *newfname;
73  size_t bytes_in_chunk;
74  int myfeof;
75 
76  int thread_num = omp_get_thread_num();
77 
78  /****************************** WRITE *****************************/
79 
80  /* */ starttime = MPI_Wtime();
81  sid = sion_paropen_omp(filename, "bw", &options->chunksize, &options->fsblksize, &globalrank, &fp, &newfname);
82  /* */ opentime = MPI_Wtime() - starttime;
83  /* */ starttime = MPI_Wtime();
84  barrier_after_open(communicators->work);
85  /* */ barr1time = MPI_Wtime() - starttime;
86 
87 
88  checksum_fp = 0;
89 
90  if(options->use_posix) {
91  fd = fileno(fp);
92  }
93 
94  left = options->totalsize;
95  bsumwrote = 0;
96  chunkcnt = 0;
97  bytes_in_chunk=0;
98  lstartoffset=options->startoffset;
99  /* Write until we reach the total size of the data */
100  while (left > 0) {
101  if(lstartoffset==0) bwrite = options->bufsize;
102  else {
103  bwrite = lstartoffset; lstartoffset=0;
104  }
105  if (bwrite > left) bwrite = left;
106 
107  if (((options->debug && thread_num == 0)) || ((options->Debug && (thread_num == omp_get_num_threads() - 1)))) {
108  fprintf(stderr, "timings[t%03d] write %lld bytes\n", thread_num, (sion_int64) bwrite);
109  }
110 
111  bytes_in_chunk+=bwrite;
112  if(bytes_in_chunk>options->chunksize) {
113  sion_ensure_free_space(sid, bwrite);
114  bytes_in_chunk=bwrite;
115  }
116 
117  if(options->use_posix) {
118  bwrote = write(fd, localbuffer, 1*bwrite);
119  } else {
120  bwrote = fwrite(localbuffer, 1, bwrite, fp);
121  }
122 
123  #ifdef CHECKSUM
124  if(!options->suppress_checksum) {
125  checksum_fp=0.0;
126  for (i = 0; i < bwrote; i++)
127  checksum_fp += (double) localbuffer[i];
128  }
129  #endif
130 
131  left -= bwrote;
132  bsumwrote += bwrote;
133  chunkcnt++;
134 
135 
136  if (((options->debug && thread_num == 0)) || ((options->Debug && (thread_num == omp_get_num_threads() - 1)))) {
137  fprintf(stderr, "timings[t%03d] wrote (%lld bytes) %lld bytes (%10.4f MB) (%lld left)\n",
138  thread_num,(sion_int64) bwrote, bsumwrote, bsumwrote / 1024.0 / 1024.0, (sion_int64) left);
139  fprintf(stderr, "timings[t%03d] after write position in file= %lld \n", thread_num, sion_get_position(sid));
140  }
141  }
142  fflush(fp);
143  /* */ writetime = MPI_Wtime() - starttime;
144 
145  /* */ starttime = MPI_Wtime();
146  barrier_after_write(communicators->work);
147  /* */ barr2time = MPI_Wtime() - starttime;
148 
149  /* */ starttime = MPI_Wtime();
150  sion_parclose_omp(sid);
151  /* */ closetime = MPI_Wtime() - starttime;
152 
153  /* */ starttime = MPI_Wtime();
154  barrier_after_close(communicators->work);
155  /* */ barr3time = MPI_Wtime() - starttime;
156 
157 
158  if (writetime == 0) writetime = -1;
159 
160  if (options->verbose) {
161  sprintf(cbuffer,
162  "timings[t%03d] open=%10.6fs write=%10.6fs close=%10.6fs barrier(open=%10.6fs, write=%10.6fs, close=%10.6fs) #chunks=%d bw=%10.4f MB/s ionode=%d\n",
163  thread_num, opentime, writetime, closetime, barr1time, barr2time, barr3time, chunkcnt,
164  options->totalsize / 1024.0 / 1024.0 / writetime, communicators->ionode_number);
165  collective_print_gather(cbuffer, communicators->work);
166  }
167 
168  reduce_omp(&bsumwrote,&sumsize,MPI_SUM,_PARTEST_SION_INT64);
169  reduce_omp(&opentime,&gopentime,MPI_MAX,_PARTEST_DOUBLE);
170  reduce_omp(&closetime,&gclosetime,MPI_MAX,_PARTEST_DOUBLE);
171  reduce_omp(&writetime,&gwritetime,MPI_MAX,_PARTEST_DOUBLE);
172 
173  #pragma omp master
174  {
175  fprintf(stderr, "------------------------------------------------------------------------------------------\n");
176  fprintf(stderr, "TOTAL result: open=%10.6fs close=%10.6fs wrote %10.4f MB write=%10.6fs bw=%10.4f MB/s to %d files\n",
177  gopentime, gclosetime, 1.0 * sumsize / 1024.0 / 1024.0, gwritetime, 1.0 * sumsize / 1024.0 / 1024.0 / gwritetime, options->numfiles);
178  fprintf(stderr, "------------------------------------------------------------------------------------------\n");
179  fprintf(stderr, "*********************************************************************************************\n");
180  }
181 #pragma omp barrier
182 
183  /****************************** READ *****************************/
184 
185  /* reset localbuffer */
186  for (i = 0; i < ((options->totalsize < options->bufsize) ? options->totalsize : options->bufsize); i++) {
187  localbuffer[i] = ' ';
188  }
189 
190  /* */ starttime = MPI_Wtime();
191  if (options->collectiveopenforread) {
192 
193  sid = sion_paropen_omp(filename,"br", &options->chunksize, &options->fsblksize, &globalrank, &fp, &newfname);
194 
195  }
196  else {
197  /* there is some work to for multifile sion file */
198  sid = sion_open_rank(filename, "br", &rchunksize, &rfsblksize, &communicators->workread_rank, &fp);
199  }
200  /* */ opentime = MPI_Wtime() - starttime;
201 
202  /* */ starttime = MPI_Wtime();
203  barrier_after_open(communicators->workread);
204  /* */ barr1time = MPI_Wtime() - starttime;
205 
206  if(options->use_posix) {
207  fd = fileno(fp);
208  }
209 
210  checksum_read_fp = 0;
211  left = options->totalsize;
212  bsumread = 0;
213  chunkcnt = 0;
214  bytes_in_chunk = 0;
215  lstartoffset=options->startoffset;
216 
217  myfeof=sion_feof(sid);
218  while ((left > 0) && (!myfeof)) {
219 
220  if(lstartoffset==0) btoread = options->bufsize;
221  else {
222  btoread = lstartoffset; lstartoffset=0;
223  }
224  if (btoread > left)
225  btoread = left;
226 
227  bytes_in_chunk+=btoread;
228  if(bytes_in_chunk>options->chunksize) {
229  myfeof=sion_feof(sid);
230  }
231  if(!myfeof) {
232  if(options->use_posix) {
233  bread = read(fd, localbuffer, 1*btoread);
234  } else {
235  bread = fread(localbuffer, 1, btoread, fp);
236  }
237 
238  #ifdef CHECKSUM
239  if(!options->suppress_checksum) {
240  checksum_read_fp=0.0;
241  for (i = 0; i < bread; i++)
242  checksum_read_fp += (double) localbuffer[i];
243  }
244  #endif
245 
246  left -= bread;
247  bsumread += bread;
248  chunkcnt++;
249  #pragma omp master
250  {
251  if (((options->debug && thread_num == 0)) || ((options->Debug && (thread_num == omp_get_num_threads() - 1)))) {
252  fprintf(stderr, "timings[t%03d] read (%lld bytes) %lld bytes (%10.4f MB) (%lld left)\n",
253  thread_num, (sion_int64) bread, bsumread, bsumread / 1024.0 / 1024.0, (sion_int64) left);
254  fprintf(stderr, "timings[t%03d] after read position in file= %lld restinblock=%lld\n",
255  thread_num, sion_get_position(sid), sion_bytes_avail_in_block(sid));
256  }
257  }
258 #pragma omp barrier
259  }
260  }
261  fflush(fp);
262 
263  /* */ readtime = MPI_Wtime() - starttime;
264 
265  /* */ starttime= MPI_Wtime();
266  barrier_after_read(communicators->workread);
267  /* */ barr2time = MPI_Wtime() - starttime;
268 
269  /* */ starttime = MPI_Wtime();
270  if (options->collectiveopenforread) {
271  sion_parclose_omp(sid);
272  }
273  else {
274  sion_close(sid);
275  }
276  /* */ closetime = MPI_Wtime() - starttime;
277 
278  barrier_after_close(communicators->workread);
279 
280  if (readtime == 0)
281  readtime = -1;
282  if (options->verbose) {
283  sprintf(cbuffer,
284  "timings[t%03d] open=%10.6fs read=%10.6fs close=%10.6fs barrier(open=%10.6fs, read=%10.6fs, close=%10.6fs) #chunks=%d br=%10.4f MB/s ionode=%d (check %d)\n",
285  thread_num, opentime, readtime, closetime, barr1time, barr2time, barr3time, chunkcnt,
286  options->totalsize / 1024.0 / 1024.0 / readtime, communicators->ionode_number, (fabs(checksum_fp - checksum_read_fp) < 1e-5));
287  collective_print_gather(cbuffer, communicators->workread);
288 
289  }
290 
291 #ifdef CHECKSUM
292  if(!options->suppress_checksum) {
293  if (fabs(checksum_fp - checksum_read_fp) > 1e-5) {
294  fprintf(stderr, "timings[t%03d] ERROR in double checksum %14.10f==%14.10f, diff=%14.10f\n", thread_num,
295  checksum_fp, checksum_read_fp, checksum_fp - checksum_read_fp);
296  }
297  }
298 #endif
299 
300  reduce_omp(&bsumread,&sumsize,MPI_SUM,_PARTEST_SION_INT64);
301  reduce_omp(&opentime,&gopentime,MPI_MAX,_PARTEST_DOUBLE);
302  reduce_omp(&closetime,&gclosetime,MPI_MAX,_PARTEST_DOUBLE);
303  reduce_omp(&readtime,&greadtime,MPI_MAX,_PARTEST_DOUBLE);
304 
305  #pragma omp master
306  {
307  fprintf(stderr, "------------------------------------------------------------------------------------------\n");
308  fprintf(stderr, "TOTAL result: open=%10.6fs close=%10.6fs read %10.4f MB read=%10.6fs br=%10.4f MB/s from %d files\n",
309  gopentime, gclosetime, 1.0 * sumsize / 1024.0 / 1024.0, greadtime, 1.0 * sumsize / 1024.0 / 1024.0 / greadtime, options->numfiles);
310  fprintf(stderr, "------------------------------------------------------------------------------------------\n");
311  }
312 #pragma omp barrier
313 
314  if(options->unlink_files) {
315  /* */ starttime = MPI_Wtime();
316  barrier_before_unlink(communicators->workread);
317  #pragma omp master
318  {
319  fprintf(stderr, "partest result: unlink file %s ...\n", newfname);
320  unlink(newfname);
321  }
322 #pragma omp barrier
323 
324  barrier_after_unlink(communicators->workread);
325  /* */ unlinktime = MPI_Wtime() - starttime;
326  #pragma omp master
327  {
328  fprintf(stderr, "partest result: ultime=%10.6fs unlink %s\n", unlinktime, newfname);
329  }
330 #pragma omp barrier
331  }
332 
333  return (1);
334 }
int sion_feof(int sid)
Function that indicates whether the end of file is reached for this task.
Definition: sion_common.c:770
sion_int64 sion_bytes_avail_in_block(int sid)
Return the number of bytes available in the current chunk.
Definition: sion_common.c:840
int sion_ensure_free_space(int sid, sion_int64 bytes)
Funtion to ensure that enough space is available for writing.
Definition: sion_common.c:1014
sion_int64 sion_get_position(int sid)
Function that returns the current file position.
Definition: sion_common.c:891
int sion_close(int sid)
Close a sion file.
Definition: sion_serial.c:113
int sion_open_rank(char *fname, const char *file_mode, sion_int64 *chunksize, sion_int32 *fsblksize, int *rank, FILE **fileptr)
Open a sion file for a specific rank.
Definition: sion_serial.c:90
Sion Time Stamp Header.