SIONlib  1.7.5
Scalable I/O library for parallel access to task-local files
sion_generic_buddy.c
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2019 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 /*
10  *
11  * \brief Internal Functions(parallel)
12  */
13 
14 #define _XOPEN_SOURCE 700
15 
16 #include <stdlib.h>
17 #include <stdio.h>
18 #include <string.h>
19 #include <unistd.h>
20 
21 #include "sion.h"
22 #include "sion_debug.h"
23 #include "sion_error_handler.h"
24 #include "sion_file.h"
25 #include "sion_filedesc.h"
26 #include "sion_fd.h"
27 #include "sion_metadata.h"
28 #include "sion_internal.h"
29 #include "sion_printts.h"
30 #include "sion_flags.h"
31 
32 #include "sion_buffer.h"
33 
34 #include "sion_filedesc.h"
35 #include "sion_keyvalue.h"
36 
37 #include "sion_generic_internal.h"
39 #include "sion_generic_buddy.h"
40 
41 
63 #define DFUNCTION "_sion_paropen_generic_buddy"
64 int _sion_paropen_generic_buddy(
65  int sid,
66  const char *fname,
67  _sion_flags_store *flags_store,
68  char *prefix,
69  int *numFiles,
70  int *filenumber,
71  sion_int64 *chunksize,
72  sion_int32 *fsblksize,
73  int rank,
74  int ntasks,
75  int *globalrank,
76  FILE **fileptr,
77  _sion_generic_gendata *sion_gendata )
78 {
79  int rc = SION_SUCCESS;
80  int b, capability, pass, buddylevel;
81  sion_int32 file_globalrank;
82  sion_int64 file_chunksize;
83  char *buddy_fn, *nfname=NULL;
84  _sion_filedesc *sion_filedesc;
85  _sion_generic_buddy *buddies, *buddyptr;
86  _sion_generic_apidesc *sion_apidesc;
87 
88  buddylevel = atoi(_sion_flags_get(flags_store,"buddy")->val);
89  if (buddylevel==0) buddylevel=1; /* default */
90 
91  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "allocate memory for %d buddy levels\n", buddylevel));
92 
93  buddies = (_sion_generic_buddy *) malloc(buddylevel * sizeof(_sion_generic_buddy));
94  if (buddies == NULL) {
95  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"cannot allocate buddies structure of size %lu (_sion_generic_buddy), aborting ...\n",
96  (unsigned long) sizeof(_sion_generic_buddy)));
97  }
98  buddies->numgroups=0;
99 
100  buddy_fn = calloc(SION_FILENAME_LENGTH+10,1);
101  if (buddy_fn == NULL) {
102  free(buddies);
103  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"_sion_paropen_generic_buddy: cannot allocate temporary memory of size %lu (buddy_fn), aborting ...\n",
104  (unsigned long) SION_FILENAME_LENGTH+10));
105  }
106 
107  /* get pointer to internal datastructures */
108  sion_apidesc=sion_gendata->apidesc;
109 
110  if (flags_store->mask&_SION_FMODE_WRITE) {
111 
112  /* STEP1: WRITE open normal file */
113  nfname=(sion_apidesc->get_multi_filename_cb?sion_apidesc->get_multi_filename_cb:_sion_get_multi_filename)
114  (fname,sion_gendata->filenumber);
115  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "call parallel open of %d files (current name %s)\n",
116  sion_gendata->numfiles, nfname));
117  rc=_sion_paropen_generic_one_file(sid, nfname, flags_store, prefix,
118  &sion_gendata->numfiles, &sion_gendata->filenumber,
119  chunksize, fsblksize,
120  sion_gendata->lrank, sion_gendata->lsize, globalrank,
121  _SION_INTERNAL_FLAG_BUDDY_NORMAL, fileptr, sion_gendata, NULL);
122  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "leave parallel open of %d files in #tasks=%d sid=%d globalrank=%d\n", sion_gendata->numfiles,
123  sion_gendata->lsize, sid, sion_gendata->grank));
124  free(nfname);
125 
126  /* test sid and get internal data structure */
127  if ((rc<0) || (sid<0) || (_sion_vcdtype(sid) != SION_FILEDESCRIPTOR) || !(sion_filedesc = _sion_vcdtovcon(sid))) {
128  free(buddies);
129  free(buddy_fn);
130  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"_sion_paropen_generic_buddy: invalid sion_filedesc %d", sid));
131  }
132 
133  /* store additional data */
134  sion_filedesc->buddies=buddies;
135 
136  capability=sion_apidesc->get_capability_cb(sion_gendata->comm_data_global);
137 
138  /* open buddy files */
139  for(b=0;b<sion_filedesc->buddylevel;b++) {
140  buddyptr=&buddies[b];
141 
142  /* create communicators */
143  buddyptr->buddy_send.commgroup=NULL; buddyptr->buddy_coll.commgroup=NULL;
144  rc=_sion_buddy_map(sion_gendata,capability,b+1,&buddyptr->buddy_send,&buddyptr->buddy_coll);
145 
146  /* file name */
147  sprintf(buddy_fn,"%s_BUDDY_%02d", fname,b);
148 
149  /* open files */
150  for(pass=1;pass<=_SION_BW_SCHED_NUM_PASSES;pass++) {
151 
152  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "BUDDYO pass=%d [grank=%2d] op=%d\n",pass,*globalrank,
153  _sion_buddy_bwsched(buddyptr->buddy_send.groupnum, sion_gendata->numfiles, pass)));
154 
155 
156  switch (_sion_buddy_bwsched(buddyptr->buddy_send.groupnum, sion_gendata->numfiles, pass)) {
157  case _SION_BW_SCHED_ACTIONA:
158  /* parameter */
159  file_globalrank=-1*(*globalrank+1);
160  file_chunksize=0;
161  nfname=(sion_apidesc->get_multi_filename_cb?sion_apidesc->get_multi_filename_cb:_sion_get_multi_filename)
162  (buddy_fn,buddyptr->buddy_coll.filenum);
163  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "BUDDYO call open_one_file COLL [gendata: b=%d grank=%2d, group=%d of %d, lrank=%d of %d] fn=%s (%d)\n",
164  b,*globalrank, buddyptr->buddy_coll.groupnum,sion_gendata->numfiles,
165  buddyptr->buddy_coll.rank, buddyptr->buddy_coll.size,nfname,buddyptr->buddy_coll.filenum));
166  buddyptr->buddy_coll.sid = _sion_newvcd(NULL,SION_FILEDESCRIPTOR);
167  rc=_sion_paropen_generic_one_file(buddyptr->buddy_coll.sid, nfname, flags_store, prefix,
168  &sion_gendata->numfiles, &buddyptr->buddy_coll.filenum,
169  &file_chunksize, fsblksize,
170  buddyptr->buddy_coll.rank,buddyptr->buddy_coll.size, &file_globalrank,
171  _SION_INTERNAL_FLAG_BUDDY_COLL, NULL, sion_gendata,
172  buddyptr);
173  free(nfname);
174  break;
175  case _SION_BW_SCHED_ACTIONB:
176  /* parameter */
177  file_globalrank=*globalrank;
178  file_chunksize=*chunksize;
179  nfname=(sion_apidesc->get_multi_filename_cb?sion_apidesc->get_multi_filename_cb:_sion_get_multi_filename)
180  (buddy_fn,buddyptr->buddy_send.filenum);
181  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "BUDDYO call open_one_file SEND [gendata: b=%d grank=%2d, group=%d of %d, lrank=%d of %d] fn=%s \n",
182  b,*globalrank, buddyptr->buddy_send.groupnum,sion_gendata->numfiles,
183  buddyptr->buddy_send.rank, buddyptr->buddy_send.size,nfname));
184  buddyptr->buddy_send.sid = _sion_newvcd(NULL,SION_FILEDESCRIPTOR);
185  rc=_sion_paropen_generic_one_file(buddyptr->buddy_send.sid, nfname, flags_store, prefix,
186  &sion_gendata->numfiles, &buddyptr->buddy_send.filenum,
187  &file_chunksize, fsblksize,
188  buddyptr->buddy_send.rank,buddyptr->buddy_send.size, &file_globalrank,
189  _SION_INTERNAL_FLAG_BUDDY_SEND, NULL, sion_gendata,
190  buddyptr);
191  free(nfname);
192  break;
193  default:
194  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "BUDDYO no-op [grank=%2d]\n",*globalrank));
195  break;
196  } /* switch */
197  } /* for pass */
198  } /* for buddies */
199 
200  } else if (flags_store->mask&_SION_FMODE_READ) {
201  int filefound, lgroup=*filenumber, root=0, task, tmpsize, group, masterfile_found;
202  sion_int32 filenumber, numfiles, lrank, lsize, file, file_lgroup, myrole, steprank;
203  sion_int32 helpint32=0, numsteps=0, numgroups=0, numreaderinthisfile;
204  sion_int32 *sion_tmpintfield1 = NULL; /* allocated on rank 0 */
205  /* sion_int32 *sion_tmpintfield2 = NULL; */ /* allocated on lranl-0 that has access to master file */
206 
207  int *stepvectorlist[MAXREADSTEPS]; /* to be changed in an dynamic list */
208  int *stepvector, readercount=0, readyflag=0, datafound=0;
209  int stepcount_coll,stepcount_collreader,stepcount_reader,stepcount_reader_rank,stepcount_noreader,stepcount_noreader_rank, groupcounter;
210  int step,fit,newrank;
211  _sion_generic_buddy_info *buddy_info;
212  sion_int32 helpint7[7];
213 
214  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "enter buddy open in read mode\n"));
215 
216  if(sion_gendata->grank==0) {
217  tmpsize=1*sion_gendata->gsize;
218  sion_tmpintfield1 = (sion_int32 *) malloc(tmpsize * sizeof(sion_int32));
219  if (sion_tmpintfield1 == NULL) {
220  free(buddies);
221  free(buddy_fn);
222  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"_sion_paropen_generic_buddy: cannot allocate temporary memory of size %lu (sion_tmpintfield), aborting ...\n",
223  (unsigned long) tmpsize * sizeof(sion_int32)));
224  }
225  }
226 
227  /* Loop1: over all file sets (normal, buddy1, buddy2) until all have found their data chunk */
228  groupcounter=0;
229  for(b=-1; ( (b<buddylevel) && (!readyflag));b++) {
230 
231  /* determine filename and check for existence of master file on each lrank=0 */
232  if(b==-1) sprintf(buddy_fn,"%s", fname);
233  else sprintf(buddy_fn,"%s_BUDDY_%02d", fname,b);
234  nfname=(sion_apidesc->get_multi_filename_cb?sion_apidesc->get_multi_filename_cb:_sion_get_multi_filename)(buddy_fn,0);
235  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "BUDDY[%2d] start checking buddy level (file=%s)\n",b,nfname));
236 
237  unsigned int apiflag;
238  if (flags_store->mask&_SION_FMODE_POSIX) apiflag=SION_FILE_FLAG_POSIX;
239  else if (flags_store->mask&_SION_FMODE_SIONFWD) apiflag=SION_FILE_FLAG_SIONFWD;
240  else apiflag=SION_FILE_FLAG_ANSI;
241 
242  filefound=0;
243  if(sion_gendata->lrank==0) {
244  if(_sion_file_stat_file2(nfname, apiflag)) {
245  filefound=1;
246  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "check for existence of file %s --> file found\n",nfname));
247  }
248  }
249 
250  /* distribute info about master file to all others (root=<tasknum> or -1 if file not found) */
251  helpint32=filefound;
252  sion_gendata->apidesc->gatherr_cb(&helpint32, sion_tmpintfield1, sion_gendata->comm_data_global, _SION_INT32, 1, 0);
253  if (sion_gendata->grank == 0) {
254  root=-1;
255  if (sion_tmpintfield1 == NULL) {
256  free(buddies);
257  free(buddy_fn);
258  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_ABORT, DFUNCTION ": gatherr_cb returned sion_tmpintfield1 == NULL"));
259  }
260  for(task=0;task<sion_gendata->gsize;task++) {
261  if(sion_tmpintfield1[task]==1) root=task;
262  }
263  helpint32=root;
264  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "after scan master: root for this file is %d #tasks=%d\n",root,sion_gendata->gsize));
265  }
266  sion_gendata->apidesc->bcastr_cb(&helpint32, sion_gendata->comm_data_global, _SION_INT32, 1, 0); root=helpint32;
267  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "root for this file is %d\n",root));
268 
269  /* get mapping from file and distribute it */
270  if(root!=-1) {
271  _sion_generic_buddy_get_and_distribute_info_from_file(sion_gendata,nfname,root, &filenumber, &numfiles, &lrank, &lsize);
272  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "BUDDY[%d] after read mapping file=%d of %d, lrank=%d of %d\n",b,filenumber, numfiles, lrank, lsize));
273  masterfile_found=1;
274  } else {
275  filenumber=lrank=lsize=-1;
276  masterfile_found=0;
277  }
278  free(nfname);
279 
280  /* numfiles will be defined from from input parameter, other info will be defined on-the-fly */
281  numfiles=*numFiles;
282  DPRINTFP((32, DFUNCTION, sion_gendata->grank, "Info avail: b=%d numfiles=%d root=%d lgroup=%d\n",b,numfiles,root,lgroup));
283 
284  /* Loop2: for each physical file of this file set */
285  for(file=0;( (file<numfiles) && (!readyflag) );file++) {
286 
287  /* check physical file */
288  nfname=(sion_apidesc->get_multi_filename_cb?sion_apidesc->get_multi_filename_cb:_sion_get_multi_filename)(buddy_fn,file);
289  filefound=-1;
290  if(sion_gendata->lrank==0) {
291  if(_sion_file_stat_file2(nfname, apiflag)) {
292  filefound=lgroup;
293  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "check for existence of file %s --> file found\n",nfname));
294  }
295  }
296 
297  /* gather and distribute info (root and lgroup) about file to all others (root=<tasknum> or -1 if file not found) */
298  helpint32=filefound;
299  sion_gendata->apidesc->gatherr_cb(&helpint32, sion_tmpintfield1, sion_gendata->comm_data_global, _SION_INT32, 1, 0);
300  if (sion_gendata->grank == 0) {
301  root=-1;
302  for(task=0;task<sion_gendata->gsize;task++) {
303  if(sion_tmpintfield1[task]>-1) {
304  root=task;
305  file_lgroup=sion_tmpintfield1[task];
306  }
307  }
308  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "after scan one file: root for this file is %d file_lgroup=%d\n",root,file_lgroup));
309  }
310  if (sion_gendata->grank == 0) helpint32=root;
311  sion_gendata->apidesc->bcastr_cb(&helpint32, sion_gendata->comm_data_global, _SION_INT32, 1, 0); root=helpint32;
312  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "root for this file is %d\n",root));
313 
314  if(root>=0) {
315  /* file found */
316 
317  if(!masterfile_found) {
318  /* get avail info from file: lrank/lsize for local tasks */
319  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "BUDDY[%d] befre read globalranks file=%d of %d, lrank=%d of %d\n",
320  b, filenumber, numfiles, lrank, lsize));
321  _sion_generic_buddy_get_and_distribute_info_from_one_file(sion_gendata, nfname, root, &filenumber, &numfiles, &lrank, &lsize);
322  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "BUDDY[%d] after read globalranks file=%d of %d, lrank=%d of %d\n",
323  b, filenumber, numfiles, lrank, lsize));
324  }
325 
326  /* bcast lgroup */
327  if (sion_gendata->grank == 0) helpint32=file_lgroup;
328  sion_gendata->apidesc->bcastr_cb(&helpint32, sion_gendata->comm_data_global, _SION_INT32, 1, 0); file_lgroup=helpint32;
329  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "file_lgroup for this file is %d\n",file_lgroup));
330 
331  /* determine my role for this file */
332  myrole=SION_ROLE_NONE; /* not acting on that file */
333  if((filenumber==file) && (!datafound)) {
334  datafound=1; /* fantastic ... */
335  if(sion_gendata->grank==root) myrole=SION_ROLE_COLLECTOR_READER; /* collector and reader */
336  else myrole=SION_ROLE_READER; /* reader */
337  } else {
338  if(sion_gendata->grank==root) myrole=SION_ROLE_COLLECTOR; /* collector */
339  else if(file_lgroup==lgroup) myrole=SION_ROLE_NOREADER; /* task on the same lgroup but not collector */
340  }
341 
342  /* collect on rank 0 info about each task acting on this file */
343  helpint32=myrole;
344  sion_gendata->apidesc->gatherr_cb(&helpint32, sion_tmpintfield1, sion_gendata->comm_data_global, _SION_INT32, 1, 0);
345 
346  /* check if at least one tasks needs data */
347  if(sion_gendata->grank == 0) {
348  numreaderinthisfile=0;
349  if (sion_tmpintfield1 == NULL) {
350  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_ABORT, DFUNCTION ": gatherr_cb returned sion_tmpintfield1 == NULL"));
351  }
352  for(task=0; ( (task<sion_gendata->gsize) ) ;task++)
353  if ( (sion_tmpintfield1[task]==SION_ROLE_COLLECTOR_READER) || (sion_tmpintfield1[task]==SION_ROLE_READER) ) numreaderinthisfile++;
354  }
355  sion_gendata->apidesc->bcastr_cb(&numreaderinthisfile, sion_gendata->comm_data_global, _SION_INT32, 1, 0);
356 
357  if (numreaderinthisfile>0) {
358 
359  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "myrole for file %s is %s\n",nfname,_sion_buddy_role_to_str(myrole)));
360 
361  /* check for step in which the group can be created */
362  if (sion_gendata->grank == 0) {
363  /* check in existing steps if group can created there */
364  fit=0;
365  for(step=0; ( (step<numsteps) && (fit==0) ) ;step++) {
366  stepvector=stepvectorlist[step];
367  fit=1;
368  for(task=0; ( (task<sion_gendata->gsize) && (fit==1) ) ;task++) {
369  if(sion_tmpintfield1[task]!=SION_ROLE_NONE) {
370  if(stepvector[task]!=SION_ROLE_NONE) fit=0;
371  }
372  }
373  }
374  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "BUDDY STEP assign: after scanning %d steps fit=%d\n",numsteps,fit));
375  if(fit==0) {
376  /* no space in available steps, create a new one */
377  stepvector = (int *) malloc(sion_gendata->gsize * sizeof(int));
378  if (stepvector == NULL) {
379  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"_sion_paropen_generic_buddy: cannot allocate temporary memory of size %lu (stepvector), aborting ...\n",
380  (unsigned long) sion_gendata->gsize * sizeof(int)));
381  }
382  for(task=0; (task<sion_gendata->gsize);task++) stepvector[task]=SION_ROLE_NONE;
383  numsteps++;step++;
384  stepvectorlist[numsteps-1]=stepvector;
385  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "BUDDY STEP assign: allocating new step %d\n",numsteps-1));
386  }
387  /* store roles in stepvector and analyse group */
388  stepcount_coll=stepcount_collreader=stepcount_reader=stepcount_noreader=0;
389  for(task=0; (task<sion_gendata->gsize);task++) {
390  if(sion_tmpintfield1[task]!=SION_ROLE_NONE) {
391  stepvector[task]=sion_tmpintfield1[task];
392  if (sion_tmpintfield1[task]==SION_ROLE_COLLECTOR) stepcount_coll++;
393  if (sion_tmpintfield1[task]==SION_ROLE_READER) {readercount++;stepcount_reader++;}
394  if (sion_tmpintfield1[task]==SION_ROLE_COLLECTOR_READER) {readercount++;stepcount_collreader++;}
395  if (sion_tmpintfield1[task]==SION_ROLE_NOREADER) stepcount_noreader++;
396  }
397  }
398  /* DPRINTFP((1, DFUNCTION, sion_gendata->grank, "BUDDY STEP assign: stepcount_coll=%d stepcount_collreader=%d stepcount_reader=%d stepcount_noreader=%d\n", */
399  /* stepcount_coll,stepcount_collreader, stepcount_reader, stepcount_noreader)); */
400  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "BUDDY STEP assign: group assigned to step %d (tasks with no data: %d)\n",numsteps-1,sion_gendata->gsize-readercount));
401  if(sion_gendata->gsize-readercount==0) readyflag=1;
402 
403  /* calculate ranks */
404  stepcount_noreader_rank = stepcount_coll;
405  stepcount_reader_rank = stepcount_noreader_rank+stepcount_noreader;
406  for(task=0; (task<sion_gendata->gsize);task++) {
407  /*DPRINTFP((1, DFUNCTION, sion_gendata->grank, "BUDDY STEP assign1: task %d stepcount_reader_rank=%d stepcount_noreader_rank=%d %s\n",
408  task, stepcount_reader_rank, stepcount_noreader_rank,_sion_buddy_role_to_str(sion_tmpintfield1[task])));*/
409  if(sion_tmpintfield1[task]==SION_ROLE_NONE) newrank=-1;
410  if(sion_tmpintfield1[task]==SION_ROLE_COLLECTOR) newrank=0;
411  if(sion_tmpintfield1[task]==SION_ROLE_COLLECTOR_READER) newrank=stepcount_reader_rank++;
412  if(sion_tmpintfield1[task]==SION_ROLE_NOREADER) newrank=stepcount_noreader_rank++;
413  if(sion_tmpintfield1[task]==SION_ROLE_READER) newrank=stepcount_reader_rank++;
414  sion_tmpintfield1[task]=newrank;
415  /* DPRINTFP((1, DFUNCTION, sion_gendata->grank, "BUDDY STEP assign2: rank %d to task %d stepcount_reader_rank=%d stepcount_noreader_rank=%d\n",
416  sion_tmpintfield1[task],task, stepcount_reader_rank, stepcount_noreader_rank));*/
417  }
418  } /* if(rank==0) --> stepvector handling */
419 
420  /* scatter rank info to tasks */
421  sion_gendata->apidesc->scatterr_cb(sion_tmpintfield1, &steprank, sion_gendata->comm_data_global, _SION_INT32, 1, 0);
422 
423 
424  /* bcast info ), ready flag, size, collsize, to_index */
425  if (sion_gendata->grank == 0) {
426 
427  helpint7[0]=step-1; /* step number */
428  helpint7[1]=readyflag; /* readyflag */
429  helpint7[2]=stepcount_coll+stepcount_collreader+stepcount_reader+stepcount_noreader; /* size */
430  helpint7[3]=stepcount_collreader+stepcount_reader; /* collsize */
431  helpint7[4]=stepcount_coll+stepcount_noreader; /* from_index */
432  helpint7[5]=stepcount_coll+stepcount_noreader+stepcount_collreader+stepcount_reader-1; /* to_index */
433  helpint7[6]=groupcounter++; /* unique Id of group */
434  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "define group #%d (step=%d, readyflag=%d, size=%d, collsize=%d, from=%d, to=%d) \n",
435  helpint7[6], helpint7[0], helpint7[1], helpint7[2], helpint7[3], helpint7[4], helpint7[5]));
436  }
437  sion_gendata->apidesc->bcastr_cb(helpint7, sion_gendata->comm_data_global, _SION_INT32, 7, 0);
438  readyflag=helpint7[1];
439 
440  /* create new data structure for storing info about group if task is acting in this group */
441  if(myrole!=SION_ROLE_NONE) {
442  buddy_info = (_sion_generic_buddy_info *) malloc(sizeof(_sion_generic_buddy_info));
443  if (buddy_info == NULL) {
444  free(buddies);
445  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"_sion_paropen_generic_buddy: cannot allocate temporary memory of size %lu (buddy_info), aborting ...\n",
446  (unsigned long) sizeof(_sion_generic_buddy_info)));
447  }
448  buddies->groups[buddies->numgroups]=buddy_info; buddies->numgroups++;
449  buddy_info->groupid=helpint7[6];
450  buddy_info->stepnum=helpint7[0];
451  buddy_info->rank=steprank;
452  buddy_info->size=helpint7[2];
453  buddy_info->commgroup=NULL; /* will be created later */
454  buddy_info->collsize=helpint7[3];
455  buddy_info->from_index=helpint7[4];
456  buddy_info->to_index=helpint7[5];
457  buddy_info->myrole=myrole;
458  buddy_info->bnum=b;
459  if( (myrole==SION_ROLE_READER) || (myrole==SION_ROLE_COLLECTOR_READER)) {
460  buddy_info->filelrank=lrank;
461  } else {
462  buddy_info->filelrank=-1;
463  }
464  buddy_info->filenum=file;
465  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "create group #%d (step=%d, rank=%d, size=%d, collsize=%d, from=%2d, to=%2d) (bnum=%2d file# %2d) with role %s \n",
466  buddy_info->groupid, buddy_info->stepnum, buddy_info->rank, buddy_info->size, buddy_info->collsize,
467  buddy_info->from_index, buddy_info->to_index, buddy_info->bnum, buddy_info->filenum, _sion_buddy_role_to_str(buddy_info->myrole)));
468 
469  } /* create group */
470  } /* (numreaderinthisfile>0) */
471  } /* if (file found on one of the lgroups) */
472  } /* end of loop over physical files of file set */
473  } /* end of loop over file sets (buddies) */
474 
475  if(!readyflag) {
476  /* some tasks could not find their data */
477  free(buddies);
478  free(buddy_fn);
479  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"_sion_paropen_generic_buddy: cannot open file, not all files available"));
480  }
481 
482  /* bcast max. number of steps */
483  sion_gendata->apidesc->bcastr_cb(&numsteps, sion_gendata->comm_data_global, _SION_INT32, 1, 0);
484  buddies->numsteps=numsteps;
485 
486  /* bcast max. number of groups */
487  if (sion_gendata->grank == 0) numgroups=groupcounter;
488  sion_gendata->apidesc->bcastr_cb(&numgroups, sion_gendata->comm_data_global, _SION_INT32, 1, 0);
489 
490  /* build commgroups */
491  for(step=0;step<numsteps;step++) {
492  /* search for group with this step */
493  group=-1;
494  for(b=0; ( (b<buddies->numgroups) && (group==-1) ); b++ ) {
495 
496  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "check group #%d of %d in stepnum=%d step=%d\n",b, buddies->numgroups, buddies->groups[b]->stepnum,step));
497  if (buddies->groups[b]->stepnum==step) group=b;
498  }
499 
500  if(group!=-1) {
501  /* create group for step */
502  buddy_info=buddies->groups[group];
503  rc=sion_gendata->apidesc->create_lcg_cb(&buddy_info->commgroup,sion_gendata->comm_data_global,
504  sion_gendata->grank,sion_gendata->gsize,
505  buddy_info->rank,buddy_info->size,
506  buddy_info->groupid,numgroups);
507  DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, "COMM_created active [grank=%2d of %2d, group=%2d of %2d, lrank=%2d of %2d]\n",
508  sion_gendata->grank,sion_gendata->gsize, buddy_info->groupid,numgroups, buddy_info->rank,buddy_info->size));
509  } else {
510  /* nothing to do in this step, create dummy comm */
511  void *dummycommgroup=NULL;
512  rc=sion_gendata->apidesc->create_lcg_cb(&dummycommgroup,sion_gendata->comm_data_global,
513  sion_gendata->grank,sion_gendata->gsize,
514  0,1,
515  -1,numgroups);
516  DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, "COMM_created dummy [grank=%2d of %2d, group=%2d of %2d, lrank=%2d of %2d]\n",
517  sion_gendata->grank,sion_gendata->gsize,
518  -1,numgroups,
519  0,1 ));
520  }
521 
522 
523  if(group!=-1) {
524  /* open file */
525  buddy_info=buddies->groups[group];
526  buddies->currentgroup=group;
527 
528  if(buddy_info->bnum==-1) sprintf(buddy_fn,"%s", fname);
529  else sprintf(buddy_fn,"%s_BUDDY_%02d", fname,buddy_info->bnum);
530  nfname=(sion_apidesc->get_multi_filename_cb?sion_apidesc->get_multi_filename_cb:_sion_get_multi_filename)(buddy_fn,buddy_info->filenum);
531 
532  DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, "open file for group #%d %s [step=%d grank=%2d of %2d, lrank=%2d of %2d filelrank=%2d]\n",
533  buddy_info->groupid,nfname,buddy_info->stepnum,
534  sion_gendata->grank,sion_gendata->gsize, buddy_info->rank,buddy_info->size,buddy_info->filelrank));
535 
536  /* test if tsk is a reader, a task is only in one step a reader */
537  if( (buddy_info->myrole == SION_ROLE_READER)
538  || (buddy_info->myrole == SION_ROLE_COLLECTOR_READER)) {
539  buddy_info->sid=sid; /* use already defined sid, this will be the master data structure */
540  rc=_sion_paropen_generic_one_file(buddy_info->sid, nfname, flags_store, prefix,
541  &sion_gendata->numfiles, &buddy_info->filenum,
542  chunksize, fsblksize,
543  buddy_info->rank, buddy_info->size, &sion_gendata->grank,
544  _SION_INTERNAL_FLAG_BUDDY_READ, NULL, sion_gendata,
545  buddies);
546 
547  /* store internal buddy data structure */
548  /* test sid and get internal data structure */
549  if ((rc<0) || (sid<0) || (_sion_vcdtype(sid) != SION_FILEDESCRIPTOR) || !(sion_filedesc = _sion_vcdtovcon(sid))) {
550  free(buddy_fn);
551  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"_sion_paropen_generic_buddy: invalid sion_filedesc %d", sid));
552  }
553 
554  /* store additional data */
555  sion_filedesc->buddies=buddies;
556 
557  } else {
558  sion_int64 dummy_chunksize;
559  sion_int32 dummy_fsblksize;
560  buddy_info->sid = _sion_newvcd(NULL,SION_FILEDESCRIPTOR); /* create a new sid */
561  rc=_sion_paropen_generic_one_file(buddy_info->sid, nfname, flags_store, prefix,
562  &sion_gendata->numfiles, &buddy_info->filenum,
563  &dummy_chunksize, &dummy_fsblksize,
564  buddy_info->rank, buddy_info->size, &sion_gendata->grank,
565  _SION_INTERNAL_FLAG_BUDDY_READ, NULL, sion_gendata,
566  buddies);
567 
568  }
569 
570  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "BUDDYO afteropen [grank=%2d step=%d] rc=%d\n",sion_gendata->grank,step,rc));
571  free(nfname);
572 
573  } else {
574  /* nothing to do in this step */
575  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "BUDDYO no-op [grank=%2d]\n",sion_gendata->grank));
576  }
577 
578  /* barrier between steps */
579  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "BUDDYO beforegbarrier [grank=%2d]\n",sion_gendata->grank));
580  sion_apidesc->barrier_cb(sion_gendata->comm_data_global);
581  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "BUDDYO aftergbarrier [grank=%2d]\n",sion_gendata->grank));
582 
583 
584  } /* for steps */
585 
586 
587  } else {
588  free(buddies);
589  free(buddy_fn);
590  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"_sion_paropen_generic_buddy: unknown file mode"));
591  }
592  /* 'buddies' is still used later and should not be freed here. */
593  /* free(buddies); */
594  free(buddy_fn);
595  sion_apidesc->barrier_cb(sion_gendata->comm_data_global);
596 
597 
598  return(rc);
599 }
600 #undef DFUNCTION
601 
602 
614 #define DFUNCTION "_sion_parclose_generic_buddy"
615 int _sion_parclose_generic_buddy(int sid,
616  int rank,
617  int ntasks,
618  int mapping_size,
619  sion_int32 *mapping,
620  _sion_generic_gendata *sion_gendata) {
621  int rc=SION_SUCCESS;
622  _sion_filedesc *sion_filedesc;
623  /* _sion_generic_apidesc *sion_apidesc; */
624  _sion_generic_buddy *buddies, *buddyptr;
625  int b_mapping_size=0,group,step;
626  sion_int32 *b_mapping=NULL;
627  _sion_generic_buddy_info *buddy_info;
628 
629  int b, pass;
630 
631  /* obtain pointers to internal data structures */
632  if ( (sid<0) || (_sion_vcdtype(sid) != SION_FILEDESCRIPTOR) || !(sion_filedesc = _sion_vcdtovcon(sid))) {
633  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"_sion_parclose_generic_buddy: invalid sion_filedesc %d", sid));
634  }
635  /* sion_apidesc=sion_gendata->apidesc; */
636  buddies=sion_filedesc->buddies;
637 
638 
639  if (sion_filedesc->mode == SION_FILEMODE_WRITE) {
640 
641  /* close buddy files */
642  for(b=0;b<sion_filedesc->buddylevel;b++) {
643  buddyptr=&buddies[b];
644 
645 
646  /* collect mapping */
647  _sion_generic_collect_mapping_buddy(buddyptr, sion_gendata, &b_mapping_size, &b_mapping);
648 
649 
650  /* close files */
651  for(pass=1;pass<=_SION_BW_SCHED_NUM_PASSES;pass++) {
652 
653  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "BUDDYC pass=%d [grank=%2d] op=%d\n",pass,sion_gendata->grank,
654  _sion_buddy_bwsched(buddyptr->buddy_send.groupnum, sion_gendata->numfiles, pass)));
655 
656  switch (_sion_buddy_bwsched(buddyptr->buddy_send.groupnum, sion_gendata->numfiles, pass)) {
657  case _SION_BW_SCHED_ACTIONA:
658  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "BUDDYc call parallel close COLL [gendata: grank=%2d, group=%d of %d, lrank=%d of %d]\n",
659  sion_gendata->grank, buddyptr->buddy_coll.groupnum,sion_gendata->numfiles,
660  buddyptr->buddy_coll.rank, buddyptr->buddy_coll.size));
661  _sion_parclose_generic(buddyptr->buddy_coll.sid, buddyptr->buddy_coll.rank, buddyptr->buddy_coll.size,
662  b_mapping_size, b_mapping, _SION_INTERNAL_FLAG_BUDDY_COLL, sion_gendata, buddyptr );
663  break;
664  case _SION_BW_SCHED_ACTIONB:
665  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "BUDDYc call parallel close SEND [gendata: grank=%2d, group=%d of %d, lrank=%d of %d]\n",
666  sion_gendata->grank, buddyptr->buddy_send.groupnum,sion_gendata->numfiles,
667  buddyptr->buddy_send.rank, buddyptr->buddy_send.size));
668  _sion_parclose_generic(buddyptr->buddy_send.sid, buddyptr->buddy_send.rank, buddyptr->buddy_send.size,
669  0, NULL, _SION_INTERNAL_FLAG_BUDDY_SEND, sion_gendata, buddyptr );
670  break;
671  default:
672  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "BUDDYC no-op [grank=%2d]\n",sion_gendata->grank));
673  break;
674  } /* switch */
675  } /* for pass */
676  } /* for buddies */
677 
678 
679  _SION_SAFE_FREE(b_mapping, NULL);
680  _SION_SAFE_FREE(buddies, NULL);
681 
682 
683  /* STEP: close normal files */
684  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "call parallel close of %d files, sid=%d\n",
685  sion_gendata->numfiles, sid));
686  rc = _sion_parclose_generic( sid, sion_filedesc->rank, sion_filedesc->ntasks, mapping_size, mapping, _SION_INTERNAL_FLAG_NORMAL,
687  sion_gendata, NULL );
688  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "leave parallel close of %d files in #tasks=%d globalrank=%d\n", sion_gendata->numfiles,
689  sion_gendata->lsize, sion_gendata->grank));
690 
691  } else if (sion_filedesc->mode == SION_FILEMODE_READ) {
692 
693  /* loop over all groups */
694  for(step=0;step<buddies->numsteps;step++) {
695  /* search for group with this step */
696  group=-1;
697  for(b=0; ( (b<buddies->numgroups) && (group==-1) ); b++ ) {
698 
699  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "check group #%d of %d %d==%d %x\n",b, buddies->numgroups, buddies->groups[b]->stepnum,step, buddies->groups[b]));
700  if (buddies->groups[b]->stepnum==step) group=b;
701  }
702  buddy_info=buddies->groups[group];
703 
704  if(group!=-1) {
705  /* call parclose for this group */
706  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "call parallel close of %d files, sid=%d\n",
707  sion_gendata->numfiles, sid));
708  buddies->currentgroup=group;
709  rc = _sion_parclose_generic( buddy_info->sid, buddy_info->rank,buddy_info->size, -1, NULL, _SION_INTERNAL_FLAG_BUDDY_READ,
710  sion_gendata, buddies );
711  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "leave parallel close of %d files in #tasks=%d globalrank=%d\n", sion_gendata->numfiles,
712  sion_gendata->lsize, sion_gendata->grank));
713  }
714  } /* step */
715 
716 
717  } else {
718  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"_sion_parclose_generic_buddy: unknown file mode"));
719  }
720 
721 
722  return(rc);
723 }
724 #undef DFUNCTION
725 
726 
735 #define DFUNCTION "_sion_coll_fwrite_buddy"
736 int _sion_coll_fwrite_buddy(const void *data,
737  size_t size,
738  size_t nitems,
739  int sid,
740  _sion_generic_gendata *sion_gendata ) {
741  int rc=SION_SUCCESS;
742  _sion_filedesc *sion_filedesc,*sion_filedesc_coll,*sion_filedesc_send;
743  _sion_generic_apidesc *sion_apidesc;
744  _sion_generic_buddy *buddies, *buddyptr;
745  sion_int64 spec[2], ownnewposition;
746  int b, pass, collector=0;
747 
748  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "enter parallel write buddy\n"));
749 
750  /* obtain pointers to internal data structures */
751  if ( (sid<0) || (_sion_vcdtype(sid) != SION_FILEDESCRIPTOR) || !(sion_filedesc = _sion_vcdtovcon(sid))) {
752  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"_sion_coll_fwrite_buddy: invalid sion_filedesc %d", sid));
753  }
754  sion_apidesc=sion_gendata->apidesc;
755  buddies=sion_filedesc->buddies;
756 
757  /* buddy files */
758  for(b=0;b<sion_filedesc->buddylevel;b++) {
759  buddyptr=&buddies[b];
760 
761  /* get pointer to data structures */
762  if ( (buddyptr->buddy_coll.sid<0) || (_sion_vcdtype(buddyptr->buddy_coll.sid) != SION_FILEDESCRIPTOR) || !(sion_filedesc_coll = _sion_vcdtovcon(buddyptr->buddy_coll.sid))) {
763  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"_sion_coll_fwrite_buddy: invalid sion_filedesc %d", buddyptr->buddy_coll.sid));
764  }
765  if ( (buddyptr->buddy_send.sid<0) || (_sion_vcdtype(buddyptr->buddy_send.sid) != SION_FILEDESCRIPTOR) || !(sion_filedesc_send = _sion_vcdtovcon(buddyptr->buddy_send.sid))) {
766  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"_sion_coll_fwrite_buddy: invalid sion_filedesc %d", buddyptr->buddy_send.sid));
767  }
768 
769 
770  /* open files */
771  for(pass=1;pass<=_SION_BW_SCHED_NUM_PASSES;pass++) {
772 
773  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "BUDDYw pass=%d [grank=%2d] op=%d\n",pass,sion_gendata->grank,
774  _sion_buddy_bwsched(buddyptr->buddy_send.groupnum, sion_gendata->numfiles, pass)));
775 
776  switch (_sion_buddy_bwsched(buddyptr->buddy_send.groupnum, sion_gendata->numfiles, pass)) {
777  case _SION_BW_SCHED_ACTIONA:
778  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "BUDDYw call parallel write COLL [gendata: grank=%2d, group=%d of %d, lrank=%d of %d] %d..%d\n",
779  sion_gendata->grank, buddyptr->buddy_coll.groupnum,sion_gendata->numfiles,
780  buddyptr->buddy_coll.rank, buddyptr->buddy_coll.size,buddyptr->buddy_coll.from_index,buddyptr->buddy_coll.to_index));
781  spec[0]=0;spec[1]=0;
782  if(sion_filedesc_coll->rank == collector) {
783  ownnewposition=sion_filedesc_coll->currentpos;
784  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "BUDDYw call parallel write COLL currentpos=%d\n",(int) sion_filedesc_coll->currentpos));
785  }
786  rc = sion_apidesc->gather_execute_cb(data,spec,2, sion_filedesc_coll->fsblksize,
787  buddyptr->buddy_coll.commgroup,collector,buddyptr->buddy_coll.from_index,buddyptr->buddy_coll.to_index,
788  buddyptr->buddy_coll.sid, _sion_generic_collective_process_write);
789  if(sion_filedesc_coll->rank == collector) {
790  _sion_file_flush(sion_filedesc_coll->fileptr);
791  _sion_file_set_position(sion_filedesc_coll->fileptr,ownnewposition);sion_filedesc_coll->currentpos=ownnewposition;
792  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "BUDDYw call parallel write COLL currentpos=%d\n",(int) sion_filedesc_coll->currentpos));
793  }
794  break;
795  case _SION_BW_SCHED_ACTIONB:
796  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "BUDDYw call parallel write SEND [gendata: grank=%2d, group=%d of %d, lrank=%d of %d] %d..%d\n",
797  sion_gendata->grank, buddyptr->buddy_send.groupnum,sion_gendata->numfiles,
798  buddyptr->buddy_send.rank, buddyptr->buddy_send.size,buddyptr->buddy_send.from_index,buddyptr->buddy_send.to_index));
799 
800  /* ensure free space for this block */
801  if(sion_ensure_free_space(buddyptr->buddy_send.sid,size*nitems) != SION_SUCCESS) {
802  _sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"could not ensure free space for this block, returning %d ...\n", sid);
803  spec[0]=spec[1]=-1; /* signaling error */
804  } else {
805  spec[0]=sion_filedesc_send->currentpos;
806  spec[1]=size*nitems;
807  }
808  rc = sion_apidesc->gather_execute_cb(data,spec,2, sion_filedesc_send->fsblksize,
809  buddyptr->buddy_send.commgroup,collector,buddyptr->buddy_send.from_index,buddyptr->buddy_send.to_index,
810  buddyptr->buddy_send.sid, _sion_generic_collective_process_write);
811  sion_filedesc_send->currentpos+=size*nitems;
812  break;
813  default:
814  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "BUDDYw no-op [grank=%2d]\n",sion_gendata->grank));
815  break;
816  } /* switch */
817  } /* for pass */
818  } /* for buddies */
819 
820 
821  /* sion_apidesc->barrier_cb(sion_gendata->comm_data_global); */
822 
823  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "leave parallel write buddy\n"));
824 
825 
826 
827  return(rc);
828 }
829 #undef DFUNCTION
830 
831 
840 #define DFUNCTION "_sion_coll_fread_buddy"
841 int _sion_coll_fread_buddy( void *data, size_t size, size_t nitems, int sid) {
842  _sion_filedesc *sion_filedesc,*b_sion_filedesc;
843  _sion_generic_gendata *sion_gendata;
844  _sion_generic_apidesc *sion_apidesc;
845  sion_int64 bread=-1, spec[2], ownnewposition, items_read = 0;
846  int rc_own=SION_STD_SUCCESS,rc_cb=SION_STD_SUCCESS;
847  int collector, firstsender, lastsender, step, b, group;
848  _sion_generic_buddy *buddies;
849  _sion_generic_buddy_info *buddy_info;
850 
851  /* obtain pointers to internal data structures */
852  if ( (sid<0) || (_sion_vcdtype(sid) != SION_FILEDESCRIPTOR) || !(sion_filedesc = _sion_vcdtovcon(sid))) {
853  return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"_sion_coll_fread_buddy: invalid sion_filedesc %d", sid));
854  }
855  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "enter usecoll=%d collector=%d collsize=%d (%d tasks, %d files)\n",
856  sion_filedesc->usecoll, sion_filedesc->collector, sion_filedesc->collsize, sion_filedesc->ntasks,sion_filedesc->nfiles));
857 
858  sion_gendata=sion_filedesc->dataptr;
859  sion_apidesc=sion_gendata->apidesc;
860  buddies=sion_filedesc->buddies;
861 
862  /* needed for avoiding subsequent non-collective calls */
863  sion_filedesc->collcmdused=1;
864 
865  /* check collsize */
866  if (sion_filedesc->collsize<=0) {
867  return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"_sion_coll_fread_buddy: collsize=%d <= 0, returning ...\n",
868  (int) sion_filedesc->collsize));
869  }
870 
871 
872  /* loop over all groups */
873  for(step=0;step<buddies->numsteps;step++) {
874  /* search for group with this step */
875  group=-1;
876  for(b=0; ( (b<buddies->numgroups) && (group==-1) ); b++ ) {
877 
878  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "check group #%d of %d %d==%d %x\n",b, buddies->numgroups, buddies->groups[b]->stepnum,step, buddies->groups[b]));
879  if (buddies->groups[b]->stepnum==step) group=b;
880  }
881 
882  if(group!=-1) {
883  /* activity needed for this group */
884 
885  buddy_info=buddies->groups[group];
886 
887  if ( (buddy_info->sid<0) || (_sion_vcdtype(buddy_info->sid) != SION_FILEDESCRIPTOR) || !(b_sion_filedesc = _sion_vcdtovcon(buddy_info->sid))) {
888  return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"_sion_coll_fread_buddy: invalid sion_filedesc %d", buddy_info->sid));
889  }
890 
891  /* parameter of callback function */
892  collector=0;
893  firstsender=buddy_info->from_index;
894  lastsender=buddy_info->to_index;
895 
896 
897  /* check input parameter and position in file */
898  if( (buddy_info->myrole == SION_ROLE_READER)
899  || (buddy_info->myrole == SION_ROLE_COLLECTOR_READER)) {
900 
901  /* ensure to be at the beginning of the right block */
902  if(size*nitems>0) {
903  if(sion_feof(buddy_info->sid)) {
904  _sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"early eof found for this block, returning %d ...\n", buddy_info->sid);
905  spec[0]=spec[1]=-1; /* signaling that no data is requested */
906  } else {
907  /* specification of location in file */
908  spec[0]=sion_filedesc->currentpos;
909  spec[1]=size*nitems;
910  }
911  } else {
912  /* signaling that no data is requested */
913  spec[0]=spec[1]=-1;
914  }
915 
916 
917  }
918 
919 
920  /* read own part */
921  if( (buddy_info->myrole == SION_ROLE_COLLECTOR_READER) ) {
922  rc_own=_sion_generic_collective_process_read(data,spec,buddy_info->sid);
923  firstsender++; /* must not read again */
924  }
925  /* save own position before reading at other positions */
926  if( (buddy_info->myrole == SION_ROLE_COLLECTOR)
927  || (buddy_info->myrole == SION_ROLE_COLLECTOR_READER)) {
928  ownnewposition=sion_filedesc->currentpos;
929  }
930 
931 
932  /* read parts and scatter these to sender tasks via callback function */
933  if(!sion_apidesc->execute_scatter_cb ) {
934  return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,
935  "_sion_coll_fread_buddy: API %s not correctly initalized, collective I/O calls missing, aborting",sion_apidesc->name));
936  }
937  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "call execute_gather #%d (step=%d, rank=%d, size=%d, collsize=%d, from=%2d, to=%2d) (bnum=%2d file# %2d) with role %s \n",
938  buddy_info->groupid, buddy_info->stepnum, buddy_info->rank, buddy_info->size, buddy_info->collsize,
939  buddy_info->from_index, buddy_info->to_index, buddy_info->bnum, buddy_info->filenum, _sion_buddy_role_to_str(buddy_info->myrole)));
940 
941  rc_cb=sion_apidesc->execute_scatter_cb(data,spec,2, b_sion_filedesc->fsblksize,
942  buddy_info->commgroup,collector,firstsender,lastsender,buddy_info->sid,
943  _sion_generic_collective_process_read);
944 
945 
946 
947  /* set own position to end of own block read in this call */
948  if( (buddy_info->myrole == SION_ROLE_COLLECTOR)
949  || (buddy_info->myrole == SION_ROLE_COLLECTOR_READER)) {
950  _sion_file_flush(b_sion_filedesc->fileptr);
951  _sion_file_set_position(b_sion_filedesc->fileptr,ownnewposition);b_sion_filedesc->currentpos=ownnewposition;
952  }
953 
954  /* set file pointer in data structure and in file if it is exported and can be used without control of SIONlib */
955  if(buddy_info->myrole == SION_ROLE_READER) {
956  b_sion_filedesc->currentpos+=size*nitems;
957  if(b_sion_filedesc->fileptr_exported) {
958  _sion_file_set_position(b_sion_filedesc->fileptr,b_sion_filedesc->currentpos);
959  }
960  }
961 
962  if( (buddy_info->myrole == SION_ROLE_READER)
963  || (buddy_info->myrole == SION_ROLE_COLLECTOR_READER)) {
964  if( (rc_own == SION_STD_SUCCESS) && (rc_cb == SION_STD_SUCCESS) ) {
965  bread=size*nitems;
966  } else {
967  bread=0;
968  }
969  items_read = size ? bread / size : 0;
970  }
971 
972  } /* froup != -1 */
973 
974  /* barrier between steps */
975  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "BUDDYO beforegbarrier [grank=%2d]\n",sion_gendata->grank));
976  sion_apidesc->barrier_cb(sion_gendata->comm_data_global);
977  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "BUDDYO aftergbarrier [grank=%2d]\n",sion_gendata->grank));
978 
979  } /* for step */
980 
981  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "leave usecoll=%d collector=%d collsize=%d (%d tasks, %d files) rc=%d\n",
982  sion_filedesc->usecoll, sion_filedesc->collector, sion_filedesc->collsize, sion_filedesc->ntasks,sion_filedesc->nfiles,items_read));
983 
984  return items_read;
985 
986 }
987 #undef DFUNCTION
988 
989 #define DFUNCTION "_sion_buddy_map"
990 int _sion_buddy_map(
991  _sion_generic_gendata *sion_gendata,
992  int capability,
993  int buddy_idx,
994  _sion_generic_buddy_info *buddy_send,
995  _sion_generic_buddy_info *buddy_coll ) {
996  int rc=SION_SUCCESS;
997  int tmpsize,t,g, g_map, p, p_map, n, n_map, orig, distance,distance_1step,pass;
998  int *size_per_group = NULL, *group_map_orig_to_new, *group_map_new_to_orig, mynewgroupnr;
999  sion_int32 helpint2[2];
1000  sion_int32 *tasktogroup = NULL;
1001  sion_int32 *tmpintfield1 = NULL;
1002  int comm_send_rank, comm_send_size, comm_send_grpnum;
1003  int comm_coll_rank, comm_coll_size, comm_coll_grpnum;
1004  void *dummycommgroup=NULL;
1005 
1006  DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, "buddy_idx=%d capability=%d [gendata: grank=%d of %d, group=%d of %d, lrank=%d of %d]\n",
1007  buddy_idx, capability,
1008  sion_gendata->grank, sion_gendata->gsize,
1009  sion_gendata->filenumber, sion_gendata->numfiles,
1010  sion_gendata->lrank, sion_gendata->lsize));
1011 
1012  distance=buddy_idx; /* higher level buddy CP: groups will be remapped to d=1 buddy CP */
1013  distance_1step=1;
1014 
1015  /* allocate some fields */
1016  if(sion_gendata->grank==0) {
1017 
1018  tmpsize=1*sion_gendata->gsize;
1019  tasktogroup = (sion_int32 *) malloc(tmpsize * sizeof(sion_int32));
1020  if (tasktogroup == NULL) {
1021  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"_sion_buddy_map: cannot allocate temporary memory of size %lu (tasktogroup), aborting ...\n",
1022  (unsigned long) tmpsize * sizeof(sion_int32)));
1023  }
1024 
1025  tmpsize=2*sion_gendata->gsize;
1026  tmpintfield1 = (sion_int32 *) malloc(tmpsize * sizeof(sion_int32));
1027  if (tmpintfield1 == NULL) {
1028  free(tasktogroup);
1029  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"_sion_buddy_map: cannot allocate temporary memory of size %lu (tmpintfield1), aborting ...\n",
1030  (unsigned long) tmpsize * sizeof(sion_int32)));
1031  }
1032 
1033  size_per_group = (int *) malloc(sion_gendata->numfiles * sizeof(int));
1034  if (size_per_group == NULL) {
1035  free(tasktogroup);
1036  free(tmpintfield1);
1037  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"_sion_buddy_map: cannot allocate temporary memory of size %lu (size_per_group), aborting ...\n",
1038  (unsigned long) sion_gendata->numfiles * sizeof(int)));
1039  }
1040 
1041  }
1042 
1043  /* on all tasks */
1044  group_map_orig_to_new = (int *) malloc(sion_gendata->numfiles * sizeof(int));
1045  if (group_map_orig_to_new == NULL) {
1046  free(tasktogroup);
1047  _SION_SAFE_FREE(size_per_group, NULL);
1048  free(tmpintfield1);
1049  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"_sion_buddy_map: cannot allocate temporary memory of size %lu (group_map_orig_to_new), aborting ...\n",
1050  (unsigned long) sion_gendata->numfiles * sizeof(int)));
1051  }
1052 
1053  group_map_new_to_orig = (int *) malloc(sion_gendata->numfiles * sizeof(int));
1054  if (group_map_new_to_orig == NULL) {
1055  _SION_SAFE_FREE(size_per_group, NULL);
1056  free(group_map_orig_to_new);
1057  free(tasktogroup);
1058  free(tmpintfield1);
1059  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"_sion_buddy_map: cannot allocate temporary memory of size %lu (group_map_new_to_orig), aborting ...\n",
1060  (unsigned long) sion_gendata->numfiles * sizeof(int)));
1061  }
1062 
1063  /* Step1: gather group number */
1064  helpint2[0]=sion_gendata->filenumber;
1065  sion_gendata->apidesc->gatherr_cb(helpint2, tasktogroup, sion_gendata->comm_data_global, _SION_INT32,1,0);
1066 
1067 
1068  /* Step2: gather local size */
1069  helpint2[0]=sion_gendata->lsize;
1070  helpint2[1]=capability;
1071  sion_gendata->apidesc->gatherr_cb(helpint2, tmpintfield1, sion_gendata->comm_data_global, _SION_INT32,2,0);
1072  if(sion_gendata->grank==0) {
1073  for(t=0;t<sion_gendata->gsize;t++) {
1074  DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, "task-map --> t=%2d grpnr=%2d lsize=%2d capability=%d\n",
1075  t,tasktogroup[t],tmpintfield1[2*t+0],tmpintfield1[2*t+1]));
1076  }
1077  /* --> tasktogroup[t] = groupnr, tmpintfield1[t][0] = lsize tmpintfield1[t][1] = capability */
1078  }
1079 
1080  /* Step2: build vector with size info per group and group map */
1081  if(sion_gendata->grank==0) {
1082  for(t=0;t<sion_gendata->gsize;t++) {
1083  size_per_group[tasktogroup[t]] = tmpintfield1[2*t+0];
1084  }
1085  }
1086 
1087  for(g=0;g<sion_gendata->numfiles;g++) {
1088  orig=(g*distance) % sion_gendata->numfiles;
1089  group_map_orig_to_new[orig]=g;
1090  group_map_new_to_orig[g]=orig;
1091  }
1092 
1093  for(g=0;g<sion_gendata->numfiles;g++) {
1094  DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, "grp-map --> g=%2d->%2d\n",g,group_map_orig_to_new[g]));
1095  }
1096 
1097 
1098  /* Step3: compute send group */
1099  if(sion_gendata->grank==0) {
1100  for(t=0;t<sion_gendata->gsize;t++) {
1101  g=tasktogroup[t];
1102  g_map=group_map_orig_to_new[g];
1103  p_map=(g_map-distance_1step + sion_gendata->numfiles) % sion_gendata->numfiles;
1104  p=group_map_new_to_orig[p_map];
1105  tmpintfield1[2*t+0] = g_map;
1106  tmpintfield1[2*t+1] = size_per_group[p];
1107  DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, "comm-send --> t=%2d grpnr=%2d grpmap=%2d prevgr=%2d prevgrmap=%2d prev_lsize=%2d\n",
1108  t, g, tmpintfield1[2*t+0], p, p_map, tmpintfield1[2*t+1]));
1109  }
1110  /* --> tmpintfield1[t][0] = own_groupnr, tmpintfield1[t][1] = lsize of prev_group */
1111  }
1112 
1113  /* Step4: scatter info about send group */
1114  sion_gendata->apidesc->scatterr_cb(tmpintfield1, helpint2, sion_gendata->comm_data_global, _SION_INT32,2,0);
1115 
1116  /* Step5: compute ranks for send group */
1117  mynewgroupnr=comm_send_grpnum=helpint2[0];
1118  buddy_send->rank=comm_send_rank=helpint2[1]+sion_gendata->lrank;
1119  buddy_send->size=comm_send_size=helpint2[1]+sion_gendata->lsize;
1120  buddy_send->collsize=sion_gendata->lsize;
1121  buddy_send->groupnum=mynewgroupnr;
1122  buddy_send->filenum=group_map_new_to_orig[mynewgroupnr];
1123  buddy_send->from_index=helpint2[1];
1124  buddy_send->to_index=helpint2[1]+sion_gendata->lsize-1;
1125  DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, "COMM_SEND d=%d grpnum=%2d lrank=%2d lsize=%2d mynewgroupnr=%2d collsize=%2d %d..%d fnum=%d\n",
1126  distance,comm_send_grpnum,comm_send_rank,comm_send_size,mynewgroupnr,
1127  buddy_send->collsize,buddy_send->from_index,buddy_send->to_index,
1128  buddy_send->filenum));
1129 
1130  /* Step6: compute coll group */
1131  if(sion_gendata->grank==0) {
1132  for(t=0;t<sion_gendata->gsize;t++) {
1133  g=tasktogroup[t];
1134  g_map=group_map_orig_to_new[g];
1135  n_map=(g_map+distance_1step) % sion_gendata->numfiles;
1136  n=group_map_new_to_orig[n_map];
1137  tmpintfield1[2*t+0] = n_map;
1138  tmpintfield1[2*t+1] = size_per_group[n];
1139  DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, "comm-coll --> t=%2d grpnr=%2d grpmap=%2d nextgr=%2d nextgrmap=%2d next_lsize=%2d\n",
1140  t, g, g_map, n, tmpintfield1[2*t+0], tmpintfield1[2*t+1]));
1141  }
1142  /* --> tmpintfield1[t][0] = next_groupnr_map, tmpintfield1[t][0] = lsize of next_group */
1143  }
1144 
1145  /* Step7: scatter info about coll group */
1146  sion_gendata->apidesc->scatterr_cb(tmpintfield1, helpint2, sion_gendata->comm_data_global, _SION_INT32,2,0);
1147 
1148  /* Step8: compute ranks for coll group */
1149  comm_coll_grpnum=helpint2[0];
1150  buddy_coll->rank=comm_coll_rank=sion_gendata->lrank;
1151  buddy_coll->size=comm_coll_size=sion_gendata->lsize+helpint2[1];
1152  buddy_coll->collsize=helpint2[1];
1153  buddy_coll->groupnum=comm_coll_grpnum;
1154  buddy_coll->filenum=group_map_new_to_orig[comm_coll_grpnum];
1155  buddy_coll->from_index=sion_gendata->lsize;
1156  buddy_coll->to_index=sion_gendata->lsize+helpint2[1]-1;
1157  DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, "COMM_COLL d=%d grpnum=%2d lrank=%2d lsize=%2d collsize=%2d %d..%d fnum=%d\n",
1158  distance,comm_coll_grpnum,comm_coll_rank,comm_coll_size,
1159  buddy_coll->collsize,buddy_coll->from_index,buddy_coll->to_index,
1160  buddy_coll->filenum));
1161 
1162  /* Step9: create local communicators for send/coll */
1163 
1164  for(pass=1;pass<=_SION_BW_SCHED_NUM_PASSES;pass++) {
1165  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "pass=%d [grank=%2d] op=%d\n",pass,sion_gendata->grank,
1166  _sion_buddy_bwsched(mynewgroupnr, sion_gendata->numfiles, pass)));
1167 
1168  switch (_sion_buddy_bwsched(mynewgroupnr, sion_gendata->numfiles, pass)) {
1169  case _SION_BW_SCHED_ACTIONA:
1170  rc=sion_gendata->apidesc->create_lcg_cb(&buddy_coll->commgroup,sion_gendata->comm_data_global,
1171  sion_gendata->grank,sion_gendata->gsize,
1172  comm_coll_rank,comm_coll_size,
1173  comm_coll_grpnum,sion_gendata->numfiles);
1174  DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, "COMM_created comm_coll (d=%1d, p=%1d) [gendata: mygrp=%2d grank=%2d of %2d, group=%2d of %2d, lrank=%2d of %2d]\n",
1175  distance,pass,mynewgroupnr,sion_gendata->grank,sion_gendata->gsize,
1176  comm_coll_grpnum,sion_gendata->numfiles,
1177  comm_coll_rank,comm_coll_size ));
1178  break;
1179  case _SION_BW_SCHED_ACTIONB:
1180  rc=sion_gendata->apidesc->create_lcg_cb(&buddy_send->commgroup,sion_gendata->comm_data_global,
1181  sion_gendata->grank,sion_gendata->gsize,
1182  comm_send_rank,comm_send_size,
1183  comm_send_grpnum,sion_gendata->numfiles);
1184  DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, "COMM_created comm_send (d=%1d, p=%1d) [gendata: mygrp=%2d grank=%2d of %2d, group=%2d of %2d, lrank=%2d of %2d]\n",
1185  distance,pass,mynewgroupnr,sion_gendata->grank,sion_gendata->gsize,
1186  comm_send_grpnum,sion_gendata->numfiles,
1187  comm_send_rank,comm_send_size ));
1188  break;
1189  case _SION_BW_SCHED_NOACTION:
1190  rc=sion_gendata->apidesc->create_lcg_cb(&dummycommgroup,sion_gendata->comm_data_global,
1191  sion_gendata->grank,sion_gendata->gsize,
1192  0,1,
1193  -1,sion_gendata->numfiles);
1194  DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, "COMM_created dummy (d=%1d, p=%1d) [gendata: mygrp=%2d grank=%2d of %2d, group=%2d of %2d, lrank=%2d of %2d]\n",
1195  distance,pass,mynewgroupnr,sion_gendata->grank,sion_gendata->gsize,
1196  -1,sion_gendata->numfiles,0,1 ));
1197  break;
1198 
1199  default: break;
1200  }
1201  }
1202 
1203  /* free fields */
1204  if(sion_gendata->grank==0) {
1205  free(tasktogroup);
1206  free(tmpintfield1);
1207  free(size_per_group);
1208  }
1209  free(group_map_new_to_orig);
1210  free(group_map_orig_to_new);
1211 
1212  return(rc);
1213 }
1214 #undef DFUNCTION
1215 
1216  /* pass=1..3 */
1217 #define DFUNCTION "_sion_buddy_bwsched"
1218 int _sion_buddy_bwsched(int groupnr, int numgroups, int pass) {
1219  int res=_SION_BW_SCHED_NOACTION;
1220  if(numgroups%2==0) { /* #groups even */
1221  if(groupnr%2==0) { /* groupnr even */
1222  if(pass==1) res=_SION_BW_SCHED_ACTIONA;
1223  if(pass==2) res=_SION_BW_SCHED_ACTIONB;
1224  } else { /* groupnr even */
1225  if(pass==1) res=_SION_BW_SCHED_ACTIONB;
1226  if(pass==2) res=_SION_BW_SCHED_ACTIONA;
1227  }
1228  } else {
1229  if(groupnr%2==0) { /* groupnr even */
1230  if((pass==1) && (groupnr != (numgroups-1))) res=_SION_BW_SCHED_ACTIONA;
1231  if((pass==2) && (groupnr != 0)) res=_SION_BW_SCHED_ACTIONB;
1232  if((pass==3) && (groupnr == 0)) res=_SION_BW_SCHED_ACTIONB;
1233  if((pass==3) && (groupnr == (numgroups-1))) res=_SION_BW_SCHED_ACTIONA;
1234  } else { /* groupnr even */
1235  if(pass==1) res=_SION_BW_SCHED_ACTIONB;
1236  if(pass==2) res=_SION_BW_SCHED_ACTIONA;
1237  }
1238  }
1239  return(res);
1240 }
1241 #undef DFUNCTION
1242 
1247 #define DFUNCTION "_sion_generic_collect_mapping"
1248 int _sion_generic_collect_mapping_buddy( _sion_generic_buddy *buddyptr,
1249  _sion_generic_gendata *sion_gendata,
1250  int *mapping_size,
1251  sion_int32 **mapping) {
1252  int rc=SION_SUCCESS;
1253  int t;
1254  _sion_generic_apidesc *sion_apidesc;
1255  sion_int32 lpos[2], *receivemap=NULL, iamreceiver, receiver = -1;
1256 
1257  sion_apidesc=sion_gendata->apidesc;
1258 
1259  *mapping = NULL; *mapping_size = 0;
1260 
1261 
1262  /* mapping data will be collected by master of first physical file */
1263  if((buddyptr->buddy_coll.groupnum==0) && (buddyptr->buddy_coll.rank==0)) {
1264  /* allocate data */
1265  *mapping_size=sion_gendata->gsize;
1266  *mapping = (sion_int32 *) malloc(*mapping_size * 2 * sizeof(sion_int32));
1267  if (*mapping == NULL) {
1268  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"_sion_generic_collect_mapping_buddy: Cannot allocate memory for mapping"));
1269  }
1270  }
1271 
1272  /* gather info about send about global rank of master of first file on grank 0 */
1273  if(sion_gendata->grank==0) {
1274  receivemap = (sion_int32 *) malloc(sion_gendata->gsize * sizeof(sion_int32));
1275  if (receivemap == NULL) {
1276  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"_sion_generic_collect_mapping_buddy: Cannot allocate memory for receivemap"));
1277  }
1278  }
1279 
1280  if((buddyptr->buddy_coll.filenum==0) && (buddyptr->buddy_coll.rank==0)) iamreceiver=sion_gendata->grank;
1281  else iamreceiver=-1;
1282  sion_apidesc->gatherr_cb(&iamreceiver, receivemap, sion_gendata->comm_data_global, _SION_INT32, 1, 0);
1283  if(sion_gendata->grank==0) {
1284  for(t=0;t<sion_gendata->gsize;t++) {
1285  if(receivemap[t]>=0) {
1286  receiver=receivemap[t];
1287  break;
1288  }
1289  }
1290  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "receiver of mapping grank=%d\n", receiver));
1291  }
1292  sion_apidesc->bcastr_cb(&receiver, sion_gendata->comm_data_global, _SION_INT32, 1, 0);
1293 
1294  /* receive global rank of master of first file on grank 0 */
1295  lpos[0] = buddyptr->buddy_send.filenum;
1296  lpos[1] = buddyptr->buddy_send.rank;
1297  sion_apidesc->gatherr_cb(&lpos, *mapping, sion_gendata->comm_data_global, _SION_INT32, 2, receiver);
1298 
1299  if(receivemap!=NULL) free(receivemap);
1300 
1301  return(rc);
1302 }
1303 #undef DFUNCTION
1304 
1305 
1306 #define DFUNCTION "_sion_generic_buddy_get_and_distribute_info_from_file"
1307 int _sion_generic_buddy_get_and_distribute_info_from_file( _sion_generic_gendata *sion_gendata, char *fname, int root,
1308  sion_int32 *filenumber, sion_int32 *numfiles,
1309  sion_int32 *lrank, sion_int32 *lsize) {
1310 
1311  int sid = -1, ntasks, nfiles, t;
1312  int rc = SION_SUCCESS;
1313  FILE *fileptr;
1314  sion_int32 fsblksize;
1315  int *tasksinfile = NULL;
1316  int mapping_size = -1;
1317  sion_int32 *mapping = NULL;
1318  sion_int32 lpos[2];
1319  _sion_generic_apidesc *sion_apidesc;
1320 
1321  sion_apidesc=sion_gendata->apidesc;
1322 
1323  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "entering function sion_gendata->grank=%d\n",sion_gendata->grank));
1324 
1325  if(sion_gendata->grank == root) {
1326  /* open and get mapping of sion file */
1327  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "before open\n"));
1328  sid=_sion_open_read(fname,_SION_FMODE_READ|_SION_FMODE_ANSI,_SION_READ_MASTER_ONLY_OF_MULTI_FILES,
1329  &ntasks,&nfiles,NULL,&fsblksize,NULL,&fileptr);
1330  if(sid>=0) {
1331  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "after open\n"));
1332  rc=sion_get_mapping(sid,&mapping_size,&mapping,numfiles);
1333  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "sion file %d files\n", *numfiles));
1334  } else {
1335  *numfiles=-1;
1336  }
1337  }
1338 
1339 
1340  /* each task has to know if more than one file was used in sion file */
1341  sion_apidesc->bcastr_cb(numfiles, sion_gendata->comm_data_global, _SION_INT32, 1, root);
1342  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "%s: numfiles=%d\n",fname,*numfiles));
1343 
1344  if((sion_gendata->grank == root) && (*numfiles>1)) {
1345  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "%s: mapping_size=%d sion_gendata->gsize=%d\n",fname,*numfiles,mapping_size,sion_gendata->gsize));
1346  if(mapping_size!=sion_gendata->gsize) {
1347  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,
1348  "_sion_generic_buddy_get_and_distribute_info_from_file: Incorrect sum of ntasks of files %d <> %d\n",
1349  mapping_size, sion_gendata->gsize));
1350  }
1351  }
1352 
1353  if(*numfiles<0) {
1354  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,
1355  "_sion_generic_buddy_get_and_distribute_info_from_file: could not get numfiles from sion file\n"));
1356  }
1357 
1358  if(*numfiles>1) {
1359  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "before scatter\n"));
1360  if(sion_gendata->grank==root) {
1361  for(t=0;t<mapping_size;t++) {
1362  DPRINTFP((1, DFUNCTION, sion_gendata->grank, " %d -> (%d,%d)\n",t,mapping[t*2],mapping[t*2+1]));
1363  }
1364  }
1365 
1366  /* scatter mapping to all tasks */
1367  sion_apidesc->scatterr_cb(mapping, lpos, sion_gendata->comm_data_global, _SION_INT32, 2, root);
1368  *filenumber=lpos[0];
1369  *lrank =lpos[1];
1370  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "after scatter filenum+lrank (%d,%d)\n",*filenumber,*lrank));
1371 
1372  /* compute and scatter number of tasks in each file */
1373  if(sion_gendata->grank==root) {
1374  tasksinfile = (int *) malloc(*numfiles * sizeof(int));
1375  if (tasksinfile == NULL) {
1376  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"_sion_generic_get_and_distribute_info_from_file: Cannot allocate memory for tasksinfile counter vector"));
1377  }
1378  for(t=0;t<*numfiles;t++) tasksinfile[t]=0; /* init counter */
1379  for(t=0;t<mapping_size;t++) tasksinfile[ mapping[t*2] ]++; /* count tasks in file */
1380  for(t=0;t<mapping_size;t++) mapping[t*2+1]=tasksinfile[ mapping[t*2] ]; /* set 2nd value of mapping to lsize */
1381  }
1382  sion_apidesc->scatterr_cb(mapping, lpos, sion_gendata->comm_data_global, _SION_INT32, 2, root);
1383  *lsize =lpos[1];
1384  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "after scatter lsize (%d, %d of %d)\n",*filenumber, *lrank, *lsize));
1385 
1386  if(sion_gendata->grank==root) {
1387  if(tasksinfile) free(tasksinfile);
1388  }
1389  /* WARNING: mapping file of sion file is now destroyed and should not be used until close */
1390 
1391  } else {
1392  *filenumber=0;
1393  *lrank = sion_gendata->grank;
1394  *lsize = sion_gendata->gsize;
1395  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "only one file -> filenumber=%d lRank=%d\n",*filenumber,*lrank));
1396  }
1397 
1398  if(sion_gendata->grank == root) {
1399  /* frees also mapping vector */
1400  if (sid>=0) _sion_close_sid(sid);
1401  }
1402 
1403  return(rc);
1404 }
1405 #undef DFUNCTION
1406 
1407 
1408 #define DFUNCTION "_sion_generic_buddy_get_and_distribute_info_from_one_file"
1409 int _sion_generic_buddy_get_and_distribute_info_from_one_file( _sion_generic_gendata *sion_gendata, char *fname, int root,
1410  sion_int32 *filenumber, sion_int32 *numfiles,
1411  sion_int32 *lrank, sion_int32 *lsize) {
1412 
1413  int rc = SION_SUCCESS;
1414  int t, mapping_size = 0, grank, *mapping = NULL;
1415  sion_int32 file_filenumber,file_numfiles,file_lrank,file_lsize;
1416  _sion_filedesc *sion_filedesc = NULL;
1417  _sion_fileptr *sion_fileptr;
1418  _sion_generic_apidesc *sion_apidesc;
1419 
1420  sion_apidesc=sion_gendata->apidesc;
1421 
1422  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "entering function sion_gendata->grank=%d\n",sion_gendata->grank));
1423 
1424  if(sion_gendata->grank == root) {
1425  /* open and get globalranks of sion file */
1426  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "before open\n"));
1427  /* allocate and initialise internal data structure with default values (NULL and -1) */
1428  sion_filedesc = _sion_alloc_filedesc();
1429  if (sion_filedesc == NULL) {
1430  return(_sion_errorprint(SION_ID_UNDEF,SION_ID_UNDEF,
1431  "sion_open: cannot allocate filedescriptor structure of size %lu (sion_filedesc), aborting ...\n",
1432  (unsigned long) sizeof(sion_filedesc)));
1433  }
1434  _sion_init_filedesc(sion_filedesc);
1435 
1436  /* open file */
1438  if (!sion_fileptr) {
1439  return(_sion_errorprint(SION_ID_UNDEF,_SION_ERROR_RETURN,"sion_open: cannot open %s for reading, aborting ...\n", fname));
1440  }
1441  sion_filedesc->fileptr = sion_fileptr;
1442 
1443  /* read part of header which does not depend on ntasks */
1444  rc = _sion_read_header_fix_part(sion_filedesc);
1445  if (rc!=SION_SUCCESS) {
1446  return(_sion_errorprint(SION_ID_UNDEF,_SION_ERROR_RETURN,"sion_open: cannot read header from file %s, aborting ...\n", fname));
1447  }
1448  sion_filedesc->rank = 0;
1449  sion_filedesc->state = SION_FILESTATE_SEROPEN;
1450  sion_filedesc->mode = SION_FILEMODE_READ;
1451 
1452  /* memory allocation for internal fields */
1453  _sion_alloc_filedesc_arrays(sion_filedesc);
1454 
1455  /* read part of header which depends on ntasks */
1456  rc = _sion_read_header_var_part(sion_filedesc);
1457  if (rc!=SION_SUCCESS) {
1458  return(_sion_errorprint(SION_ID_UNDEF,_SION_ERROR_RETURN,"sion_open: cannot read header (var part) from file %s, aborting ...\n", fname));
1459  }
1460 
1461  file_numfiles=sion_filedesc->nfiles;
1462  file_filenumber=sion_filedesc->filenumber;
1463  file_lsize=sion_filedesc->ntasks;
1464 
1465  /* allocate mapping vector */
1466  mapping_size=sion_gendata->gsize;
1467  mapping = (sion_int32 *) malloc(mapping_size * 2 * sizeof(sion_int32));
1468  if (mapping == NULL) {
1469  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"_sion_generic_buddy_get_and_distribute_info_from_one_file: cannot allocate temporary memory of size %lu (mapping), aborting ...\n",
1470  (unsigned long) mapping_size * 2 * sizeof(sion_int32)));
1471  }
1472 
1473  }
1474 
1475 
1476  /* each task has to know some number about local file */
1477  sion_apidesc->bcastr_cb(&file_numfiles, sion_gendata->comm_data_global, _SION_INT32, 1, root);
1478  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "%s: numfiles=%d\n",fname,file_numfiles));
1479  sion_apidesc->bcastr_cb(&file_filenumber, sion_gendata->comm_data_global, _SION_INT32, 1, root);
1480  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "%s: filenumber=%d\n",fname,file_filenumber));
1481  sion_apidesc->bcastr_cb(&file_lsize, sion_gendata->comm_data_global, _SION_INT32, 1, root);
1482  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "%s: file_lsize=%d\n",fname,file_lsize));
1483 
1484 
1485  if(file_numfiles!=*numfiles) {
1486  _SION_SAFE_FREE(mapping, NULL);
1487  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,
1488  "_sion_generic_buddy_get_and_distribute_info_from_one_file: Incorrect number of files %d <> %d\n",
1489  file_numfiles,*numfiles));
1490  }
1491 
1492  /* init mapping vector lrank */
1493  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "before scatter\n"));
1494  if(sion_gendata->grank==root) {
1495  for(t=0;t<mapping_size;t++) mapping[t]=-1;
1496  for(t=0;t<file_lsize;t++) {
1497  grank=sion_filedesc->all_globalranks[t];
1498  if(grank>=0) {
1499  mapping[grank]=t;
1500  DPRINTFP((1, DFUNCTION, sion_gendata->grank, " file=%s mapping[%d] -> (%d)\n",fname,grank,mapping[grank]));
1501  }
1502  }
1503  }
1504 
1505  /* scatter mapping to all tasks */
1506  sion_apidesc->scatterr_cb(mapping, &file_lrank, sion_gendata->comm_data_global, _SION_INT32, 1, root);
1507  if(file_lrank!=-1) {
1508  *filenumber=file_filenumber;
1509  *lrank =file_lrank;
1510  *lsize =file_lsize;
1511  DPRINTFP((1, DFUNCTION, sion_gendata->grank, "after scatter filenum=%d lrank=%d lsize=%d\n",*filenumber,*lrank,*lsize));
1512  }
1513 
1514  if(sion_gendata->grank == root) {
1515  /* close current file */
1516  _sion_free_filedesc_arrays(sion_filedesc);
1517  _sion_file_close(sion_filedesc->fileptr);
1518  sion_filedesc->fileptr=NULL;
1519  free(mapping);
1520  }
1521 
1522  return(rc);
1523 }
1524 #undef DFUNCTION
SION_FILE_FLAG_ANSI
#define SION_FILE_FLAG_ANSI
Definition: sion_file.h:21
_sion_newvcd
int _sion_newvcd(void *data, int type)
Definition: sion_fd.c:42
_sion_file_close
int _sion_file_close(_sion_fileptr *sion_fileptr)
Close file and destroys fileptr structure.
Definition: sion_file.c:139
SION_FILESTATE_SEROPEN
#define SION_FILESTATE_SEROPEN
Definition: sion_filedesc.h:28
_sion_filedesc_struct::usecoll
sion_int32 usecoll
Definition: sion_filedesc.h:180
_sion_filedesc_struct::filenumber
sion_int32 filenumber
Definition: sion_filedesc.h:128
_sion_generic_gendata_struct
Definition: sion_generic_apidesc.h:59
_sion_vcdtype
int _sion_vcdtype(int sid)
Definition: sion_fd.c:57
sion_filedesc.h
_sion_filedesc_struct::fileptr
_sion_fileptr * fileptr
Definition: sion_filedesc.h:81
sion_internal.h
sion_buffer.h
_sion_file_set_position
sion_int64 _sion_file_set_position(_sion_fileptr *sion_fileptr, sion_int64 startpointer)
Set new position in file.
Definition: sion_file.c:292
sion_ensure_free_space
int sion_ensure_free_space(int sid, sion_int64 bytes)
Funtion to ensure that enough space is available for writing.
Definition: sion_common.c:1053
sion_generic_internal.h
_sion_filedesc_struct::collsize
sion_int32 collsize
Definition: sion_filedesc.h:181
_sion_filedesc_struct::mode
sion_int32 mode
Definition: sion_filedesc.h:108
SION_FILE_FLAG_SIONFWD
#define SION_FILE_FLAG_SIONFWD
Definition: sion_file.h:27
_sion_filedesc_struct::rank
sion_int32 rank
Definition: sion_filedesc.h:94
_sion_generic_buddy_info_struct
Definition: sion_buddy_common.h:21
_sion_free_filedesc_arrays
int _sion_free_filedesc_arrays(_sion_filedesc *sion_filedesc)
free memory for the internal sion arrays
Definition: sion_filedesc.c:429
SION_FILEMODE_WRITE
#define SION_FILEMODE_WRITE
Definition: sion_filedesc.h:37
_sion_filedesc_struct::fsblksize
sion_int32 fsblksize
Definition: sion_filedesc.h:117
_sion_read_header_var_part
int _sion_read_header_var_part(_sion_filedesc *sion_filedesc)
Read the second part of SION Meta Block 1.
Definition: sion_metadata.c:502
_sion_filedesc_struct::collcmdused
sion_int32 collcmdused
Definition: sion_filedesc.h:185
sion_fd.h
_sion_alloc_filedesc_arrays
int _sion_alloc_filedesc_arrays(_sion_filedesc *sion_filedesc)
Allocate memory for the internal sion arrays.
Definition: sion_filedesc.c:336
_sion_filedesc_struct::all_globalranks
sion_int64 * all_globalranks
Definition: sion_filedesc.h:144
sion.h
sion_debug.h
_sion_vcdtovcon
void * _sion_vcdtovcon(int sid)
Definition: sion_fd.c:52
_sion_open_read
int _sion_open_read(const char *fname, sion_int64 file_mode_flags, int read_all, int *ntasks, int *nfiles, sion_int64 **chunksizes, sion_int32 *fsblksize, int **globalranks, FILE **fileptr)
internal sion serial open function for reading on one or more files
Definition: sion_internal.c:258
_sion_filedesc_struct::fileptr_exported
sion_int32 fileptr_exported
Definition: sion_filedesc.h:186
_sion_filedesc_struct
Sion File Descriptor Structure.
Definition: sion_filedesc.h:78
_sion_filedesc_struct::currentpos
sion_int64 currentpos
Definition: sion_filedesc.h:95
_sion_file_stat_file2
int _sion_file_stat_file2(const char *fname, unsigned int apiflag)
Check if file exists with appropriate low-level API.
Definition: sion_file.c:262
SION_FILEDESCRIPTOR
#define SION_FILEDESCRIPTOR
Definition: sion_fd.h:16
sion_generic_collective.h
_sion_read_header_fix_part
int _sion_read_header_fix_part(_sion_filedesc *sion_filedesc)
Read part of the SION Meta Block 1.
Definition: sion_metadata.c:316
_sion_filedesc_struct::buddies
void * buddies
Definition: sion_filedesc.h:194
_sion_file_open
_sion_fileptr * _sion_file_open(const char *fname, unsigned int flags, unsigned int addflags)
Create and open a new file for writing.
Definition: sion_file.c:45
_sion_filedesc_struct::ntasks
sion_int32 ntasks
Definition: sion_filedesc.h:106
_sion_generic_buddy_struct
Definition: sion_buddy_common.h:40
sion_feof
int sion_feof(int sid)
Function that indicates whether the end of file is reached for this task.
Definition: sion_common.c:809
_sion_init_filedesc
int _sion_init_filedesc(_sion_filedesc *sion_filedesc)
Initialize the sion file description.
Definition: sion_filedesc.c:37
_sion_file_flush
int _sion_file_flush(_sion_fileptr *sion_fileptr)
Flush data to file.
Definition: sion_file.c:345
sion_file.h
sion_metadata.h
_sion_filedesc_struct::dataptr
void * dataptr
Definition: sion_filedesc.h:82
_sion_filedesc_struct::nfiles
sion_int32 nfiles
Definition: sion_filedesc.h:127
SION_FILE_FLAG_POSIX
#define SION_FILE_FLAG_POSIX
Definition: sion_file.h:23
_sion_generic_apidesc_struct
Definition: sion_generic_apidesc.h:24
_sion_parclose_generic
int _sion_parclose_generic(int sid, int rank, int ntasks, int mapping_size, sion_int32 *mapping, int flag, _sion_generic_gendata *sion_gendata, _sion_generic_buddy *buddy_data)
Internal function to close parallel opened SION file.
Definition: sion_generic_internal.c:701
_sion_filedesc_struct::state
sion_int32 state
Definition: sion_filedesc.h:107
sion_get_mapping
int sion_get_mapping(int sid, int *mapping_size, sion_int32 **mapping, int *numfiles)
Returns pointers to the internal field mapping.
Definition: sion_common.c:219
_sion_flags_store_struct
Definition: sion_flags.h:23
_sion_paropen_generic_one_file
int _sion_paropen_generic_one_file(int sid, char *fname, _sion_flags_store *flags_store, char *prefix, int *numFiles, int *filenumber, sion_int64 *chunksize, sion_int32 *fsblksize, int rank, int ntasks, int *globalrank, int flag, FILE **fileptr, _sion_generic_gendata *sion_gendata, _sion_generic_buddy *buddy_data)
Generic parallel open of one direct access file.
Definition: sion_generic_internal.c:72
_sion_filedesc_struct::buddylevel
sion_int32 buddylevel
Definition: sion_filedesc.h:192
_sion_alloc_filedesc
_sion_filedesc * _sion_alloc_filedesc(void)
Allocates memory for internal sion structure.
Definition: sion_filedesc.c:168
_sion_filedesc_struct::collector
sion_int32 collector
Definition: sion_filedesc.h:182
_sion_get_multi_filename
char * _sion_get_multi_filename(const char *fname, int filenumber)
generates the multi filename
Definition: sion_internal.c:911
sion_flags.h
SION_FILE_FLAG_READ
#define SION_FILE_FLAG_READ
Definition: sion_file.h:26
SION_FILEMODE_READ
#define SION_FILEMODE_READ
Definition: sion_filedesc.h:36
sion_printts.h
Sion Time Stamp Header.
sion_keyvalue.h
_sion_fileptr_s
Definition: sion_file.h:29