SIONlib  1.7.4
Scalable I/O library for parallel access to task-local files
sion_generic_internal.c
Go to the documentation of this file.
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2019 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
19 #define _XOPEN_SOURCE 700
20 
21 #include <stdlib.h>
22 #include <stdio.h>
23 #include <string.h>
24 #include <unistd.h>
25 
26 #include "sion.h"
27 #include "sion_debug.h"
28 #include "sion_error_handler.h"
29 #include "sion_file.h"
30 #include "sion_filedesc.h"
31 #include "sion_fd.h"
32 #include "sion_metadata.h"
33 #include "sion_internal.h"
34 #include "sion_printts.h"
35 #include "sion_keyvalue.h"
36 #include "sion_flags.h"
37 
38 #include "sion_cache.h"
39 #include "sion_buffer.h"
40 #include "sion_flags.h"
41 #include "sion_hints.h"
42 #include "sion_generic_internal.h"
44 #include "sion_generic_buddy.h"
45 #include "sion_internal_startptr.h"
46 
70  int sid,
71  char *fname,
72  _sion_flags_store *flags_store,
73  char *prefix,
74  int *numFiles,
75  int *filenumber,
76  sion_int64 *chunksize,
77  sion_int32 *fsblksize,
78  int rank,
79  int ntasks,
80  int *globalrank,
81  int flag,
82  FILE **fileptr,
83  _sion_generic_gendata *sion_gendata,
84  _sion_generic_buddy *buddy_data )
85 {
86 
87  int i, j;
88  int rc;
89 
90  _sion_filedesc *sion_filedesc;
91  _sion_fileptr *sion_fileptr;
92 
93  int nfiles, filenum;
94  sion_int64 lchunksize, lstartpointer, lglobalrank, new_fsblocksize, helpint64, apiflag;
95  sion_int64 *sion_tmpintfield = NULL;
96  sion_int32 *sion_tmpintfield_map = NULL, helpint32;
97  sion_int32 *sion_tmpintfield_buddy32 = NULL;
98  sion_int64 *sion_tmpintfield_buddy64 = NULL;
99  void *comm_group=NULL;
100  int do_open_file;
101 
102  _sion_flags_entry* flags_entry = NULL;
103 
104  if (flags_store->mask&_SION_FMODE_POSIX) apiflag=SION_FILE_FLAG_POSIX;
105  else apiflag=SION_FILE_FLAG_ANSI;
106 
107  DPRINTFP((2, "_sion_paropen_generic_one_file", rank, "enter parallel open of file %s in mode %d #tasks=%d\n", fname, (int) flags_store->mask, ntasks));
108  DPRINTFP((32, "_sion_paropen_generic_one_file", rank, "sizeof: int=%d long=%d longlong=%d sion_int32=%d sion_int64=%d\n", sizeof(int), sizeof(long),
109  sizeof(long long), sizeof(sion_int32), sizeof(sion_int64)));
110 
111  /* some shortcuts */
112  nfiles = *numFiles;
113  filenum = *filenumber;
114 
115  /* select local communicator */
116  if(flag& _SION_INTERNAL_FLAG_NORMAL ) comm_group=sion_gendata->comm_data_local;
117  if(flag& _SION_INTERNAL_FLAG_BUDDY_NORMAL ) comm_group=sion_gendata->comm_data_local;
118  if(flag& _SION_INTERNAL_FLAG_BUDDY_SEND ) comm_group=buddy_data->buddy_send.commgroup;
119  if(flag& _SION_INTERNAL_FLAG_BUDDY_COLL ) comm_group=buddy_data->buddy_coll.commgroup;
120  if(flag& _SION_INTERNAL_FLAG_BUDDY_READ ) comm_group=buddy_data->groups[buddy_data->currentgroup]->commgroup;
121 
122  /* decide if file has to be opened physically */
123  do_open_file=1;
124  if (flag&_SION_INTERNAL_FLAG_BUDDY_SEND) do_open_file=0;
125  if ( (flag&_SION_INTERNAL_FLAG_BUDDY_COLL) && (rank>0) ) do_open_file=0;
126  if ( (flag&_SION_INTERNAL_FLAG_BUDDY_READ) && (rank>0) ) do_open_file=0;
127  /* WF: todo decide if data is local, these tasks can read data directly? */
128  DPRINTFP((2, "_sion_paropen_generic_one_file", rank, "do_open_file=%d\n", do_open_file));
129 
130  sion_filedesc = _sion_alloc_filedesc();
131  if (sion_filedesc == NULL) {
132  _sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_ABORT,"_sion_paropen_omp: cannot allocate filedescriptor structure of size %lu (sion_filedesc), aborting ...\n",
133  (unsigned long) sizeof(sion_filedesc));
134  }
135  _sion_init_filedesc(sion_filedesc);
136  sion_filedesc->fname = strdup(fname); /* Set the filename */
137 
138  _sion_reassignvcd(sid,sion_filedesc, SION_FILEDESCRIPTOR);
139  sion_filedesc->sid=sid;
140 
141  /* Allocate memory for storing MAXCHUNKS chunksize infos in internal structure */
143  sion_filedesc->lastchunknr = 0; /* Set the current number of chunks */
144  sion_filedesc->currentblocknr = 0; /* Set the current block number */
145 
146  if (flags_store->mask&_SION_FMODE_WRITE) {
147  /* **************** WRITE mode **************** */
148 
149  DPRINTFP((32, "_sion_paropen_generic_one_file", rank, " starting open for write #tasks=%d\n", ntasks));
150 
151  /* check parameter */
152  if (ntasks<0) {
153  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen: wrong number of tasks specific: ntasks=%d (<0), returning ...\n", (int) ntasks));
154  }
155 
156  /* check parameter */
157  if ((chunksize != NULL) && (*chunksize<0)) {
158  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen: ((chunksize != NULL) && (*chunksize<0)), returning ...\n"));
159  }
160 
161  /* check parameter */
162  if ((flag & _SION_INTERNAL_FLAG_NORMAL ) && (globalrank != NULL) && (*globalrank<0)) {
163  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen: ((globalrank != NULL) && (*globalrank<0)), returning ...\n"));
164  }
165 
166  sion_filedesc->state = SION_FILESTATE_PAROPEN;
167  sion_filedesc->mode = SION_FILEMODE_WRITE;
168  sion_filedesc->endianness = _sion_get_endianness_with_flags(flags_store->mask); /* Endianness */
169  sion_filedesc->swapbytes = 0; /* Endianness, swapping bytes */
170  sion_filedesc->fsblksize = *fsblksize;
171  sion_filedesc->rank = rank;
172  sion_filedesc->globalrank = *globalrank;
173  sion_filedesc->ntasks = ntasks;
174  sion_filedesc->nfiles = nfiles;
175  sion_filedesc->filenumber = filenum;
176  sion_filedesc->prefix = strdup(prefix);
177  sion_filedesc->compress = flags_store->mask&_SION_FMODE_COMPRESS;
178  sion_filedesc->usecoll = (flags_store->mask&_SION_FMODE_COLLECTIVE)>0;
179  sion_filedesc->collmergemode = (flags_store->mask&_SION_FMODE_COLLECTIVE_MERGE)>0;
180  sion_filedesc->collmsa = !!_sion_flags_get(flags_store, "collmsa");
181  sion_filedesc->usebuddy = (flags_store->mask&_SION_FMODE_BUDDY)>0;
182  if(sion_filedesc->usebuddy) {
183  sion_filedesc->buddylevel = atoi(_sion_flags_get(flags_store,"buddy")->val);
184  if (sion_filedesc->buddylevel==0) sion_filedesc->buddylevel=1; /* default */
185  }
186 
187  /* open file on rank 0, first time to create file and get fsblksize if necessary */
188  if (rank == 0) {
189  sion_fileptr = _sion_file_open(fname,apiflag|SION_FILE_FLAG_WRITE|SION_FILE_FLAG_CREATE,0);
190  if (!sion_fileptr) {
191  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_generic: cannot open %s for writing, aborting ...\n", fname));
192  }
193  if(*fsblksize<=-1) {
194  /* check with fstat fsblksize */
195  new_fsblocksize=(sion_int64) _sion_file_get_opt_blksize(sion_fileptr);
196  if((new_fsblocksize<0) || (new_fsblocksize>SION_MAX_FSBLOCKSIZE)) new_fsblocksize=SION_DEFAULT_FSBLOCKSIZE;
197  }
198  _sion_file_close(sion_fileptr);
199  }
200  sion_gendata->apidesc->barrier_cb(comm_group);
201  /* printf("WF: rank=%2d after first barrier fsblksize=%d\n",rank,(int) *fsblksize); */
202 
203  /* distribute new fsblksize */
204  if(*fsblksize==-1) {
205  sion_gendata->apidesc->bcastr_cb(&new_fsblocksize, comm_group, _SION_INT64, 1, 0);
206  *fsblksize=new_fsblocksize;
207  sion_filedesc->fsblksize = *fsblksize;
208  /* printf("WF: rank=%2d after bcast fsblksize=%d new_fsblocksize=%d\n",rank,(int) *fsblksize,(int) new_fsblocksize); */
209  DPRINTFP((32, "_sion_paropen_generic_one_file", rank, "setting fsblksize to %lld\n", new_fsblocksize));
210  }
211 
212  /* check for buffer, needed at this point to set flag before writing header */
213  _sion_cache_check_env(sion_filedesc);
214  _sion_buffer_check_env(sion_filedesc);
215 
216  /* check for keyval parameter, needed at this point to set flag before writing header (flag1) */
217  if (rank == 0) {
218  _sion_keyval_check_env(sion_filedesc, flags_store->mask);
219  }
220  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->keyvalmode, comm_group, _SION_INT32, 1, 0);
221 
222  /* check for collective options */
223  if (rank == 0) {
224  if ((flags_entry = _sion_flags_get(flags_store, "collsize"))) {
225  sion_filedesc->collsize = atoi(flags_entry->val);
226  }
227  _sion_coll_check_env(sion_filedesc);
228  }
229 
230  if (
231  ( flag&_SION_INTERNAL_FLAG_BUDDY_NORMAL )
232  || ( flag&_SION_INTERNAL_FLAG_BUDDY_SEND )
233  || ( flag&_SION_INTERNAL_FLAG_BUDDY_COLL )
234  )
235  {
236  /* overwrite collective info if not set*/
237  if(!sion_filedesc->usecoll) {
238  sion_filedesc->usecoll=1;
239  sion_filedesc->collsize=sion_filedesc->ntasks;
240  };
241  }
242 
243  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->usecoll, comm_group, _SION_INT32, 1, 0);
244  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->collsize, comm_group, _SION_INT32, 1, 0);
245  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->collmergemode, comm_group, _SION_INT32, 1, 0);
246 
247  /* check if API support coalescing I/O */
248  if(sion_filedesc->usecoll) {
249  if(sion_gendata->apidesc->level!=SION_GENERIC_API_LEVEL_FULL) {
250  _sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_WARN,"sion_paropen_generic: requested coalescing I/O but API does not support this mode, falling back to individual mode ...\n");
251  sion_filedesc->usecoll=0;
252  }
253  }
254 
255  /* check for hints options */
256  if (rank == 0) {
257  _sion_hints_check_env(sion_filedesc);
258  }
259  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->usehints, comm_group, _SION_INT32, 1, 0);
260  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->hinttype, comm_group, _SION_INT32, 1, 0);
261 
262  /* */ DPRINTFTS(rank, "before alloc");
263  if (rank == 0) {
264  /* memory allocation for internal fields */
265  _sion_alloc_filedesc_arrays(sion_filedesc);
266  if(sion_filedesc->usecoll) _sion_alloc_filedesc_coll_arrays(sion_filedesc);
267  }
268  /* */ DPRINTFTS(rank, "after alloc");
269 
270  /* collect data and init startpointers on PE 0 */
271  lchunksize = (sion_int64) *chunksize;
272  lglobalrank = (sion_int64) *globalrank;
273  sion_filedesc->chunksize_req=lchunksize;
274  DPRINTFP((32, "_sion_paropen_generic_one_file", rank, "lchunksize=%lld lglobalrank=%lld\n", lchunksize,lglobalrank));
275 
276  /* */ DPRINTFTS2(rank, "before gather");
277  sion_gendata->apidesc->gatherr_cb(&lchunksize, sion_filedesc->all_chunksizes, comm_group, _SION_INT64, 1, 0);
278  sion_gendata->apidesc->gatherr_cb(&lglobalrank, sion_filedesc->all_globalranks, comm_group, _SION_INT64, 1, 0);
279 
280  /* check capability of tasks */
281  if(sion_filedesc->usecoll) {
282  sion_filedesc->coll_capability=sion_gendata->apidesc->get_capability_cb(comm_group);
283  sion_gendata->apidesc->gatherr_cb(&sion_filedesc->coll_capability, sion_filedesc->all_coll_capability, comm_group, _SION_INT32, 1, 0);
284  }
285 
286  /* */ DPRINTFTS2(rank, "after gather");
287  if (rank == 0) {
288  /* */ DPRINTFTS(rank, "before calculate");
289  DPRINTFP((32, "_sion_paropen_generic_one_file", rank, "chunksizes[%d - 1]=%ld\n", ntasks,(long) sion_filedesc->all_chunksizes[ntasks - 1]));
290  if (!sion_filedesc->usecoll) _sion_calculate_startpointers(sion_filedesc);
291  else {
292  if (sion_filedesc->collmergemode) _sion_calculate_startpointers_collective_merge(sion_filedesc);
293  else if (sion_filedesc->collmsa) _sion_calculate_startpointers_collective_msa(sion_filedesc);
294  else _sion_calculate_startpointers_collective(sion_filedesc);
295  }
296  /* */ DPRINTFTS(rank, "after calculate");
297  }
298 
299  /* */ DPRINTFTS(rank, "before open");
300  /* open not file on non-collector task if buddy-checkpointing */
301  if(do_open_file) {
302 
303  /* open file on all ranks */
304  sion_fileptr = _sion_file_open(fname,apiflag|SION_FILE_FLAG_WRITE,0);
305  if (!sion_fileptr) {
306  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_generic: cannot open %s for writing, aborting ...\n", fname));
307  }
308  /* store data in static data structure (sid) */
309  sion_filedesc->fileptr = sion_fileptr;
310  }
311  sion_gendata->apidesc->barrier_cb(comm_group);
312  /* */ DPRINTFTS(rank, "after open");
313 
314 
315  /* write header */
316  if (rank == 0) {
317 
318  /* apply hint for first meta data block */
319  _sion_apply_hints(sion_filedesc,SION_HINTS_ACCESS_TYPE_METADATABLOCK1);
320 
321  /* */ DPRINTFTS(rank, "before writeh");
322  _sion_write_header(sion_filedesc);
323  /* */ DPRINTFTS(rank, "after writeh");
324 
325  /* needed for writing pointer to var part of metadata at the end of the file */
326  sion_filedesc->end_of_header = _sion_file_get_position(sion_filedesc->fileptr);
327  sion_filedesc->start_of_data = sion_filedesc->all_startpointers[0];
328  /*set max. file size */
329  lstartpointer = sion_filedesc->all_startpointers[ntasks - 1]
330  + sion_filedesc->all_chunksizes[ntasks - 1];
331  /* */ DPRINTFTS(rank, "before setp(0)");
332  _sion_file_flush(sion_filedesc->fileptr);
333  _sion_file_set_position(sion_filedesc->fileptr, lstartpointer);
334  /* */ DPRINTFTS(rank, "after setp(0)");
335 
336  }
337 
338  /* distribute start_pos */
339  /* */ DPRINTFTS(rank, "before scatter");
340  sion_gendata->apidesc->scatterr_cb(sion_filedesc->all_startpointers, &sion_filedesc->startpos, comm_group, _SION_INT64, 1, 0);
341  /* */ DPRINTFTS(rank, "after scatter");
342 
343  /* distribute chunksize */
344  /* */ DPRINTFTS(rank, "before scatter");
345  sion_gendata->apidesc->scatterr_cb(sion_filedesc->all_chunksizes, &sion_filedesc->chunksize, comm_group, _SION_INT64, 1, 0);
346  /* */ DPRINTFTS(rank, "after scatter");
347 
348  /* distribute information for collective operations */
349  if(sion_filedesc->usecoll) {
350  sion_gendata->apidesc->scatterr_cb(sion_filedesc->all_coll_collsize, &sion_filedesc->collsize, comm_group, _SION_INT32, 1, 0);
351  sion_gendata->apidesc->scatterr_cb(sion_filedesc->all_coll_collector, &sion_filedesc->collector, comm_group, _SION_INT32, 1, 0);
352 
353  _sion_free_filedesc_coll_arrays(sion_filedesc);
354  }
355 
356  /* distribute globalskip */
357  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->globalskip, comm_group, _SION_INT64, 1, 0);
358 
359  DPRINTFP((32, "_sion_paropen_generic_one_file", rank, " start position is %10lld %10.4f MB chunksize=%10lld %10.4f MB\n",
360  sion_filedesc->startpos, sion_filedesc->startpos / 1024.0 / 1024.0,
361  sion_filedesc->chunksize, sion_filedesc->chunksize / 1024.0 / 1024.0
362  ));
363 
364  /* set filepointer on each task */
365  /* */ DPRINTFTS(rank, "before setp");
366  sion_gendata->apidesc->barrier_cb(comm_group);
367  if(do_open_file) {
368  _sion_file_flush(sion_filedesc->fileptr);
369  _sion_file_set_position(sion_filedesc->fileptr, sion_filedesc->startpos);
370  }
371  sion_filedesc->currentpos = sion_filedesc->startpos;
372  /* given by calculate startpointers ...
373  sion_filedesc->chunksize = (sion_int64) *chunksize;
374  */
375  sion_gendata->apidesc->barrier_cb(comm_group);
376 
377  /* apply hint for first chunk */
378  _sion_apply_hints(sion_filedesc,SION_HINTS_ACCESS_TYPE_CHUNK);
379 
380  /* */ DPRINTFTS(rank, "after setp");
381  DPRINTFP((32, "_sion_paropen_generic_one_file", rank, " ending open for write #tasks=%d filepos=%lld\n", ntasks, _sion_file_get_position(sion_filedesc->fileptr)));
382 
383  }
384  else if (flags_store->mask&_SION_FMODE_READ) {
385  /* **************** READ mode **************** */
386  if (rank == 0)
387  DPRINTFP((32, "_sion_paropen_generic_one_file", rank, " starting open for read #tasks=%d\n", ntasks));
388 
389  /* open not file on non-collector task if buddy-checkpointing */
390  if(do_open_file) {
391  /* */ DPRINTFTS(rank, "before openR");
392  sion_fileptr = _sion_file_open(fname,apiflag|SION_FILE_FLAG_READ,0);
393  /* */ DPRINTFTS(rank, "after openR");
394  if (!sion_fileptr) {
395  DPRINTFP((32, "_sion_paropen_generic_one_file", rank, " cannot open %s for reading, aborting ...\n", fname));
396  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"_sion_paropen_generic_one_file: cannot open %s for reading, aborting ...\n", fname));
397  }
398  } else {
399  sion_fileptr=NULL;
400  }
401  sion_gendata->apidesc->barrier_cb(comm_group);
402 
403  /* store data in static data structure (sid) */
404  sion_filedesc->fileptr = sion_fileptr;
405  sion_filedesc->rank = rank;
406  sion_filedesc->ntasks = ntasks;
407  sion_filedesc->state = SION_FILESTATE_PAROPEN;
408  sion_filedesc->mode = SION_FILEMODE_READ;
409  sion_filedesc->nfiles = nfiles;
410  sion_filedesc->collmsa = !!_sion_flags_get(flags_store, "collmsa");
411  sion_filedesc->usebuddy = (flags_store->mask&_SION_FMODE_BUDDY)>0;
412  if(sion_filedesc->usebuddy) {
413  sion_filedesc->buddylevel = atoi(_sion_flags_get(flags_store,"buddy")->val);
414  if (sion_filedesc->buddylevel==0) sion_filedesc->buddylevel=1; /* default */
415  }
416 
417  /* creating of mapping from file ranks to rank used in buddy read */
418  if ( flag&_SION_INTERNAL_FLAG_BUDDY_READ ) {
419 
420  /* overwrite collective info if not set*/
421  if(!sion_filedesc->usecoll) {
422  sion_filedesc->usecoll=1;
423  sion_filedesc->collsize=sion_filedesc->ntasks;
424  }
425 
426  DPRINTFP((32, "_sion_paropen_generic_one_file", rank, " create buddy mapping ntasks=%d filentasks=%d\n",ntasks,sion_filedesc->ntasks));
427 
428  if (rank == 0) {
429  sion_tmpintfield_buddy32 = (sion_int32 *) malloc(ntasks * sizeof(sion_int32));
430  if (sion_tmpintfield_buddy32 == NULL) {
431  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"_sion_paropen_generic_one_file: cannot allocate temporary memory of size %lu (sion_tmpintfield_buddy), aborting ...\n",
432  (unsigned long) ntasks * sizeof(sion_int32)));
433  }
434  for (j = 0; j < ntasks; j++) sion_tmpintfield_buddy32[j]=-1;
435  sion_tmpintfield_buddy64 = (sion_int64 *) malloc(ntasks * sizeof(sion_int64));
436  if (sion_tmpintfield_buddy64 == NULL) {
437  free(sion_tmpintfield_buddy32);
438  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"_sion_paropen_generic_one_file: cannot allocate temporary memory of size %lu (sion_tmpintfield_buddy), aborting ...\n",
439  (unsigned long) ntasks * sizeof(sion_int64)));
440  }
441  for (j = 0; j < ntasks; j++) sion_tmpintfield_buddy64[j]=-1;
442  sion_tmpintfield_map = (sion_int32 *) malloc(ntasks * sizeof(sion_int32));
443  if (sion_tmpintfield_map == NULL) {
444  free(sion_tmpintfield_buddy32);
445  free(sion_tmpintfield_buddy64);
446  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"_sion_paropen_generic_one_file: cannot allocate temporary memory of size %lu (sion_tmpintfield_map), aborting ...\n",
447  (unsigned long) ntasks * sizeof(sion_int32)));
448  }
449  for (j = 0; j < ntasks; j++) sion_tmpintfield_map[j]=-1;
450 
451  }
452  helpint32=buddy_data->groups[buddy_data->currentgroup]->filelrank;
453  sion_gendata->apidesc->gatherr_cb(&helpint32, sion_tmpintfield_map, comm_group, _SION_INT32, 1, 0);
454 
455 
456  if (rank == 0) {
457  for (j = 0; j < ntasks; j++)
458  DPRINTFP((64, "_sion_paropen_generic_one_file", rank, " buddy map[%d]=%d\n", j, (int) sion_tmpintfield_map[j]));
459  }
460 
461  }
462 
463  if (rank == 0) {
464  rc = _sion_read_header_fix_part(sion_filedesc); /* overwrites sion_filedesc->ntasks */
465  if (rc!=SION_SUCCESS) {
466  free(sion_tmpintfield_buddy32);
467  free(sion_tmpintfield_buddy64);
468  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"_sion_paropen_generic_one_file: cannot read header from file %s, aborting ...\n", fname));
469  }
470  DPRINTFP((32, "_sion_paropen_generic_one_file", rank,
471  " read, after read of fix header part endianness=0x%x blksize=%d ntasks=%d\n", sion_filedesc->endianness, sion_filedesc->fsblksize, sion_filedesc->ntasks));
472 
473  /* */ DPRINTFTS(rank, "before alloc");
474  /* memory allocation */
475  _sion_alloc_filedesc_arrays(sion_filedesc);
476  /* */ DPRINTFTS(rank, "after alloc");
477 
478  rc = _sion_read_header_var_part(sion_filedesc);
479  if (rc!=SION_SUCCESS) {
480  free(sion_tmpintfield_buddy32);
481  free(sion_tmpintfield_buddy64);
482  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"_sion_paropen_generic_one_file: cannot read header from file %s, aborting ...\n", fname));
483  }
484 
485  if ((flags_entry = _sion_flags_get(flags_store, "collsize"))) {
486  sion_filedesc->collsize = atoi(flags_entry->val);
487  }
488  _sion_coll_check_env(sion_filedesc);
489  if(sion_filedesc->usecoll) _sion_alloc_filedesc_coll_arrays(sion_filedesc);
490 
491  /* collective */
492  if (!sion_filedesc->usecoll) _sion_calculate_startpointers(sion_filedesc);
493  else {
494  if (sion_filedesc->collmergemode) _sion_calculate_startpointers_collective_merge(sion_filedesc);
495  else if (sion_filedesc->collmsa) _sion_calculate_startpointers_collective_msa(sion_filedesc);
496  else _sion_calculate_startpointers_collective(sion_filedesc);
497  }
498  /* */ DPRINTFTS(rank, "after calculate");
499 
500  /* check for keyval parameter, needed at this point to set flag before writing header (flag1) */
501  _sion_keyval_check_env(sion_filedesc, flags_store->mask);
502 
503  } /* rank==0 */
504 
505  /* distribute keyvalmode */
506  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->keyvalmode, comm_group, _SION_INT32, 1, 0);
507 
508  /* distribute collective options */
509  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->usecoll, comm_group, _SION_INT32, 1, 0);
510  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->collsize, comm_group, _SION_INT32, 1, 0);
511 
512 
513  DPRINTFP((32, "_sion_paropen_generic_one_file", rank," usecoll=%d\n", sion_filedesc->usecoll));
514 
515  if(sion_filedesc->usecoll) {
516 
517  if (! (flag&_SION_INTERNAL_FLAG_BUDDY_READ) ) {
518  sion_gendata->apidesc->scatterr_cb(sion_filedesc->all_coll_collsize, &sion_filedesc->collsize, comm_group, _SION_INT32, 1, 0);
519  sion_gendata->apidesc->scatterr_cb(sion_filedesc->all_coll_collector, &sion_filedesc->collector, comm_group, _SION_INT32, 1, 0);
520  } else {
521 
522  /* remap data */
523  if(rank==0) for (j = 0; j < ntasks; j++) if(sion_tmpintfield_map[j]>=0) sion_tmpintfield_buddy32[j]=sion_filedesc->all_coll_collsize[sion_tmpintfield_map[j]];
524  sion_gendata->apidesc->scatterr_cb(sion_tmpintfield_buddy32, &sion_filedesc->collsize, comm_group, _SION_INT32, 1, 0);
525  /* remap data */
526  if(rank==0) for (j = 0; j < ntasks; j++) if(sion_tmpintfield_map[j]>=0) sion_tmpintfield_buddy32[j]=sion_filedesc->all_coll_collector[sion_tmpintfield_map[j]];
527  sion_gendata->apidesc->scatterr_cb(sion_tmpintfield_buddy32, &sion_filedesc->collector, comm_group, _SION_INT32, 1, 0);
528  }
529 
530 
531  _sion_free_filedesc_coll_arrays(sion_filedesc);
532  }
533 
534  /* distribute globalskip */
535  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->globalskip, comm_group, _SION_INT64, 1, 0);
536 
537  /* broadcast information read from file */
538  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->endianness, comm_group, _SION_INT32, 1, 0);
539  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->swapbytes, comm_group, _SION_INT32, 1, 0);
540  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->fsblksize, comm_group, _SION_INT32, 1, 0);
541  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->ntasks, comm_group, _SION_INT32, 1, 0);
542  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->fileversion, comm_group, _SION_INT32, 1, 0);
543  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->nfiles, comm_group, _SION_INT32, 1, 0);
544  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->filenumber, comm_group, _SION_INT32, 1, 0);
545  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->flag1, comm_group, _SION_INT32, 1, 0);
546  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->flag2, comm_group, _SION_INT32, 1, 0);
547  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->maxusedchunks, comm_group, _SION_INT32, 1, 0);
548 
549  DPRINTFP((32, "_sion_paropen_generic_one_file", rank,
550  " read, after read of maxusedchunks=%d maxchunks=%d (%d)\n", sion_filedesc->maxusedchunks,sion_filedesc->maxchunks, MAXCHUNKS));
551  if (sion_filedesc->maxusedchunks > MAXCHUNKS) _sion_realloc_filedesc_blocklist(sion_filedesc, sion_filedesc->maxusedchunks);
552  /* */ DPRINTFTS(rank, "after bcast");
553 
554  /* scatter per task information read from file */
555  /* */ DPRINTFTS(rank, "before scatter");
556  if (! (flag&_SION_INTERNAL_FLAG_BUDDY_READ) ) {
557  sion_gendata->apidesc->scatterr_cb(sion_filedesc->all_chunksizes, &sion_filedesc->chunksize, comm_group, _SION_INT64, 1, 0);
558  sion_gendata->apidesc->scatterr_cb(sion_filedesc->all_startpointers, &sion_filedesc->startpos, comm_group, _SION_INT64, 1, 0);
559  sion_gendata->apidesc->scatterr_cb(sion_filedesc->all_globalranks, &helpint64, comm_group, _SION_INT64, 1, 0);sion_filedesc->globalrank=(sion_int32) helpint64;
560  } else {
561  /* remap data */
562  if(rank==0) for (j = 0; j < ntasks; j++) if(sion_tmpintfield_map[j]>=0) sion_tmpintfield_buddy64[j]=sion_filedesc->all_chunksizes[sion_tmpintfield_map[j]];
563  sion_gendata->apidesc->scatterr_cb(sion_tmpintfield_buddy64, &sion_filedesc->chunksize, comm_group, _SION_INT64, 1, 0);
564  /* remap data */
565  if(rank==0) for (j = 0; j < ntasks; j++) if(sion_tmpintfield_map[j]>=0) sion_tmpintfield_buddy64[j]=sion_filedesc->all_startpointers[sion_tmpintfield_map[j]];
566  sion_gendata->apidesc->scatterr_cb(sion_tmpintfield_buddy64, &sion_filedesc->startpos, comm_group, _SION_INT64, 1, 0);
567  /* remap data */
568  if(rank==0) for (j = 0; j < ntasks; j++) if(sion_tmpintfield_map[j]>=0) sion_tmpintfield_buddy64[j]=sion_filedesc->all_globalranks[sion_tmpintfield_map[j]];
569  sion_gendata->apidesc->scatterr_cb(sion_tmpintfield_buddy64, &helpint64, comm_group, _SION_INT64, 1, 0);sion_filedesc->globalrank=(sion_int32) helpint64;
570  }
571 
572  /* */ DPRINTFTS(rank, "after scatter");
573 
574  /* read number of blocks for each task */
575  if (rank == 0) {
576  sion_tmpintfield = (sion_int64 *) malloc(sion_filedesc->ntasks * sizeof(sion_int64));
577  if (sion_tmpintfield == NULL) {
578  free(sion_tmpintfield_buddy32);
579  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"_sion_paropen_generic_one_file: cannot allocate temporary memory of size %lu (sion_tmpintfield), aborting ...\n",
580  (unsigned long) ntasks * sizeof(sion_int64)));
581  }
582  _sion_read_header_var_part_blockcount_to_field(sion_filedesc, sion_filedesc->ntasks, sion_tmpintfield);
583 
584  for (j = 0; j < sion_filedesc->ntasks; j++)
585  DPRINTFP((2048, "_sion_paropen_generic_one_file", rank, " read, blockcount on task %02d is %10ld\n", j, (long) sion_tmpintfield[j]));
586  }
587 
588  /* and distribute them */
589  /* */ DPRINTFTS(rank, "before scatter");
590  if (! (flag&_SION_INTERNAL_FLAG_BUDDY_READ) ) {
591  sion_gendata->apidesc->scatterr_cb(sion_tmpintfield, &helpint64, comm_group, _SION_INT64, 1, 0);
592  } else {
593  /* remap data */
594  if(rank==0) for (j = 0; j < ntasks; j++) if(sion_tmpintfield_map[j]>=0) sion_tmpintfield_buddy64[j]=sion_tmpintfield[sion_tmpintfield_map[j]];
595  sion_gendata->apidesc->scatterr_cb(sion_tmpintfield_buddy64, &helpint64, comm_group, _SION_INT64, 1, 0);
596  }
597  /* */ DPRINTFTS(rank, "after scatter");
598  sion_filedesc->lastchunknr = helpint64-1;
599  DPRINTFP((32, "_sion_paropen_generic_one_file", rank, " lastchunknr on task %02d is %10ld\n", rank, (long) sion_filedesc->lastchunknr));
600 
601  for (i = 0; i < sion_filedesc->maxusedchunks; i++) {
602  if (rank == 0) _sion_read_header_var_part_nextblocksizes_to_field(sion_filedesc, sion_filedesc->ntasks, sion_tmpintfield);
603  /* */ DPRINTFTS(rank, "before scatter");
604  if (! (flag&_SION_INTERNAL_FLAG_BUDDY_READ) ) {
605  sion_gendata->apidesc->scatterr_cb(sion_tmpintfield, &helpint64, comm_group, _SION_INT64, 1, 0);
606  } else {
607  /* remap data */
608  if(rank==0) for (j = 0; j < ntasks; j++) if(sion_tmpintfield_map[j]>=0) sion_tmpintfield_buddy64[j]=sion_tmpintfield[sion_tmpintfield_map[j]];
609  sion_gendata->apidesc->scatterr_cb(sion_tmpintfield_buddy64, &helpint64, comm_group, _SION_INT64, 1, 0);
610  }
611  /* */ DPRINTFTS(rank, "after scatter");
612  sion_filedesc->blocksizes[i] = helpint64;
613  }
614 
615 #define BGFLUSH
616 #ifdef BGFLUSH
617  if(do_open_file) {
618  _sion_file_flush(sion_filedesc->fileptr);
619  }
620 #endif
621 
622  /* */ DPRINTFTS(rank, "before setp");
623  sion_gendata->apidesc->barrier_cb(comm_group);
624  if(do_open_file) {
625  _sion_file_purge(sion_filedesc->fileptr);
626  _sion_file_set_position(sion_filedesc->fileptr, sion_filedesc->startpos);
627  }
628  sion_filedesc->currentpos = sion_filedesc->startpos;
629  sion_filedesc->currentblocknr = 0;
630 
631  /* OUTPUT parameters */
632  *fsblksize = sion_filedesc->fsblksize;
633  *chunksize = sion_filedesc->chunksize;
634  *globalrank = sion_filedesc->globalrank;
635 
636  /* free tmp field */
637  if(sion_tmpintfield) free(sion_tmpintfield);
638  if(sion_tmpintfield_map) free(sion_tmpintfield_map);
639  if(sion_tmpintfield_buddy32) free(sion_tmpintfield_buddy32);
640  if(sion_tmpintfield_buddy64) free(sion_tmpintfield_buddy64);
641 
642  sion_gendata->apidesc->barrier_cb(comm_group);
643  /* */ DPRINTFTS(rank, "after setp");
644  /* end of read */
645  } else {
646  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_multi_mpi: unknown file mode"));
647  }
648 
649  if(do_open_file) {
650  if(fileptr!=NULL) {
651  if(sion_filedesc->fileptr->flags&SION_FILE_FLAG_ANSI) {
652  *fileptr=sion_filedesc->fileptr->fileptr;
653  sion_filedesc->fileptr_exported=1;
654  } else {
655  *fileptr=NULL;
656  sion_filedesc->fileptr_exported=0;
657  }
658  }
659  } else {
660  if(fileptr!=NULL) *fileptr=NULL;
661  sion_filedesc->fileptr_exported=0;
662  }
663 
664  if (rank == 0) {
665  /* not needed for rest of sionlib function calls */
666  _sion_free_filedesc_arrays(sion_filedesc);
667  }
668 
669  _sion_print_filedesc(sion_filedesc, 512, "_sion_paropen_generic_one_file", 1);
670 
671  DPRINTFP((32, "_sion_paropen_generic_one_file", rank, " start position on task %02d is at end of sion_paropen_generic %10lld\n", rank,
672  _sion_file_get_position(sion_filedesc->fileptr)));
673 
674  DPRINTFP((2, "_sion_paropen_generic_one_file", rank, "leave parallel open of file %s in mode 0x%lx #tasks=%d\n", fname, (long) flags_store->mask, ntasks));
675 
676  return (sid);
677 
678 
679 }
680 
681 
682 
698  int rank,
699  int ntasks,
700  int mapping_size,
701  sion_int32 *mapping,
702  int flag,
703  _sion_generic_gendata *sion_gendata,
704  _sion_generic_buddy *buddy_data)
705 {
706 
707  int rc = SION_SUCCESS;
708  int blknum, lrank;
709  sion_int64 helpint64;
710  sion_int64 *sion_tmpintfield = NULL;
711  _sion_filedesc *sion_filedesc;
712  void *comm_group=NULL;
713  int do_close_file;
714 
715  if ((_sion_vcdtype(sid) != SION_FILEDESCRIPTOR) || !(sion_filedesc = _sion_vcdtovcon(sid))) {
716  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"_sion_parclose_generic: invalid sion_filedesc, aborting %d ...\n", sid));
717  }
718 
719  if (sion_filedesc->state != SION_FILESTATE_PAROPEN) {
720  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"_sion_parclose_generic: sion file with sid=%d was not opened by a sion_paropen\n", sid));
721  }
722 
723  DPRINTFP((2, "_sion_parclose_generic", rank, "enter parallel close sid=%d\n", sid));
724 
725  /* select local communicator */
726  if(flag& _SION_INTERNAL_FLAG_NORMAL ) comm_group=sion_gendata->comm_data_local;
727  if(flag& _SION_INTERNAL_FLAG_BUDDY_SEND ) comm_group=buddy_data->buddy_send.commgroup;
728  if(flag& _SION_INTERNAL_FLAG_BUDDY_COLL ) comm_group=buddy_data->buddy_coll.commgroup;
729  if(flag& _SION_INTERNAL_FLAG_BUDDY_READ ) comm_group=buddy_data->groups[buddy_data->currentgroup]->commgroup;
730 
731  /* decide if file has to be opened physically */
732  do_close_file=1;
733  if (flag&_SION_INTERNAL_FLAG_BUDDY_SEND) do_close_file=0;
734  if ( (flag&_SION_INTERNAL_FLAG_BUDDY_COLL) && (rank>0) ) do_close_file=0;
735  if ( (flag&_SION_INTERNAL_FLAG_BUDDY_READ) && (rank>0) ) do_close_file=0;
736 
737  /* READ MODE: close file on all tasks */
738  if (sion_filedesc->mode == SION_FILEMODE_READ) {
739  if (sion_filedesc->state != SION_FILESTATE_CLOSE) {
740 
741  _sion_print_filedesc(sion_filedesc, 512, "_sion_parclose_generic", 1);
742  DPRINTFP((32, "_sion_parclose_generic", rank, " parallel close (read mode) sid=%d, call fclose on file\n", sid));
743 
744  if(do_close_file) {
745  _sion_file_close(sion_filedesc->fileptr);
746  }
747  sion_filedesc->fileptr = NULL;
748  sion_filedesc->state = SION_FILESTATE_CLOSE;
749  }
750  }
751  else {
752  /* WRITE MODE: collect data from all tasks, write metadata, and close file on all tasks */
753 
754  /* _sion_buffer_flush(sion_filedesc); */ /* clear internal buffer */
755  _sion_flush_block(sion_filedesc);
756 
757  if (sion_filedesc->usebuffer) {
758  _sion_buffer_flush(sion_filedesc);
759  }
760 
761  _sion_print_filedesc(sion_filedesc, 512, "_sion_parclose_generic", 1);
762 
763  /* close file on all other task, except 0 */
764  if (rank != 0) {
765  if (sion_filedesc->state != SION_FILESTATE_CLOSE) {
766  DPRINTFP((32, "_sion_parclose_generic", rank, " parallel close (write mode) sid=%d, call fclose on file\n", sid));
767  if(do_close_file) {
768  _sion_file_close(sion_filedesc->fileptr);
769  }
770  sion_filedesc->fileptr = NULL;
771  sion_filedesc->state = SION_FILESTATE_CLOSE;
772  }
773  }
774 
775  sion_gendata->apidesc->barrier_cb(comm_group);
776 
777  DPRINTFP((32, "_sion_parclose_generic", rank, " parallel close sid=%d: lastchunknr=%d globalskip=%lld\n", sid, sion_filedesc->lastchunknr,
778  sion_filedesc->globalskip));
779  for (blknum = 0; blknum <= sion_filedesc->lastchunknr; blknum++) {
780  DPRINTFP((1024, "_sion_parclose_generic", rank, " parallel close sid=%d: local block %02d -> %10lld bytes\n", sid, blknum,
781  sion_filedesc->blocksizes[blknum]));
782  }
783 
784  if (rank == 0) {
785  sion_tmpintfield = (sion_int64 *) malloc(sion_filedesc->ntasks * sizeof(sion_int64));
786  if (sion_tmpintfield == NULL) {
787  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"_sion_parclose_generic: cannot allocate temporary memory of size %lu (sion_tmpintfield), aborting ...\n",
788  (unsigned long) sion_filedesc->ntasks * sizeof(sion_int64)));
789  }
790  }
791 
792  /* gather number of blocks of each tasks, and search maxusedchunks */
793  /* */ DPRINTFTS2(rank, "before gather");
794  helpint64 = sion_filedesc->lastchunknr + 1;
795  sion_gendata->apidesc->gatherr_cb(&helpint64, sion_tmpintfield, comm_group, _SION_INT64, 1, 0);
796 
797  if (rank == 0) {
798  sion_filedesc->maxusedchunks = -1;
799  for (blknum = 0; blknum < sion_filedesc->ntasks; blknum++)
800  if (sion_tmpintfield[blknum] > sion_filedesc->maxusedchunks)
801  sion_filedesc->maxusedchunks = (int) sion_tmpintfield[blknum];
802  }
803  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->maxusedchunks, comm_group, _SION_INT32, 1, 0);
804  /* */ DPRINTFTS2(rank, "after gather");
805 
806  /* calculate and set start_of_varheader */
807  sion_filedesc->start_of_varheader = sion_filedesc->start_of_data + sion_filedesc->maxusedchunks * sion_filedesc->globalskip;
808 
809  /* write rest of first meta data block on rank 0 */
810  if (rank == 0) {
811  _sion_write_header_var_info(sion_filedesc);
812 
813  _sion_write_header_var_part_blockcount_from_field(sion_filedesc,sion_filedesc->ntasks,sion_tmpintfield);
814 
815  }
816 
817  /* collect chunksizes of each block from each task and write it to file */
818  for (blknum = 0; blknum < sion_filedesc->maxusedchunks; blknum++) {
819  if (blknum <= sion_filedesc->lastchunknr) {
820  helpint64 = sion_filedesc->blocksizes[blknum];
821  }
822  else {
823  helpint64 = 0;
824  }
825 
826  /* */ DPRINTFTS2(rank, "before gather");
827  sion_gendata->apidesc->gatherr_cb(&helpint64, sion_tmpintfield, comm_group, _SION_INT64, 1, 0);
828  /* */ DPRINTFTS2(rank, "after gather");
829 
830  if (rank == 0) {
831  for (lrank = 0; lrank < ntasks; lrank++)
832  DPRINTFP((2048, "_sion_parclose_generic", rank, " parallel close sid=%d: write total chunksize for block %d: %2lld rank=%d\n", sid, blknum,
833  sion_tmpintfield[lrank], lrank));
834 
835  _sion_write_header_var_part_nextblocksizes_from_field(sion_filedesc,sion_filedesc->ntasks,sion_tmpintfield);
836 
837  }
838  }
839 
840  /* write mapping to file if more than one physical file is used, mapping_size is the number of global tasks */
841  if (mapping != NULL) {
842  _sion_write_header_var_part_mapping(sion_filedesc, mapping_size, mapping);
843  }
844 
845  /* close file on task 0 */
846  if (rank == 0) {
847  DPRINTFP((32, "_sion_parclose_generic", rank, " parallel close (write mode) sid=%d, call fclose on file\n", sid));
848  if(do_close_file) {
849  _sion_file_close(sion_filedesc->fileptr);
850  }
851  sion_filedesc->fileptr = NULL;
852  sion_filedesc->state = SION_FILESTATE_CLOSE;
853 
854  /* free tmp field */
855  if(sion_tmpintfield) free(sion_tmpintfield);
856  }
857 
858  } /* write */
859 
860  _sion_free_filedesc(sion_filedesc);
861  sion_filedesc = NULL;
862 
863 
864  DPRINTFP((2, "_sion_parclose_generic", rank, "leave parallel close sid=%d\n", sid));
865 
866  return (rc);
867 }
868 
874  int sid,
875  sion_int64 chunksize,
876  int rank,
877  int ntasks,
878  _sion_generic_gendata *sion_gendata)
879 {
880 
881  int rc = SION_SUCCESS;
882  _sion_filedesc *sion_filedesc;
883  sion_int64 lchunksize, lstartpointer, lglobalrank;
884  void *comm_group=NULL;
885 
886  if ((_sion_vcdtype(sid) != SION_FILEDESCRIPTOR) || !(sion_filedesc = _sion_vcdtovcon(sid))) {
887  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"_sion_parreinit_generic: invalid sion_filedesc, aborting %d ...\n", sid));
888  }
889 
890  if (sion_filedesc->state != SION_FILESTATE_PAROPEN) {
891  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"_sion_parreinit_generic: sion file with sid=%d was not opened by a sion_paropen\n", sid));
892  }
893 
894  DPRINTFP((2, "_sion_parreinit_generic", sion_filedesc->rank, "enter parallel reinit sid=%d\n", sid));
895 
896  comm_group=sion_gendata->comm_data_local;
897 
898  if (sion_filedesc->mode == SION_FILEMODE_READ) {
899  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"_sion_parreinit_generic: sion file with sid=%d only allowed for files openend for write\n", sid));
900  }
901 
902  /* */ DPRINTFTS(sion_filedesc->rank, "before alloc");
903  if (sion_filedesc->rank == 0) {
904  /* memory allocation for internal fields */
905  _sion_alloc_filedesc_arrays(sion_filedesc);
906  }
907  /* */ DPRINTFTS(sion_filedesc->rank, "after alloc");
908 
909  /* collect new chunksize data and init startpointers on PE 0 */
910  lchunksize = (sion_int64) chunksize;
911  lglobalrank = (sion_int64) sion_filedesc->globalrank;
912 
913  /* */ DPRINTFTS2(sion_filedesc->rank, "before gather");
914  sion_gendata->apidesc->gatherr_cb(&lchunksize, sion_filedesc->all_chunksizes, comm_group, _SION_INT64, 1, 0);
915  sion_gendata->apidesc->gatherr_cb(&lglobalrank, sion_filedesc->all_globalranks, comm_group, _SION_INT64, 1, 0);
916 
917  /* */ DPRINTFTS2(sion_filedesc->rank, "after gather");
918 
919  if(sion_filedesc->usecoll) _sion_alloc_filedesc_coll_arrays(sion_filedesc);
920 
921  if (sion_filedesc->rank == 0) {
922  /* */ DPRINTFTS(sion_filedesc->rank, "before calculate");
923  if (!sion_filedesc->usecoll) _sion_calculate_startpointers(sion_filedesc);
924  else _sion_calculate_startpointers_collective(sion_filedesc);
925  /* */ DPRINTFTS(sion_filedesc->rank, "after calculate");
926  }
927 
928  /* write header again */
929  if (sion_filedesc->rank == 0) {
930 
931  /* apply hint for first meta data block */
932  _sion_apply_hints(sion_filedesc,SION_HINTS_ACCESS_TYPE_METADATABLOCK1);
933 
934  _sion_file_flush(sion_filedesc->fileptr);
935  lstartpointer=0;
936  _sion_file_set_position(sion_filedesc->fileptr, lstartpointer);
937 
938  /* */ DPRINTFTS(sion_filedesc->rank, "before writeh");
939  _sion_write_header(sion_filedesc);
940  /* */ DPRINTFTS(sion_filedesc->rank, "after writeh");
941 
942  /* needed for writing pointer to var part of metadata at the end of the file */
943  sion_filedesc->end_of_header = _sion_file_get_position(sion_filedesc->fileptr);
944  sion_filedesc->start_of_data = sion_filedesc->all_startpointers[0];
945 
946  /*set max. file size */
947  lstartpointer = sion_filedesc->all_startpointers[sion_filedesc->ntasks - 1]
948  + sion_filedesc->all_chunksizes[sion_filedesc->ntasks - 1];
949  /* */ DPRINTFTS(sion_filedesc->rank, "before setp(0)");
950  _sion_file_flush(sion_filedesc->fileptr);
951  _sion_file_set_position(sion_filedesc->fileptr, lstartpointer);
952  /* */ DPRINTFTS(sion_filedesc->rank, "after setp(0)");
953 
954  }
955 
956  /* distribute start_pos */
957  /* */ DPRINTFTS(sion_filedesc->rank, "before scatter");
958  sion_gendata->apidesc->scatterr_cb(sion_filedesc->all_startpointers, &sion_filedesc->startpos, comm_group, _SION_INT64, 1, 0);
959  /* */ DPRINTFTS(sion_filedesc->rank, "after scatter");
960 
961  /* distribute information for collective operations */
962  if(sion_filedesc->usecoll) {
963  sion_gendata->apidesc->scatterr_cb(sion_filedesc->all_coll_collsize, &sion_filedesc->collsize, comm_group, _SION_INT32, 1, 0);
964  sion_gendata->apidesc->scatterr_cb(sion_filedesc->all_coll_collector, &sion_filedesc->collector, comm_group, _SION_INT32, 1, 0);
965 
966  _sion_free_filedesc_coll_arrays(sion_filedesc);
967  }
968 
969  /* distribute globalskip */
970  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->globalskip, comm_group, _SION_INT64, 1, 0);
971 
972  DPRINTFP((32, "_sion_parreinit_generic", sion_filedesc->rank, " start position is %10lld %10.4f MB\n",
973  sion_filedesc->startpos, sion_filedesc->startpos / 1024.0 / 1024.0));
974 
975  /* set filepointer on each task */
976  /* */ DPRINTFTS(sion_filedesc->rank, "before setp");
977  sion_gendata->apidesc->barrier_cb(comm_group);
978  _sion_file_flush(sion_filedesc->fileptr);
979  _sion_file_set_position(sion_filedesc->fileptr, sion_filedesc->startpos);
980  sion_filedesc->currentpos = sion_filedesc->startpos;
981  sion_filedesc->chunksize = (sion_int64) chunksize;
982  sion_gendata->apidesc->barrier_cb(comm_group);
983 
984  /* apply hint for first chunk */
985  _sion_apply_hints(sion_filedesc,SION_HINTS_ACCESS_TYPE_CHUNK);
986 
987  if (sion_filedesc->rank == 0) {
988  /* not needed for rest of sionlib function calls */
989  _sion_free_filedesc_arrays(sion_filedesc);
990  }
991 
992  /* */ DPRINTFTS(sion_filedesc->rank, "after setp");
993  DPRINTFP((32, "_sion_parreinit_generic", sion_filedesc->rank, " ending open for write #tasks=%d filepos=%lld\n",
994  sion_filedesc->ntasks, _sion_file_get_position(sion_filedesc->fileptr)));
995 
996  DPRINTFP((2, "_sion_parreinit_generic", sion_filedesc->rank, "leave parallel reinit of file %s in #tasks=%d\n",
997  sion_filedesc->fname, sion_filedesc->ntasks));
998 
999  return (rc);
1000 
1001 
1002 }
1003 
1004 
1009 #define DFUNCTION "_sion_generic_collect_mapping"
1010 int _sion_generic_collect_mapping( _sion_filedesc *sion_filedesc,
1011  int *mapping_size,
1012  sion_int32 **mapping ) {
1013  int rc=SION_SUCCESS;
1014  int t;
1015  _sion_generic_gendata *sion_gendata;
1016  _sion_generic_apidesc *sion_apidesc;
1017  sion_int32 lpos[2], *receivemap=NULL, iamreceiver, receiver = -1;
1018 
1019 
1020  sion_gendata=sion_filedesc->dataptr;
1021  sion_apidesc=sion_gendata->apidesc;
1022 
1023  *mapping = NULL; *mapping_size = 0;
1024 
1025  if ((sion_filedesc->mode == SION_FILEMODE_WRITE) && (sion_filedesc->nfiles > 1)) {
1026  /* collect mapping to files to task 0 */
1027 
1028  /* mapping data will be collected by master of first physical file */
1029  if((sion_filedesc->filenumber==0) && (sion_filedesc->rank==0)) {
1030  /* allocate data */
1031  *mapping_size=sion_gendata->gsize;
1032  *mapping = (sion_int32 *) malloc(*mapping_size * 2 * sizeof(sion_int32));
1033  if (*mapping == NULL) {
1034  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_generic_parclose: Cannot allocate memory for mapping"));
1035  }
1036  }
1037 
1038  /* gather info about send about global rank of master of first file on grank 0 */
1039  if(sion_gendata->grank==0) {
1040  receivemap = (sion_int32 *) malloc(sion_gendata->gsize * sizeof(sion_int32));
1041  if (receivemap == NULL) {
1042  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_generic_parclose: Cannot allocate memory for receivemap"));
1043  }
1044  }
1045 
1046  if((sion_filedesc->filenumber==0) && (sion_filedesc->rank==0)) iamreceiver=sion_gendata->grank;
1047  else iamreceiver=-1;
1048  sion_apidesc->gatherr_cb(&iamreceiver, receivemap, sion_gendata->comm_data_global, _SION_INT32, 1, 0);
1049  if(sion_gendata->grank==0) {
1050  for(t=0;t<sion_gendata->gsize;t++) {
1051  if(receivemap[t]>=0) {
1052  receiver=receivemap[t];
1053  break;
1054  }
1055  }
1056  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "receiver of mapping grank=%d\n", receiver));
1057  }
1058  sion_apidesc->bcastr_cb(&receiver, sion_gendata->comm_data_global, _SION_INT32, 1, 0);
1059 
1060  /* receive global rank of master of first file on grank 0 */
1061  lpos[0] = sion_filedesc->filenumber;
1062  lpos[1] = sion_filedesc->rank;
1063  sion_apidesc->gatherr_cb(&lpos, *mapping, sion_gendata->comm_data_global, _SION_INT32, 2, receiver);
1064  }
1065 
1066  if(receivemap!=NULL) free(receivemap);
1067 
1068  return(rc);
1069 }
1070 #undef DFUNCTION
1071 
1072 /* END OF _sion_parclose_generic */
1073 
1081 {
1082  int grank = comm->grank;
1083  int gsize = comm->gsize;
1084 
1085  DPRINTFP((2, __func__, grank, "enter\n"));
1086 
1087  if (0 != strcmp(comm->apidesc->name, "SIONlib_MPI_API")) {
1088  return _sion_errorprint(SION_NOT_SUCCESS, _SION_ERROR_RETURN, "MSA Collectives: not supported for generic API \"%s\"\n", comm->apidesc->name);
1089  }
1090 
1091  _sion_filedesc fd;
1092  fd.ntasks = gsize;
1093  _sion_flags_entry *flags_entry = _sion_flags_get(flags, "collsize");
1094  if (flags_entry) {
1095  fd.collsize = atoi(flags_entry->val);
1096  }
1097  _sion_coll_check_env(&fd);
1098  sion_int32 collsize = fd.collsize;
1099  if (collsize < 2) {
1100  return _sion_errorprint(SION_NOT_SUCCESS, _SION_ERROR_RETURN, "MSA Collectives: size of collective groups should be 2 or more, but is %d\n", collsize);
1101  }
1102  sion_int32 n_groups = gsize / collsize + ((gsize % collsize) ? 1 : 0);
1103 
1104  if (!fd.usecoll) {
1105  return _sion_errorprint(SION_NOT_SUCCESS, _SION_ERROR_RETURN, "MSA Collectives: usecoll == false\n");
1106  }
1107 
1108  int is_candidate = _sion_generic_is_candidate(comm);
1109  int n_candidates = 0;
1110  int candidates_before = 0;
1111  // bring your own Allreduce and Exscan
1112  {
1113  int *candidates = NULL;
1114  if (0 == comm->grank) {
1115  candidates = calloc(gsize, sizeof(int));
1116  if (!candidates) {
1117  _sion_errorprint(SION_NOT_SUCCESS, _SION_ERROR_ABORT, "MSA Collectives: malloc returned NULL\n");
1118  }
1119  }
1120 
1121  comm->apidesc->gatherr_cb(&is_candidate, candidates, comm->comm_data_global, _SION_INT32, 1, 0);
1122 
1123  if (0 == comm->grank) {
1124  for (size_t i = 0; i < gsize; i++) {
1125  int tmp = candidates[i];
1126  candidates[i] = n_candidates;
1127  n_candidates += tmp;
1128  }
1129  }
1130 
1131  comm->apidesc->bcastr_cb(&n_candidates, comm->comm_data_global, _SION_INT32, 1, 0);
1132  comm->apidesc->scatterr_cb(candidates, &candidates_before, comm->comm_data_global, _SION_INT32, 1, 0);
1133 
1134  if (0 == comm->grank) {
1135  free(candidates);
1136  }
1137  }
1138 
1139  if ((n_candidates < n_groups) || (n_candidates < comm->numfiles)) {
1140  return _sion_errorprint(SION_NOT_SUCCESS, _SION_ERROR_RETURN, "MSA Collectives: insufficient number of candidates %d, number of groups %d, number of files %d\n", n_groups, comm->numfiles);
1141 
1142  }
1143 
1144  int groups_per_file = n_groups / comm->numfiles;
1145  int is_collector = is_candidate && (candidates_before < n_groups);
1146  if (is_collector) {
1147  int rank_collector = candidates_before;
1148  comm->filenumber = rank_collector % comm->numfiles;
1149  comm->lrank = (rank_collector / comm->numfiles) * collsize;
1150  } else {
1151  int collectors_before = (candidates_before < n_groups) ? candidates_before : n_groups;
1152  int rank_sender = grank - collectors_before;
1153  int group_number = rank_sender / (collsize - 1);
1154  comm->filenumber = group_number % comm->numfiles;
1155  comm->lrank = (group_number / comm->numfiles) * collsize + rank_sender % (collsize - 1) + 1;
1156  }
1157  comm->lsize = collsize * (groups_per_file + ((comm->filenumber < n_groups % comm->numfiles) ? 1 : 0));
1158  if (comm->filenumber == comm->numfiles - 1) {
1159  comm->lsize += gsize - n_groups * collsize;
1160  }
1161 
1162  DPRINTFP((32, __func__, grank, "MSA Collectives: global rank %d of %d, is candidate %d, is collector %d, file no %d, local rank %d, local size %d\n", grank, gsize, is_candidate, is_collector, comm->filenumber, comm->lrank, comm->lsize));
1163  DPRINTFP((2, __func__, grank, "exit\n"));
1164  return SION_SUCCESS;
1165 }
1166 
1167 int _sion_generic_is_candidate(_sion_generic_gendata *comm) {
1168 #if defined(_SION_MSA_DEEP_EST_SDV)
1169  char hostname[1024];
1170  if (0 == gethostname(hostname, 1024)) {
1171  if (0 == strncmp("knl", hostname, 3)) {
1172  return 0;
1173  } else {
1174  return 1;
1175  }
1176  } else {
1177  return 0;
1178  }
1179 #elif defined(_SION_MSA_TEST)
1180  return comm->grank %2;
1181 #else
1182  return 1;
1183 #endif
1184 }
long _sion_file_get_opt_blksize(_sion_fileptr *sion_fileptr)
Get optional file system block size for a file.
Definition: sion_file.c:187
sion_int64 _sion_file_get_position(_sion_fileptr *sion_fileptr)
Get new position in file.
Definition: sion_file.c:241
int _sion_buffer_flush(_sion_filedesc *sion_filedesc)
Flush buffer.
Definition: sion_buffer.c:177
int _sion_flush_block(_sion_filedesc *sion_filedesc)
Update the internal data structure.
sion_int64 _sion_file_set_position(_sion_fileptr *sion_fileptr, sion_int64 startpointer)
Set new position in file.
Definition: sion_file.c:219
int _sion_reassignvcd(int sid, void *data, int type)
Definition: sion_fd.c:63
_sion_fileptr * _sion_file_open(const char *fname, unsigned int flags, unsigned int addflags)
Create and open a new file for writing.
Definition: sion_file.c:42
Sion File Descriptor Structure.
Definition: sion_filedesc.h:79
int _sion_paropen_generic_one_file(int sid, char *fname, _sion_flags_store *flags_store, char *prefix, int *numFiles, int *filenumber, sion_int64 *chunksize, sion_int32 *fsblksize, int rank, int ntasks, int *globalrank, int flag, FILE **fileptr, _sion_generic_gendata *sion_gendata, _sion_generic_buddy *buddy_data)
Generic parallel open of one direct access file.
int _sion_parclose_generic(int sid, int rank, int ntasks, int mapping_size, sion_int32 *mapping, int flag, _sion_generic_gendata *sion_gendata, _sion_generic_buddy *buddy_data)
Internal function to close parallel opened SION file.
#define SION_FILE_FLAG_WRITE
Definition: sion_file.h:26
int _sion_read_header_var_part(_sion_filedesc *sion_filedesc)
Read the second part of SION Meta Block 1.
int _sion_write_header_var_part_mapping(_sion_filedesc *sion_filedesc, sion_int32 mapping_size, sion_int32 *mapping)
Write mapping into the SION Meta Block 2.
int _sion_file_purge(_sion_fileptr *sion_fileptr)
Purge data to file.
Definition: sion_file.c:285
Definition: sion_flags.h:31
sion_int64 * all_globalranks
int _sion_write_header(_sion_filedesc *sion_filedesc)
Write the SION Meta Block 1.
Definition: sion_metadata.c:38
int _sion_alloc_filedesc_arrays(_sion_filedesc *sion_filedesc)
Allocate memory for the internal sion arrays.
int _sion_free_filedesc_arrays(_sion_filedesc *sion_filedesc)
free memory for the internal sion arrays
#define SION_FILE_FLAG_READ
Definition: sion_file.h:27
int _sion_realloc_filedesc_blocklist(_sion_filedesc *sion_filedesc, sion_int32 maxchunks)
Increase the memory used by the internal sion structure for the blocklist.
int _sion_vcdtype(int sid)
Definition: sion_fd.c:58
sion_int32 * all_coll_collector
sion_int32 fileptr_exported
sion_int32 _sion_get_endianness_with_flags(sion_int64 flags)
Return endianness including possible choice via flags.
#define SION_FILE_FLAG_POSIX
Definition: sion_file.h:24
int _sion_parreinit_generic(int sid, sion_int64 chunksize, int rank, int ntasks, _sion_generic_gendata *sion_gendata)
change chunksize for an already opened SION file (write)
int _sion_write_header_var_part_nextblocksizes_from_field(_sion_filedesc *sion_filedesc, int field_size, sion_int64 *field)
Write the next set of blocksizes from Meta Block 2 Assuming that filepointer is at the correct positi...
#define MAXCHUNKS
Definition: sion_common.h:35
sion_int32 * all_coll_capability
void * _sion_vcdtovcon(int sid)
Definition: sion_fd.c:53
#define SION_FILE_FLAG_ANSI
Definition: sion_file.h:22
#define SION_FILE_FLAG_CREATE
Definition: sion_file.h:25
#define SION_FILEMODE_READ
Definition: sion_filedesc.h:37
sion_int64 * all_startpointers
int _sion_read_header_var_part_blockcount_to_field(_sion_filedesc *sion_filedesc, int field_size, sion_int64 *field)
Read the block sizes from Meta Block 2.
int _sion_file_close(_sion_fileptr *sion_fileptr)
Close file and destroys fileptr structure.
Definition: sion_file.c:109
#define SION_FILESTATE_PAROPEN
Definition: sion_filedesc.h:28
int _sion_write_header_var_info(_sion_filedesc *sion_filedesc)
Write the SION Meta Block 1.
int _sion_free_filedesc_coll_arrays(_sion_filedesc *sion_filedesc)
free memory for the internal sion arrays
int _sion_read_header_fix_part(_sion_filedesc *sion_filedesc)
Read part of the SION Meta Block 1.
int _sion_buffer_check_env(_sion_filedesc *sion_filedesc)
Checks if environment variables are set to use buffer.
Definition: sion_buffer.c:60
sion_int32 coll_capability
sion_int64 * blocksizes
int _sion_write_header_var_part_blockcount_from_field(_sion_filedesc *sion_filedesc, int field_size, sion_int64 *field)
Write the block sizes from Meta Block 2.
sion_int32 currentblocknr
Definition: sion_filedesc.h:97
int _sion_cache_check_env(_sion_filedesc *sion_filedesc)
Check if environment variables are set to use cache.
Definition: sion_cache.c:57
#define SION_FILEDESCRIPTOR
Definition: sion_fd.h:17
sion_int64 * all_chunksizes
int _sion_print_filedesc(_sion_filedesc *sion_filedesc, int level, char *desc, int flag)
Print the initialized sion file description.
#define SION_FILESTATE_CLOSE
Definition: sion_filedesc.h:35
_sion_filedesc * _sion_alloc_filedesc(void)
Allocates memory for internal sion structure.
int _sion_alloc_filedesc_coll_arrays(_sion_filedesc *sion_filedesc)
Allocate memory for the internal sion arrays.
int _sion_init_filedesc(_sion_filedesc *sion_filedesc)
Initialize the sion file description.
Definition: sion_filedesc.c:37
#define SION_FILEMODE_WRITE
Definition: sion_filedesc.h:38
int _sion_read_header_var_part_nextblocksizes_to_field(_sion_filedesc *sion_filedesc, int field_size, sion_int64 *field)
Read the next set of blocksizes from Meta Block 2 Assuming that filepointer is at the correct positio...
#define DFUNCTION
collect mapping information on rank 0 of first file, mapping=NULL for all others
Sion Time Stamp Header.
sion_int32 * all_coll_collsize
sion_int64 start_of_varheader
int _sion_file_flush(_sion_fileptr *sion_fileptr)
Flush data to file.
Definition: sion_file.c:263
_sion_fileptr * fileptr
Definition: sion_filedesc.h:82
int _sion_generic_renumber_collmsa(_sion_generic_gendata *comm, _sion_flags_store *flags)
Splits a Communicator in numfiles different communicators.