SIONlib  1.7.4
Scalable I/O library for parallel access to task-local files
omp_partest_sionfile.c
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2019 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
10 #define _XOPEN_SOURCE 700
11 
12 #include <stdio.h>
13 #include <stdlib.h>
14 #include <string.h>
15 #include <ctype.h>
16 #include <unistd.h>
17 #include <mpi.h>
18 
19 #include <time.h>
20 #include <math.h>
21 
22 
23 #include "sion.h"
24 #include "sion_debug.h"
25 #include "sion_printts.h"
26 #include "ompi_partest.h"
27 
28 #include <omp.h>
29 
30 
31 
32 #define _PARTEST_SION_INT32 10
33 #define _PARTEST_SION_INT64 11
34 #define _PARTEST_DOUBLE 12
35 
36 
37 
38 /****************************************************************************************************************
39  *
40  * test_paropen_multi_omp
41  *
42  ***************************************************************************************************************/
43 
54 int test_paropen_omp(char *filename,
55  char *localbuffer,
56  _test_communicators *communicators,
57  _test_options *options
58  ) {
59 
60  double starttime, gopentime, opentime, unlinktime, gwritetime, writetime, gclosetime, closetime, readtime, greadtime;
61  double barr1time, barr2time, barr3time;
62 
63  sion_int64 left;
64  sion_int64 bsumwrote, sumsize, bsumread;
65  double checksum_fp, checksum_read_fp;
66  int globalrank, sid, i, lstartoffset;
67  size_t bwrite, bwrote, btoread, bread;
68  int chunkcnt;
69  sion_int64 rchunksize;
70  sion_int32 rfsblksize;
71  FILE *fp;
72  int fd;
73  char cbuffer[2*MAXCHARLEN];
74  char *newfname;
75  size_t bytes_in_chunk;
76  int myfeof;
77 
78  int thread_num = omp_get_thread_num();
79 
80  /****************************** WRITE *****************************/
81 
82  /* */ starttime = MPI_Wtime();
83  sid = sion_paropen_omp(filename, "bw", &options->chunksize, &options->fsblksize, &globalrank, &fp, &newfname);
84  /* */ opentime = MPI_Wtime() - starttime;
85  /* */ starttime = MPI_Wtime();
86  barrier_after_open(communicators->work);
87  /* */ barr1time = MPI_Wtime() - starttime;
88 
89 
90  checksum_fp = 0;
91 
92  if(options->use_posix) {
93  fd = fileno(fp);
94  }
95 
96  left = options->totalsize;
97  bsumwrote = 0;
98  chunkcnt = 0;
99  bytes_in_chunk=0;
100  lstartoffset=options->startoffset;
101  /* Write until we reach the total size of the data */
102  while (left > 0) {
103  if(lstartoffset==0) bwrite = options->bufsize;
104  else {
105  bwrite = lstartoffset; lstartoffset=0;
106  }
107  if (bwrite > left) bwrite = left;
108 
109  if (((options->debug && thread_num == 0)) || ((options->Debug && (thread_num == omp_get_num_threads() - 1)))) {
110  fprintf(stderr, "timings[t%03d] write %lld bytes\n", thread_num, (sion_int64) bwrite);
111  }
112 
113  bytes_in_chunk+=bwrite;
114  if(bytes_in_chunk>options->chunksize) {
115  sion_ensure_free_space(sid, bwrite);
116  bytes_in_chunk=bwrite;
117  }
118 
119  if(options->use_posix) {
120  bwrote = write(fd, localbuffer, 1*bwrite);
121  } else {
122  bwrote = fwrite(localbuffer, 1, bwrite, fp);
123  }
124 
125  #ifdef CHECKSUM
126  if(!options->suppress_checksum) {
127  checksum_fp=0.0;
128  for (i = 0; i < bwrote; i++)
129  checksum_fp += (double) localbuffer[i];
130  }
131  #endif
132 
133  left -= bwrote;
134  bsumwrote += bwrote;
135  chunkcnt++;
136 
137 
138  if (((options->debug && thread_num == 0)) || ((options->Debug && (thread_num == omp_get_num_threads() - 1)))) {
139  fprintf(stderr, "timings[t%03d] wrote (%lld bytes) %lld bytes (%10.4f MB) (%lld left)\n",
140  thread_num,(sion_int64) bwrote, bsumwrote, bsumwrote / 1024.0 / 1024.0, (sion_int64) left);
141  fprintf(stderr, "timings[t%03d] after write position in file= %lld \n", thread_num, sion_get_position(sid));
142  }
143  }
144  fflush(fp);
145  /* */ writetime = MPI_Wtime() - starttime;
146 
147  /* */ starttime = MPI_Wtime();
148  barrier_after_write(communicators->work);
149  /* */ barr2time = MPI_Wtime() - starttime;
150 
151  /* */ starttime = MPI_Wtime();
152  sion_parclose_omp(sid);
153  /* */ closetime = MPI_Wtime() - starttime;
154 
155  /* */ starttime = MPI_Wtime();
156  barrier_after_close(communicators->work);
157  /* */ barr3time = MPI_Wtime() - starttime;
158 
159 
160  if (writetime == 0) writetime = -1;
161 
162  if (options->verbose) {
163  sprintf(cbuffer,
164  "timings[t%03d] open=%10.6fs write=%10.6fs close=%10.6fs barrier(open=%10.6fs, write=%10.6fs, close=%10.6fs) #chunks=%d bw=%10.4f MB/s ionode=%d\n",
165  thread_num, opentime, writetime, closetime, barr1time, barr2time, barr3time, chunkcnt,
166  options->totalsize / 1024.0 / 1024.0 / writetime, communicators->ionode_number);
167  collective_print_gather(cbuffer, communicators->work);
168  }
169 
170  reduce_omp(&bsumwrote,&sumsize,MPI_SUM,_PARTEST_SION_INT64);
171  reduce_omp(&opentime,&gopentime,MPI_MAX,_PARTEST_DOUBLE);
172  reduce_omp(&closetime,&gclosetime,MPI_MAX,_PARTEST_DOUBLE);
173  reduce_omp(&writetime,&gwritetime,MPI_MAX,_PARTEST_DOUBLE);
174 
175  #pragma omp master
176  {
177  fprintf(stderr, "------------------------------------------------------------------------------------------\n");
178  fprintf(stderr, "TOTAL result: open=%10.6fs close=%10.6fs wrote %10.4f MB write=%10.6fs bw=%10.4f MB/s to %d files\n",
179  gopentime, gclosetime, 1.0 * sumsize / 1024.0 / 1024.0, gwritetime, 1.0 * sumsize / 1024.0 / 1024.0 / gwritetime, options->numfiles);
180  fprintf(stderr, "------------------------------------------------------------------------------------------\n");
181  fprintf(stderr, "*********************************************************************************************\n");
182  }
183 #pragma omp barrier
184 
185  /****************************** READ *****************************/
186 
187  /* reset localbuffer */
188  for (i = 0; i < ((options->totalsize < options->bufsize) ? options->totalsize : options->bufsize); i++) {
189  localbuffer[i] = ' ';
190  }
191 
192  /* */ starttime = MPI_Wtime();
193  if (options->collectiveopenforread) {
194 
195  sid = sion_paropen_omp(filename,"br", &options->chunksize, &options->fsblksize, &globalrank, &fp, &newfname);
196 
197  }
198  else {
199  /* there is some work to for multifile sion file */
200  sid = sion_open_rank(filename, "br", &rchunksize, &rfsblksize, &communicators->workread_rank, &fp);
201  }
202  /* */ opentime = MPI_Wtime() - starttime;
203 
204  /* */ starttime = MPI_Wtime();
205  barrier_after_open(communicators->workread);
206  /* */ barr1time = MPI_Wtime() - starttime;
207 
208  if(options->use_posix) {
209  fd = fileno(fp);
210  }
211 
212  checksum_read_fp = 0;
213  left = options->totalsize;
214  bsumread = 0;
215  chunkcnt = 0;
216  bytes_in_chunk = 0;
217  lstartoffset=options->startoffset;
218 
219  myfeof=sion_feof(sid);
220  while ((left > 0) && (!myfeof)) {
221 
222  if(lstartoffset==0) btoread = options->bufsize;
223  else {
224  btoread = lstartoffset; lstartoffset=0;
225  }
226  if (btoread > left)
227  btoread = left;
228 
229  bytes_in_chunk+=btoread;
230  if(bytes_in_chunk>options->chunksize) {
231  myfeof=sion_feof(sid);
232  }
233  if(!myfeof) {
234  if(options->use_posix) {
235  bread = read(fd, localbuffer, 1*btoread);
236  } else {
237  bread = fread(localbuffer, 1, btoread, fp);
238  }
239 
240  #ifdef CHECKSUM
241  if(!options->suppress_checksum) {
242  checksum_read_fp=0.0;
243  for (i = 0; i < bread; i++)
244  checksum_read_fp += (double) localbuffer[i];
245  }
246  #endif
247 
248  left -= bread;
249  bsumread += bread;
250  chunkcnt++;
251  #pragma omp master
252  {
253  if (((options->debug && thread_num == 0)) || ((options->Debug && (thread_num == omp_get_num_threads() - 1)))) {
254  fprintf(stderr, "timings[t%03d] read (%lld bytes) %lld bytes (%10.4f MB) (%lld left)\n",
255  thread_num, (sion_int64) bread, bsumread, bsumread / 1024.0 / 1024.0, (sion_int64) left);
256  fprintf(stderr, "timings[t%03d] after read position in file= %lld restinblock=%lld\n",
257  thread_num, sion_get_position(sid), sion_bytes_avail_in_block(sid));
258  }
259  }
260 #pragma omp barrier
261  }
262  }
263  fflush(fp);
264 
265  /* */ readtime = MPI_Wtime() - starttime;
266 
267  /* */ starttime= MPI_Wtime();
268  barrier_after_read(communicators->workread);
269  /* */ barr2time = MPI_Wtime() - starttime;
270 
271  /* */ starttime = MPI_Wtime();
272  if (options->collectiveopenforread) {
273  sion_parclose_omp(sid);
274  }
275  else {
276  sion_close(sid);
277  }
278  /* */ closetime = MPI_Wtime() - starttime;
279 
280  barrier_after_close(communicators->workread);
281 
282  if (readtime == 0)
283  readtime = -1;
284  if (options->verbose) {
285  sprintf(cbuffer,
286  "timings[t%03d] open=%10.6fs read=%10.6fs close=%10.6fs barrier(open=%10.6fs, read=%10.6fs, close=%10.6fs) #chunks=%d br=%10.4f MB/s ionode=%d (check %d)\n",
287  thread_num, opentime, readtime, closetime, barr1time, barr2time, barr3time, chunkcnt,
288  options->totalsize / 1024.0 / 1024.0 / readtime, communicators->ionode_number, (fabs(checksum_fp - checksum_read_fp) < 1e-5));
289  collective_print_gather(cbuffer, communicators->workread);
290 
291  }
292 
293 #ifdef CHECKSUM
294  if(!options->suppress_checksum) {
295  if (fabs(checksum_fp - checksum_read_fp) > 1e-5) {
296  fprintf(stderr, "timings[t%03d] ERROR in double checksum %14.10f==%14.10f, diff=%14.10f\n", thread_num,
297  checksum_fp, checksum_read_fp, checksum_fp - checksum_read_fp);
298  }
299  }
300 #endif
301 
302  reduce_omp(&bsumread,&sumsize,MPI_SUM,_PARTEST_SION_INT64);
303  reduce_omp(&opentime,&gopentime,MPI_MAX,_PARTEST_DOUBLE);
304  reduce_omp(&closetime,&gclosetime,MPI_MAX,_PARTEST_DOUBLE);
305  reduce_omp(&readtime,&greadtime,MPI_MAX,_PARTEST_DOUBLE);
306 
307  #pragma omp master
308  {
309  fprintf(stderr, "------------------------------------------------------------------------------------------\n");
310  fprintf(stderr, "TOTAL result: open=%10.6fs close=%10.6fs read %10.4f MB read=%10.6fs br=%10.4f MB/s from %d files\n",
311  gopentime, gclosetime, 1.0 * sumsize / 1024.0 / 1024.0, greadtime, 1.0 * sumsize / 1024.0 / 1024.0 / greadtime, options->numfiles);
312  fprintf(stderr, "------------------------------------------------------------------------------------------\n");
313  }
314 #pragma omp barrier
315 
316  if(options->unlink_files) {
317  /* */ starttime = MPI_Wtime();
318  barrier_before_unlink(communicators->workread);
319  #pragma omp master
320  {
321  fprintf(stderr, "partest result: unlink file %s ...\n", newfname);
322  unlink(newfname);
323  }
324 #pragma omp barrier
325 
326  barrier_after_unlink(communicators->workread);
327  /* */ unlinktime = MPI_Wtime() - starttime;
328  #pragma omp master
329  {
330  fprintf(stderr, "partest result: ultime=%10.6fs unlink %s\n", unlinktime, newfname);
331  }
332 #pragma omp barrier
333  }
334 
335  return (1);
336 }
int sion_feof(int sid)
Function that indicates whether the end of file is reached for this task.
Definition: sion_common.c:809
sion_int64 sion_bytes_avail_in_block(int sid)
Return the number of bytes available in the current chunk.
Definition: sion_common.c:879
int sion_ensure_free_space(int sid, sion_int64 bytes)
Funtion to ensure that enough space is available for writing.
Definition: sion_common.c:1053
sion_int64 sion_get_position(int sid)
Function that returns the current file position.
Definition: sion_common.c:930
int sion_close(int sid)
Close a sion file.
Definition: sion_serial.c:106
int sion_open_rank(char *fname, const char *file_mode, sion_int64 *chunksize, sion_int32 *fsblksize, int *rank, FILE **fileptr)
Open a sion file for a specific rank.
Definition: sion_serial.c:83
Sion Time Stamp Header.