SIONlib  1.7.7
Scalable I/O library for parallel access to task-local files
sion_generic_internal.c
Go to the documentation of this file.
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2019 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** Copyright (c) 2019 **
8 ** DataDirect Networks **
9 ** **
10 ** See the file COPYRIGHT in the package base directory for details **
11 ****************************************************************************/
12 
22 #define _XOPEN_SOURCE 700
23 
24 
25 #if defined(_SION_MSA_HOSTNAME_REGEX)
26 #include <regex.h>
27 #endif
28 #include <stdlib.h>
29 #include <stdio.h>
30 #include <string.h>
31 #include <unistd.h>
32 
33 #include "sion.h"
34 #include "sion_debug.h"
35 #include "sion_error_handler.h"
36 #include "sion_file.h"
37 #include "sion_filedesc.h"
38 #include "sion_fd.h"
39 #include "sion_metadata.h"
40 #include "sion_internal.h"
41 #include "sion_printts.h"
42 #include "sion_keyvalue.h"
43 #include "sion_flags.h"
44 
45 #include "sion_cache.h"
46 #include "sion_buffer.h"
47 #include "sion_flags.h"
48 #include "sion_hints.h"
49 #include "sion_generic_internal.h"
51 #include "sion_generic_buddy.h"
52 #include "sion_internal_startptr.h"
53 
77  int sid,
78  char *fname,
79  _sion_flags_store *flags_store,
80  char *prefix,
81  int *numFiles,
82  int *filenumber,
83  sion_int64 *chunksize,
84  sion_int32 *fsblksize,
85  int rank,
86  int ntasks,
87  int *globalrank,
88  int flag,
89  FILE **fileptr,
90  _sion_generic_gendata *sion_gendata,
91  _sion_generic_buddy *buddy_data )
92 {
93 
94  int i, j;
95  int rc;
96 
97  _sion_filedesc *sion_filedesc;
98  _sion_fileptr *sion_fileptr;
99 
100  int nfiles, filenum;
101  sion_int64 lchunksize, lstartpointer, lglobalrank, new_fsblocksize, helpint64, apiflag;
102  sion_int64 *sion_tmpintfield = NULL;
103  sion_int32 *sion_tmpintfield_map = NULL, helpint32;
104  sion_int32 *sion_tmpintfield_buddy32 = NULL;
105  sion_int64 *sion_tmpintfield_buddy64 = NULL;
106  void *comm_group=NULL;
107  int do_open_file;
108 
109  _sion_flags_entry* flags_entry = NULL;
110 
111  if (flags_store->mask&_SION_FMODE_POSIX) apiflag=SION_FILE_FLAG_POSIX;
112 #if defined(_SION_SIONFWD)
113  else if (flags_store->mask&_SION_FMODE_SIONFWD) apiflag=SION_FILE_FLAG_SIONFWD;
114 #endif
115 #ifdef _SION_IME_NATIVE
116  else if (file_mode_flags&_SION_FMODE_IME_NATIVE) apiflag=SION_FILE_FLAG_IME_NATIVE;
117 #endif
118  else apiflag=SION_FILE_FLAG_ANSI;
119 
120  DPRINTFP((2, "_sion_paropen_generic_one_file", rank, "enter parallel open of file %s in mode %d #tasks=%d\n", fname, (int) flags_store->mask, ntasks));
121  DPRINTFP((32, "_sion_paropen_generic_one_file", rank, "sizeof: int=%d long=%d longlong=%d sion_int32=%d sion_int64=%d\n", sizeof(int), sizeof(long),
122  sizeof(long long), sizeof(sion_int32), sizeof(sion_int64)));
123 
124  /* some shortcuts */
125  nfiles = *numFiles;
126  filenum = *filenumber;
127 
128  /* select local communicator */
129  if(flag& _SION_INTERNAL_FLAG_NORMAL ) comm_group=sion_gendata->comm_data_local;
130  if(flag& _SION_INTERNAL_FLAG_BUDDY_NORMAL ) comm_group=sion_gendata->comm_data_local;
131  if(flag& _SION_INTERNAL_FLAG_BUDDY_SEND ) comm_group=buddy_data->buddy_send.commgroup;
132  if(flag& _SION_INTERNAL_FLAG_BUDDY_COLL ) comm_group=buddy_data->buddy_coll.commgroup;
133  if(flag& _SION_INTERNAL_FLAG_BUDDY_READ ) comm_group=buddy_data->groups[buddy_data->currentgroup]->commgroup;
134 
135  /* decide if file has to be opened physically */
136  do_open_file=1;
137  if (flag&_SION_INTERNAL_FLAG_BUDDY_SEND) do_open_file=0;
138  if ( (flag&_SION_INTERNAL_FLAG_BUDDY_COLL) && (rank>0) ) do_open_file=0;
139  if ( (flag&_SION_INTERNAL_FLAG_BUDDY_READ) && (rank>0) ) do_open_file=0;
140  /* WF: todo decide if data is local, these tasks can read data directly? */
141  DPRINTFP((2, "_sion_paropen_generic_one_file", rank, "do_open_file=%d\n", do_open_file));
142 
143  sion_filedesc = _sion_alloc_filedesc();
144  if (sion_filedesc == NULL) {
145  _sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_ABORT,"_sion_paropen_omp: cannot allocate filedescriptor structure of size %lu (sion_filedesc), aborting ...\n",
146  (unsigned long) sizeof(sion_filedesc));
147  }
148  _sion_init_filedesc(sion_filedesc);
149  sion_filedesc->fname = strdup(fname); /* Set the filename */
150 
151  _sion_reassignvcd(sid,sion_filedesc, SION_FILEDESCRIPTOR);
152  sion_filedesc->sid=sid;
153 
154  /* Allocate memory for storing MAXCHUNKS chunksize infos in internal structure */
156  sion_filedesc->lastchunknr = 0; /* Set the current number of chunks */
157  sion_filedesc->currentblocknr = 0; /* Set the current block number */
158 
159  if (flags_store->mask&_SION_FMODE_WRITE) {
160  /* **************** WRITE mode **************** */
161 
162  DPRINTFP((32, "_sion_paropen_generic_one_file", rank, " starting open for write #tasks=%d\n", ntasks));
163 
164  /* check parameter */
165  if (ntasks<0) {
166  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen: wrong number of tasks specific: ntasks=%d (<0), returning ...\n", (int) ntasks));
167  }
168 
169  /* check parameter */
170  if ((chunksize != NULL) && (*chunksize<0)) {
171  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen: ((chunksize != NULL) && (*chunksize<0)), returning ...\n"));
172  }
173 
174  /* check parameter */
175  if ((flag & _SION_INTERNAL_FLAG_NORMAL ) && (globalrank != NULL) && (*globalrank<0)) {
176  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen: ((globalrank != NULL) && (*globalrank<0)), returning ...\n"));
177  }
178 
179  sion_filedesc->state = SION_FILESTATE_PAROPEN;
180  sion_filedesc->mode = SION_FILEMODE_WRITE;
181  sion_filedesc->endianness = _sion_get_endianness_with_flags(flags_store->mask); /* Endianness */
182  sion_filedesc->swapbytes = 0; /* Endianness, swapping bytes */
183  sion_filedesc->fsblksize = *fsblksize;
184  sion_filedesc->rank = rank;
185  sion_filedesc->globalrank = *globalrank;
186  sion_filedesc->ntasks = ntasks;
187  sion_filedesc->nfiles = nfiles;
188  sion_filedesc->filenumber = filenum;
189  sion_filedesc->prefix = strdup(prefix);
190  sion_filedesc->compress = flags_store->mask&_SION_FMODE_COMPRESS;
191  sion_filedesc->usecoll = (flags_store->mask&_SION_FMODE_COLLECTIVE)>0;
192  sion_filedesc->collmergemode = (flags_store->mask&_SION_FMODE_COLLECTIVE_MERGE)>0;
193  sion_filedesc->collmsa = !!_sion_flags_get(flags_store, "collmsa");
194  sion_filedesc->usebuddy = (flags_store->mask&_SION_FMODE_BUDDY)>0;
195  if(sion_filedesc->usebuddy) {
196  sion_filedesc->buddylevel = atoi(_sion_flags_get(flags_store,"buddy")->val);
197  if (sion_filedesc->buddylevel==0) sion_filedesc->buddylevel=1; /* default */
198  }
199 
200  /* open file on rank 0, first time to create file and get fsblksize if necessary */
201  if (rank == 0) {
202  sion_fileptr = _sion_file_open(fname,apiflag|SION_FILE_FLAG_WRITE|SION_FILE_FLAG_CREATE,0);
203  if (!sion_fileptr) {
204  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_generic: cannot open %s for writing, aborting ...\n", fname));
205  }
206  if(*fsblksize<=-1) {
207  /* check with fstat fsblksize */
208  new_fsblocksize=(sion_int64) _sion_file_get_opt_blksize(sion_fileptr);
209  if((new_fsblocksize<0) || (new_fsblocksize>SION_MAX_FSBLOCKSIZE)) new_fsblocksize=SION_DEFAULT_FSBLOCKSIZE;
210  }
211  _sion_file_close(sion_fileptr);
212  }
213  sion_gendata->apidesc->barrier_cb(comm_group);
214  /* printf("WF: rank=%2d after first barrier fsblksize=%d\n",rank,(int) *fsblksize); */
215 
216  /* distribute new fsblksize */
217  if(*fsblksize==-1) {
218  sion_gendata->apidesc->bcastr_cb(&new_fsblocksize, comm_group, _SION_INT64, 1, 0);
219  *fsblksize=new_fsblocksize;
220  sion_filedesc->fsblksize = *fsblksize;
221  /* printf("WF: rank=%2d after bcast fsblksize=%d new_fsblocksize=%d\n",rank,(int) *fsblksize,(int) new_fsblocksize); */
222  DPRINTFP((32, "_sion_paropen_generic_one_file", rank, "setting fsblksize to %lld\n", new_fsblocksize));
223  }
224 
225  /* check for buffer, needed at this point to set flag before writing header */
226  _sion_cache_check_env(sion_filedesc);
227  _sion_buffer_check_env(sion_filedesc);
228 
229  /* check for keyval parameter, needed at this point to set flag before writing header (flag1) */
230  if (rank == 0) {
231  _sion_keyval_check_env(sion_filedesc, flags_store->mask);
232  }
233  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->keyvalmode, comm_group, _SION_INT32, 1, 0);
234 
235  /* check for collective options */
236  if (rank == 0) {
237  if ((flags_entry = _sion_flags_get(flags_store, "collsize"))) {
238  sion_filedesc->collsize = atoi(flags_entry->val);
239  }
240  _sion_coll_check_env(sion_filedesc);
241  }
242 
243  if (
244  ( flag&_SION_INTERNAL_FLAG_BUDDY_NORMAL )
245  || ( flag&_SION_INTERNAL_FLAG_BUDDY_SEND )
246  || ( flag&_SION_INTERNAL_FLAG_BUDDY_COLL )
247  )
248  {
249  /* overwrite collective info if not set*/
250  if(!sion_filedesc->usecoll) {
251  sion_filedesc->usecoll=1;
252  sion_filedesc->collsize=sion_filedesc->ntasks;
253  };
254  }
255 
256  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->usecoll, comm_group, _SION_INT32, 1, 0);
257  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->collsize, comm_group, _SION_INT32, 1, 0);
258  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->collmergemode, comm_group, _SION_INT32, 1, 0);
259 
260  /* check if API support coalescing I/O */
261  if(sion_filedesc->usecoll) {
262  if(sion_gendata->apidesc->level!=SION_GENERIC_API_LEVEL_FULL) {
263  _sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_WARN,"sion_paropen_generic: requested coalescing I/O but API does not support this mode, falling back to individual mode ...\n");
264  sion_filedesc->usecoll=0;
265  }
266  }
267 
268  /* check for hints options */
269  if (rank == 0) {
270  _sion_hints_check_env(sion_filedesc);
271  }
272  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->usehints, comm_group, _SION_INT32, 1, 0);
273  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->hinttype, comm_group, _SION_INT32, 1, 0);
274 
275  /* */ DPRINTFTS(rank, "before alloc");
276  if (rank == 0) {
277  /* memory allocation for internal fields */
278  _sion_alloc_filedesc_arrays(sion_filedesc);
279  if(sion_filedesc->usecoll) _sion_alloc_filedesc_coll_arrays(sion_filedesc);
280  }
281  /* */ DPRINTFTS(rank, "after alloc");
282 
283  /* collect data and init startpointers on PE 0 */
284  lchunksize = (sion_int64) *chunksize;
285  lglobalrank = (sion_int64) *globalrank;
286  sion_filedesc->chunksize_req=lchunksize;
287  DPRINTFP((32, "_sion_paropen_generic_one_file", rank, "lchunksize=%lld lglobalrank=%lld\n", lchunksize,lglobalrank));
288 
289  /* */ DPRINTFTS2(rank, "before gather");
290  sion_gendata->apidesc->gatherr_cb(&lchunksize, sion_filedesc->all_chunksizes, comm_group, _SION_INT64, 1, 0);
291  sion_gendata->apidesc->gatherr_cb(&lglobalrank, sion_filedesc->all_globalranks, comm_group, _SION_INT64, 1, 0);
292 
293  /* check capability of tasks */
294  if(sion_filedesc->usecoll) {
295  sion_filedesc->coll_capability=sion_gendata->apidesc->get_capability_cb(comm_group);
296  sion_gendata->apidesc->gatherr_cb(&sion_filedesc->coll_capability, sion_filedesc->all_coll_capability, comm_group, _SION_INT32, 1, 0);
297  }
298 
299  /* */ DPRINTFTS2(rank, "after gather");
300  if (rank == 0) {
301  /* */ DPRINTFTS(rank, "before calculate");
302  DPRINTFP((32, "_sion_paropen_generic_one_file", rank, "chunksizes[%d - 1]=%ld\n", ntasks,(long) sion_filedesc->all_chunksizes[ntasks - 1]));
303  if (!sion_filedesc->usecoll) _sion_calculate_startpointers(sion_filedesc);
304  else {
305  if (sion_filedesc->collmergemode) _sion_calculate_startpointers_collective_merge(sion_filedesc);
306  else if (sion_filedesc->collmsa) _sion_calculate_startpointers_collective_msa(sion_filedesc);
307  else _sion_calculate_startpointers_collective(sion_filedesc);
308  }
309  /* */ DPRINTFTS(rank, "after calculate");
310  }
311 
312  /* */ DPRINTFTS(rank, "before open");
313  /* open not file on non-collector task if buddy-checkpointing */
314  if(do_open_file) {
315 
316  /* open file on all ranks */
317  sion_fileptr = _sion_file_open(fname,apiflag|SION_FILE_FLAG_WRITE,0);
318  if (!sion_fileptr) {
319  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_generic: cannot open %s for writing, aborting ...\n", fname));
320  }
321  /* store data in static data structure (sid) */
322  sion_filedesc->fileptr = sion_fileptr;
323  }
324  sion_gendata->apidesc->barrier_cb(comm_group);
325  /* */ DPRINTFTS(rank, "after open");
326 
327 
328  /* write header */
329  if (rank == 0) {
330 
331  /* apply hint for first meta data block */
332  _sion_apply_hints(sion_filedesc,SION_HINTS_ACCESS_TYPE_METADATABLOCK1);
333 
334  /* */ DPRINTFTS(rank, "before writeh");
335  _sion_write_header(sion_filedesc);
336  /* */ DPRINTFTS(rank, "after writeh");
337 
338  /* needed for writing pointer to var part of metadata at the end of the file */
339  sion_filedesc->end_of_header = _sion_file_get_position(sion_filedesc->fileptr);
340  sion_filedesc->start_of_data = sion_filedesc->all_startpointers[0];
341  /*set max. file size */
342  lstartpointer = sion_filedesc->all_startpointers[ntasks - 1]
343  + sion_filedesc->all_chunksizes[ntasks - 1];
344  /* */ DPRINTFTS(rank, "before setp(0)");
345  _sion_file_flush(sion_filedesc->fileptr);
346  _sion_file_set_position(sion_filedesc->fileptr, lstartpointer);
347  /* */ DPRINTFTS(rank, "after setp(0)");
348 
349  }
350 
351  /* distribute start_pos */
352  /* */ DPRINTFTS(rank, "before scatter");
353  sion_gendata->apidesc->scatterr_cb(sion_filedesc->all_startpointers, &sion_filedesc->startpos, comm_group, _SION_INT64, 1, 0);
354  /* */ DPRINTFTS(rank, "after scatter");
355 
356  /* distribute chunksize */
357  /* */ DPRINTFTS(rank, "before scatter");
358  sion_gendata->apidesc->scatterr_cb(sion_filedesc->all_chunksizes, &sion_filedesc->chunksize, comm_group, _SION_INT64, 1, 0);
359  /* */ DPRINTFTS(rank, "after scatter");
360 
361  /* distribute information for collective operations */
362  if(sion_filedesc->usecoll) {
363  sion_gendata->apidesc->scatterr_cb(sion_filedesc->all_coll_collsize, &sion_filedesc->collsize, comm_group, _SION_INT32, 1, 0);
364  sion_gendata->apidesc->scatterr_cb(sion_filedesc->all_coll_collector, &sion_filedesc->collector, comm_group, _SION_INT32, 1, 0);
365 
366  _sion_free_filedesc_coll_arrays(sion_filedesc);
367  }
368 
369  /* distribute globalskip */
370  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->globalskip, comm_group, _SION_INT64, 1, 0);
371 
372  DPRINTFP((32, "_sion_paropen_generic_one_file", rank, " start position is %10lld %10.4f MB chunksize=%10lld %10.4f MB\n",
373  sion_filedesc->startpos, sion_filedesc->startpos / 1024.0 / 1024.0,
374  sion_filedesc->chunksize, sion_filedesc->chunksize / 1024.0 / 1024.0
375  ));
376 
377  /* set filepointer on each task */
378  /* */ DPRINTFTS(rank, "before setp");
379  sion_gendata->apidesc->barrier_cb(comm_group);
380  if(do_open_file) {
381  _sion_file_flush(sion_filedesc->fileptr);
382  _sion_file_set_position(sion_filedesc->fileptr, sion_filedesc->startpos);
383  }
384  sion_filedesc->currentpos = sion_filedesc->startpos;
385  /* given by calculate startpointers ...
386  sion_filedesc->chunksize = (sion_int64) *chunksize;
387  */
388  sion_gendata->apidesc->barrier_cb(comm_group);
389 
390  /* apply hint for first chunk */
391  _sion_apply_hints(sion_filedesc,SION_HINTS_ACCESS_TYPE_CHUNK);
392 
393  /* */ DPRINTFTS(rank, "after setp");
394  DPRINTFP((32, "_sion_paropen_generic_one_file", rank, " ending open for write #tasks=%d filepos=%lld\n", ntasks, _sion_file_get_position(sion_filedesc->fileptr)));
395 
396  }
397  else if (flags_store->mask&_SION_FMODE_READ) {
398  /* **************** READ mode **************** */
399  if (rank == 0)
400  DPRINTFP((32, "_sion_paropen_generic_one_file", rank, " starting open for read #tasks=%d\n", ntasks));
401 
402  /* open not file on non-collector task if buddy-checkpointing */
403  if(do_open_file) {
404  /* */ DPRINTFTS(rank, "before openR");
405  sion_fileptr = _sion_file_open(fname,apiflag|SION_FILE_FLAG_READ,0);
406  /* */ DPRINTFTS(rank, "after openR");
407  if (!sion_fileptr) {
408  DPRINTFP((32, "_sion_paropen_generic_one_file", rank, " cannot open %s for reading, aborting ...\n", fname));
409  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"_sion_paropen_generic_one_file: cannot open %s for reading, aborting ...\n", fname));
410  }
411  } else {
412  sion_fileptr=NULL;
413  }
414  sion_gendata->apidesc->barrier_cb(comm_group);
415 
416  /* store data in static data structure (sid) */
417  sion_filedesc->fileptr = sion_fileptr;
418  sion_filedesc->rank = rank;
419  sion_filedesc->ntasks = ntasks;
420  sion_filedesc->state = SION_FILESTATE_PAROPEN;
421  sion_filedesc->mode = SION_FILEMODE_READ;
422  sion_filedesc->nfiles = nfiles;
423  sion_filedesc->collmsa = !!_sion_flags_get(flags_store, "collmsa");
424  sion_filedesc->usebuddy = (flags_store->mask&_SION_FMODE_BUDDY)>0;
425  if(sion_filedesc->usebuddy) {
426  sion_filedesc->buddylevel = atoi(_sion_flags_get(flags_store,"buddy")->val);
427  if (sion_filedesc->buddylevel==0) sion_filedesc->buddylevel=1; /* default */
428  }
429 
430  /* creating of mapping from file ranks to rank used in buddy read */
431  if ( flag&_SION_INTERNAL_FLAG_BUDDY_READ ) {
432 
433  /* overwrite collective info if not set*/
434  if(!sion_filedesc->usecoll) {
435  sion_filedesc->usecoll=1;
436  sion_filedesc->collsize=sion_filedesc->ntasks;
437  }
438 
439  DPRINTFP((32, "_sion_paropen_generic_one_file", rank, " create buddy mapping ntasks=%d filentasks=%d\n",ntasks,sion_filedesc->ntasks));
440 
441  if (rank == 0) {
442  sion_tmpintfield_buddy32 = (sion_int32 *) malloc(ntasks * sizeof(sion_int32));
443  if (sion_tmpintfield_buddy32 == NULL) {
444  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"_sion_paropen_generic_one_file: cannot allocate temporary memory of size %lu (sion_tmpintfield_buddy), aborting ...\n",
445  (unsigned long) ntasks * sizeof(sion_int32)));
446  }
447  for (j = 0; j < ntasks; j++) sion_tmpintfield_buddy32[j]=-1;
448  sion_tmpintfield_buddy64 = (sion_int64 *) malloc(ntasks * sizeof(sion_int64));
449  if (sion_tmpintfield_buddy64 == NULL) {
450  free(sion_tmpintfield_buddy32);
451  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"_sion_paropen_generic_one_file: cannot allocate temporary memory of size %lu (sion_tmpintfield_buddy), aborting ...\n",
452  (unsigned long) ntasks * sizeof(sion_int64)));
453  }
454  for (j = 0; j < ntasks; j++) sion_tmpintfield_buddy64[j]=-1;
455  sion_tmpintfield_map = (sion_int32 *) malloc(ntasks * sizeof(sion_int32));
456  if (sion_tmpintfield_map == NULL) {
457  free(sion_tmpintfield_buddy32);
458  free(sion_tmpintfield_buddy64);
459  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"_sion_paropen_generic_one_file: cannot allocate temporary memory of size %lu (sion_tmpintfield_map), aborting ...\n",
460  (unsigned long) ntasks * sizeof(sion_int32)));
461  }
462  for (j = 0; j < ntasks; j++) sion_tmpintfield_map[j]=-1;
463 
464  }
465  helpint32=buddy_data->groups[buddy_data->currentgroup]->filelrank;
466  sion_gendata->apidesc->gatherr_cb(&helpint32, sion_tmpintfield_map, comm_group, _SION_INT32, 1, 0);
467 
468 
469  if (rank == 0) {
470  for (j = 0; j < ntasks; j++)
471  DPRINTFP((64, "_sion_paropen_generic_one_file", rank, " buddy map[%d]=%d\n", j, (int) sion_tmpintfield_map[j]));
472  }
473 
474  }
475 
476  if (rank == 0) {
477  rc = _sion_read_header_fix_part(sion_filedesc); /* overwrites sion_filedesc->ntasks */
478  if (rc!=SION_SUCCESS) {
479  free(sion_tmpintfield_buddy32);
480  free(sion_tmpintfield_buddy64);
481  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"_sion_paropen_generic_one_file: cannot read header from file %s, aborting ...\n", fname));
482  }
483  DPRINTFP((32, "_sion_paropen_generic_one_file", rank,
484  " read, after read of fix header part endianness=0x%x blksize=%d ntasks=%d\n", sion_filedesc->endianness, sion_filedesc->fsblksize, sion_filedesc->ntasks));
485 
486  /* */ DPRINTFTS(rank, "before alloc");
487  /* memory allocation */
488  _sion_alloc_filedesc_arrays(sion_filedesc);
489  /* */ DPRINTFTS(rank, "after alloc");
490 
491  rc = _sion_read_header_var_part(sion_filedesc);
492  if (rc!=SION_SUCCESS) {
493  free(sion_tmpintfield_buddy32);
494  free(sion_tmpintfield_buddy64);
495  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"_sion_paropen_generic_one_file: cannot read header from file %s, aborting ...\n", fname));
496  }
497 
498  if ((flags_entry = _sion_flags_get(flags_store, "collsize"))) {
499  sion_filedesc->collsize = atoi(flags_entry->val);
500  }
501  _sion_coll_check_env(sion_filedesc);
502  if(sion_filedesc->usecoll) _sion_alloc_filedesc_coll_arrays(sion_filedesc);
503 
504  /* collective */
505  if (!sion_filedesc->usecoll) _sion_calculate_startpointers(sion_filedesc);
506  else {
507  if (sion_filedesc->collmergemode) _sion_calculate_startpointers_collective_merge(sion_filedesc);
508  else if (sion_filedesc->collmsa) _sion_calculate_startpointers_collective_msa(sion_filedesc);
509  else _sion_calculate_startpointers_collective(sion_filedesc);
510  }
511  /* */ DPRINTFTS(rank, "after calculate");
512 
513  /* check for keyval parameter, needed at this point to set flag before writing header (flag1) */
514  _sion_keyval_check_env(sion_filedesc, flags_store->mask);
515 
516  } /* rank==0 */
517 
518  /* distribute keyvalmode */
519  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->keyvalmode, comm_group, _SION_INT32, 1, 0);
520 
521  /* distribute collective options */
522  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->usecoll, comm_group, _SION_INT32, 1, 0);
523  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->collsize, comm_group, _SION_INT32, 1, 0);
524 
525 
526  DPRINTFP((32, "_sion_paropen_generic_one_file", rank," usecoll=%d\n", sion_filedesc->usecoll));
527 
528  if(sion_filedesc->usecoll) {
529 
530  if (! (flag&_SION_INTERNAL_FLAG_BUDDY_READ) ) {
531  sion_gendata->apidesc->scatterr_cb(sion_filedesc->all_coll_collsize, &sion_filedesc->collsize, comm_group, _SION_INT32, 1, 0);
532  sion_gendata->apidesc->scatterr_cb(sion_filedesc->all_coll_collector, &sion_filedesc->collector, comm_group, _SION_INT32, 1, 0);
533  } else {
534 
535  /* remap data */
536  if(rank==0) for (j = 0; j < ntasks; j++) if(sion_tmpintfield_map[j]>=0) sion_tmpintfield_buddy32[j]=sion_filedesc->all_coll_collsize[sion_tmpintfield_map[j]];
537  sion_gendata->apidesc->scatterr_cb(sion_tmpintfield_buddy32, &sion_filedesc->collsize, comm_group, _SION_INT32, 1, 0);
538  /* remap data */
539  if(rank==0) for (j = 0; j < ntasks; j++) if(sion_tmpintfield_map[j]>=0) sion_tmpintfield_buddy32[j]=sion_filedesc->all_coll_collector[sion_tmpintfield_map[j]];
540  sion_gendata->apidesc->scatterr_cb(sion_tmpintfield_buddy32, &sion_filedesc->collector, comm_group, _SION_INT32, 1, 0);
541  }
542 
543 
544  _sion_free_filedesc_coll_arrays(sion_filedesc);
545  }
546 
547  /* distribute globalskip */
548  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->globalskip, comm_group, _SION_INT64, 1, 0);
549 
550  /* broadcast information read from file */
551  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->endianness, comm_group, _SION_INT32, 1, 0);
552  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->swapbytes, comm_group, _SION_INT32, 1, 0);
553  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->fsblksize, comm_group, _SION_INT32, 1, 0);
554  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->ntasks, comm_group, _SION_INT32, 1, 0);
555  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->fileversion, comm_group, _SION_INT32, 1, 0);
556  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->nfiles, comm_group, _SION_INT32, 1, 0);
557  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->filenumber, comm_group, _SION_INT32, 1, 0);
558  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->flag1, comm_group, _SION_INT32, 1, 0);
559  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->flag2, comm_group, _SION_INT32, 1, 0);
560  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->maxusedchunks, comm_group, _SION_INT32, 1, 0);
561 
562  DPRINTFP((32, "_sion_paropen_generic_one_file", rank,
563  " read, after read of maxusedchunks=%d maxchunks=%d (%d)\n", sion_filedesc->maxusedchunks,sion_filedesc->maxchunks, MAXCHUNKS));
564  if (sion_filedesc->maxusedchunks > MAXCHUNKS) _sion_realloc_filedesc_blocklist(sion_filedesc, sion_filedesc->maxusedchunks);
565  /* */ DPRINTFTS(rank, "after bcast");
566 
567  /* scatter per task information read from file */
568  /* */ DPRINTFTS(rank, "before scatter");
569  if (! (flag&_SION_INTERNAL_FLAG_BUDDY_READ) ) {
570  sion_gendata->apidesc->scatterr_cb(sion_filedesc->all_chunksizes, &sion_filedesc->chunksize, comm_group, _SION_INT64, 1, 0);
571  sion_gendata->apidesc->scatterr_cb(sion_filedesc->all_startpointers, &sion_filedesc->startpos, comm_group, _SION_INT64, 1, 0);
572  sion_gendata->apidesc->scatterr_cb(sion_filedesc->all_globalranks, &helpint64, comm_group, _SION_INT64, 1, 0);sion_filedesc->globalrank=(sion_int32) helpint64;
573  } else {
574  /* remap data */
575  if(rank==0) for (j = 0; j < ntasks; j++) if(sion_tmpintfield_map[j]>=0) sion_tmpintfield_buddy64[j]=sion_filedesc->all_chunksizes[sion_tmpintfield_map[j]];
576  sion_gendata->apidesc->scatterr_cb(sion_tmpintfield_buddy64, &sion_filedesc->chunksize, comm_group, _SION_INT64, 1, 0);
577  /* remap data */
578  if(rank==0) for (j = 0; j < ntasks; j++) if(sion_tmpintfield_map[j]>=0) sion_tmpintfield_buddy64[j]=sion_filedesc->all_startpointers[sion_tmpintfield_map[j]];
579  sion_gendata->apidesc->scatterr_cb(sion_tmpintfield_buddy64, &sion_filedesc->startpos, comm_group, _SION_INT64, 1, 0);
580  /* remap data */
581  if(rank==0) for (j = 0; j < ntasks; j++) if(sion_tmpintfield_map[j]>=0) sion_tmpintfield_buddy64[j]=sion_filedesc->all_globalranks[sion_tmpintfield_map[j]];
582  sion_gendata->apidesc->scatterr_cb(sion_tmpintfield_buddy64, &helpint64, comm_group, _SION_INT64, 1, 0);sion_filedesc->globalrank=(sion_int32) helpint64;
583  }
584 
585  /* */ DPRINTFTS(rank, "after scatter");
586 
587  /* read number of blocks for each task */
588  if (rank == 0) {
589  sion_tmpintfield = (sion_int64 *) malloc(sion_filedesc->ntasks * sizeof(sion_int64));
590  if (sion_tmpintfield == NULL) {
591  free(sion_tmpintfield_buddy32);
592  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"_sion_paropen_generic_one_file: cannot allocate temporary memory of size %lu (sion_tmpintfield), aborting ...\n",
593  (unsigned long) ntasks * sizeof(sion_int64)));
594  }
595  _sion_read_header_var_part_blockcount_to_field(sion_filedesc, sion_filedesc->ntasks, sion_tmpintfield);
596 
597  for (j = 0; j < sion_filedesc->ntasks; j++)
598  DPRINTFP((2048, "_sion_paropen_generic_one_file", rank, " read, blockcount on task %02d is %10ld\n", j, (long) sion_tmpintfield[j]));
599  }
600 
601  /* and distribute them */
602  /* */ DPRINTFTS(rank, "before scatter");
603  if (! (flag&_SION_INTERNAL_FLAG_BUDDY_READ) ) {
604  sion_gendata->apidesc->scatterr_cb(sion_tmpintfield, &helpint64, comm_group, _SION_INT64, 1, 0);
605  } else {
606  /* remap data */
607  if(rank==0) for (j = 0; j < ntasks; j++) if(sion_tmpintfield_map[j]>=0) sion_tmpintfield_buddy64[j]=sion_tmpintfield[sion_tmpintfield_map[j]];
608  sion_gendata->apidesc->scatterr_cb(sion_tmpintfield_buddy64, &helpint64, comm_group, _SION_INT64, 1, 0);
609  }
610  /* */ DPRINTFTS(rank, "after scatter");
611  sion_filedesc->lastchunknr = helpint64-1;
612  DPRINTFP((32, "_sion_paropen_generic_one_file", rank, " lastchunknr on task %02d is %10ld\n", rank, (long) sion_filedesc->lastchunknr));
613 
614  for (i = 0; i < sion_filedesc->maxusedchunks; i++) {
615  if (rank == 0) _sion_read_header_var_part_nextblocksizes_to_field(sion_filedesc, sion_filedesc->ntasks, sion_tmpintfield);
616  /* */ DPRINTFTS(rank, "before scatter");
617  if (! (flag&_SION_INTERNAL_FLAG_BUDDY_READ) ) {
618  sion_gendata->apidesc->scatterr_cb(sion_tmpintfield, &helpint64, comm_group, _SION_INT64, 1, 0);
619  } else {
620  /* remap data */
621  if(rank==0) for (j = 0; j < ntasks; j++) if(sion_tmpintfield_map[j]>=0) sion_tmpintfield_buddy64[j]=sion_tmpintfield[sion_tmpintfield_map[j]];
622  sion_gendata->apidesc->scatterr_cb(sion_tmpintfield_buddy64, &helpint64, comm_group, _SION_INT64, 1, 0);
623  }
624  /* */ DPRINTFTS(rank, "after scatter");
625  sion_filedesc->blocksizes[i] = helpint64;
626  }
627 
628 #define BGFLUSH
629 #ifdef BGFLUSH
630  if(do_open_file) {
631  _sion_file_flush(sion_filedesc->fileptr);
632  }
633 #endif
634 
635  /* */ DPRINTFTS(rank, "before setp");
636  sion_gendata->apidesc->barrier_cb(comm_group);
637  if(do_open_file) {
638  _sion_file_purge(sion_filedesc->fileptr);
639  _sion_file_set_position(sion_filedesc->fileptr, sion_filedesc->startpos);
640  }
641  sion_filedesc->currentpos = sion_filedesc->startpos;
642  sion_filedesc->currentblocknr = 0;
643 
644  /* OUTPUT parameters */
645  *fsblksize = sion_filedesc->fsblksize;
646  *chunksize = sion_filedesc->chunksize;
647  *globalrank = sion_filedesc->globalrank;
648 
649  /* free tmp field */
650  if(sion_tmpintfield) free(sion_tmpintfield);
651  if(sion_tmpintfield_map) free(sion_tmpintfield_map);
652  if(sion_tmpintfield_buddy32) free(sion_tmpintfield_buddy32);
653  if(sion_tmpintfield_buddy64) free(sion_tmpintfield_buddy64);
654 
655  sion_gendata->apidesc->barrier_cb(comm_group);
656  /* */ DPRINTFTS(rank, "after setp");
657  /* end of read */
658  } else {
659  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_multi_mpi: unknown file mode"));
660  }
661 
662  if(do_open_file) {
663  if(fileptr!=NULL) {
664  if(sion_filedesc->fileptr->flags&SION_FILE_FLAG_ANSI) {
665  *fileptr=sion_filedesc->fileptr->fileptr;
666  sion_filedesc->fileptr_exported=1;
667  } else {
668  *fileptr=NULL;
669  sion_filedesc->fileptr_exported=0;
670  }
671  }
672  } else {
673  if(fileptr!=NULL) *fileptr=NULL;
674  sion_filedesc->fileptr_exported=0;
675  }
676 
677  if (rank == 0) {
678  /* not needed for rest of sionlib function calls */
679  _sion_free_filedesc_arrays(sion_filedesc);
680  }
681 
682  _sion_print_filedesc(sion_filedesc, 512, "_sion_paropen_generic_one_file", 1);
683 
684  DPRINTFP((32, "_sion_paropen_generic_one_file", rank, " start position on task %02d is at end of sion_paropen_generic %10lld\n", rank,
685  _sion_file_get_position(sion_filedesc->fileptr)));
686 
687  DPRINTFP((2, "_sion_paropen_generic_one_file", rank, "leave parallel open of file %s in mode 0x%lx #tasks=%d\n", fname, (long) flags_store->mask, ntasks));
688 
689  return (sid);
690 
691 
692 }
693 
694 
695 
711  int rank,
712  int ntasks,
713  int mapping_size,
714  sion_int32 *mapping,
715  int flag,
716  _sion_generic_gendata *sion_gendata,
717  _sion_generic_buddy *buddy_data)
718 {
719 
720  int rc = SION_SUCCESS;
721  int blknum, lrank;
722  sion_int64 helpint64;
723  sion_int64 *sion_tmpintfield = NULL;
724  _sion_filedesc *sion_filedesc;
725  void *comm_group=NULL;
726  int do_close_file;
727 
728  if ((_sion_vcdtype(sid) != SION_FILEDESCRIPTOR) || !(sion_filedesc = _sion_vcdtovcon(sid))) {
729  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"_sion_parclose_generic: invalid sion_filedesc, aborting %d ...\n", sid));
730  }
731 
732  if (sion_filedesc->state != SION_FILESTATE_PAROPEN) {
733  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"_sion_parclose_generic: sion file with sid=%d was not opened by a sion_paropen\n", sid));
734  }
735 
736  DPRINTFP((2, "_sion_parclose_generic", rank, "enter parallel close sid=%d\n", sid));
737 
738  /* select local communicator */
739  if(flag& _SION_INTERNAL_FLAG_NORMAL ) comm_group=sion_gendata->comm_data_local;
740  if(flag& _SION_INTERNAL_FLAG_BUDDY_SEND ) comm_group=buddy_data->buddy_send.commgroup;
741  if(flag& _SION_INTERNAL_FLAG_BUDDY_COLL ) comm_group=buddy_data->buddy_coll.commgroup;
742  if(flag& _SION_INTERNAL_FLAG_BUDDY_READ ) comm_group=buddy_data->groups[buddy_data->currentgroup]->commgroup;
743 
744  /* decide if file has to be opened physically */
745  do_close_file=1;
746  if (flag&_SION_INTERNAL_FLAG_BUDDY_SEND) do_close_file=0;
747  if ( (flag&_SION_INTERNAL_FLAG_BUDDY_COLL) && (rank>0) ) do_close_file=0;
748  if ( (flag&_SION_INTERNAL_FLAG_BUDDY_READ) && (rank>0) ) do_close_file=0;
749 
750  /* READ MODE: close file on all tasks */
751  if (sion_filedesc->mode == SION_FILEMODE_READ) {
752  if (sion_filedesc->state != SION_FILESTATE_CLOSE) {
753 
754  _sion_print_filedesc(sion_filedesc, 512, "_sion_parclose_generic", 1);
755  DPRINTFP((32, "_sion_parclose_generic", rank, " parallel close (read mode) sid=%d, call fclose on file\n", sid));
756 
757  if(do_close_file) {
758  _sion_file_close(sion_filedesc->fileptr);
759  }
760  sion_filedesc->fileptr = NULL;
761  sion_filedesc->state = SION_FILESTATE_CLOSE;
762  }
763  }
764  else {
765  /* WRITE MODE: collect data from all tasks, write metadata, and close file on all tasks */
766 
767  /* _sion_buffer_flush(sion_filedesc); */ /* clear internal buffer */
768  _sion_flush_block(sion_filedesc);
769 
770  if (sion_filedesc->usebuffer) {
771  _sion_buffer_flush(sion_filedesc);
772  }
773 
774  _sion_print_filedesc(sion_filedesc, 512, "_sion_parclose_generic", 1);
775 
776  /* close file on all other task, except 0 */
777  if (rank != 0) {
778  if (sion_filedesc->state != SION_FILESTATE_CLOSE) {
779  DPRINTFP((32, "_sion_parclose_generic", rank, " parallel close (write mode) sid=%d, call fclose on file\n", sid));
780  if(do_close_file) {
781  _sion_file_close(sion_filedesc->fileptr);
782  }
783  sion_filedesc->fileptr = NULL;
784  sion_filedesc->state = SION_FILESTATE_CLOSE;
785  }
786  }
787 
788  sion_gendata->apidesc->barrier_cb(comm_group);
789 
790  DPRINTFP((32, "_sion_parclose_generic", rank, " parallel close sid=%d: lastchunknr=%d globalskip=%lld\n", sid, sion_filedesc->lastchunknr,
791  sion_filedesc->globalskip));
792  for (blknum = 0; blknum <= sion_filedesc->lastchunknr; blknum++) {
793  DPRINTFP((1024, "_sion_parclose_generic", rank, " parallel close sid=%d: local block %02d -> %10lld bytes\n", sid, blknum,
794  sion_filedesc->blocksizes[blknum]));
795  }
796 
797  if (rank == 0) {
798  sion_tmpintfield = (sion_int64 *) malloc(sion_filedesc->ntasks * sizeof(sion_int64));
799  if (sion_tmpintfield == NULL) {
800  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"_sion_parclose_generic: cannot allocate temporary memory of size %lu (sion_tmpintfield), aborting ...\n",
801  (unsigned long) sion_filedesc->ntasks * sizeof(sion_int64)));
802  }
803  }
804 
805  /* gather number of blocks of each tasks, and search maxusedchunks */
806  /* */ DPRINTFTS2(rank, "before gather");
807  helpint64 = sion_filedesc->lastchunknr + 1;
808  sion_gendata->apidesc->gatherr_cb(&helpint64, sion_tmpintfield, comm_group, _SION_INT64, 1, 0);
809 
810  if (rank == 0) {
811  sion_filedesc->maxusedchunks = -1;
812  for (blknum = 0; blknum < sion_filedesc->ntasks; blknum++)
813  if (sion_tmpintfield[blknum] > sion_filedesc->maxusedchunks)
814  sion_filedesc->maxusedchunks = (int) sion_tmpintfield[blknum];
815  }
816  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->maxusedchunks, comm_group, _SION_INT32, 1, 0);
817  /* */ DPRINTFTS2(rank, "after gather");
818 
819  /* calculate and set start_of_varheader */
820  sion_filedesc->start_of_varheader = sion_filedesc->start_of_data + sion_filedesc->maxusedchunks * sion_filedesc->globalskip;
821 
822  /* write rest of first meta data block on rank 0 */
823  if (rank == 0) {
824  _sion_write_header_var_info(sion_filedesc);
825 
826  _sion_write_header_var_part_blockcount_from_field(sion_filedesc,sion_filedesc->ntasks,sion_tmpintfield);
827 
828  }
829 
830  /* collect chunksizes of each block from each task and write it to file */
831  for (blknum = 0; blknum < sion_filedesc->maxusedchunks; blknum++) {
832  if (blknum <= sion_filedesc->lastchunknr) {
833  helpint64 = sion_filedesc->blocksizes[blknum];
834  }
835  else {
836  helpint64 = 0;
837  }
838 
839  /* */ DPRINTFTS2(rank, "before gather");
840  sion_gendata->apidesc->gatherr_cb(&helpint64, sion_tmpintfield, comm_group, _SION_INT64, 1, 0);
841  /* */ DPRINTFTS2(rank, "after gather");
842 
843  if (rank == 0) {
844  for (lrank = 0; lrank < ntasks; lrank++)
845  DPRINTFP((2048, "_sion_parclose_generic", rank, " parallel close sid=%d: write total chunksize for block %d: %2lld rank=%d\n", sid, blknum,
846  sion_tmpintfield[lrank], lrank));
847 
848  _sion_write_header_var_part_nextblocksizes_from_field(sion_filedesc,sion_filedesc->ntasks,sion_tmpintfield);
849 
850  }
851  }
852 
853  /* write mapping to file if more than one physical file is used, mapping_size is the number of global tasks */
854  if (mapping != NULL) {
855  _sion_write_header_var_part_mapping(sion_filedesc, mapping_size, mapping);
856  }
857 
858  /* close file on task 0 */
859  if (rank == 0) {
860  DPRINTFP((32, "_sion_parclose_generic", rank, " parallel close (write mode) sid=%d, call fclose on file\n", sid));
861  if(do_close_file) {
862  _sion_file_close(sion_filedesc->fileptr);
863  }
864  sion_filedesc->fileptr = NULL;
865  sion_filedesc->state = SION_FILESTATE_CLOSE;
866 
867  /* free tmp field */
868  if(sion_tmpintfield) free(sion_tmpintfield);
869  }
870 
871  } /* write */
872 
873  _sion_free_filedesc(sion_filedesc);
874  sion_filedesc = NULL;
875 
876 
877  DPRINTFP((2, "_sion_parclose_generic", rank, "leave parallel close sid=%d\n", sid));
878 
879  return (rc);
880 }
881 
887  int sid,
888  sion_int64 chunksize,
889  int rank,
890  int ntasks,
891  _sion_generic_gendata *sion_gendata)
892 {
893 
894  int rc = SION_SUCCESS;
895  _sion_filedesc *sion_filedesc;
896  sion_int64 lchunksize, lstartpointer, lglobalrank;
897  void *comm_group=NULL;
898 
899  if ((_sion_vcdtype(sid) != SION_FILEDESCRIPTOR) || !(sion_filedesc = _sion_vcdtovcon(sid))) {
900  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"_sion_parreinit_generic: invalid sion_filedesc, aborting %d ...\n", sid));
901  }
902 
903  if (sion_filedesc->state != SION_FILESTATE_PAROPEN) {
904  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"_sion_parreinit_generic: sion file with sid=%d was not opened by a sion_paropen\n", sid));
905  }
906 
907  DPRINTFP((2, "_sion_parreinit_generic", sion_filedesc->rank, "enter parallel reinit sid=%d\n", sid));
908 
909  comm_group=sion_gendata->comm_data_local;
910 
911  if (sion_filedesc->mode == SION_FILEMODE_READ) {
912  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"_sion_parreinit_generic: sion file with sid=%d only allowed for files openend for write\n", sid));
913  }
914 
915  /* */ DPRINTFTS(sion_filedesc->rank, "before alloc");
916  if (sion_filedesc->rank == 0) {
917  /* memory allocation for internal fields */
918  _sion_alloc_filedesc_arrays(sion_filedesc);
919  }
920  /* */ DPRINTFTS(sion_filedesc->rank, "after alloc");
921 
922  /* collect new chunksize data and init startpointers on PE 0 */
923  lchunksize = (sion_int64) chunksize;
924  lglobalrank = (sion_int64) sion_filedesc->globalrank;
925 
926  /* */ DPRINTFTS2(sion_filedesc->rank, "before gather");
927  sion_gendata->apidesc->gatherr_cb(&lchunksize, sion_filedesc->all_chunksizes, comm_group, _SION_INT64, 1, 0);
928  sion_gendata->apidesc->gatherr_cb(&lglobalrank, sion_filedesc->all_globalranks, comm_group, _SION_INT64, 1, 0);
929 
930  /* */ DPRINTFTS2(sion_filedesc->rank, "after gather");
931 
932  if(sion_filedesc->usecoll) _sion_alloc_filedesc_coll_arrays(sion_filedesc);
933 
934  if (sion_filedesc->rank == 0) {
935  /* */ DPRINTFTS(sion_filedesc->rank, "before calculate");
936  if (!sion_filedesc->usecoll) _sion_calculate_startpointers(sion_filedesc);
937  else _sion_calculate_startpointers_collective(sion_filedesc);
938  /* */ DPRINTFTS(sion_filedesc->rank, "after calculate");
939  }
940 
941  /* write header again */
942  if (sion_filedesc->rank == 0) {
943 
944  /* apply hint for first meta data block */
945  _sion_apply_hints(sion_filedesc,SION_HINTS_ACCESS_TYPE_METADATABLOCK1);
946 
947  _sion_file_flush(sion_filedesc->fileptr);
948  lstartpointer=0;
949  _sion_file_set_position(sion_filedesc->fileptr, lstartpointer);
950 
951  /* */ DPRINTFTS(sion_filedesc->rank, "before writeh");
952  _sion_write_header(sion_filedesc);
953  /* */ DPRINTFTS(sion_filedesc->rank, "after writeh");
954 
955  /* needed for writing pointer to var part of metadata at the end of the file */
956  sion_filedesc->end_of_header = _sion_file_get_position(sion_filedesc->fileptr);
957  sion_filedesc->start_of_data = sion_filedesc->all_startpointers[0];
958 
959  /*set max. file size */
960  lstartpointer = sion_filedesc->all_startpointers[sion_filedesc->ntasks - 1]
961  + sion_filedesc->all_chunksizes[sion_filedesc->ntasks - 1];
962  /* */ DPRINTFTS(sion_filedesc->rank, "before setp(0)");
963  _sion_file_flush(sion_filedesc->fileptr);
964  _sion_file_set_position(sion_filedesc->fileptr, lstartpointer);
965  /* */ DPRINTFTS(sion_filedesc->rank, "after setp(0)");
966 
967  }
968 
969  /* distribute start_pos */
970  /* */ DPRINTFTS(sion_filedesc->rank, "before scatter");
971  sion_gendata->apidesc->scatterr_cb(sion_filedesc->all_startpointers, &sion_filedesc->startpos, comm_group, _SION_INT64, 1, 0);
972  /* */ DPRINTFTS(sion_filedesc->rank, "after scatter");
973 
974  /* distribute information for collective operations */
975  if(sion_filedesc->usecoll) {
976  sion_gendata->apidesc->scatterr_cb(sion_filedesc->all_coll_collsize, &sion_filedesc->collsize, comm_group, _SION_INT32, 1, 0);
977  sion_gendata->apidesc->scatterr_cb(sion_filedesc->all_coll_collector, &sion_filedesc->collector, comm_group, _SION_INT32, 1, 0);
978 
979  _sion_free_filedesc_coll_arrays(sion_filedesc);
980  }
981 
982  /* distribute globalskip */
983  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->globalskip, comm_group, _SION_INT64, 1, 0);
984 
985  DPRINTFP((32, "_sion_parreinit_generic", sion_filedesc->rank, " start position is %10lld %10.4f MB\n",
986  sion_filedesc->startpos, sion_filedesc->startpos / 1024.0 / 1024.0));
987 
988  /* set filepointer on each task */
989  /* */ DPRINTFTS(sion_filedesc->rank, "before setp");
990  sion_gendata->apidesc->barrier_cb(comm_group);
991  _sion_file_flush(sion_filedesc->fileptr);
992  _sion_file_set_position(sion_filedesc->fileptr, sion_filedesc->startpos);
993  sion_filedesc->currentpos = sion_filedesc->startpos;
994  sion_filedesc->chunksize = (sion_int64) chunksize;
995  sion_gendata->apidesc->barrier_cb(comm_group);
996 
997  /* apply hint for first chunk */
998  _sion_apply_hints(sion_filedesc,SION_HINTS_ACCESS_TYPE_CHUNK);
999 
1000  if (sion_filedesc->rank == 0) {
1001  /* not needed for rest of sionlib function calls */
1002  _sion_free_filedesc_arrays(sion_filedesc);
1003  }
1004 
1005  /* */ DPRINTFTS(sion_filedesc->rank, "after setp");
1006  DPRINTFP((32, "_sion_parreinit_generic", sion_filedesc->rank, " ending open for write #tasks=%d filepos=%lld\n",
1007  sion_filedesc->ntasks, _sion_file_get_position(sion_filedesc->fileptr)));
1008 
1009  DPRINTFP((2, "_sion_parreinit_generic", sion_filedesc->rank, "leave parallel reinit of file %s in #tasks=%d\n",
1010  sion_filedesc->fname, sion_filedesc->ntasks));
1011 
1012  return (rc);
1013 
1014 
1015 }
1016 
1017 
1022 #define DFUNCTION "_sion_generic_collect_mapping"
1023 int _sion_generic_collect_mapping( _sion_filedesc *sion_filedesc,
1024  int *mapping_size,
1025  sion_int32 **mapping ) {
1026  int rc=SION_SUCCESS;
1027  int t;
1028  _sion_generic_gendata *sion_gendata;
1029  _sion_generic_apidesc *sion_apidesc;
1030  sion_int32 lpos[2], *receivemap=NULL, iamreceiver, receiver = -1;
1031 
1032 
1033  sion_gendata=sion_filedesc->dataptr;
1034  sion_apidesc=sion_gendata->apidesc;
1035 
1036  *mapping = NULL; *mapping_size = 0;
1037 
1038  if ((sion_filedesc->mode == SION_FILEMODE_WRITE) && (sion_filedesc->nfiles > 1)) {
1039  /* collect mapping to files to task 0 */
1040 
1041  /* mapping data will be collected by master of first physical file */
1042  if((sion_filedesc->filenumber==0) && (sion_filedesc->rank==0)) {
1043  /* allocate data */
1044  *mapping_size=sion_gendata->gsize;
1045  *mapping = (sion_int32 *) malloc(*mapping_size * 2 * sizeof(sion_int32));
1046  if (*mapping == NULL) {
1047  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_generic_parclose: Cannot allocate memory for mapping"));
1048  }
1049  }
1050 
1051  /* gather info about send about global rank of master of first file on grank 0 */
1052  if(sion_gendata->grank==0) {
1053  receivemap = (sion_int32 *) malloc(sion_gendata->gsize * sizeof(sion_int32));
1054  if (receivemap == NULL) {
1055  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_generic_parclose: Cannot allocate memory for receivemap"));
1056  }
1057  }
1058 
1059  if((sion_filedesc->filenumber==0) && (sion_filedesc->rank==0)) iamreceiver=sion_gendata->grank;
1060  else iamreceiver=-1;
1061  sion_apidesc->gatherr_cb(&iamreceiver, receivemap, sion_gendata->comm_data_global, _SION_INT32, 1, 0);
1062  if(sion_gendata->grank==0) {
1063  for(t=0;t<sion_gendata->gsize;t++) {
1064  if(receivemap[t]>=0) {
1065  receiver=receivemap[t];
1066  break;
1067  }
1068  }
1069  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "receiver of mapping grank=%d\n", receiver));
1070  }
1071  sion_apidesc->bcastr_cb(&receiver, sion_gendata->comm_data_global, _SION_INT32, 1, 0);
1072 
1073  /* receive global rank of master of first file on grank 0 */
1074  lpos[0] = sion_filedesc->filenumber;
1075  lpos[1] = sion_filedesc->rank;
1076  sion_apidesc->gatherr_cb(&lpos, *mapping, sion_gendata->comm_data_global, _SION_INT32, 2, receiver);
1077  }
1078 
1079  if(receivemap!=NULL) free(receivemap);
1080 
1081  return(rc);
1082 }
1083 #undef DFUNCTION
1084 
1085 /* END OF _sion_parclose_generic */
1086 
1094 {
1095  int grank = comm->grank;
1096  int gsize = comm->gsize;
1097 
1098  DPRINTFP((2, __func__, grank, "enter\n"));
1099 
1100  if (0 != strcmp(comm->apidesc->name, "SIONlib_MPI_API")) {
1101  return _sion_errorprint(SION_NOT_SUCCESS, _SION_ERROR_RETURN, "MSA Collectives: not supported for generic API \"%s\"\n", comm->apidesc->name);
1102  }
1103 
1104  _sion_filedesc fd;
1105  _sion_init_filedesc(&fd);
1106  fd.ntasks = gsize;
1107  _sion_flags_entry *flags_entry = _sion_flags_get(flags, "collsize");
1108  if (flags_entry) {
1109  fd.collsize = atoi(flags_entry->val);
1110  }
1111  _sion_coll_check_env(&fd);
1112  sion_int32 collsize = fd.collsize;
1113  if (collsize < 2) {
1114  return _sion_errorprint(SION_NOT_SUCCESS, _SION_ERROR_RETURN, "MSA Collectives: size of collective groups should be 2 or more, but is %d\n", collsize);
1115  }
1116  sion_int32 n_groups = gsize / collsize + ((gsize % collsize) ? 1 : 0);
1117 
1118  if (!fd.usecoll) {
1119  return _sion_errorprint(SION_NOT_SUCCESS, _SION_ERROR_RETURN, "MSA Collectives: usecoll == false\n");
1120  }
1121 
1122  int is_candidate = _sion_generic_is_candidate(comm);
1123  int n_candidates = 0;
1124  int candidates_before = 0;
1125  // bring your own Allreduce and Exscan
1126  {
1127  int *candidates = NULL;
1128  if (0 == comm->grank) {
1129  candidates = calloc(gsize, sizeof(int));
1130  if (!candidates) {
1131  _sion_errorprint(SION_NOT_SUCCESS, _SION_ERROR_ABORT, "MSA Collectives: malloc returned NULL\n");
1132  }
1133  }
1134 
1135  comm->apidesc->gatherr_cb(&is_candidate, candidates, comm->comm_data_global, _SION_INT32, 1, 0);
1136 
1137  if (0 == comm->grank) {
1138  for (size_t i = 0; i < gsize; i++) {
1139  int tmp = candidates[i];
1140  candidates[i] = n_candidates;
1141  n_candidates += tmp;
1142  }
1143  }
1144 
1145  comm->apidesc->bcastr_cb(&n_candidates, comm->comm_data_global, _SION_INT32, 1, 0);
1146  comm->apidesc->scatterr_cb(candidates, &candidates_before, comm->comm_data_global, _SION_INT32, 1, 0);
1147 
1148  if (0 == comm->grank) {
1149  free(candidates);
1150  }
1151  }
1152 
1153  if ((n_candidates < n_groups) || (n_candidates < comm->numfiles)) {
1154  return _sion_errorprint(SION_NOT_SUCCESS, _SION_ERROR_RETURN, "MSA Collectives: insufficient number of candidates %d, number of groups %d, number of files %d\n", n_groups, comm->numfiles);
1155 
1156  }
1157 
1158  int groups_per_file = n_groups / comm->numfiles;
1159  int is_collector = is_candidate && (candidates_before < n_groups);
1160  if (is_collector) {
1161  int rank_collector = candidates_before;
1162  comm->filenumber = rank_collector % comm->numfiles;
1163  comm->lrank = (rank_collector / comm->numfiles) * collsize;
1164  } else {
1165  int collectors_before = (candidates_before < n_groups) ? candidates_before : n_groups;
1166  int rank_sender = grank - collectors_before;
1167  int group_number = rank_sender / (collsize - 1);
1168  comm->filenumber = group_number % comm->numfiles;
1169  comm->lrank = (group_number / comm->numfiles) * collsize + rank_sender % (collsize - 1) + 1;
1170  }
1171  comm->lsize = collsize * (groups_per_file + ((comm->filenumber < n_groups % comm->numfiles) ? 1 : 0));
1172  if (comm->filenumber == comm->numfiles - 1) {
1173  comm->lsize += gsize - n_groups * collsize;
1174  }
1175 
1176  DPRINTFP((32, __func__, grank, "MSA Collectives: global rank %d of %d, is candidate %d, is collector %d, file no %d, local rank %d, local size %d\n", grank, gsize, is_candidate, is_collector, comm->filenumber, comm->lrank, comm->lsize));
1177  DPRINTFP((2, __func__, grank, "exit\n"));
1178  return SION_SUCCESS;
1179 }
1180 
1181 int _sion_generic_is_candidate(_sion_generic_gendata *comm) {
1182 #if defined(_SION_MSA_DEEP_EST_SDV)
1183  char hostname[1024];
1184  if (0 == gethostname(hostname, 1024)) {
1185  if (0 == strncmp("knl", hostname, 3)) {
1186  return 0;
1187  } else {
1188  return 1;
1189  }
1190  } else {
1191  return 0;
1192  }
1193 #elif defined(_SION_MSA_HOSTNAME_REGEX)
1194  char *regex;
1195  regex_t compiled;
1196  int compile_error;
1197  if ((regex = _sion_getenv("SION_MSA_COLLECTOR_HOSTNAME_EREGEX"))) {
1198  compile_error = regcomp(&compiled, regex, REG_EXTENDED|REG_ICASE|REG_NOSUB);
1199  } else if ((regex = _sion_getenv("SION_MSA_COLLECTOR_HOSTNAME_REGEX"))) {
1200  compile_error = regcomp(&compiled, regex, REG_ICASE|REG_NOSUB);
1201  } else {
1202  return 1;
1203  }
1204  if (compile_error) {
1205  char error_msg[1024];
1206  size_t error_size = regerror(compile_error, &compiled, error_msg, 1024);
1207  regfree(&compiled);
1208  _sion_errorprint(SION_NOT_SUCCESS, _SION_ERROR_ABORT, "MSA Collectives: error compiling regex \"%s\": %s%s\n", regex, error_msg, (error_size > 1024) ? "..." : "");
1209  }
1210 
1211  char hostname[1024];
1212  int hostname_error = gethostname(hostname, 1023);
1213  hostname[1023] = '\0';
1214  if (hostname_error) {
1215  regfree(&compiled);
1216  _sion_errorprint(SION_NOT_SUCCESS, _SION_ERROR_ABORT, "MSA Collectives: error getting host name\n");
1217  }
1218 
1219  int exec_status = regexec(&compiled, hostname, 0, NULL, 0);
1220  if (exec_status == 0) {
1221  regfree(&compiled);
1222  return 1;
1223  } else if (exec_status == REG_NOMATCH) {
1224  regfree(&compiled);
1225  return 0;
1226  } else {
1227  char error_msg[1024];
1228  size_t error_size = regerror(exec_status, &compiled, error_msg, 1024);
1229  regfree(&compiled);
1230  _sion_errorprint(SION_NOT_SUCCESS, _SION_ERROR_ABORT, "MSA Collectives: error matching regex \"%s\": %s%s\n", regex, error_msg, (error_size > 1024) ? "..." : "");
1231  return 0; // NOT REACHED
1232  }
1233 #elif defined(_SION_MSA_TEST)
1234  return comm->grank %2;
1235 #else
1236  return 1;
1237 #endif
1238 }
int _sion_buffer_check_env(_sion_filedesc *sion_filedesc)
Checks if environment variables are set to use buffer.
Definition: sion_buffer.c:60
int _sion_buffer_flush(_sion_filedesc *sion_filedesc)
Flush buffer.
Definition: sion_buffer.c:177
int _sion_cache_check_env(_sion_filedesc *sion_filedesc)
Check if environment variables are set to use cache.
Definition: sion_cache.c:57
#define MAXCHUNKS
Definition: sion_common.h:35
int _sion_reassignvcd(int sid, void *data, int type)
Definition: sion_fd.c:63
void * _sion_vcdtovcon(int sid)
Definition: sion_fd.c:53
int _sion_vcdtype(int sid)
Definition: sion_fd.c:58
#define SION_FILEDESCRIPTOR
Definition: sion_fd.h:17
long _sion_file_get_opt_blksize(_sion_fileptr *sion_fileptr)
Get optional file system block size for a file.
Definition: sion_file.c:289
int _sion_file_flush(_sion_fileptr *sion_fileptr)
Flush data to file.
Definition: sion_file.c:434
int _sion_file_close(_sion_fileptr *sion_fileptr)
Close file and destroys fileptr structure.
Definition: sion_file.c:178
sion_int64 _sion_file_set_position(_sion_fileptr *sion_fileptr, sion_int64 startpointer)
Set new position in file.
Definition: sion_file.c:367
int _sion_file_purge(_sion_fileptr *sion_fileptr)
Purge data to file.
Definition: sion_file.c:467
sion_int64 _sion_file_get_position(_sion_fileptr *sion_fileptr)
Get new position in file.
Definition: sion_file.c:401
_sion_fileptr * _sion_file_open(const char *fname, unsigned int flags, unsigned int addflags)
Create and open a new file for writing.
Definition: sion_file.c:53
#define SION_FILE_FLAG_READ
Definition: sion_file.h:30
#define SION_FILE_FLAG_CREATE
Definition: sion_file.h:28
#define SION_FILE_FLAG_ANSI
Definition: sion_file.h:25
#define SION_FILE_FLAG_WRITE
Definition: sion_file.h:29
#define SION_FILE_FLAG_POSIX
Definition: sion_file.h:27
int _sion_init_filedesc(_sion_filedesc *sion_filedesc)
Initialize the sion file description.
Definition: sion_filedesc.c:37
int _sion_realloc_filedesc_blocklist(_sion_filedesc *sion_filedesc, sion_int32 maxchunks)
Increase the memory used by the internal sion structure for the blocklist.
int _sion_alloc_filedesc_coll_arrays(_sion_filedesc *sion_filedesc)
Allocate memory for the internal sion arrays.
int _sion_alloc_filedesc_arrays(_sion_filedesc *sion_filedesc)
Allocate memory for the internal sion arrays.
int _sion_print_filedesc(_sion_filedesc *sion_filedesc, int level, char *desc, int flag)
Print the initialized sion file description.
int _sion_free_filedesc_arrays(_sion_filedesc *sion_filedesc)
free memory for the internal sion arrays
_sion_filedesc * _sion_alloc_filedesc(void)
Allocates memory for internal sion structure.
int _sion_free_filedesc_coll_arrays(_sion_filedesc *sion_filedesc)
free memory for the internal sion arrays
#define SION_FILEMODE_WRITE
Definition: sion_filedesc.h:38
#define SION_FILEMODE_READ
Definition: sion_filedesc.h:37
#define SION_FILESTATE_PAROPEN
Definition: sion_filedesc.h:28
#define SION_FILESTATE_CLOSE
Definition: sion_filedesc.h:35
#define DFUNCTION
collect mapping information on rank 0 of first file, mapping=NULL for all others
int _sion_parclose_generic(int sid, int rank, int ntasks, int mapping_size, sion_int32 *mapping, int flag, _sion_generic_gendata *sion_gendata, _sion_generic_buddy *buddy_data)
Internal function to close parallel opened SION file.
int _sion_generic_renumber_collmsa(_sion_generic_gendata *comm, _sion_flags_store *flags)
Splits a Communicator in numfiles different communicators.
int _sion_parreinit_generic(int sid, sion_int64 chunksize, int rank, int ntasks, _sion_generic_gendata *sion_gendata)
change chunksize for an already opened SION file (write)
int _sion_paropen_generic_one_file(int sid, char *fname, _sion_flags_store *flags_store, char *prefix, int *numFiles, int *filenumber, sion_int64 *chunksize, sion_int32 *fsblksize, int rank, int ntasks, int *globalrank, int flag, FILE **fileptr, _sion_generic_gendata *sion_gendata, _sion_generic_buddy *buddy_data)
Generic parallel open of one direct access file.
char * _sion_getenv(const char *name)
int _sion_flush_block(_sion_filedesc *sion_filedesc)
Update the internal data structure.
sion_int32 _sion_get_endianness_with_flags(sion_int64 flags)
Return endianness including possible choice via flags.
int _sion_read_header_var_part(_sion_filedesc *sion_filedesc)
Read the second part of SION Meta Block 1.
int _sion_write_header_var_part_blockcount_from_field(_sion_filedesc *sion_filedesc, int field_size, sion_int64 *field)
Write the block sizes from Meta Block 2.
int _sion_write_header_var_part_nextblocksizes_from_field(_sion_filedesc *sion_filedesc, int field_size, sion_int64 *field)
Write the next set of blocksizes from Meta Block 2 Assuming that filepointer is at the correct positi...
int _sion_write_header_var_info(_sion_filedesc *sion_filedesc)
Write the SION Meta Block 1.
int _sion_write_header(_sion_filedesc *sion_filedesc)
Write the SION Meta Block 1.
Definition: sion_metadata.c:38
int _sion_read_header_var_part_blockcount_to_field(_sion_filedesc *sion_filedesc, int field_size, sion_int64 *field)
Read the block sizes from Meta Block 2.
int _sion_read_header_fix_part(_sion_filedesc *sion_filedesc)
Read part of the SION Meta Block 1.
int _sion_write_header_var_part_mapping(_sion_filedesc *sion_filedesc, sion_int32 mapping_size, sion_int32 *mapping)
Write mapping into the SION Meta Block 2.
int _sion_read_header_var_part_nextblocksizes_to_field(_sion_filedesc *sion_filedesc, int field_size, sion_int64 *field)
Read the next set of blocksizes from Meta Block 2 Assuming that filepointer is at the correct positio...
Sion Time Stamp Header.
Sion File Descriptor Structure.
Definition: sion_filedesc.h:79
sion_int32 * all_coll_collsize
sion_int32 * all_coll_collector
sion_int64 start_of_varheader
sion_int64 * all_chunksizes
sion_int64 * blocksizes
sion_int32 fileptr_exported
sion_int64 * all_globalranks
sion_int32 currentblocknr
Definition: sion_filedesc.h:97
sion_int32 * all_coll_capability
_sion_fileptr * fileptr
Definition: sion_filedesc.h:82
sion_int32 coll_capability
sion_int64 * all_startpointers
Definition: sion_flags.h:32