SIONlib  1.7.2
Scalable I/O library for parallel access to task-local files
sion_generic_internal.c
Go to the documentation of this file.
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2018 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
19 #define _XOPEN_SOURCE 700
20 
21 #include <stdlib.h>
22 #include <stdio.h>
23 #include <string.h>
24 #include <unistd.h>
25 
26 #include "sion.h"
27 #include "sion_debug.h"
28 #include "sion_error_handler.h"
29 #include "sion_file.h"
30 #include "sion_filedesc.h"
31 #include "sion_fd.h"
32 #include "sion_metadata.h"
33 #include "sion_internal.h"
34 #include "sion_printts.h"
35 #include "sion_keyvalue.h"
36 #include "sion_flags.h"
37 
38 #include "sion_cache.h"
39 #include "sion_buffer.h"
40 #include "sion_hints.h"
41 #include "sion_generic_internal.h"
43 #include "sion_generic_buddy.h"
44 #include "sion_internal_startptr.h"
45 
69  int sid,
70  char *fname,
71  _sion_flags_store *flags_store,
72  char *prefix,
73  int *numFiles,
74  int *filenumber,
75  sion_int64 *chunksize,
76  sion_int32 *fsblksize,
77  int rank,
78  int ntasks,
79  int *globalrank,
80  int flag,
81  FILE **fileptr,
82  _sion_generic_gendata *sion_gendata,
83  _sion_generic_buddy *buddy_data )
84 {
85 
86  int i, j;
87  int rc;
88 
89  _sion_filedesc *sion_filedesc;
90  _sion_fileptr *sion_fileptr;
91 
92  int nfiles, filenum;
93  sion_int64 lchunksize, lstartpointer, lglobalrank, new_fsblocksize, helpint64, apiflag;
94  sion_int64 *sion_tmpintfield = NULL;
95  sion_int32 *sion_tmpintfield_map = NULL, helpint32;
96  sion_int32 *sion_tmpintfield_buddy32 = NULL;
97  sion_int64 *sion_tmpintfield_buddy64 = NULL;
98  void *comm_group=NULL;
99  int do_open_file;
100 
101  _sion_flags_entry* flags_entry = NULL;
102 
103  if (flags_store->mask&_SION_FMODE_POSIX) apiflag=SION_FILE_FLAG_POSIX;
104  else apiflag=SION_FILE_FLAG_ANSI;
105 
106  DPRINTFP((2, "_sion_paropen_generic_one_file", rank, "enter parallel open of file %s in mode %d #tasks=%d\n", fname, (int) flags_store->mask, ntasks));
107  DPRINTFP((32, "_sion_paropen_generic_one_file", rank, "sizeof: int=%d long=%d longlong=%d sion_int32=%d sion_int64=%d\n", sizeof(int), sizeof(long),
108  sizeof(long long), sizeof(sion_int32), sizeof(sion_int64)));
109 
110  /* some shortcuts */
111  nfiles = *numFiles;
112  filenum = *filenumber;
113 
114  /* select local communicator */
115  if(flag& _SION_INTERNAL_FLAG_NORMAL ) comm_group=sion_gendata->comm_data_local;
116  if(flag& _SION_INTERNAL_FLAG_BUDDY_NORMAL ) comm_group=sion_gendata->comm_data_local;
117  if(flag& _SION_INTERNAL_FLAG_BUDDY_SEND ) comm_group=buddy_data->buddy_send.commgroup;
118  if(flag& _SION_INTERNAL_FLAG_BUDDY_COLL ) comm_group=buddy_data->buddy_coll.commgroup;
119  if(flag& _SION_INTERNAL_FLAG_BUDDY_READ ) comm_group=buddy_data->groups[buddy_data->currentgroup]->commgroup;
120 
121  /* decide if file has to be opened physically */
122  do_open_file=1;
123  if (flag&_SION_INTERNAL_FLAG_BUDDY_SEND) do_open_file=0;
124  if ( (flag&_SION_INTERNAL_FLAG_BUDDY_COLL) && (rank>0) ) do_open_file=0;
125  if ( (flag&_SION_INTERNAL_FLAG_BUDDY_READ) && (rank>0) ) do_open_file=0;
126  /* WF: todo decide if data is local, these tasks can read data directly? */
127  DPRINTFP((2, "_sion_paropen_generic_one_file", rank, "do_open_file=%d\n", do_open_file));
128 
129  sion_filedesc = _sion_alloc_filedesc();
130  if (sion_filedesc == NULL) {
131  _sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_ABORT,"_sion_paropen_omp: cannot allocate filedescriptor structure of size %lu (sion_filedesc), aborting ...\n",
132  (unsigned long) sizeof(sion_filedesc));
133  }
134  _sion_init_filedesc(sion_filedesc);
135  sion_filedesc->fname = strdup(fname); /* Set the filename */
136 
137  _sion_reassignvcd(sid,sion_filedesc, SION_FILEDESCRIPTOR);
138  sion_filedesc->sid=sid;
139 
140  /* Allocate memory for storing MAXCHUNKS chunksize infos in internal structure */
142  sion_filedesc->lastchunknr = 0; /* Set the current number of chunks */
143  sion_filedesc->currentblocknr = 0; /* Set the current block number */
144 
145  if (flags_store->mask&_SION_FMODE_WRITE) {
146  /* **************** WRITE mode **************** */
147 
148  DPRINTFP((32, "_sion_paropen_generic_one_file", rank, " starting open for write #tasks=%d\n", ntasks));
149 
150  /* check parameter */
151  if (ntasks<0) {
152  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen: wrong number of tasks specific: ntasks=%d (<0), returning ...\n", (int) ntasks));
153  }
154 
155  /* check parameter */
156  if ((chunksize != NULL) && (*chunksize<0)) {
157  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen: ((chunksize != NULL) && (*chunksize<0)), returning ...\n"));
158  }
159 
160  /* check parameter */
161  if ((flag & _SION_INTERNAL_FLAG_NORMAL ) && (globalrank != NULL) && (*globalrank<0)) {
162  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen: ((globalrank != NULL) && (*globalrank<0)), returning ...\n"));
163  }
164 
165  sion_filedesc->state = SION_FILESTATE_PAROPEN;
166  sion_filedesc->mode = SION_FILEMODE_WRITE;
167  sion_filedesc->endianness = _sion_get_endianness_with_flags(flags_store->mask); /* Endianness */
168  sion_filedesc->swapbytes = 0; /* Endianness, swapping bytes */
169  sion_filedesc->fsblksize = *fsblksize;
170  sion_filedesc->rank = rank;
171  sion_filedesc->globalrank = *globalrank;
172  sion_filedesc->ntasks = ntasks;
173  sion_filedesc->nfiles = nfiles;
174  sion_filedesc->filenumber = filenum;
175  sion_filedesc->prefix = strdup(prefix);
176  sion_filedesc->compress = flags_store->mask&_SION_FMODE_COMPRESS;
177  sion_filedesc->usecoll = (flags_store->mask&_SION_FMODE_COLLECTIVE)>0;
178  sion_filedesc->collmergemode = (flags_store->mask&_SION_FMODE_COLLECTIVE_MERGE)>0;
179  sion_filedesc->usebuddy = (flags_store->mask&_SION_FMODE_BUDDY)>0;
180  if(sion_filedesc->usebuddy) {
181  sion_filedesc->buddylevel = atoi(_sion_flags_get(flags_store,"buddy")->val);
182  if (sion_filedesc->buddylevel==0) sion_filedesc->buddylevel=1; /* default */
183  }
184 
185  /* open file on rank 0, first time to create file and get fsblksize if necessary */
186  if (rank == 0) {
187  sion_fileptr = _sion_file_open(fname,apiflag|SION_FILE_FLAG_WRITE|SION_FILE_FLAG_CREATE,0);
188  if (!sion_fileptr) {
189  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_generic: cannot open %s for writing, aborting ...\n", fname));
190  }
191  if(*fsblksize<=-1) {
192  /* check with fstat fsblksize */
193  new_fsblocksize=(sion_int64) _sion_file_get_opt_blksize(sion_fileptr);
194  if((new_fsblocksize<0) || (new_fsblocksize>SION_MAX_FSBLOCKSIZE)) new_fsblocksize=SION_DEFAULT_FSBLOCKSIZE;
195  }
196  _sion_file_close(sion_fileptr);
197  }
198  sion_gendata->apidesc->barrier_cb(comm_group);
199  /* printf("WF: rank=%2d after first barrier fsblksize=%d\n",rank,(int) *fsblksize); */
200 
201  /* distribute new fsblksize */
202  if(*fsblksize==-1) {
203  sion_gendata->apidesc->bcastr_cb(&new_fsblocksize, comm_group, _SION_INT64, 1, 0);
204  *fsblksize=new_fsblocksize;
205  sion_filedesc->fsblksize = *fsblksize;
206  /* printf("WF: rank=%2d after bcast fsblksize=%d new_fsblocksize=%d\n",rank,(int) *fsblksize,(int) new_fsblocksize); */
207  DPRINTFP((32, "_sion_paropen_generic_one_file", rank, "setting fsblksize to %lld\n", new_fsblocksize));
208  }
209 
210  /* check for buffer, needed at this point to set flag before writing header */
211  _sion_cache_check_env(sion_filedesc);
212  _sion_buffer_check_env(sion_filedesc);
213 
214  /* check for keyval parameter, needed at this point to set flag before writing header (flag1) */
215  if (rank == 0) {
216  _sion_keyval_check_env(sion_filedesc, flags_store->mask);
217  }
218  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->keyvalmode, comm_group, _SION_INT32, 1, 0);
219 
220  /* check for collective options */
221  if (rank == 0) {
222  if ((flags_entry = _sion_flags_get(flags_store, "collsize"))) {
223  sion_filedesc->collsize = atoi(flags_entry->val);
224  }
225  _sion_coll_check_env(sion_filedesc);
226  }
227 
228  if (
229  ( flag&_SION_INTERNAL_FLAG_BUDDY_NORMAL )
230  || ( flag&_SION_INTERNAL_FLAG_BUDDY_SEND )
231  || ( flag&_SION_INTERNAL_FLAG_BUDDY_COLL )
232  )
233  {
234  /* overwrite collective info if not set*/
235  if(!sion_filedesc->usecoll) {
236  sion_filedesc->usecoll=1;
237  sion_filedesc->collsize=sion_filedesc->ntasks;
238  };
239  }
240 
241  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->usecoll, comm_group, _SION_INT32, 1, 0);
242  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->collsize, comm_group, _SION_INT32, 1, 0);
243  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->collmergemode, comm_group, _SION_INT32, 1, 0);
244 
245  /* check if API support coalescing I/O */
246  if(sion_filedesc->usecoll) {
247  if(sion_gendata->apidesc->level!=SION_GENERIC_API_LEVEL_FULL) {
248  _sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_WARN,"sion_paropen_generic: requested coalescing I/O but API does not support this mode, falling back to individual mode ...\n");
249  sion_filedesc->usecoll=0;
250  }
251  }
252 
253  /* check for hints options */
254  if (rank == 0) {
255  _sion_hints_check_env(sion_filedesc);
256  }
257  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->usehints, comm_group, _SION_INT32, 1, 0);
258  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->hinttype, comm_group, _SION_INT32, 1, 0);
259 
260  /* */ DPRINTFTS(rank, "before alloc");
261  if (rank == 0) {
262  /* memory allocation for internal fields */
263  _sion_alloc_filedesc_arrays(sion_filedesc);
264  if(sion_filedesc->usecoll) _sion_alloc_filedesc_coll_arrays(sion_filedesc);
265  }
266  /* */ DPRINTFTS(rank, "after alloc");
267 
268  /* collect data and init startpointers on PE 0 */
269  lchunksize = (sion_int64) *chunksize;
270  lglobalrank = (sion_int64) *globalrank;
271  sion_filedesc->chunksize_req=lchunksize;
272  DPRINTFP((32, "_sion_paropen_generic_one_file", rank, "lchunksize=%lld lglobalrank=%lld\n", lchunksize,lglobalrank));
273 
274  /* */ DPRINTFTS2(rank, "before gather");
275  sion_gendata->apidesc->gatherr_cb(&lchunksize, sion_filedesc->all_chunksizes, comm_group, _SION_INT64, 1, 0);
276  sion_gendata->apidesc->gatherr_cb(&lglobalrank, sion_filedesc->all_globalranks, comm_group, _SION_INT64, 1, 0);
277 
278  /* check capability of tasks */
279  if(sion_filedesc->usecoll) {
280  sion_filedesc->coll_capability=sion_gendata->apidesc->get_capability_cb(comm_group);
281  sion_gendata->apidesc->gatherr_cb(&sion_filedesc->coll_capability, sion_filedesc->all_coll_capability, comm_group, _SION_INT32, 1, 0);
282  }
283 
284  /* */ DPRINTFTS2(rank, "after gather");
285  if (rank == 0) {
286  /* */ DPRINTFTS(rank, "before calculate");
287  DPRINTFP((32, "_sion_paropen_generic_one_file", rank, "chunksizes[%d - 1]=%ld\n", ntasks,(long) sion_filedesc->all_chunksizes[ntasks - 1]));
288  if (!sion_filedesc->usecoll) _sion_calculate_startpointers(sion_filedesc);
289  else {
290  if (!sion_filedesc->collmergemode) _sion_calculate_startpointers_collective(sion_filedesc);
291  else _sion_calculate_startpointers_collective_merge(sion_filedesc);
292  }
293  /* */ DPRINTFTS(rank, "after calculate");
294  }
295 
296  /* */ DPRINTFTS(rank, "before open");
297  /* open not file on non-collector task if buddy-checkpointing */
298  if(do_open_file) {
299 
300  /* open file on all ranks */
301  sion_fileptr = _sion_file_open(fname,apiflag|SION_FILE_FLAG_WRITE,0);
302  if (!sion_fileptr) {
303  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_generic: cannot open %s for writing, aborting ...\n", fname));
304  }
305  /* store data in static data structure (sid) */
306  sion_filedesc->fileptr = sion_fileptr;
307  }
308  sion_gendata->apidesc->barrier_cb(comm_group);
309  /* */ DPRINTFTS(rank, "after open");
310 
311 
312  /* write header */
313  if (rank == 0) {
314 
315  /* apply hint for first meta data block */
316  _sion_apply_hints(sion_filedesc,SION_HINTS_ACCESS_TYPE_METADATABLOCK1);
317 
318  /* */ DPRINTFTS(rank, "before writeh");
319  _sion_write_header(sion_filedesc);
320  /* */ DPRINTFTS(rank, "after writeh");
321 
322  /* needed for writing pointer to var part of metadata at the end of the file */
323  sion_filedesc->end_of_header = _sion_file_get_position(sion_filedesc->fileptr);
324  sion_filedesc->start_of_data = sion_filedesc->all_startpointers[0];
325  /*set max. file size */
326  lstartpointer = sion_filedesc->all_startpointers[ntasks - 1]
327  + sion_filedesc->all_chunksizes[ntasks - 1];
328  /* */ DPRINTFTS(rank, "before setp(0)");
329  _sion_file_flush(sion_filedesc->fileptr);
330  _sion_file_set_position(sion_filedesc->fileptr, lstartpointer);
331  /* */ DPRINTFTS(rank, "after setp(0)");
332 
333  }
334 
335  /* distribute start_pos */
336  /* */ DPRINTFTS(rank, "before scatter");
337  sion_gendata->apidesc->scatterr_cb(sion_filedesc->all_startpointers, &sion_filedesc->startpos, comm_group, _SION_INT64, 1, 0);
338  /* */ DPRINTFTS(rank, "after scatter");
339 
340  /* distribute chunksize */
341  /* */ DPRINTFTS(rank, "before scatter");
342  sion_gendata->apidesc->scatterr_cb(sion_filedesc->all_chunksizes, &sion_filedesc->chunksize, comm_group, _SION_INT64, 1, 0);
343  /* */ DPRINTFTS(rank, "after scatter");
344 
345  /* distribute information for collective operations */
346  if(sion_filedesc->usecoll) {
347  sion_gendata->apidesc->scatterr_cb(sion_filedesc->all_coll_collsize, &sion_filedesc->collsize, comm_group, _SION_INT32, 1, 0);
348  sion_gendata->apidesc->scatterr_cb(sion_filedesc->all_coll_collector, &sion_filedesc->collector, comm_group, _SION_INT32, 1, 0);
349 
350  _sion_free_filedesc_coll_arrays(sion_filedesc);
351  }
352 
353  /* distribute globalskip */
354  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->globalskip, comm_group, _SION_INT64, 1, 0);
355 
356  DPRINTFP((32, "_sion_paropen_generic_one_file", rank, " start position is %10lld %10.4f MB chunksize=%10lld %10.4f MB\n",
357  sion_filedesc->startpos, sion_filedesc->startpos / 1024.0 / 1024.0,
358  sion_filedesc->chunksize, sion_filedesc->chunksize / 1024.0 / 1024.0
359  ));
360 
361  /* set filepointer on each task */
362  /* */ DPRINTFTS(rank, "before setp");
363  sion_gendata->apidesc->barrier_cb(comm_group);
364  if(do_open_file) {
365  _sion_file_flush(sion_filedesc->fileptr);
366  _sion_file_set_position(sion_filedesc->fileptr, sion_filedesc->startpos);
367  }
368  sion_filedesc->currentpos = sion_filedesc->startpos;
369  /* given by calculate startpointers ...
370  sion_filedesc->chunksize = (sion_int64) *chunksize;
371  */
372  sion_gendata->apidesc->barrier_cb(comm_group);
373 
374  /* apply hint for first chunk */
375  _sion_apply_hints(sion_filedesc,SION_HINTS_ACCESS_TYPE_CHUNK);
376 
377  /* */ DPRINTFTS(rank, "after setp");
378  DPRINTFP((32, "_sion_paropen_generic_one_file", rank, " ending open for write #tasks=%d filepos=%lld\n", ntasks, _sion_file_get_position(sion_filedesc->fileptr)));
379 
380  }
381  else if (flags_store->mask&_SION_FMODE_READ) {
382  /* **************** READ mode **************** */
383  if (rank == 0)
384  DPRINTFP((32, "_sion_paropen_generic_one_file", rank, " starting open for read #tasks=%d\n", ntasks));
385 
386  /* open not file on non-collector task if buddy-checkpointing */
387  if(do_open_file) {
388  /* */ DPRINTFTS(rank, "before openR");
389  sion_fileptr = _sion_file_open(fname,apiflag|SION_FILE_FLAG_READ,0);
390  /* */ DPRINTFTS(rank, "after openR");
391  if (!sion_fileptr) {
392  DPRINTFP((32, "_sion_paropen_generic_one_file", rank, " cannot open %s for reading, aborting ...\n", fname));
393  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"_sion_paropen_generic_one_file: cannot open %s for reading, aborting ...\n", fname));
394  }
395  } else {
396  sion_fileptr=NULL;
397  }
398  sion_gendata->apidesc->barrier_cb(comm_group);
399 
400  /* store data in static data structure (sid) */
401  sion_filedesc->fileptr = sion_fileptr;
402  sion_filedesc->rank = rank;
403  sion_filedesc->ntasks = ntasks;
404  sion_filedesc->state = SION_FILESTATE_PAROPEN;
405  sion_filedesc->mode = SION_FILEMODE_READ;
406  sion_filedesc->nfiles = nfiles;
407  sion_filedesc->usebuddy = (flags_store->mask&_SION_FMODE_BUDDY)>0;
408  if(sion_filedesc->usebuddy) {
409  sion_filedesc->buddylevel = atoi(_sion_flags_get(flags_store,"buddy")->val);
410  if (sion_filedesc->buddylevel==0) sion_filedesc->buddylevel=1; /* default */
411  }
412 
413  /* creating of mapping from file ranks to rank used in buddy read */
414  if ( flag&_SION_INTERNAL_FLAG_BUDDY_READ ) {
415 
416  /* overwrite collective info if not set*/
417  if(!sion_filedesc->usecoll) {
418  sion_filedesc->usecoll=1;
419  sion_filedesc->collsize=sion_filedesc->ntasks;
420  }
421 
422  DPRINTFP((32, "_sion_paropen_generic_one_file", rank, " create buddy mapping ntasks=%d filentasks=%d\n",ntasks,sion_filedesc->ntasks));
423 
424  if (rank == 0) {
425  sion_tmpintfield_buddy32 = (sion_int32 *) malloc(ntasks * sizeof(sion_int32));
426  if (sion_tmpintfield_buddy32 == NULL) {
427  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"_sion_paropen_generic_one_file: cannot allocate temporary memory of size %lu (sion_tmpintfield_buddy), aborting ...\n",
428  (unsigned long) ntasks * sizeof(sion_int32)));
429  }
430  for (j = 0; j < ntasks; j++) sion_tmpintfield_buddy32[j]=-1;
431  sion_tmpintfield_buddy64 = (sion_int64 *) malloc(ntasks * sizeof(sion_int64));
432  if (sion_tmpintfield_buddy64 == NULL) {
433  free(sion_tmpintfield_buddy32);
434  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"_sion_paropen_generic_one_file: cannot allocate temporary memory of size %lu (sion_tmpintfield_buddy), aborting ...\n",
435  (unsigned long) ntasks * sizeof(sion_int64)));
436  }
437  for (j = 0; j < ntasks; j++) sion_tmpintfield_buddy64[j]=-1;
438  sion_tmpintfield_map = (sion_int32 *) malloc(ntasks * sizeof(sion_int32));
439  if (sion_tmpintfield_map == NULL) {
440  free(sion_tmpintfield_buddy32);
441  free(sion_tmpintfield_buddy64);
442  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"_sion_paropen_generic_one_file: cannot allocate temporary memory of size %lu (sion_tmpintfield_map), aborting ...\n",
443  (unsigned long) ntasks * sizeof(sion_int32)));
444  }
445  for (j = 0; j < ntasks; j++) sion_tmpintfield_map[j]=-1;
446 
447  }
448  helpint32=buddy_data->groups[buddy_data->currentgroup]->filelrank;
449  sion_gendata->apidesc->gatherr_cb(&helpint32, sion_tmpintfield_map, comm_group, _SION_INT32, 1, 0);
450 
451 
452  if (rank == 0) {
453  for (j = 0; j < ntasks; j++)
454  DPRINTFP((64, "_sion_paropen_generic_one_file", rank, " buddy map[%d]=%d\n", j, (int) sion_tmpintfield_map[j]));
455  }
456 
457  }
458 
459  if (rank == 0) {
460  rc = _sion_read_header_fix_part(sion_filedesc); /* overwrites sion_filedesc->ntasks */
461  if (rc!=SION_SUCCESS) {
462  free(sion_tmpintfield_buddy32);
463  free(sion_tmpintfield_buddy64);
464  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"_sion_paropen_generic_one_file: cannot read header from file %s, aborting ...\n", fname));
465  }
466  DPRINTFP((32, "_sion_paropen_generic_one_file", rank,
467  " read, after read of fix header part endianness=0x%x blksize=%d ntasks=%d\n", sion_filedesc->endianness, sion_filedesc->fsblksize, sion_filedesc->ntasks));
468 
469  /* */ DPRINTFTS(rank, "before alloc");
470  /* memory allocation */
471  _sion_alloc_filedesc_arrays(sion_filedesc);
472  /* */ DPRINTFTS(rank, "after alloc");
473 
474  rc = _sion_read_header_var_part(sion_filedesc);
475  if (rc!=SION_SUCCESS) {
476  free(sion_tmpintfield_buddy32);
477  free(sion_tmpintfield_buddy64);
478  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"_sion_paropen_generic_one_file: cannot read header from file %s, aborting ...\n", fname));
479  }
480 
481  if ((flags_entry = _sion_flags_get(flags_store, "collsize"))) {
482  sion_filedesc->collsize = atoi(flags_entry->val);
483  }
484  _sion_coll_check_env(sion_filedesc);
485  if(sion_filedesc->usecoll) _sion_alloc_filedesc_coll_arrays(sion_filedesc);
486 
487  /* collective */
488  if (!sion_filedesc->usecoll) _sion_calculate_startpointers(sion_filedesc);
489  else {
490  if (!sion_filedesc->collmergemode) _sion_calculate_startpointers_collective(sion_filedesc);
491  else _sion_calculate_startpointers_collective_merge(sion_filedesc);
492  }
493  /* */ DPRINTFTS(rank, "after calculate");
494 
495  /* check for keyval parameter, needed at this point to set flag before writing header (flag1) */
496  _sion_keyval_check_env(sion_filedesc, flags_store->mask);
497 
498  } /* rank==0 */
499 
500  /* distribute keyvalmode */
501  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->keyvalmode, comm_group, _SION_INT32, 1, 0);
502 
503  /* distribute collective options */
504  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->usecoll, comm_group, _SION_INT32, 1, 0);
505  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->collsize, comm_group, _SION_INT32, 1, 0);
506 
507 
508  DPRINTFP((32, "_sion_paropen_generic_one_file", rank," usecoll=%d\n", sion_filedesc->usecoll));
509 
510  if(sion_filedesc->usecoll) {
511 
512  if (! (flag&_SION_INTERNAL_FLAG_BUDDY_READ) ) {
513  sion_gendata->apidesc->scatterr_cb(sion_filedesc->all_coll_collsize, &sion_filedesc->collsize, comm_group, _SION_INT32, 1, 0);
514  sion_gendata->apidesc->scatterr_cb(sion_filedesc->all_coll_collector, &sion_filedesc->collector, comm_group, _SION_INT32, 1, 0);
515  } else {
516 
517  /* remap data */
518  if(rank==0) for (j = 0; j < ntasks; j++) if(sion_tmpintfield_map[j]>=0) sion_tmpintfield_buddy32[j]=sion_filedesc->all_coll_collsize[sion_tmpintfield_map[j]];
519  sion_gendata->apidesc->scatterr_cb(sion_tmpintfield_buddy32, &sion_filedesc->collsize, comm_group, _SION_INT32, 1, 0);
520  /* remap data */
521  if(rank==0) for (j = 0; j < ntasks; j++) if(sion_tmpintfield_map[j]>=0) sion_tmpintfield_buddy32[j]=sion_filedesc->all_coll_collector[sion_tmpintfield_map[j]];
522  sion_gendata->apidesc->scatterr_cb(sion_tmpintfield_buddy32, &sion_filedesc->collector, comm_group, _SION_INT32, 1, 0);
523  }
524 
525 
526  _sion_free_filedesc_coll_arrays(sion_filedesc);
527  }
528 
529  /* distribute globalskip */
530  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->globalskip, comm_group, _SION_INT64, 1, 0);
531 
532  /* broadcast information read from file */
533  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->endianness, comm_group, _SION_INT32, 1, 0);
534  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->swapbytes, comm_group, _SION_INT32, 1, 0);
535  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->fsblksize, comm_group, _SION_INT32, 1, 0);
536  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->ntasks, comm_group, _SION_INT32, 1, 0);
537  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->fileversion, comm_group, _SION_INT32, 1, 0);
538  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->nfiles, comm_group, _SION_INT32, 1, 0);
539  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->filenumber, comm_group, _SION_INT32, 1, 0);
540  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->flag1, comm_group, _SION_INT32, 1, 0);
541  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->flag2, comm_group, _SION_INT32, 1, 0);
542  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->maxusedchunks, comm_group, _SION_INT32, 1, 0);
543 
544  DPRINTFP((32, "_sion_paropen_generic_one_file", rank,
545  " read, after read of maxusedchunks=%d maxchunks=%d (%d)\n", sion_filedesc->maxusedchunks,sion_filedesc->maxchunks, MAXCHUNKS));
546  if (sion_filedesc->maxusedchunks > MAXCHUNKS) _sion_realloc_filedesc_blocklist(sion_filedesc, sion_filedesc->maxusedchunks);
547  /* */ DPRINTFTS(rank, "after bcast");
548 
549  /* scatter per task information read from file */
550  /* */ DPRINTFTS(rank, "before scatter");
551  if (! (flag&_SION_INTERNAL_FLAG_BUDDY_READ) ) {
552  sion_gendata->apidesc->scatterr_cb(sion_filedesc->all_chunksizes, &sion_filedesc->chunksize, comm_group, _SION_INT64, 1, 0);
553  sion_gendata->apidesc->scatterr_cb(sion_filedesc->all_startpointers, &sion_filedesc->startpos, comm_group, _SION_INT64, 1, 0);
554  sion_gendata->apidesc->scatterr_cb(sion_filedesc->all_globalranks, &helpint64, comm_group, _SION_INT64, 1, 0);sion_filedesc->globalrank=(sion_int32) helpint64;
555  } else {
556  /* remap data */
557  if(rank==0) for (j = 0; j < ntasks; j++) if(sion_tmpintfield_map[j]>=0) sion_tmpintfield_buddy64[j]=sion_filedesc->all_chunksizes[sion_tmpintfield_map[j]];
558  sion_gendata->apidesc->scatterr_cb(sion_tmpintfield_buddy64, &sion_filedesc->chunksize, comm_group, _SION_INT64, 1, 0);
559  /* remap data */
560  if(rank==0) for (j = 0; j < ntasks; j++) if(sion_tmpintfield_map[j]>=0) sion_tmpintfield_buddy64[j]=sion_filedesc->all_startpointers[sion_tmpintfield_map[j]];
561  sion_gendata->apidesc->scatterr_cb(sion_tmpintfield_buddy64, &sion_filedesc->startpos, comm_group, _SION_INT64, 1, 0);
562  /* remap data */
563  if(rank==0) for (j = 0; j < ntasks; j++) if(sion_tmpintfield_map[j]>=0) sion_tmpintfield_buddy64[j]=sion_filedesc->all_globalranks[sion_tmpintfield_map[j]];
564  sion_gendata->apidesc->scatterr_cb(sion_tmpintfield_buddy64, &helpint64, comm_group, _SION_INT64, 1, 0);sion_filedesc->globalrank=(sion_int32) helpint64;
565  }
566 
567  /* */ DPRINTFTS(rank, "after scatter");
568 
569  /* read number of blocks for each task */
570  if (rank == 0) {
571  sion_tmpintfield = (sion_int64 *) malloc(sion_filedesc->ntasks * sizeof(sion_int64));
572  if (sion_tmpintfield == NULL) {
573  free(sion_tmpintfield_buddy32);
574  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"_sion_paropen_generic_one_file: cannot allocate temporary memory of size %lu (sion_tmpintfield), aborting ...\n",
575  (unsigned long) ntasks * sizeof(sion_int64)));
576  }
577  _sion_read_header_var_part_blockcount_to_field(sion_filedesc, sion_filedesc->ntasks, sion_tmpintfield);
578 
579  for (j = 0; j < sion_filedesc->ntasks; j++)
580  DPRINTFP((2048, "_sion_paropen_generic_one_file", rank, " read, blockcount on task %02d is %10ld\n", j, (long) sion_tmpintfield[j]));
581  }
582 
583  /* and distribute them */
584  /* */ DPRINTFTS(rank, "before scatter");
585  if (! (flag&_SION_INTERNAL_FLAG_BUDDY_READ) ) {
586  sion_gendata->apidesc->scatterr_cb(sion_tmpintfield, &helpint64, comm_group, _SION_INT64, 1, 0);
587  } else {
588  /* remap data */
589  if(rank==0) for (j = 0; j < ntasks; j++) if(sion_tmpintfield_map[j]>=0) sion_tmpintfield_buddy64[j]=sion_tmpintfield[sion_tmpintfield_map[j]];
590  sion_gendata->apidesc->scatterr_cb(sion_tmpintfield_buddy64, &helpint64, comm_group, _SION_INT64, 1, 0);
591  }
592  /* */ DPRINTFTS(rank, "after scatter");
593  sion_filedesc->lastchunknr = helpint64-1;
594  DPRINTFP((32, "_sion_paropen_generic_one_file", rank, " lastchunknr on task %02d is %10ld\n", rank, (long) sion_filedesc->lastchunknr));
595 
596  for (i = 0; i < sion_filedesc->maxusedchunks; i++) {
597  if (rank == 0) _sion_read_header_var_part_nextblocksizes_to_field(sion_filedesc, sion_filedesc->ntasks, sion_tmpintfield);
598  /* */ DPRINTFTS(rank, "before scatter");
599  if (! (flag&_SION_INTERNAL_FLAG_BUDDY_READ) ) {
600  sion_gendata->apidesc->scatterr_cb(sion_tmpintfield, &helpint64, comm_group, _SION_INT64, 1, 0);
601  } else {
602  /* remap data */
603  if(rank==0) for (j = 0; j < ntasks; j++) if(sion_tmpintfield_map[j]>=0) sion_tmpintfield_buddy64[j]=sion_tmpintfield[sion_tmpintfield_map[j]];
604  sion_gendata->apidesc->scatterr_cb(sion_tmpintfield_buddy64, &helpint64, comm_group, _SION_INT64, 1, 0);
605  }
606  /* */ DPRINTFTS(rank, "after scatter");
607  sion_filedesc->blocksizes[i] = helpint64;
608  }
609 
610 #define BGFLUSH
611 #ifdef BGFLUSH
612  if(do_open_file) {
613  _sion_file_flush(sion_filedesc->fileptr);
614  }
615 #endif
616 
617  /* */ DPRINTFTS(rank, "before setp");
618  sion_gendata->apidesc->barrier_cb(comm_group);
619  if(do_open_file) {
620  _sion_file_purge(sion_filedesc->fileptr);
621  _sion_file_set_position(sion_filedesc->fileptr, sion_filedesc->startpos);
622  }
623  sion_filedesc->currentpos = sion_filedesc->startpos;
624  sion_filedesc->currentblocknr = 0;
625 
626  /* OUTPUT parameters */
627  *fsblksize = sion_filedesc->fsblksize;
628  *chunksize = sion_filedesc->chunksize;
629  *globalrank = sion_filedesc->globalrank;
630 
631  /* free tmp field */
632  if(sion_tmpintfield) free(sion_tmpintfield);
633  if(sion_tmpintfield_map) free(sion_tmpintfield_map);
634  if(sion_tmpintfield_buddy32) free(sion_tmpintfield_buddy32);
635  if(sion_tmpintfield_buddy64) free(sion_tmpintfield_buddy64);
636 
637  sion_gendata->apidesc->barrier_cb(comm_group);
638  /* */ DPRINTFTS(rank, "after setp");
639  /* end of read */
640  } else {
641  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_multi_mpi: unknown file mode"));
642  }
643 
644  if(do_open_file) {
645  if(fileptr!=NULL) {
646  if(sion_filedesc->fileptr->flags&SION_FILE_FLAG_ANSI) {
647  *fileptr=sion_filedesc->fileptr->fileptr;
648  sion_filedesc->fileptr_exported=1;
649  } else {
650  *fileptr=NULL;
651  sion_filedesc->fileptr_exported=0;
652  }
653  }
654  } else {
655  if(fileptr!=NULL) *fileptr=NULL;
656  sion_filedesc->fileptr_exported=0;
657  }
658 
659  if (rank == 0) {
660  /* not needed for rest of sionlib function calls */
661  _sion_free_filedesc_arrays(sion_filedesc);
662  }
663 
664  _sion_print_filedesc(sion_filedesc, 512, "_sion_paropen_generic_one_file", 1);
665 
666  DPRINTFP((32, "_sion_paropen_generic_one_file", rank, " start position on task %02d is at end of sion_paropen_generic %10lld\n", rank,
667  _sion_file_get_position(sion_filedesc->fileptr)));
668 
669  DPRINTFP((2, "_sion_paropen_generic_one_file", rank, "leave parallel open of file %s in mode 0x%lx #tasks=%d\n", fname, (long) flags_store->mask, ntasks));
670 
671  return (sid);
672 
673 
674 }
675 
676 
677 
693  int rank,
694  int ntasks,
695  int mapping_size,
696  sion_int32 *mapping,
697  int flag,
698  _sion_generic_gendata *sion_gendata,
699  _sion_generic_buddy *buddy_data)
700 {
701 
702  int rc = SION_SUCCESS;
703  int blknum, lrank;
704  sion_int64 helpint64;
705  sion_int64 *sion_tmpintfield = NULL;
706  _sion_filedesc *sion_filedesc;
707  void *comm_group=NULL;
708  int do_close_file;
709 
710  if ((_sion_vcdtype(sid) != SION_FILEDESCRIPTOR) || !(sion_filedesc = _sion_vcdtovcon(sid))) {
711  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"_sion_parclose_generic: invalid sion_filedesc, aborting %d ...\n", sid));
712  }
713 
714  if (sion_filedesc->state != SION_FILESTATE_PAROPEN) {
715  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"_sion_parclose_generic: sion file with sid=%d was not opened by a sion_paropen\n", sid));
716  }
717 
718  DPRINTFP((2, "_sion_parclose_generic", rank, "enter parallel close sid=%d\n", sid));
719 
720  /* select local communicator */
721  if(flag& _SION_INTERNAL_FLAG_NORMAL ) comm_group=sion_gendata->comm_data_local;
722  if(flag& _SION_INTERNAL_FLAG_BUDDY_SEND ) comm_group=buddy_data->buddy_send.commgroup;
723  if(flag& _SION_INTERNAL_FLAG_BUDDY_COLL ) comm_group=buddy_data->buddy_coll.commgroup;
724  if(flag& _SION_INTERNAL_FLAG_BUDDY_READ ) comm_group=buddy_data->groups[buddy_data->currentgroup]->commgroup;
725 
726  /* decide if file has to be opened physically */
727  do_close_file=1;
728  if (flag&_SION_INTERNAL_FLAG_BUDDY_SEND) do_close_file=0;
729  if ( (flag&_SION_INTERNAL_FLAG_BUDDY_COLL) && (rank>0) ) do_close_file=0;
730  if ( (flag&_SION_INTERNAL_FLAG_BUDDY_READ) && (rank>0) ) do_close_file=0;
731 
732  /* READ MODE: close file on all tasks */
733  if (sion_filedesc->mode == SION_FILEMODE_READ) {
734  if (sion_filedesc->state != SION_FILESTATE_CLOSE) {
735 
736  _sion_print_filedesc(sion_filedesc, 512, "_sion_parclose_generic", 1);
737  DPRINTFP((32, "_sion_parclose_generic", rank, " parallel close (read mode) sid=%d, call fclose on file\n", sid));
738 
739  if(do_close_file) {
740  _sion_file_close(sion_filedesc->fileptr);
741  }
742  sion_filedesc->fileptr = NULL;
743  sion_filedesc->state = SION_FILESTATE_CLOSE;
744  }
745  }
746  else {
747  /* WRITE MODE: collect data from all tasks, write metadata, and close file on all tasks */
748 
749  /* _sion_buffer_flush(sion_filedesc); */ /* clear internal buffer */
750  _sion_flush_block(sion_filedesc);
751 
752  if (sion_filedesc->usebuffer) {
753  _sion_buffer_flush(sion_filedesc);
754  }
755 
756  _sion_print_filedesc(sion_filedesc, 512, "_sion_parclose_generic", 1);
757 
758  /* close file on all other task, except 0 */
759  if (rank != 0) {
760  if (sion_filedesc->state != SION_FILESTATE_CLOSE) {
761  DPRINTFP((32, "_sion_parclose_generic", rank, " parallel close (write mode) sid=%d, call fclose on file\n", sid));
762  if(do_close_file) {
763  _sion_file_close(sion_filedesc->fileptr);
764  }
765  sion_filedesc->fileptr = NULL;
766  sion_filedesc->state = SION_FILESTATE_CLOSE;
767  }
768  }
769 
770  sion_gendata->apidesc->barrier_cb(comm_group);
771 
772  DPRINTFP((32, "_sion_parclose_generic", rank, " parallel close sid=%d: lastchunknr=%d globalskip=%lld\n", sid, sion_filedesc->lastchunknr,
773  sion_filedesc->globalskip));
774  for (blknum = 0; blknum <= sion_filedesc->lastchunknr; blknum++) {
775  DPRINTFP((1024, "_sion_parclose_generic", rank, " parallel close sid=%d: local block %02d -> %10lld bytes\n", sid, blknum,
776  sion_filedesc->blocksizes[blknum]));
777  }
778 
779  if (rank == 0) {
780  sion_tmpintfield = (sion_int64 *) malloc(sion_filedesc->ntasks * sizeof(sion_int64));
781  if (sion_tmpintfield == NULL) {
782  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"_sion_parclose_generic: cannot allocate temporary memory of size %lu (sion_tmpintfield), aborting ...\n",
783  (unsigned long) sion_filedesc->ntasks * sizeof(sion_int64)));
784  }
785  }
786 
787  /* gather number of blocks of each tasks, and search maxusedchunks */
788  /* */ DPRINTFTS2(rank, "before gather");
789  helpint64 = sion_filedesc->lastchunknr + 1;
790  sion_gendata->apidesc->gatherr_cb(&helpint64, sion_tmpintfield, comm_group, _SION_INT64, 1, 0);
791 
792  if (rank == 0) {
793  sion_filedesc->maxusedchunks = -1;
794  for (blknum = 0; blknum < sion_filedesc->ntasks; blknum++)
795  if (sion_tmpintfield[blknum] > sion_filedesc->maxusedchunks)
796  sion_filedesc->maxusedchunks = (int) sion_tmpintfield[blknum];
797  }
798  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->maxusedchunks, comm_group, _SION_INT32, 1, 0);
799  /* */ DPRINTFTS2(rank, "after gather");
800 
801  /* calculate and set start_of_varheader */
802  sion_filedesc->start_of_varheader = sion_filedesc->start_of_data + sion_filedesc->maxusedchunks * sion_filedesc->globalskip;
803 
804  /* write rest of first meta data block on rank 0 */
805  if (rank == 0) {
806  _sion_write_header_var_info(sion_filedesc);
807 
808  _sion_write_header_var_part_blockcount_from_field(sion_filedesc,sion_filedesc->ntasks,sion_tmpintfield);
809 
810  }
811 
812  /* collect chunksizes of each block from each task and write it to file */
813  for (blknum = 0; blknum < sion_filedesc->maxusedchunks; blknum++) {
814  if (blknum <= sion_filedesc->lastchunknr) {
815  helpint64 = sion_filedesc->blocksizes[blknum];
816  }
817  else {
818  helpint64 = 0;
819  }
820 
821  /* */ DPRINTFTS2(rank, "before gather");
822  sion_gendata->apidesc->gatherr_cb(&helpint64, sion_tmpintfield, comm_group, _SION_INT64, 1, 0);
823  /* */ DPRINTFTS2(rank, "after gather");
824 
825  if (rank == 0) {
826  for (lrank = 0; lrank < ntasks; lrank++)
827  DPRINTFP((2048, "_sion_parclose_generic", rank, " parallel close sid=%d: write total chunksize for block %d: %2lld rank=%d\n", sid, blknum,
828  sion_tmpintfield[lrank], lrank));
829 
830  _sion_write_header_var_part_nextblocksizes_from_field(sion_filedesc,sion_filedesc->ntasks,sion_tmpintfield);
831 
832  }
833  }
834 
835  /* write mapping to file if more than one physical file is used, mapping_size is the number of global tasks */
836  if (mapping != NULL) {
837  _sion_write_header_var_part_mapping(sion_filedesc, mapping_size, mapping);
838  }
839 
840  /* close file on task 0 */
841  if (rank == 0) {
842  DPRINTFP((32, "_sion_parclose_generic", rank, " parallel close (write mode) sid=%d, call fclose on file\n", sid));
843  if(do_close_file) {
844  _sion_file_close(sion_filedesc->fileptr);
845  }
846  sion_filedesc->fileptr = NULL;
847  sion_filedesc->state = SION_FILESTATE_CLOSE;
848 
849  /* free tmp field */
850  if(sion_tmpintfield) free(sion_tmpintfield);
851  }
852 
853  } /* write */
854 
855  _sion_free_filedesc(sion_filedesc);
856  sion_filedesc = NULL;
857 
858 
859  DPRINTFP((2, "_sion_parclose_generic", rank, "leave parallel close sid=%d\n", sid));
860 
861  return (rc);
862 }
863 
869  int sid,
870  sion_int64 chunksize,
871  int rank,
872  int ntasks,
873  _sion_generic_gendata *sion_gendata)
874 {
875 
876  int rc = SION_SUCCESS;
877  _sion_filedesc *sion_filedesc;
878  sion_int64 lchunksize, lstartpointer, lglobalrank;
879  void *comm_group=NULL;
880 
881  if ((_sion_vcdtype(sid) != SION_FILEDESCRIPTOR) || !(sion_filedesc = _sion_vcdtovcon(sid))) {
882  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"_sion_parreinit_generic: invalid sion_filedesc, aborting %d ...\n", sid));
883  }
884 
885  if (sion_filedesc->state != SION_FILESTATE_PAROPEN) {
886  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"_sion_parreinit_generic: sion file with sid=%d was not opened by a sion_paropen\n", sid));
887  }
888 
889  DPRINTFP((2, "_sion_parreinit_generic", sion_filedesc->rank, "enter parallel reinit sid=%d\n", sid));
890 
891  comm_group=sion_gendata->comm_data_local;
892 
893  if (sion_filedesc->mode == SION_FILEMODE_READ) {
894  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"_sion_parreinit_generic: sion file with sid=%d only allowed for files openend for write\n", sid));
895  }
896 
897  /* */ DPRINTFTS(sion_filedesc->rank, "before alloc");
898  if (sion_filedesc->rank == 0) {
899  /* memory allocation for internal fields */
900  _sion_alloc_filedesc_arrays(sion_filedesc);
901  }
902  /* */ DPRINTFTS(sion_filedesc->rank, "after alloc");
903 
904  /* collect new chunksize data and init startpointers on PE 0 */
905  lchunksize = (sion_int64) chunksize;
906  lglobalrank = (sion_int64) sion_filedesc->globalrank;
907 
908  /* */ DPRINTFTS2(sion_filedesc->rank, "before gather");
909  sion_gendata->apidesc->gatherr_cb(&lchunksize, sion_filedesc->all_chunksizes, comm_group, _SION_INT64, 1, 0);
910  sion_gendata->apidesc->gatherr_cb(&lglobalrank, sion_filedesc->all_globalranks, comm_group, _SION_INT64, 1, 0);
911 
912  /* */ DPRINTFTS2(sion_filedesc->rank, "after gather");
913 
914  if(sion_filedesc->usecoll) _sion_alloc_filedesc_coll_arrays(sion_filedesc);
915 
916  if (sion_filedesc->rank == 0) {
917  /* */ DPRINTFTS(sion_filedesc->rank, "before calculate");
918  if (!sion_filedesc->usecoll) _sion_calculate_startpointers(sion_filedesc);
919  else _sion_calculate_startpointers_collective(sion_filedesc);
920  /* */ DPRINTFTS(sion_filedesc->rank, "after calculate");
921  }
922 
923  /* write header again */
924  if (sion_filedesc->rank == 0) {
925 
926  /* apply hint for first meta data block */
927  _sion_apply_hints(sion_filedesc,SION_HINTS_ACCESS_TYPE_METADATABLOCK1);
928 
929  _sion_file_flush(sion_filedesc->fileptr);
930  lstartpointer=0;
931  _sion_file_set_position(sion_filedesc->fileptr, lstartpointer);
932 
933  /* */ DPRINTFTS(sion_filedesc->rank, "before writeh");
934  _sion_write_header(sion_filedesc);
935  /* */ DPRINTFTS(sion_filedesc->rank, "after writeh");
936 
937  /* needed for writing pointer to var part of metadata at the end of the file */
938  sion_filedesc->end_of_header = _sion_file_get_position(sion_filedesc->fileptr);
939  sion_filedesc->start_of_data = sion_filedesc->all_startpointers[0];
940 
941  /*set max. file size */
942  lstartpointer = sion_filedesc->all_startpointers[sion_filedesc->ntasks - 1]
943  + sion_filedesc->all_chunksizes[sion_filedesc->ntasks - 1];
944  /* */ DPRINTFTS(sion_filedesc->rank, "before setp(0)");
945  _sion_file_flush(sion_filedesc->fileptr);
946  _sion_file_set_position(sion_filedesc->fileptr, lstartpointer);
947  /* */ DPRINTFTS(sion_filedesc->rank, "after setp(0)");
948 
949  }
950 
951  /* distribute start_pos */
952  /* */ DPRINTFTS(sion_filedesc->rank, "before scatter");
953  sion_gendata->apidesc->scatterr_cb(sion_filedesc->all_startpointers, &sion_filedesc->startpos, comm_group, _SION_INT64, 1, 0);
954  /* */ DPRINTFTS(sion_filedesc->rank, "after scatter");
955 
956  /* distribute information for collective operations */
957  if(sion_filedesc->usecoll) {
958  sion_gendata->apidesc->scatterr_cb(sion_filedesc->all_coll_collsize, &sion_filedesc->collsize, comm_group, _SION_INT32, 1, 0);
959  sion_gendata->apidesc->scatterr_cb(sion_filedesc->all_coll_collector, &sion_filedesc->collector, comm_group, _SION_INT32, 1, 0);
960 
961  _sion_free_filedesc_coll_arrays(sion_filedesc);
962  }
963 
964  /* distribute globalskip */
965  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->globalskip, comm_group, _SION_INT64, 1, 0);
966 
967  DPRINTFP((32, "_sion_parreinit_generic", sion_filedesc->rank, " start position is %10lld %10.4f MB\n",
968  sion_filedesc->startpos, sion_filedesc->startpos / 1024.0 / 1024.0));
969 
970  /* set filepointer on each task */
971  /* */ DPRINTFTS(sion_filedesc->rank, "before setp");
972  sion_gendata->apidesc->barrier_cb(comm_group);
973  _sion_file_flush(sion_filedesc->fileptr);
974  _sion_file_set_position(sion_filedesc->fileptr, sion_filedesc->startpos);
975  sion_filedesc->currentpos = sion_filedesc->startpos;
976  sion_filedesc->chunksize = (sion_int64) chunksize;
977  sion_gendata->apidesc->barrier_cb(comm_group);
978 
979  /* apply hint for first chunk */
980  _sion_apply_hints(sion_filedesc,SION_HINTS_ACCESS_TYPE_CHUNK);
981 
982  if (sion_filedesc->rank == 0) {
983  /* not needed for rest of sionlib function calls */
984  _sion_free_filedesc_arrays(sion_filedesc);
985  }
986 
987  /* */ DPRINTFTS(sion_filedesc->rank, "after setp");
988  DPRINTFP((32, "_sion_parreinit_generic", sion_filedesc->rank, " ending open for write #tasks=%d filepos=%lld\n",
989  sion_filedesc->ntasks, _sion_file_get_position(sion_filedesc->fileptr)));
990 
991  DPRINTFP((2, "_sion_parreinit_generic", sion_filedesc->rank, "leave parallel reinit of file %s in #tasks=%d\n",
992  sion_filedesc->fname, sion_filedesc->ntasks));
993 
994  return (rc);
995 
996 
997 }
998 
999 
1004 #define DFUNCTION "_sion_generic_collect_mapping"
1005 int _sion_generic_collect_mapping( _sion_filedesc *sion_filedesc,
1006  int *mapping_size,
1007  sion_int32 **mapping ) {
1008  int rc=SION_SUCCESS;
1009  int t;
1010  _sion_generic_gendata *sion_gendata;
1011  _sion_generic_apidesc *sion_apidesc;
1012  sion_int32 lpos[2], *receivemap=NULL, iamreceiver, receiver = -1;
1013 
1014 
1015  sion_gendata=sion_filedesc->dataptr;
1016  sion_apidesc=sion_gendata->apidesc;
1017 
1018  *mapping = NULL; *mapping_size = 0;
1019 
1020  if ((sion_filedesc->mode == SION_FILEMODE_WRITE) && (sion_filedesc->nfiles > 1)) {
1021  /* collect mapping to files to task 0 */
1022 
1023  /* mapping data will be collected by master of first physical file */
1024  if((sion_filedesc->filenumber==0) && (sion_filedesc->rank==0)) {
1025  /* allocate data */
1026  *mapping_size=sion_gendata->gsize;
1027  *mapping = (sion_int32 *) malloc(*mapping_size * 2 * sizeof(sion_int32));
1028  if (*mapping == NULL) {
1029  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_generic_parclose: Cannot allocate memory for mapping"));
1030  }
1031  }
1032 
1033  /* gather info about send about global rank of master of first file on grank 0 */
1034  if(sion_gendata->grank==0) {
1035  receivemap = (sion_int32 *) malloc(sion_gendata->gsize * sizeof(sion_int32));
1036  if (receivemap == NULL) {
1037  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_generic_parclose: Cannot allocate memory for receivemap"));
1038  }
1039  }
1040 
1041  if((sion_filedesc->filenumber==0) && (sion_filedesc->rank==0)) iamreceiver=sion_gendata->grank;
1042  else iamreceiver=-1;
1043  sion_apidesc->gatherr_cb(&iamreceiver, receivemap, sion_gendata->comm_data_global, _SION_INT32, 1, 0);
1044  if(sion_gendata->grank==0) {
1045  for(t=0;t<sion_gendata->gsize;t++) {
1046  if(receivemap[t]>=0) {
1047  receiver=receivemap[t];
1048  break;
1049  }
1050  }
1051  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "receiver of mapping grank=%d\n", receiver));
1052  }
1053  sion_apidesc->bcastr_cb(&receiver, sion_gendata->comm_data_global, _SION_INT32, 1, 0);
1054 
1055  /* receive global rank of master of first file on grank 0 */
1056  lpos[0] = sion_filedesc->filenumber;
1057  lpos[1] = sion_filedesc->rank;
1058  sion_apidesc->gatherr_cb(&lpos, *mapping, sion_gendata->comm_data_global, _SION_INT32, 2, receiver);
1059  }
1060 
1061  if(receivemap!=NULL) free(receivemap);
1062 
1063  return(rc);
1064 }
1065 #undef DFUNCTION
1066 
1067 /* END OF _sion_parclose_generic */
long _sion_file_get_opt_blksize(_sion_fileptr *sion_fileptr)
Get optional file system block size for a file.
Definition: sion_file.c:187
sion_int64 _sion_file_get_position(_sion_fileptr *sion_fileptr)
Get new position in file.
Definition: sion_file.c:241
int _sion_buffer_flush(_sion_filedesc *sion_filedesc)
Flush buffer.
Definition: sion_buffer.c:164
int _sion_flush_block(_sion_filedesc *sion_filedesc)
Update the internal data structure.
sion_int64 _sion_file_set_position(_sion_fileptr *sion_fileptr, sion_int64 startpointer)
Set new position in file.
Definition: sion_file.c:219
int _sion_reassignvcd(int sid, void *data, int type)
Definition: sion_fd.c:63
_sion_fileptr * _sion_file_open(const char *fname, unsigned int flags, unsigned int addflags)
Create and open a new file for writing.
Definition: sion_file.c:42
Sion File Descriptor Structure.
Definition: sion_filedesc.h:79
int _sion_paropen_generic_one_file(int sid, char *fname, _sion_flags_store *flags_store, char *prefix, int *numFiles, int *filenumber, sion_int64 *chunksize, sion_int32 *fsblksize, int rank, int ntasks, int *globalrank, int flag, FILE **fileptr, _sion_generic_gendata *sion_gendata, _sion_generic_buddy *buddy_data)
Generic parallel open of one direct access file.
int _sion_parclose_generic(int sid, int rank, int ntasks, int mapping_size, sion_int32 *mapping, int flag, _sion_generic_gendata *sion_gendata, _sion_generic_buddy *buddy_data)
Internal function to close parallel opened SION file.
#define SION_FILE_FLAG_WRITE
Definition: sion_file.h:26
int _sion_read_header_var_part(_sion_filedesc *sion_filedesc)
Read the second part of SION Meta Block 1.
int _sion_write_header_var_part_mapping(_sion_filedesc *sion_filedesc, sion_int32 mapping_size, sion_int32 *mapping)
Write mapping into the SION Meta Block 2.
int _sion_file_purge(_sion_fileptr *sion_fileptr)
Purge data to file.
Definition: sion_file.c:285
Definition: sion_flags.h:31
sion_int64 * all_globalranks
int _sion_write_header(_sion_filedesc *sion_filedesc)
Write the SION Meta Block 1.
Definition: sion_metadata.c:38
int _sion_alloc_filedesc_arrays(_sion_filedesc *sion_filedesc)
Allocate memory for the internal sion arrays.
int _sion_free_filedesc_arrays(_sion_filedesc *sion_filedesc)
free memory for the internal sion arrays
#define SION_FILE_FLAG_READ
Definition: sion_file.h:27
int _sion_realloc_filedesc_blocklist(_sion_filedesc *sion_filedesc, sion_int32 maxchunks)
Increase the memory used by the internal sion structure for the blocklist.
int _sion_vcdtype(int sid)
Definition: sion_fd.c:58
sion_int32 * all_coll_collector
sion_int32 fileptr_exported
sion_int32 _sion_get_endianness_with_flags(sion_int64 flags)
Return endianness including possible choice via flags.
#define SION_FILE_FLAG_POSIX
Definition: sion_file.h:24
int _sion_parreinit_generic(int sid, sion_int64 chunksize, int rank, int ntasks, _sion_generic_gendata *sion_gendata)
change chunksize for an already opened SION file (write)
int _sion_write_header_var_part_nextblocksizes_from_field(_sion_filedesc *sion_filedesc, int field_size, sion_int64 *field)
Write the next set of blocksizes from Meta Block 2 Assuming that filepointer is at the correct positi...
#define MAXCHUNKS
Definition: sion_common.h:35
sion_int32 * all_coll_capability
void * _sion_vcdtovcon(int sid)
Definition: sion_fd.c:53
#define SION_FILE_FLAG_ANSI
Definition: sion_file.h:22
#define SION_FILE_FLAG_CREATE
Definition: sion_file.h:25
#define SION_FILEMODE_READ
Definition: sion_filedesc.h:37
sion_int64 * all_startpointers
int _sion_read_header_var_part_blockcount_to_field(_sion_filedesc *sion_filedesc, int field_size, sion_int64 *field)
Read the block sizes from Meta Block 2.
int _sion_file_close(_sion_fileptr *sion_fileptr)
Close file and destroys fileptr structure.
Definition: sion_file.c:109
#define SION_FILESTATE_PAROPEN
Definition: sion_filedesc.h:28
int _sion_write_header_var_info(_sion_filedesc *sion_filedesc)
Write the SION Meta Block 1.
int _sion_free_filedesc_coll_arrays(_sion_filedesc *sion_filedesc)
free memory for the internal sion arrays
int _sion_read_header_fix_part(_sion_filedesc *sion_filedesc)
Read part of the SION Meta Block 1.
int _sion_buffer_check_env(_sion_filedesc *sion_filedesc)
Checks if environment variables are set to use buffer.
Definition: sion_buffer.c:57
sion_int32 coll_capability
sion_int64 * blocksizes
int _sion_write_header_var_part_blockcount_from_field(_sion_filedesc *sion_filedesc, int field_size, sion_int64 *field)
Write the block sizes from Meta Block 2.
sion_int32 currentblocknr
Definition: sion_filedesc.h:97
int _sion_cache_check_env(_sion_filedesc *sion_filedesc)
Check if environment variables are set to use cache.
Definition: sion_cache.c:57
#define SION_FILEDESCRIPTOR
Definition: sion_fd.h:17
sion_int64 * all_chunksizes
int _sion_print_filedesc(_sion_filedesc *sion_filedesc, int level, char *desc, int flag)
Print the initialized sion file description.
#define SION_FILESTATE_CLOSE
Definition: sion_filedesc.h:35
_sion_filedesc * _sion_alloc_filedesc(void)
Allocates memory for internal sion structure.
int _sion_alloc_filedesc_coll_arrays(_sion_filedesc *sion_filedesc)
Allocate memory for the internal sion arrays.
int _sion_init_filedesc(_sion_filedesc *sion_filedesc)
Initialize the sion file description.
Definition: sion_filedesc.c:37
#define SION_FILEMODE_WRITE
Definition: sion_filedesc.h:38
int _sion_read_header_var_part_nextblocksizes_to_field(_sion_filedesc *sion_filedesc, int field_size, sion_int64 *field)
Read the next set of blocksizes from Meta Block 2 Assuming that filepointer is at the correct positio...
#define DFUNCTION
collect mapping information on rank 0 of first file, mapping=NULL for all others
Sion Time Stamp Header.
sion_int32 * all_coll_collsize
sion_int64 start_of_varheader
int _sion_file_flush(_sion_fileptr *sion_fileptr)
Flush data to file.
Definition: sion_file.c:263
_sion_fileptr * fileptr
Definition: sion_filedesc.h:82