SIONlib  1.7.0
Scalable I/O library for parallel access to task-local files
sion_generic_internal.c
Go to the documentation of this file.
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2016 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
19 #include <stdlib.h>
20 #include <stdio.h>
21 #include <string.h>
22 #include <unistd.h>
23 
24 #include "sion.h"
25 #include "sion_debug.h"
26 #include "sion_error_handler.h"
27 #include "sion_file.h"
28 #include "sion_filedesc.h"
29 #include "sion_fd.h"
30 #include "sion_metadata.h"
31 #include "sion_internal.h"
32 #include "sion_printts.h"
33 #include "sion_keyvalue.h"
34 #include "sion_flags.h"
35 
36 #include "sion_cache.h"
37 #include "sion_buffer.h"
38 #include "sion_hints.h"
39 #include "sion_generic_internal.h"
41 #include "sion_generic_buddy.h"
42 
66  int sid,
67  char *fname,
68  _sion_flags_store *flags_store,
69  char *prefix,
70  int *numFiles,
71  int *filenumber,
72  sion_int64 *chunksize,
73  sion_int32 *fsblksize,
74  int rank,
75  int ntasks,
76  int *globalrank,
77  int flag,
78  FILE **fileptr,
79  _sion_generic_gendata *sion_gendata,
80  _sion_generic_buddy *buddy_data )
81 {
82 
83  int i, j;
84  int rc;
85 
86  _sion_filedesc *sion_filedesc;
87  _sion_fileptr *sion_fileptr;
88 
89  int nfiles, filenum;
90  sion_int64 lchunksize, lstartpointer, lglobalrank, new_fsblocksize, helpint64, apiflag;
91  sion_int64 *sion_tmpintfield = NULL;
92  sion_int32 *sion_tmpintfield_map = NULL, helpint32;
93  sion_int32 *sion_tmpintfield_buddy32 = NULL;
94  sion_int64 *sion_tmpintfield_buddy64 = NULL;
95  void *comm_group=NULL;
96  int do_open_file;
97 
98  _sion_flags_entry* flags_entry = NULL;
99 
100  if (flags_store->mask&_SION_FMODE_POSIX) apiflag=SION_FILE_FLAG_POSIX;
101  else apiflag=SION_FILE_FLAG_ANSI;
102 
103  DPRINTFP((2, "_sion_paropen_generic_one_file", rank, "enter parallel open of file %s in mode %d #tasks=%d\n", fname, (int) flags_store->mask, ntasks));
104  DPRINTFP((32, "_sion_paropen_generic_one_file", rank, "sizeof: int=%d long=%d longlong=%d sion_int32=%d sion_int64=%d\n", sizeof(int), sizeof(long),
105  sizeof(long long), sizeof(sion_int32), sizeof(sion_int64)));
106 
107  /* some shortcuts */
108  nfiles = *numFiles;
109  filenum = *filenumber;
110 
111  /* select local communicator */
112  if(flag& _SION_INTERNAL_FLAG_NORMAL ) comm_group=sion_gendata->comm_data_local;
113  if(flag& _SION_INTERNAL_FLAG_BUDDY_NORMAL ) comm_group=sion_gendata->comm_data_local;
114  if(flag& _SION_INTERNAL_FLAG_BUDDY_SEND ) comm_group=buddy_data->buddy_send.commgroup;
115  if(flag& _SION_INTERNAL_FLAG_BUDDY_COLL ) comm_group=buddy_data->buddy_coll.commgroup;
116  if(flag& _SION_INTERNAL_FLAG_BUDDY_READ ) comm_group=buddy_data->groups[buddy_data->currentgroup]->commgroup;
117 
118  /* decide if file has to be opened physically */
119  do_open_file=1;
120  if (flag&_SION_INTERNAL_FLAG_BUDDY_SEND) do_open_file=0;
121  if ( (flag&_SION_INTERNAL_FLAG_BUDDY_COLL) && (rank>0) ) do_open_file=0;
122  if ( (flag&_SION_INTERNAL_FLAG_BUDDY_READ) && (rank>0) ) do_open_file=0;
123  /* WF: todo decide if data is local, these tasks can read data directly? */
124  DPRINTFP((2, "_sion_paropen_generic_one_file", rank, "do_open_file=%d\n", do_open_file));
125 
126  sion_filedesc = _sion_alloc_filedesc();
127  if (sion_filedesc == NULL) {
128  _sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_ABORT,"_sion_paropen_omp: cannot allocate filedescriptor structure of size %lu (sion_filedesc), aborting ...\n",
129  (unsigned long) sizeof(sion_filedesc));
130  }
131  _sion_init_filedesc(sion_filedesc);
132  sion_filedesc->fname = strdup(fname); /* Set the filename */
133 
134  _sion_reassignvcd(sid,sion_filedesc, SION_FILEDESCRIPTOR);
135  sion_filedesc->sid=sid;
136 
137  /* Allocate memory for storing MAXCHUNKS chunksize infos in internal structure */
139  sion_filedesc->lastchunknr = 0; /* Set the current number of chunks */
140  sion_filedesc->currentblocknr = 0; /* Set the current block number */
141 
142  if (flags_store->mask&_SION_FMODE_WRITE) {
143  /* **************** WRITE mode **************** */
144 
145  DPRINTFP((32, "_sion_paropen_generic_one_file", rank, " starting open for write #tasks=%d\n", ntasks));
146 
147  /* check parameter */
148  if (ntasks<0) {
149  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen: wrong number of tasks specific: ntasks=%d (<0), returning ...\n", (int) ntasks));
150  }
151 
152  /* check parameter */
153  if ((chunksize != NULL) && (*chunksize<0)) {
154  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen: ((chunksize != NULL) && (*chunksize<0)), returning ...\n"));
155  }
156 
157  /* check parameter */
158  if ((flag & _SION_INTERNAL_FLAG_NORMAL ) && (globalrank != NULL) && (*globalrank<0)) {
159  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen: ((globalrank != NULL) && (*globalrank<0)), returning ...\n"));
160  }
161 
162  sion_filedesc->state = SION_FILESTATE_PAROPEN;
163  sion_filedesc->mode = SION_FILEMODE_WRITE;
164  sion_filedesc->endianness = _sion_get_endianness_with_flags(flags_store->mask); /* Endianness */
165  sion_filedesc->swapbytes = 0; /* Endianness, swapping bytes */
166  sion_filedesc->fsblksize = *fsblksize;
167  sion_filedesc->rank = rank;
168  sion_filedesc->globalrank = *globalrank;
169  sion_filedesc->ntasks = ntasks;
170  sion_filedesc->nfiles = nfiles;
171  sion_filedesc->filenumber = filenum;
172  sion_filedesc->prefix = strdup(prefix);
173  sion_filedesc->compress = flags_store->mask&_SION_FMODE_COMPRESS;
174  sion_filedesc->usecoll = (flags_store->mask&_SION_FMODE_COLLECTIVE)>0;
175  sion_filedesc->collmergemode = (flags_store->mask&_SION_FMODE_COLLECTIVE_MERGE)>0;
176  sion_filedesc->usebuddy = (flags_store->mask&_SION_FMODE_BUDDY)>0;
177  if(sion_filedesc->usebuddy) {
178  sion_filedesc->buddylevel = atoi(_sion_flags_get(flags_store,"buddy")->val);
179  if (sion_filedesc->buddylevel==0) sion_filedesc->buddylevel=1; /* default */
180  }
181 
182  /* open file on rank 0, first time to create file and get fsblksize if necessary */
183  if (rank == 0) {
184  sion_fileptr = _sion_file_open(fname,apiflag|SION_FILE_FLAG_WRITE|SION_FILE_FLAG_CREATE,0);
185  if (!sion_fileptr) {
186  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_generic: cannot open %s for writing, aborting ...\n", fname));
187  }
188  if(*fsblksize<=-1) {
189  /* check with fstat fsblksize */
190  new_fsblocksize=(sion_int64) _sion_file_get_opt_blksize(sion_fileptr);
191  if((new_fsblocksize<0) || (new_fsblocksize>SION_MAX_FSBLOCKSIZE)) new_fsblocksize=SION_DEFAULT_FSBLOCKSIZE;
192  }
193  _sion_file_close(sion_fileptr);
194  }
195  sion_gendata->apidesc->barrier_cb(comm_group);
196  /* printf("WF: rank=%2d after first barrier fsblksize=%d\n",rank,(int) *fsblksize); */
197 
198  /* distribute new fsblksize */
199  if(*fsblksize==-1) {
200  sion_gendata->apidesc->bcastr_cb(&new_fsblocksize, comm_group, _SION_INT64, 1, 0);
201  *fsblksize=new_fsblocksize;
202  sion_filedesc->fsblksize = *fsblksize;
203  /* printf("WF: rank=%2d after bcast fsblksize=%d new_fsblocksize=%d\n",rank,(int) *fsblksize,(int) new_fsblocksize); */
204  DPRINTFP((32, "_sion_paropen_generic_one_file", rank, "setting fsblksize to %lld\n", new_fsblocksize));
205  }
206 
207  /* check for buffer, needed at this point to set flag before writing header */
208  _sion_cache_check_env(sion_filedesc);
209  _sion_buffer_check_env(sion_filedesc);
210 
211  /* check for keyval parameter, needed at this point to set flag before writing header (flag1) */
212  if (rank == 0) {
213  _sion_keyval_check_env(sion_filedesc, flags_store->mask);
214  }
215  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->keyvalmode, comm_group, _SION_INT32, 1, 0);
216 
217  /* check for collective options */
218  if (rank == 0) {
219  if ((flags_entry = _sion_flags_get(flags_store, "collsize"))) {
220  sion_filedesc->collsize = atoi(flags_entry->val);
221  }
222  _sion_coll_check_env(sion_filedesc);
223  }
224 
225  if (
226  ( flag&_SION_INTERNAL_FLAG_BUDDY_NORMAL )
227  || ( flag&_SION_INTERNAL_FLAG_BUDDY_SEND )
228  || ( flag&_SION_INTERNAL_FLAG_BUDDY_COLL )
229  )
230  {
231  /* overwrite collective info if not set*/
232  if(!sion_filedesc->usecoll) {
233  sion_filedesc->usecoll=1;
234  sion_filedesc->collsize=sion_filedesc->ntasks;
235  };
236  }
237 
238  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->usecoll, comm_group, _SION_INT32, 1, 0);
239  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->collsize, comm_group, _SION_INT32, 1, 0);
240  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->collmergemode, comm_group, _SION_INT32, 1, 0);
241 
242  /* check if API support coalescing I/O */
243  if(sion_filedesc->usecoll) {
244  if(sion_gendata->apidesc->level!=SION_GENERIC_API_LEVEL_FULL) {
245  _sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_WARN,"sion_paropen_generic: requested coalescing I/O but API does not support this mode, falling back to individual mode ...\n");
246  sion_filedesc->usecoll=0;
247  }
248  }
249 
250  /* check for hints options */
251  if (rank == 0) {
252  _sion_hints_check_env(sion_filedesc);
253  }
254  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->usehints, comm_group, _SION_INT32, 1, 0);
255  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->hinttype, comm_group, _SION_INT32, 1, 0);
256 
257  /* */ DPRINTFTS(rank, "before alloc");
258  if (rank == 0) {
259  /* memory allocation for internal fields */
260  _sion_alloc_filedesc_arrays(sion_filedesc);
261  if(sion_filedesc->usecoll) _sion_alloc_filedesc_coll_arrays(sion_filedesc);
262  }
263  /* */ DPRINTFTS(rank, "after alloc");
264 
265  /* collect data and init startpointers on PE 0 */
266  lchunksize = (sion_int64) *chunksize;
267  lglobalrank = (sion_int64) *globalrank;
268  sion_filedesc->chunksize_req=lchunksize;
269  DPRINTFP((32, "_sion_paropen_generic_one_file", rank, "lchunksize=%lld lglobalrank=%lld\n", lchunksize,lglobalrank));
270 
271  /* */ DPRINTFTS2(rank, "before gather");
272  sion_gendata->apidesc->gatherr_cb(&lchunksize, sion_filedesc->all_chunksizes, comm_group, _SION_INT64, 1, 0);
273  sion_gendata->apidesc->gatherr_cb(&lglobalrank, sion_filedesc->all_globalranks, comm_group, _SION_INT64, 1, 0);
274 
275  /* check capability of tasks */
276  if(sion_filedesc->usecoll) {
277  sion_filedesc->coll_capability=sion_gendata->apidesc->get_capability_cb(comm_group);
278  sion_gendata->apidesc->gatherr_cb(&sion_filedesc->coll_capability, sion_filedesc->all_coll_capability, comm_group, _SION_INT32, 1, 0);
279  }
280 
281  /* */ DPRINTFTS2(rank, "after gather");
282  if (rank == 0) {
283  /* */ DPRINTFTS(rank, "before calculate");
284  DPRINTFP((32, "_sion_paropen_generic_one_file", rank, "chunksizes[%d - 1]=%ld\n", ntasks,(long) sion_filedesc->all_chunksizes[ntasks - 1]));
285  if (!sion_filedesc->usecoll) _sion_calculate_startpointers(sion_filedesc);
286  else {
287  if (!sion_filedesc->collmergemode) _sion_calculate_startpointers_collective(sion_filedesc);
288  else _sion_calculate_startpointers_collective_merge(sion_filedesc);
289  }
290  /* */ DPRINTFTS(rank, "after calculate");
291  }
292 
293  /* */ DPRINTFTS(rank, "before open");
294  /* open not file on non-collector task if buddy-checkpointing */
295  if(do_open_file) {
296 
297  /* open file on all ranks */
298  sion_fileptr = _sion_file_open(fname,apiflag|SION_FILE_FLAG_WRITE,0);
299  if (!sion_fileptr) {
300  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_generic: cannot open %s for writing, aborting ...\n", fname));
301  }
302  /* store data in static data structure (sid) */
303  sion_filedesc->fileptr = sion_fileptr;
304  }
305  sion_gendata->apidesc->barrier_cb(comm_group);
306  /* */ DPRINTFTS(rank, "after open");
307 
308 
309  /* write header */
310  if (rank == 0) {
311 
312  /* apply hint for first meta data block */
313  _sion_apply_hints(sion_filedesc,SION_HINTS_ACCESS_TYPE_METADATABLOCK1);
314 
315  /* */ DPRINTFTS(rank, "before writeh");
316  _sion_write_header(sion_filedesc);
317  /* */ DPRINTFTS(rank, "after writeh");
318 
319  /* needed for writing pointer to var part of metadata at the end of the file */
320  sion_filedesc->end_of_header = _sion_file_get_position(sion_filedesc->fileptr);
321  sion_filedesc->start_of_data = sion_filedesc->all_startpointers[0];
322  /*set max. file size */
323  lstartpointer = sion_filedesc->all_startpointers[ntasks - 1]
324  + sion_filedesc->all_chunksizes[ntasks - 1];
325  /* */ DPRINTFTS(rank, "before setp(0)");
326  _sion_file_flush(sion_filedesc->fileptr);
327  _sion_file_set_position(sion_filedesc->fileptr, lstartpointer);
328  /* */ DPRINTFTS(rank, "after setp(0)");
329 
330  }
331 
332  /* distribute start_pos */
333  /* */ DPRINTFTS(rank, "before scatter");
334  sion_gendata->apidesc->scatterr_cb(sion_filedesc->all_startpointers, &sion_filedesc->startpos, comm_group, _SION_INT64, 1, 0);
335  /* */ DPRINTFTS(rank, "after scatter");
336 
337  /* distribute chunksize */
338  /* */ DPRINTFTS(rank, "before scatter");
339  sion_gendata->apidesc->scatterr_cb(sion_filedesc->all_chunksizes, &sion_filedesc->chunksize, comm_group, _SION_INT64, 1, 0);
340  /* */ DPRINTFTS(rank, "after scatter");
341 
342  /* distribute information for collective operations */
343  if(sion_filedesc->usecoll) {
344  sion_gendata->apidesc->scatterr_cb(sion_filedesc->all_coll_collsize, &sion_filedesc->collsize, comm_group, _SION_INT32, 1, 0);
345  sion_gendata->apidesc->scatterr_cb(sion_filedesc->all_coll_collector, &sion_filedesc->collector, comm_group, _SION_INT32, 1, 0);
346 
347  _sion_free_filedesc_coll_arrays(sion_filedesc);
348  }
349 
350  /* distribute globalskip */
351  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->globalskip, comm_group, _SION_INT64, 1, 0);
352 
353  DPRINTFP((32, "_sion_paropen_generic_one_file", rank, " start position is %10lld %10.4f MB chunksize=%10lld %10.4f MB\n",
354  sion_filedesc->startpos, sion_filedesc->startpos / 1024.0 / 1024.0,
355  sion_filedesc->chunksize, sion_filedesc->chunksize / 1024.0 / 1024.0
356  ));
357 
358  /* set filepointer on each task */
359  /* */ DPRINTFTS(rank, "before setp");
360  sion_gendata->apidesc->barrier_cb(comm_group);
361  if(do_open_file) {
362  _sion_file_flush(sion_filedesc->fileptr);
363  _sion_file_set_position(sion_filedesc->fileptr, sion_filedesc->startpos);
364  }
365  sion_filedesc->currentpos = sion_filedesc->startpos;
366  /* given by calculate startpointers ...
367  sion_filedesc->chunksize = (sion_int64) *chunksize;
368  */
369  sion_gendata->apidesc->barrier_cb(comm_group);
370 
371  /* apply hint for first chunk */
372  _sion_apply_hints(sion_filedesc,SION_HINTS_ACCESS_TYPE_CHUNK);
373 
374  /* */ DPRINTFTS(rank, "after setp");
375  DPRINTFP((32, "_sion_paropen_generic_one_file", rank, " ending open for write #tasks=%d filepos=%lld\n", ntasks, _sion_file_get_position(sion_filedesc->fileptr)));
376 
377  }
378  else if (flags_store->mask&_SION_FMODE_READ) {
379  /* **************** READ mode **************** */
380  if (rank == 0)
381  DPRINTFP((32, "_sion_paropen_generic_one_file", rank, " starting open for read #tasks=%d\n", ntasks));
382 
383  /* open not file on non-collector task if buddy-checkpointing */
384  if(do_open_file) {
385  /* */ DPRINTFTS(rank, "before openR");
386  sion_fileptr = _sion_file_open(fname,apiflag|SION_FILE_FLAG_READ,0);
387  /* */ DPRINTFTS(rank, "after openR");
388  if (!sion_fileptr) {
389  DPRINTFP((32, "_sion_paropen_generic_one_file", rank, " cannot open %s for reading, aborting ...\n", fname));
390  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"_sion_paropen_generic_one_file: cannot open %s for reading, aborting ...\n", fname));
391  }
392  } else {
393  sion_fileptr=NULL;
394  }
395  sion_gendata->apidesc->barrier_cb(comm_group);
396 
397  /* store data in static data structure (sid) */
398  sion_filedesc->fileptr = sion_fileptr;
399  sion_filedesc->rank = rank;
400  sion_filedesc->ntasks = ntasks;
401  sion_filedesc->state = SION_FILESTATE_PAROPEN;
402  sion_filedesc->mode = SION_FILEMODE_READ;
403  sion_filedesc->nfiles = nfiles;
404  sion_filedesc->usebuddy = (flags_store->mask&_SION_FMODE_BUDDY)>0;
405  if(sion_filedesc->usebuddy) {
406  sion_filedesc->buddylevel = atoi(_sion_flags_get(flags_store,"buddy")->val);
407  if (sion_filedesc->buddylevel==0) sion_filedesc->buddylevel=1; /* default */
408  }
409 
410  /* creating of mapping from file ranks to rank used in buddy read */
411  if ( flag&_SION_INTERNAL_FLAG_BUDDY_READ ) {
412 
413  /* overwrite collective info if not set*/
414  if(!sion_filedesc->usecoll) {
415  sion_filedesc->usecoll=1;
416  sion_filedesc->collsize=sion_filedesc->ntasks;
417  }
418 
419  DPRINTFP((32, "_sion_paropen_generic_one_file", rank, " create buddy mapping ntasks=%d filentasks=%d\n",ntasks,sion_filedesc->ntasks));
420 
421  if (rank == 0) {
422  sion_tmpintfield_buddy32 = (sion_int32 *) malloc(ntasks * sizeof(sion_int32));
423  if (sion_tmpintfield_buddy32 == NULL) {
424  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"_sion_paropen_generic_one_file: cannot allocate temporary memory of size %lu (sion_tmpintfield_buddy), aborting ...\n",
425  (unsigned long) ntasks * sizeof(sion_int32)));
426  }
427  for (j = 0; j < ntasks; j++) sion_tmpintfield_buddy32[j]=-1;
428  sion_tmpintfield_buddy64 = (sion_int64 *) malloc(ntasks * sizeof(sion_int64));
429  if (sion_tmpintfield_buddy64 == NULL) {
430  free(sion_tmpintfield_buddy32);
431  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"_sion_paropen_generic_one_file: cannot allocate temporary memory of size %lu (sion_tmpintfield_buddy), aborting ...\n",
432  (unsigned long) ntasks * sizeof(sion_int64)));
433  }
434  for (j = 0; j < ntasks; j++) sion_tmpintfield_buddy64[j]=-1;
435  sion_tmpintfield_map = (sion_int32 *) malloc(ntasks * sizeof(sion_int32));
436  if (sion_tmpintfield_map == NULL) {
437  free(sion_tmpintfield_buddy32);
438  free(sion_tmpintfield_buddy64);
439  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"_sion_paropen_generic_one_file: cannot allocate temporary memory of size %lu (sion_tmpintfield_map), aborting ...\n",
440  (unsigned long) ntasks * sizeof(sion_int32)));
441  }
442  for (j = 0; j < ntasks; j++) sion_tmpintfield_map[j]=-1;
443 
444  }
445  helpint32=buddy_data->groups[buddy_data->currentgroup]->filelrank;
446  sion_gendata->apidesc->gatherr_cb(&helpint32, sion_tmpintfield_map, comm_group, _SION_INT32, 1, 0);
447 
448 
449  if (rank == 0) {
450  for (j = 0; j < ntasks; j++)
451  DPRINTFP((64, "_sion_paropen_generic_one_file", rank, " buddy map[%d]=%d\n", j, (int) sion_tmpintfield_map[j]));
452  }
453 
454  }
455 
456  if (rank == 0) {
457  rc = _sion_read_header_fix_part(sion_filedesc); /* overwrites sion_filedesc->ntasks */
458  if (rc!=SION_SUCCESS) {
459  free(sion_tmpintfield_buddy32);
460  free(sion_tmpintfield_buddy64);
461  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"_sion_paropen_generic_one_file: cannot read header from file %s, aborting ...\n", fname));
462  }
463  DPRINTFP((32, "_sion_paropen_generic_one_file", rank,
464  " read, after read of fix header part endianness=0x%x blksize=%d ntasks=%d\n", sion_filedesc->endianness, sion_filedesc->fsblksize, sion_filedesc->ntasks));
465 
466  /* */ DPRINTFTS(rank, "before alloc");
467  /* memory allocation */
468  _sion_alloc_filedesc_arrays(sion_filedesc);
469  /* */ DPRINTFTS(rank, "after alloc");
470 
471  rc = _sion_read_header_var_part(sion_filedesc);
472  if (rc!=SION_SUCCESS) {
473  free(sion_tmpintfield_buddy32);
474  free(sion_tmpintfield_buddy64);
475  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"_sion_paropen_generic_one_file: cannot read header from file %s, aborting ...\n", fname));
476  }
477 
478  if ((flags_entry = _sion_flags_get(flags_store, "collsize"))) {
479  sion_filedesc->collsize = atoi(flags_entry->val);
480  }
481  _sion_coll_check_env(sion_filedesc);
482  if(sion_filedesc->usecoll) _sion_alloc_filedesc_coll_arrays(sion_filedesc);
483 
484  /* collective */
485  if (!sion_filedesc->usecoll) _sion_calculate_startpointers(sion_filedesc);
486  else {
487  if (!sion_filedesc->collmergemode) _sion_calculate_startpointers_collective(sion_filedesc);
488  else _sion_calculate_startpointers_collective_merge(sion_filedesc);
489  }
490  /* */ DPRINTFTS(rank, "after calculate");
491 
492  /* check for keyval parameter, needed at this point to set flag before writing header (flag1) */
493  _sion_keyval_check_env(sion_filedesc, flags_store->mask);
494 
495  } /* rank==0 */
496 
497  /* distribute keyvalmode */
498  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->keyvalmode, comm_group, _SION_INT32, 1, 0);
499 
500  /* distribute collective options */
501  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->usecoll, comm_group, _SION_INT32, 1, 0);
502  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->collsize, comm_group, _SION_INT32, 1, 0);
503 
504 
505  DPRINTFP((32, "_sion_paropen_generic_one_file", rank," usecoll=%d\n", sion_filedesc->usecoll));
506 
507  if(sion_filedesc->usecoll) {
508 
509  if (! (flag&_SION_INTERNAL_FLAG_BUDDY_READ) ) {
510  sion_gendata->apidesc->scatterr_cb(sion_filedesc->all_coll_collsize, &sion_filedesc->collsize, comm_group, _SION_INT32, 1, 0);
511  sion_gendata->apidesc->scatterr_cb(sion_filedesc->all_coll_collector, &sion_filedesc->collector, comm_group, _SION_INT32, 1, 0);
512  } else {
513 
514  /* remap data */
515  if(rank==0) for (j = 0; j < ntasks; j++) if(sion_tmpintfield_map[j]>=0) sion_tmpintfield_buddy32[j]=sion_filedesc->all_coll_collsize[sion_tmpintfield_map[j]];
516  sion_gendata->apidesc->scatterr_cb(sion_tmpintfield_buddy32, &sion_filedesc->collsize, comm_group, _SION_INT32, 1, 0);
517  /* remap data */
518  if(rank==0) for (j = 0; j < ntasks; j++) if(sion_tmpintfield_map[j]>=0) sion_tmpintfield_buddy32[j]=sion_filedesc->all_coll_collector[sion_tmpintfield_map[j]];
519  sion_gendata->apidesc->scatterr_cb(sion_tmpintfield_buddy32, &sion_filedesc->collector, comm_group, _SION_INT32, 1, 0);
520  }
521 
522 
523  _sion_free_filedesc_coll_arrays(sion_filedesc);
524  }
525 
526  /* distribute globalskip */
527  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->globalskip, comm_group, _SION_INT64, 1, 0);
528 
529  /* broadcast information read from file */
530  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->endianness, comm_group, _SION_INT32, 1, 0);
531  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->swapbytes, comm_group, _SION_INT32, 1, 0);
532  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->fsblksize, comm_group, _SION_INT32, 1, 0);
533  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->ntasks, comm_group, _SION_INT32, 1, 0);
534  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->fileversion, comm_group, _SION_INT32, 1, 0);
535  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->nfiles, comm_group, _SION_INT32, 1, 0);
536  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->filenumber, comm_group, _SION_INT32, 1, 0);
537  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->flag1, comm_group, _SION_INT32, 1, 0);
538  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->flag2, comm_group, _SION_INT32, 1, 0);
539  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->maxusedchunks, comm_group, _SION_INT32, 1, 0);
540 
541  DPRINTFP((32, "_sion_paropen_generic_one_file", rank,
542  " read, after read of maxusedchunks=%d maxchunks=%d (%d)\n", sion_filedesc->maxusedchunks,sion_filedesc->maxchunks, MAXCHUNKS));
543  if (sion_filedesc->maxusedchunks > MAXCHUNKS) _sion_realloc_filedesc_blocklist(sion_filedesc, sion_filedesc->maxusedchunks);
544  /* */ DPRINTFTS(rank, "after bcast");
545 
546  /* scatter per task information read from file */
547  /* */ DPRINTFTS(rank, "before scatter");
548  if (! (flag&_SION_INTERNAL_FLAG_BUDDY_READ) ) {
549  sion_gendata->apidesc->scatterr_cb(sion_filedesc->all_chunksizes, &sion_filedesc->chunksize, comm_group, _SION_INT64, 1, 0);
550  sion_gendata->apidesc->scatterr_cb(sion_filedesc->all_startpointers, &sion_filedesc->startpos, comm_group, _SION_INT64, 1, 0);
551  sion_gendata->apidesc->scatterr_cb(sion_filedesc->all_globalranks, &helpint64, comm_group, _SION_INT64, 1, 0);sion_filedesc->globalrank=(sion_int32) helpint64;
552  } else {
553  /* remap data */
554  if(rank==0) for (j = 0; j < ntasks; j++) if(sion_tmpintfield_map[j]>=0) sion_tmpintfield_buddy64[j]=sion_filedesc->all_chunksizes[sion_tmpintfield_map[j]];
555  sion_gendata->apidesc->scatterr_cb(sion_tmpintfield_buddy64, &sion_filedesc->chunksize, comm_group, _SION_INT64, 1, 0);
556  /* remap data */
557  if(rank==0) for (j = 0; j < ntasks; j++) if(sion_tmpintfield_map[j]>=0) sion_tmpintfield_buddy64[j]=sion_filedesc->all_startpointers[sion_tmpintfield_map[j]];
558  sion_gendata->apidesc->scatterr_cb(sion_tmpintfield_buddy64, &sion_filedesc->startpos, comm_group, _SION_INT64, 1, 0);
559  /* remap data */
560  if(rank==0) for (j = 0; j < ntasks; j++) if(sion_tmpintfield_map[j]>=0) sion_tmpintfield_buddy64[j]=sion_filedesc->all_globalranks[sion_tmpintfield_map[j]];
561  sion_gendata->apidesc->scatterr_cb(sion_tmpintfield_buddy64, &helpint64, comm_group, _SION_INT64, 1, 0);sion_filedesc->globalrank=(sion_int32) helpint64;
562  }
563 
564  /* */ DPRINTFTS(rank, "after scatter");
565 
566  /* read number of blocks for each task */
567  if (rank == 0) {
568  sion_tmpintfield = (sion_int64 *) malloc(sion_filedesc->ntasks * sizeof(sion_int64));
569  if (sion_tmpintfield == NULL) {
570  free(sion_tmpintfield_buddy32);
571  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"_sion_paropen_generic_one_file: cannot allocate temporary memory of size %lu (sion_tmpintfield), aborting ...\n",
572  (unsigned long) ntasks * sizeof(sion_int64)));
573  }
574  _sion_read_header_var_part_blockcount_to_field(sion_filedesc, sion_filedesc->ntasks, sion_tmpintfield);
575 
576  for (j = 0; j < sion_filedesc->ntasks; j++)
577  DPRINTFP((2048, "_sion_paropen_generic_one_file", rank, " read, blockcount on task %02d is %10ld\n", j, (long) sion_tmpintfield[j]));
578  }
579 
580  /* and distribute them */
581  /* */ DPRINTFTS(rank, "before scatter");
582  if (! (flag&_SION_INTERNAL_FLAG_BUDDY_READ) ) {
583  sion_gendata->apidesc->scatterr_cb(sion_tmpintfield, &helpint64, comm_group, _SION_INT64, 1, 0);
584  } else {
585  /* remap data */
586  if(rank==0) for (j = 0; j < ntasks; j++) if(sion_tmpintfield_map[j]>=0) sion_tmpintfield_buddy64[j]=sion_tmpintfield[sion_tmpintfield_map[j]];
587  sion_gendata->apidesc->scatterr_cb(sion_tmpintfield_buddy64, &helpint64, comm_group, _SION_INT64, 1, 0);
588  }
589  /* */ DPRINTFTS(rank, "after scatter");
590  sion_filedesc->lastchunknr = helpint64-1;
591  DPRINTFP((32, "_sion_paropen_generic_one_file", rank, " lastchunknr on task %02d is %10ld\n", rank, (long) sion_filedesc->lastchunknr));
592 
593  for (i = 0; i < sion_filedesc->maxusedchunks; i++) {
594  if (rank == 0) _sion_read_header_var_part_nextblocksizes_to_field(sion_filedesc, sion_filedesc->ntasks, sion_tmpintfield);
595  /* */ DPRINTFTS(rank, "before scatter");
596  if (! (flag&_SION_INTERNAL_FLAG_BUDDY_READ) ) {
597  sion_gendata->apidesc->scatterr_cb(sion_tmpintfield, &helpint64, comm_group, _SION_INT64, 1, 0);
598  } else {
599  /* remap data */
600  if(rank==0) for (j = 0; j < ntasks; j++) if(sion_tmpintfield_map[j]>=0) sion_tmpintfield_buddy64[j]=sion_tmpintfield[sion_tmpintfield_map[j]];
601  sion_gendata->apidesc->scatterr_cb(sion_tmpintfield_buddy64, &helpint64, comm_group, _SION_INT64, 1, 0);
602  }
603  /* */ DPRINTFTS(rank, "after scatter");
604  sion_filedesc->blocksizes[i] = helpint64;
605  }
606 
607 #define BGFLUSH
608 #ifdef BGFLUSH
609  if(do_open_file) {
610  _sion_file_flush(sion_filedesc->fileptr);
611  }
612 #endif
613 
614  /* */ DPRINTFTS(rank, "before setp");
615  sion_gendata->apidesc->barrier_cb(comm_group);
616  if(do_open_file) {
617  _sion_file_purge(sion_filedesc->fileptr);
618  _sion_file_set_position(sion_filedesc->fileptr, sion_filedesc->startpos);
619  }
620  sion_filedesc->currentpos = sion_filedesc->startpos;
621  sion_filedesc->currentblocknr = 0;
622 
623  /* OUTPUT parameters */
624  *fsblksize = sion_filedesc->fsblksize;
625  *chunksize = sion_filedesc->chunksize;
626  *globalrank = sion_filedesc->globalrank;
627 
628  /* free tmp field */
629  if(sion_tmpintfield) free(sion_tmpintfield);
630  if(sion_tmpintfield_map) free(sion_tmpintfield_map);
631  if(sion_tmpintfield_buddy32) free(sion_tmpintfield_buddy32);
632  if(sion_tmpintfield_buddy64) free(sion_tmpintfield_buddy64);
633 
634  sion_gendata->apidesc->barrier_cb(comm_group);
635  /* */ DPRINTFTS(rank, "after setp");
636  /* end of read */
637  } else {
638  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_multi_mpi: unknown file mode"));
639  }
640 
641  if(do_open_file) {
642  if(fileptr!=NULL) {
643  if(sion_filedesc->fileptr->flags&SION_FILE_FLAG_ANSI) {
644  *fileptr=sion_filedesc->fileptr->fileptr;
645  sion_filedesc->fileptr_exported=1;
646  } else {
647  *fileptr=NULL;
648  sion_filedesc->fileptr_exported=0;
649  }
650  }
651  } else {
652  if(fileptr!=NULL) *fileptr=NULL;
653  sion_filedesc->fileptr_exported=0;
654  }
655 
656  if (rank == 0) {
657  /* not needed for rest of sionlib function calls */
658  _sion_free_filedesc_arrays(sion_filedesc);
659  }
660 
661  _sion_print_filedesc(sion_filedesc, 512, "_sion_paropen_generic_one_file", 1);
662 
663  DPRINTFP((32, "_sion_paropen_generic_one_file", rank, " start position on task %02d is at end of sion_paropen_generic %10lld\n", rank,
664  _sion_file_get_position(sion_filedesc->fileptr)));
665 
666  DPRINTFP((2, "_sion_paropen_generic_one_file", rank, "leave parallel open of file %s in mode 0x%lx #tasks=%d\n", fname, (long) flags_store->mask, ntasks));
667 
668  return (sid);
669 
670 
671 }
672 
673 
674 
690  int rank,
691  int ntasks,
692  int mapping_size,
693  sion_int32 *mapping,
694  int flag,
695  _sion_generic_gendata *sion_gendata,
696  _sion_generic_buddy *buddy_data)
697 {
698 
699  int rc = SION_SUCCESS;
700  int blknum, lrank;
701  sion_int64 helpint64;
702  sion_int64 *sion_tmpintfield = NULL;
703  _sion_filedesc *sion_filedesc;
704  void *comm_group=NULL;
705  int do_close_file;
706 
707  if ((_sion_vcdtype(sid) != SION_FILEDESCRIPTOR) || !(sion_filedesc = _sion_vcdtovcon(sid))) {
708  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"_sion_parclose_generic: invalid sion_filedesc, aborting %d ...\n", sid));
709  }
710 
711  if (sion_filedesc->state != SION_FILESTATE_PAROPEN) {
712  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"_sion_parclose_generic: sion file with sid=%d was not opened by a sion_paropen\n", sid));
713  }
714 
715  DPRINTFP((2, "_sion_parclose_generic", rank, "enter parallel close sid=%d\n", sid));
716 
717  /* select local communicator */
718  if(flag& _SION_INTERNAL_FLAG_NORMAL ) comm_group=sion_gendata->comm_data_local;
719  if(flag& _SION_INTERNAL_FLAG_BUDDY_SEND ) comm_group=buddy_data->buddy_send.commgroup;
720  if(flag& _SION_INTERNAL_FLAG_BUDDY_COLL ) comm_group=buddy_data->buddy_coll.commgroup;
721  if(flag& _SION_INTERNAL_FLAG_BUDDY_READ ) comm_group=buddy_data->groups[buddy_data->currentgroup]->commgroup;
722 
723  /* decide if file has to be opened physically */
724  do_close_file=1;
725  if (flag&_SION_INTERNAL_FLAG_BUDDY_SEND) do_close_file=0;
726  if ( (flag&_SION_INTERNAL_FLAG_BUDDY_COLL) && (rank>0) ) do_close_file=0;
727  if ( (flag&_SION_INTERNAL_FLAG_BUDDY_READ) && (rank>0) ) do_close_file=0;
728 
729  /* READ MODE: close file on all tasks */
730  if (sion_filedesc->mode == SION_FILEMODE_READ) {
731  if (sion_filedesc->state != SION_FILESTATE_CLOSE) {
732 
733  _sion_print_filedesc(sion_filedesc, 512, "_sion_parclose_generic", 1);
734  DPRINTFP((32, "_sion_parclose_generic", rank, " parallel close (read mode) sid=%d, call fclose on file\n", sid));
735 
736  if(do_close_file) {
737  _sion_file_close(sion_filedesc->fileptr);
738  }
739  sion_filedesc->fileptr = NULL;
740  sion_filedesc->state = SION_FILESTATE_CLOSE;
741  }
742  }
743  else {
744  /* WRITE MODE: collect data from all tasks, write metadata, and close file on all tasks */
745 
746  /* _sion_buffer_flush(sion_filedesc); */ /* clear internal buffer */
747  _sion_flush_block(sion_filedesc);
748 
749  if (sion_filedesc->usebuffer) {
750  _sion_buffer_flush(sion_filedesc);
751  }
752 
753  _sion_print_filedesc(sion_filedesc, 512, "_sion_parclose_generic", 1);
754 
755  /* close file on all other task, except 0 */
756  if (rank != 0) {
757  if (sion_filedesc->state != SION_FILESTATE_CLOSE) {
758  DPRINTFP((32, "_sion_parclose_generic", rank, " parallel close (write mode) sid=%d, call fclose on file\n", sid));
759  if(do_close_file) {
760  _sion_file_close(sion_filedesc->fileptr);
761  }
762  sion_filedesc->fileptr = NULL;
763  sion_filedesc->state = SION_FILESTATE_CLOSE;
764  }
765  }
766 
767  sion_gendata->apidesc->barrier_cb(comm_group);
768 
769  DPRINTFP((32, "_sion_parclose_generic", rank, " parallel close sid=%d: lastchunknr=%d globalskip=%lld\n", sid, sion_filedesc->lastchunknr,
770  sion_filedesc->globalskip));
771  for (blknum = 0; blknum <= sion_filedesc->lastchunknr; blknum++) {
772  DPRINTFP((1024, "_sion_parclose_generic", rank, " parallel close sid=%d: local block %02d -> %10lld bytes\n", sid, blknum,
773  sion_filedesc->blocksizes[blknum]));
774  }
775 
776  if (rank == 0) {
777  sion_tmpintfield = (sion_int64 *) malloc(sion_filedesc->ntasks * sizeof(sion_int64));
778  if (sion_tmpintfield == NULL) {
779  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"_sion_parclose_generic: cannot allocate temporary memory of size %lu (sion_tmpintfield), aborting ...\n",
780  (unsigned long) sion_filedesc->ntasks * sizeof(sion_int64)));
781  }
782  }
783 
784  /* gather number of blocks of each tasks, and search maxusedchunks */
785  /* */ DPRINTFTS2(rank, "before gather");
786  helpint64 = sion_filedesc->lastchunknr + 1;
787  sion_gendata->apidesc->gatherr_cb(&helpint64, sion_tmpintfield, comm_group, _SION_INT64, 1, 0);
788 
789  if (rank == 0) {
790  sion_filedesc->maxusedchunks = -1;
791  for (blknum = 0; blknum < sion_filedesc->ntasks; blknum++)
792  if (sion_tmpintfield[blknum] > sion_filedesc->maxusedchunks)
793  sion_filedesc->maxusedchunks = (int) sion_tmpintfield[blknum];
794  }
795  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->maxusedchunks, comm_group, _SION_INT32, 1, 0);
796  /* */ DPRINTFTS2(rank, "after gather");
797 
798  /* calculate and set start_of_varheader */
799  sion_filedesc->start_of_varheader = sion_filedesc->start_of_data + sion_filedesc->maxusedchunks * sion_filedesc->globalskip;
800 
801  /* write rest of first meta data block on rank 0 */
802  if (rank == 0) {
803  _sion_write_header_var_info(sion_filedesc);
804 
805  _sion_write_header_var_part_blockcount_from_field(sion_filedesc,sion_filedesc->ntasks,sion_tmpintfield);
806 
807  }
808 
809  /* collect chunksizes of each block from each task and write it to file */
810  for (blknum = 0; blknum < sion_filedesc->maxusedchunks; blknum++) {
811  if (blknum <= sion_filedesc->lastchunknr) {
812  helpint64 = sion_filedesc->blocksizes[blknum];
813  }
814  else {
815  helpint64 = 0;
816  }
817 
818  /* */ DPRINTFTS2(rank, "before gather");
819  sion_gendata->apidesc->gatherr_cb(&helpint64, sion_tmpintfield, comm_group, _SION_INT64, 1, 0);
820  /* */ DPRINTFTS2(rank, "after gather");
821 
822  if (rank == 0) {
823  for (lrank = 0; lrank < ntasks; lrank++)
824  DPRINTFP((2048, "_sion_parclose_generic", rank, " parallel close sid=%d: write total chunksize for block %d: %2lld rank=%d\n", sid, blknum,
825  sion_tmpintfield[lrank], lrank));
826 
827  _sion_write_header_var_part_nextblocksizes_from_field(sion_filedesc,sion_filedesc->ntasks,sion_tmpintfield);
828 
829  }
830  }
831 
832  /* write mapping to file if more than one physical file is used, mapping_size is the number of global tasks */
833  if (mapping != NULL) {
834  _sion_write_header_var_part_mapping(sion_filedesc, mapping_size, mapping);
835  }
836 
837  /* close file on task 0 */
838  if (rank == 0) {
839  DPRINTFP((32, "_sion_parclose_generic", rank, " parallel close (write mode) sid=%d, call fclose on file\n", sid));
840  if(do_close_file) {
841  _sion_file_close(sion_filedesc->fileptr);
842  }
843  sion_filedesc->fileptr = NULL;
844  sion_filedesc->state = SION_FILESTATE_CLOSE;
845 
846  /* free tmp field */
847  if(sion_tmpintfield) free(sion_tmpintfield);
848  }
849 
850  } /* write */
851 
852  _sion_free_filedesc(sion_filedesc);
853  sion_filedesc = NULL;
854 
855 
856  DPRINTFP((2, "_sion_parclose_generic", rank, "leave parallel close sid=%d\n", sid));
857 
858  return (rc);
859 }
860 
866  int sid,
867  sion_int64 chunksize,
868  int rank,
869  int ntasks,
870  _sion_generic_gendata *sion_gendata)
871 {
872 
873  int rc = SION_SUCCESS;
874  _sion_filedesc *sion_filedesc;
875  sion_int64 lchunksize, lstartpointer, lglobalrank;
876  void *comm_group=NULL;
877 
878  if ((_sion_vcdtype(sid) != SION_FILEDESCRIPTOR) || !(sion_filedesc = _sion_vcdtovcon(sid))) {
879  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"_sion_parreinit_generic: invalid sion_filedesc, aborting %d ...\n", sid));
880  }
881 
882  if (sion_filedesc->state != SION_FILESTATE_PAROPEN) {
883  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"_sion_parreinit_generic: sion file with sid=%d was not opened by a sion_paropen\n", sid));
884  }
885 
886  DPRINTFP((2, "_sion_parreinit_generic", sion_filedesc->rank, "enter parallel reinit sid=%d\n", sid));
887 
888  comm_group=sion_gendata->comm_data_local;
889 
890  if (sion_filedesc->mode == SION_FILEMODE_READ) {
891  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"_sion_parreinit_generic: sion file with sid=%d only allowed for files openend for write\n", sid));
892  }
893 
894  /* */ DPRINTFTS(sion_filedesc->rank, "before alloc");
895  if (sion_filedesc->rank == 0) {
896  /* memory allocation for internal fields */
897  _sion_alloc_filedesc_arrays(sion_filedesc);
898  }
899  /* */ DPRINTFTS(sion_filedesc->rank, "after alloc");
900 
901  /* collect new chunksize data and init startpointers on PE 0 */
902  lchunksize = (sion_int64) chunksize;
903  lglobalrank = (sion_int64) sion_filedesc->globalrank;
904 
905  /* */ DPRINTFTS2(sion_filedesc->rank, "before gather");
906  sion_gendata->apidesc->gatherr_cb(&lchunksize, sion_filedesc->all_chunksizes, comm_group, _SION_INT64, 1, 0);
907  sion_gendata->apidesc->gatherr_cb(&lglobalrank, sion_filedesc->all_globalranks, comm_group, _SION_INT64, 1, 0);
908 
909  /* */ DPRINTFTS2(sion_filedesc->rank, "after gather");
910 
911  if(sion_filedesc->usecoll) _sion_alloc_filedesc_coll_arrays(sion_filedesc);
912 
913  if (sion_filedesc->rank == 0) {
914  /* */ DPRINTFTS(sion_filedesc->rank, "before calculate");
915  if (!sion_filedesc->usecoll) _sion_calculate_startpointers(sion_filedesc);
916  else _sion_calculate_startpointers_collective(sion_filedesc);
917  /* */ DPRINTFTS(sion_filedesc->rank, "after calculate");
918  }
919 
920  /* write header again */
921  if (sion_filedesc->rank == 0) {
922 
923  /* apply hint for first meta data block */
924  _sion_apply_hints(sion_filedesc,SION_HINTS_ACCESS_TYPE_METADATABLOCK1);
925 
926  _sion_file_flush(sion_filedesc->fileptr);
927  lstartpointer=0;
928  _sion_file_set_position(sion_filedesc->fileptr, lstartpointer);
929 
930  /* */ DPRINTFTS(sion_filedesc->rank, "before writeh");
931  _sion_write_header(sion_filedesc);
932  /* */ DPRINTFTS(sion_filedesc->rank, "after writeh");
933 
934  /* needed for writing pointer to var part of metadata at the end of the file */
935  sion_filedesc->end_of_header = _sion_file_get_position(sion_filedesc->fileptr);
936  sion_filedesc->start_of_data = sion_filedesc->all_startpointers[0];
937 
938  /*set max. file size */
939  lstartpointer = sion_filedesc->all_startpointers[sion_filedesc->ntasks - 1]
940  + sion_filedesc->all_chunksizes[sion_filedesc->ntasks - 1];
941  /* */ DPRINTFTS(sion_filedesc->rank, "before setp(0)");
942  _sion_file_flush(sion_filedesc->fileptr);
943  _sion_file_set_position(sion_filedesc->fileptr, lstartpointer);
944  /* */ DPRINTFTS(sion_filedesc->rank, "after setp(0)");
945 
946  }
947 
948  /* distribute start_pos */
949  /* */ DPRINTFTS(sion_filedesc->rank, "before scatter");
950  sion_gendata->apidesc->scatterr_cb(sion_filedesc->all_startpointers, &sion_filedesc->startpos, comm_group, _SION_INT64, 1, 0);
951  /* */ DPRINTFTS(sion_filedesc->rank, "after scatter");
952 
953  /* distribute information for collective operations */
954  if(sion_filedesc->usecoll) {
955  sion_gendata->apidesc->scatterr_cb(sion_filedesc->all_coll_collsize, &sion_filedesc->collsize, comm_group, _SION_INT32, 1, 0);
956  sion_gendata->apidesc->scatterr_cb(sion_filedesc->all_coll_collector, &sion_filedesc->collector, comm_group, _SION_INT32, 1, 0);
957 
958  _sion_free_filedesc_coll_arrays(sion_filedesc);
959  }
960 
961  /* distribute globalskip */
962  sion_gendata->apidesc->bcastr_cb(&sion_filedesc->globalskip, comm_group, _SION_INT64, 1, 0);
963 
964  DPRINTFP((32, "_sion_parreinit_generic", sion_filedesc->rank, " start position is %10lld %10.4f MB\n",
965  sion_filedesc->startpos, sion_filedesc->startpos / 1024.0 / 1024.0));
966 
967  /* set filepointer on each task */
968  /* */ DPRINTFTS(sion_filedesc->rank, "before setp");
969  sion_gendata->apidesc->barrier_cb(comm_group);
970  _sion_file_flush(sion_filedesc->fileptr);
971  _sion_file_set_position(sion_filedesc->fileptr, sion_filedesc->startpos);
972  sion_filedesc->currentpos = sion_filedesc->startpos;
973  sion_filedesc->chunksize = (sion_int64) chunksize;
974  sion_gendata->apidesc->barrier_cb(comm_group);
975 
976  /* apply hint for first chunk */
977  _sion_apply_hints(sion_filedesc,SION_HINTS_ACCESS_TYPE_CHUNK);
978 
979  if (sion_filedesc->rank == 0) {
980  /* not needed for rest of sionlib function calls */
981  _sion_free_filedesc_arrays(sion_filedesc);
982  }
983 
984  /* */ DPRINTFTS(sion_filedesc->rank, "after setp");
985  DPRINTFP((32, "_sion_parreinit_generic", sion_filedesc->rank, " ending open for write #tasks=%d filepos=%lld\n",
986  sion_filedesc->ntasks, _sion_file_get_position(sion_filedesc->fileptr)));
987 
988  DPRINTFP((2, "_sion_parreinit_generic", sion_filedesc->rank, "leave parallel reinit of file %s in #tasks=%d\n",
989  sion_filedesc->fname, sion_filedesc->ntasks));
990 
991  return (rc);
992 
993 
994 }
995 
996 
1001 #define DFUNCTION "_sion_generic_collect_mapping"
1002 int _sion_generic_collect_mapping( _sion_filedesc *sion_filedesc,
1003  int *mapping_size,
1004  sion_int32 **mapping ) {
1005  int rc=SION_SUCCESS;
1006  int t;
1007  _sion_generic_gendata *sion_gendata;
1008  _sion_generic_apidesc *sion_apidesc;
1009  sion_int32 lpos[2], *receivemap=NULL, iamreceiver, receiver = -1;
1010 
1011 
1012  sion_gendata=sion_filedesc->dataptr;
1013  sion_apidesc=sion_gendata->apidesc;
1014 
1015  *mapping = NULL; *mapping_size = 0;
1016 
1017  if ((sion_filedesc->mode == SION_FILEMODE_WRITE) && (sion_filedesc->nfiles > 1)) {
1018  /* collect mapping to files to task 0 */
1019 
1020  /* mapping data will be collected by master of first physical file */
1021  if((sion_filedesc->filenumber==0) && (sion_filedesc->rank==0)) {
1022  /* allocate data */
1023  *mapping_size=sion_gendata->gsize;
1024  *mapping = (sion_int32 *) malloc(*mapping_size * 2 * sizeof(sion_int32));
1025  if (*mapping == NULL) {
1026  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_generic_parclose: Cannot allocate memory for mapping"));
1027  }
1028  }
1029 
1030  /* gather info about send about global rank of master of first file on grank 0 */
1031  if(sion_gendata->grank==0) {
1032  receivemap = (sion_int32 *) malloc(sion_gendata->gsize * sizeof(sion_int32));
1033  if (receivemap == NULL) {
1034  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_generic_parclose: Cannot allocate memory for receivemap"));
1035  }
1036  }
1037 
1038  if((sion_filedesc->filenumber==0) && (sion_filedesc->rank==0)) iamreceiver=sion_gendata->grank;
1039  else iamreceiver=-1;
1040  sion_apidesc->gatherr_cb(&iamreceiver, receivemap, sion_gendata->comm_data_global, _SION_INT32, 1, 0);
1041  if(sion_gendata->grank==0) {
1042  for(t=0;t<sion_gendata->gsize;t++) {
1043  if(receivemap[t]>=0) {
1044  receiver=receivemap[t];
1045  break;
1046  }
1047  }
1048  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "receiver of mapping grank=%d\n", receiver));
1049  }
1050  sion_apidesc->bcastr_cb(&receiver, sion_gendata->comm_data_global, _SION_INT32, 1, 0);
1051 
1052  /* receive global rank of master of first file on grank 0 */
1053  lpos[0] = sion_filedesc->filenumber;
1054  lpos[1] = sion_filedesc->rank;
1055  sion_apidesc->gatherr_cb(&lpos, *mapping, sion_gendata->comm_data_global, _SION_INT32, 2, receiver);
1056  }
1057 
1058  if(receivemap!=NULL) free(receivemap);
1059 
1060  return(rc);
1061 }
1062 #undef DFUNCTION
1063 
1064 /* END OF _sion_parclose_generic */
long _sion_file_get_opt_blksize(_sion_fileptr *sion_fileptr)
Get optional file system block size for a file.
Definition: sion_file.c:190
sion_int64 _sion_file_get_position(_sion_fileptr *sion_fileptr)
Get new position in file.
Definition: sion_file.c:272
int _sion_buffer_flush(_sion_filedesc *sion_filedesc)
Flush buffer.
Definition: sion_buffer.c:164
int _sion_flush_block(_sion_filedesc *sion_filedesc)
Update the internal data structure.
sion_int64 _sion_file_set_position(_sion_fileptr *sion_fileptr, sion_int64 startpointer)
Set new position in file.
Definition: sion_file.c:239
_sion_filedesc * _sion_alloc_filedesc()
Allocates memory for internal sion structure.
int _sion_reassignvcd(int sid, void *data, int type)
Definition: sion_fd.c:61
_sion_fileptr * _sion_file_open(const char *fname, unsigned int flags, unsigned int addflags)
Create and open a new file for writing.
Definition: sion_file.c:41
Sion File Descriptor Structure.
Definition: sion_filedesc.h:77
int _sion_paropen_generic_one_file(int sid, char *fname, _sion_flags_store *flags_store, char *prefix, int *numFiles, int *filenumber, sion_int64 *chunksize, sion_int32 *fsblksize, int rank, int ntasks, int *globalrank, int flag, FILE **fileptr, _sion_generic_gendata *sion_gendata, _sion_generic_buddy *buddy_data)
Generic parallel open of one direct access file.
int _sion_parclose_generic(int sid, int rank, int ntasks, int mapping_size, sion_int32 *mapping, int flag, _sion_generic_gendata *sion_gendata, _sion_generic_buddy *buddy_data)
Internal function to close parallel opened SION file.
#define SION_FILE_FLAG_WRITE
Definition: sion_file.h:23
int _sion_read_header_var_part(_sion_filedesc *sion_filedesc)
Read the second part of SION Meta Block 1.
int _sion_write_header_var_part_mapping(_sion_filedesc *sion_filedesc, sion_int32 mapping_size, sion_int32 *mapping)
Write mapping into the SION Meta Block 2.
int _sion_file_purge(_sion_fileptr *sion_fileptr)
Purge data to file.
Definition: sion_file.c:328
Definition: sion_flags.h:30
sion_int64 * all_globalranks
int _sion_write_header(_sion_filedesc *sion_filedesc)
Write the SION Meta Block 1.
Definition: sion_metadata.c:36
int _sion_alloc_filedesc_arrays(_sion_filedesc *sion_filedesc)
Allocate memory for the internal sion arrays.
int _sion_free_filedesc_arrays(_sion_filedesc *sion_filedesc)
free memory for the internal sion arrays
#define SION_FILE_FLAG_READ
Definition: sion_file.h:24
int _sion_realloc_filedesc_blocklist(_sion_filedesc *sion_filedesc, sion_int32 maxchunks)
Increase the memory used by the internal sion structure for the blocklist.
int _sion_vcdtype(int sid)
Definition: sion_fd.c:56
sion_int32 * all_coll_collector
sion_int32 fileptr_exported
sion_int32 _sion_get_endianness_with_flags(sion_int64 flags)
Return endianness including possible choice via flags.
#define SION_FILE_FLAG_POSIX
Definition: sion_file.h:21
int _sion_parreinit_generic(int sid, sion_int64 chunksize, int rank, int ntasks, _sion_generic_gendata *sion_gendata)
change chunksize for an already opened SION file (write)
int _sion_write_header_var_part_nextblocksizes_from_field(_sion_filedesc *sion_filedesc, int field_size, sion_int64 *field)
Write the next set of blocksizes from Meta Block 2 Assuming that filepointer is at the correct positi...
#define MAXCHUNKS
Definition: sion_common.h:35
sion_int32 * all_coll_capability
void * _sion_vcdtovcon(int sid)
Definition: sion_fd.c:51
#define SION_FILE_FLAG_ANSI
Definition: sion_file.h:19
#define SION_FILE_FLAG_CREATE
Definition: sion_file.h:22
#define SION_FILEMODE_READ
Definition: sion_filedesc.h:33
sion_int64 * all_startpointers
int _sion_read_header_var_part_blockcount_to_field(_sion_filedesc *sion_filedesc, int field_size, sion_int64 *field)
Read the block sizes from Meta Block 2.
int _sion_file_close(_sion_fileptr *sion_fileptr)
Close file and destroys fileptr structure.
Definition: sion_file.c:118
#define SION_FILESTATE_PAROPEN
Definition: sion_filedesc.h:24
int _sion_write_header_var_info(_sion_filedesc *sion_filedesc)
Write the SION Meta Block 1.
int _sion_free_filedesc_coll_arrays(_sion_filedesc *sion_filedesc)
free memory for the internal sion arrays
int _sion_read_header_fix_part(_sion_filedesc *sion_filedesc)
Read part of the SION Meta Block 1.
int _sion_buffer_check_env(_sion_filedesc *sion_filedesc)
Checks if environment variables are set to use buffer.
Definition: sion_buffer.c:57
sion_int32 coll_capability
sion_int64 * blocksizes
Definition: sion_filedesc.h:98
int _sion_write_header_var_part_blockcount_from_field(_sion_filedesc *sion_filedesc, int field_size, sion_int64 *field)
Write the block sizes from Meta Block 2.
sion_int32 currentblocknr
Definition: sion_filedesc.h:95
int _sion_cache_check_env(_sion_filedesc *sion_filedesc)
Check if environment variables are set to use cache.
Definition: sion_cache.c:60
#define SION_FILEDESCRIPTOR
Definition: sion_fd.h:17
sion_int64 * all_chunksizes
int _sion_print_filedesc(_sion_filedesc *sion_filedesc, int level, char *desc, int flag)
Print the initialized sion file description.
#define SION_FILESTATE_CLOSE
Definition: sion_filedesc.h:31
int _sion_alloc_filedesc_coll_arrays(_sion_filedesc *sion_filedesc)
Allocate memory for the internal sion arrays.
int _sion_init_filedesc(_sion_filedesc *sion_filedesc)
Initialize the sion file description.
Definition: sion_filedesc.c:35
#define SION_FILEMODE_WRITE
Definition: sion_filedesc.h:34
int _sion_read_header_var_part_nextblocksizes_to_field(_sion_filedesc *sion_filedesc, int field_size, sion_int64 *field)
Read the next set of blocksizes from Meta Block 2 Assuming that filepointer is at the correct positio...
#define DFUNCTION
collect mapping information on rank 0 of first file, mapping=NULL for all others
Sion Time Stamp Header.
sion_int32 * all_coll_collsize
sion_int64 start_of_varheader
int _sion_file_flush(_sion_fileptr *sion_fileptr)
Flush data to file.
Definition: sion_file.c:304
_sion_fileptr * fileptr
Definition: sion_filedesc.h:80