14 #define _XOPEN_SOURCE 700
23 #include "sion_error_handler.h"
39 #include "sion_generic_buddy.h"
63 #define DFUNCTION "_sion_paropen_generic_buddy"
64 int _sion_paropen_generic_buddy(
71 sion_int64 *chunksize,
72 sion_int32 *fsblksize,
79 int rc = SION_SUCCESS;
80 int b, capability, pass, buddylevel;
81 sion_int32 file_globalrank;
82 sion_int64 file_chunksize;
83 char *buddy_fn, *nfname=NULL;
88 buddylevel = atoi(_sion_flags_get(flags_store,
"buddy")->val);
89 if (buddylevel==0) buddylevel=1;
91 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"allocate memory for %d buddy levels\n", buddylevel));
94 if (buddies == NULL) {
95 return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,
"cannot allocate buddies structure of size %lu (_sion_generic_buddy), aborting ...\n",
100 buddy_fn = calloc(SION_FILENAME_LENGTH+10,1);
101 if (buddy_fn == NULL) {
103 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"_sion_paropen_generic_buddy: cannot allocate temporary memory of size %lu (buddy_fn), aborting ...\n",
104 (
unsigned long) SION_FILENAME_LENGTH+10));
108 sion_apidesc=sion_gendata->apidesc;
110 if (flags_store->mask&_SION_FMODE_WRITE) {
114 (fname,sion_gendata->filenumber);
115 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"call parallel open of %d files (current name %s)\n",
116 sion_gendata->numfiles, nfname));
118 &sion_gendata->numfiles, &sion_gendata->filenumber,
119 chunksize, fsblksize,
120 sion_gendata->lrank, sion_gendata->lsize, globalrank,
121 _SION_INTERNAL_FLAG_BUDDY_NORMAL, fileptr, sion_gendata, NULL);
122 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"leave parallel open of %d files in #tasks=%d sid=%d globalrank=%d\n", sion_gendata->numfiles,
123 sion_gendata->lsize, sid, sion_gendata->grank));
130 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"_sion_paropen_generic_buddy: invalid sion_filedesc %d", sid));
134 sion_filedesc->
buddies=buddies;
136 capability=sion_apidesc->get_capability_cb(sion_gendata->comm_data_global);
140 buddyptr=&buddies[b];
143 buddyptr->buddy_send.commgroup=NULL; buddyptr->buddy_coll.commgroup=NULL;
144 rc=_sion_buddy_map(sion_gendata,capability,b+1,&buddyptr->buddy_send,&buddyptr->buddy_coll);
147 sprintf(buddy_fn,
"%s_BUDDY_%02d", fname,b);
150 for(pass=1;pass<=_SION_BW_SCHED_NUM_PASSES;pass++) {
152 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"BUDDYO pass=%d [grank=%2d] op=%d\n",pass,*globalrank,
153 _sion_buddy_bwsched(buddyptr->buddy_send.groupnum, sion_gendata->numfiles, pass)));
156 switch (_sion_buddy_bwsched(buddyptr->buddy_send.groupnum, sion_gendata->numfiles, pass)) {
157 case _SION_BW_SCHED_ACTIONA:
159 file_globalrank=-1*(*globalrank+1);
162 (buddy_fn,buddyptr->buddy_coll.filenum);
163 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"BUDDYO call open_one_file COLL [gendata: b=%d grank=%2d, group=%d of %d, lrank=%d of %d] fn=%s (%d)\n",
164 b,*globalrank, buddyptr->buddy_coll.groupnum,sion_gendata->numfiles,
165 buddyptr->buddy_coll.rank, buddyptr->buddy_coll.size,nfname,buddyptr->buddy_coll.filenum));
168 &sion_gendata->numfiles, &buddyptr->buddy_coll.filenum,
169 &file_chunksize, fsblksize,
170 buddyptr->buddy_coll.rank,buddyptr->buddy_coll.size, &file_globalrank,
171 _SION_INTERNAL_FLAG_BUDDY_COLL, NULL, sion_gendata,
175 case _SION_BW_SCHED_ACTIONB:
177 file_globalrank=*globalrank;
178 file_chunksize=*chunksize;
180 (buddy_fn,buddyptr->buddy_send.filenum);
181 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"BUDDYO call open_one_file SEND [gendata: b=%d grank=%2d, group=%d of %d, lrank=%d of %d] fn=%s \n",
182 b,*globalrank, buddyptr->buddy_send.groupnum,sion_gendata->numfiles,
183 buddyptr->buddy_send.rank, buddyptr->buddy_send.size,nfname));
186 &sion_gendata->numfiles, &buddyptr->buddy_send.filenum,
187 &file_chunksize, fsblksize,
188 buddyptr->buddy_send.rank,buddyptr->buddy_send.size, &file_globalrank,
189 _SION_INTERNAL_FLAG_BUDDY_SEND, NULL, sion_gendata,
194 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"BUDDYO no-op [grank=%2d]\n",*globalrank));
200 }
else if (flags_store->mask&_SION_FMODE_READ) {
201 int filefound, lgroup=*filenumber, root=0, task, tmpsize, group, masterfile_found;
202 sion_int32 filenumber, numfiles, lrank, lsize, file, file_lgroup, myrole, steprank;
203 sion_int32 helpint32=0, numsteps=0, numgroups=0, numreaderinthisfile;
204 sion_int32 *sion_tmpintfield1 = NULL;
207 int *stepvectorlist[MAXREADSTEPS];
208 int *stepvector, readercount=0, readyflag=0, datafound=0;
209 int stepcount_coll,stepcount_collreader,stepcount_reader,stepcount_reader_rank,stepcount_noreader,stepcount_noreader_rank, groupcounter;
210 int step,fit,newrank;
212 sion_int32 helpint7[7];
214 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"enter buddy open in read mode\n"));
216 if(sion_gendata->grank==0) {
217 tmpsize=1*sion_gendata->gsize;
218 sion_tmpintfield1 = (sion_int32 *) malloc(tmpsize *
sizeof(sion_int32));
219 if (sion_tmpintfield1 == NULL) {
222 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"_sion_paropen_generic_buddy: cannot allocate temporary memory of size %lu (sion_tmpintfield), aborting ...\n",
223 (
unsigned long) tmpsize *
sizeof(sion_int32)));
229 for(b=-1; ( (b<buddylevel) && (!readyflag));b++) {
232 if(b==-1) sprintf(buddy_fn,
"%s", fname);
233 else sprintf(buddy_fn,
"%s_BUDDY_%02d", fname,b);
234 nfname=(sion_apidesc->get_multi_filename_cb?sion_apidesc->get_multi_filename_cb:
_sion_get_multi_filename)(buddy_fn,0);
235 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"BUDDY[%2d] start checking buddy level (file=%s)\n",b,nfname));
237 unsigned int apiflag;
239 #if defined(_SION_SIONFWD)
240 else if (flags_store->mask&_SION_FMODE_SIONFWD) apiflag=SION_FILE_FLAG_SIONFWD;
245 if(sion_gendata->lrank==0) {
248 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"check for existence of file %s --> file found\n",nfname));
254 sion_gendata->apidesc->gatherr_cb(&helpint32, sion_tmpintfield1, sion_gendata->comm_data_global, _SION_INT32, 1, 0);
255 if (sion_gendata->grank == 0) {
257 if (sion_tmpintfield1 == NULL) {
260 return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_ABORT, DFUNCTION
": gatherr_cb returned sion_tmpintfield1 == NULL"));
262 for(task=0;task<sion_gendata->gsize;task++) {
263 if(sion_tmpintfield1[task]==1) root=task;
266 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"after scan master: root for this file is %d #tasks=%d\n",root,sion_gendata->gsize));
268 sion_gendata->apidesc->bcastr_cb(&helpint32, sion_gendata->comm_data_global, _SION_INT32, 1, 0); root=helpint32;
269 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"root for this file is %d\n",root));
273 _sion_generic_buddy_get_and_distribute_info_from_file(sion_gendata,nfname,root, &filenumber, &numfiles, &lrank, &lsize);
274 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"BUDDY[%d] after read mapping file=%d of %d, lrank=%d of %d\n",b,filenumber, numfiles, lrank, lsize));
277 filenumber=lrank=lsize=-1;
284 DPRINTFP((32, DFUNCTION, sion_gendata->grank,
"Info avail: b=%d numfiles=%d root=%d lgroup=%d\n",b,numfiles,root,lgroup));
287 for(file=0;( (file<numfiles) && (!readyflag) );file++) {
290 nfname=(sion_apidesc->get_multi_filename_cb?sion_apidesc->get_multi_filename_cb:
_sion_get_multi_filename)(buddy_fn,file);
292 if(sion_gendata->lrank==0) {
295 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"check for existence of file %s --> file found\n",nfname));
301 sion_gendata->apidesc->gatherr_cb(&helpint32, sion_tmpintfield1, sion_gendata->comm_data_global, _SION_INT32, 1, 0);
302 if (sion_gendata->grank == 0) {
304 for(task=0;task<sion_gendata->gsize;task++) {
305 if(sion_tmpintfield1[task]>-1) {
307 file_lgroup=sion_tmpintfield1[task];
310 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"after scan one file: root for this file is %d file_lgroup=%d\n",root,file_lgroup));
312 if (sion_gendata->grank == 0) helpint32=root;
313 sion_gendata->apidesc->bcastr_cb(&helpint32, sion_gendata->comm_data_global, _SION_INT32, 1, 0); root=helpint32;
314 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"root for this file is %d\n",root));
319 if(!masterfile_found) {
321 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"BUDDY[%d] befre read globalranks file=%d of %d, lrank=%d of %d\n",
322 b, filenumber, numfiles, lrank, lsize));
323 _sion_generic_buddy_get_and_distribute_info_from_one_file(sion_gendata, nfname, root, &filenumber, &numfiles, &lrank, &lsize);
324 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"BUDDY[%d] after read globalranks file=%d of %d, lrank=%d of %d\n",
325 b, filenumber, numfiles, lrank, lsize));
329 if (sion_gendata->grank == 0) helpint32=file_lgroup;
330 sion_gendata->apidesc->bcastr_cb(&helpint32, sion_gendata->comm_data_global, _SION_INT32, 1, 0); file_lgroup=helpint32;
331 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"file_lgroup for this file is %d\n",file_lgroup));
334 myrole=SION_ROLE_NONE;
335 if((filenumber==file) && (!datafound)) {
337 if(sion_gendata->grank==root) myrole=SION_ROLE_COLLECTOR_READER;
338 else myrole=SION_ROLE_READER;
340 if(sion_gendata->grank==root) myrole=SION_ROLE_COLLECTOR;
341 else if(file_lgroup==lgroup) myrole=SION_ROLE_NOREADER;
346 sion_gendata->apidesc->gatherr_cb(&helpint32, sion_tmpintfield1, sion_gendata->comm_data_global, _SION_INT32, 1, 0);
349 if(sion_gendata->grank == 0) {
350 numreaderinthisfile=0;
351 if (sion_tmpintfield1 == NULL) {
352 return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_ABORT, DFUNCTION
": gatherr_cb returned sion_tmpintfield1 == NULL"));
354 for(task=0; ( (task<sion_gendata->gsize) ) ;task++)
355 if ( (sion_tmpintfield1[task]==SION_ROLE_COLLECTOR_READER) || (sion_tmpintfield1[task]==SION_ROLE_READER) ) numreaderinthisfile++;
357 sion_gendata->apidesc->bcastr_cb(&numreaderinthisfile, sion_gendata->comm_data_global, _SION_INT32, 1, 0);
359 if (numreaderinthisfile>0) {
361 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"myrole for file %s is %s\n",nfname,_sion_buddy_role_to_str(myrole)));
364 if (sion_gendata->grank == 0) {
367 for(step=0; ( (step<numsteps) && (fit==0) ) ;step++) {
368 stepvector=stepvectorlist[step];
370 for(task=0; ( (task<sion_gendata->gsize) && (fit==1) ) ;task++) {
371 if(sion_tmpintfield1[task]!=SION_ROLE_NONE) {
372 if(stepvector[task]!=SION_ROLE_NONE) fit=0;
376 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"BUDDY STEP assign: after scanning %d steps fit=%d\n",numsteps,fit));
379 stepvector = (
int *) malloc(sion_gendata->gsize *
sizeof(
int));
380 if (stepvector == NULL) {
381 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"_sion_paropen_generic_buddy: cannot allocate temporary memory of size %lu (stepvector), aborting ...\n",
382 (
unsigned long) sion_gendata->gsize *
sizeof(
int)));
384 for(task=0; (task<sion_gendata->gsize);task++) stepvector[task]=SION_ROLE_NONE;
386 stepvectorlist[numsteps-1]=stepvector;
387 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"BUDDY STEP assign: allocating new step %d\n",numsteps-1));
390 stepcount_coll=stepcount_collreader=stepcount_reader=stepcount_noreader=0;
391 for(task=0; (task<sion_gendata->gsize);task++) {
392 if(sion_tmpintfield1[task]!=SION_ROLE_NONE) {
393 stepvector[task]=sion_tmpintfield1[task];
394 if (sion_tmpintfield1[task]==SION_ROLE_COLLECTOR) stepcount_coll++;
395 if (sion_tmpintfield1[task]==SION_ROLE_READER) {readercount++;stepcount_reader++;}
396 if (sion_tmpintfield1[task]==SION_ROLE_COLLECTOR_READER) {readercount++;stepcount_collreader++;}
397 if (sion_tmpintfield1[task]==SION_ROLE_NOREADER) stepcount_noreader++;
402 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"BUDDY STEP assign: group assigned to step %d (tasks with no data: %d)\n",numsteps-1,sion_gendata->gsize-readercount));
403 if(sion_gendata->gsize-readercount==0) readyflag=1;
406 stepcount_noreader_rank = stepcount_coll;
407 stepcount_reader_rank = stepcount_noreader_rank+stepcount_noreader;
408 for(task=0; (task<sion_gendata->gsize);task++) {
411 if(sion_tmpintfield1[task]==SION_ROLE_NONE) newrank=-1;
412 if(sion_tmpintfield1[task]==SION_ROLE_COLLECTOR) newrank=0;
413 if(sion_tmpintfield1[task]==SION_ROLE_COLLECTOR_READER) newrank=stepcount_reader_rank++;
414 if(sion_tmpintfield1[task]==SION_ROLE_NOREADER) newrank=stepcount_noreader_rank++;
415 if(sion_tmpintfield1[task]==SION_ROLE_READER) newrank=stepcount_reader_rank++;
416 sion_tmpintfield1[task]=newrank;
423 sion_gendata->apidesc->scatterr_cb(sion_tmpintfield1, &steprank, sion_gendata->comm_data_global, _SION_INT32, 1, 0);
427 if (sion_gendata->grank == 0) {
430 helpint7[1]=readyflag;
431 helpint7[2]=stepcount_coll+stepcount_collreader+stepcount_reader+stepcount_noreader;
432 helpint7[3]=stepcount_collreader+stepcount_reader;
433 helpint7[4]=stepcount_coll+stepcount_noreader;
434 helpint7[5]=stepcount_coll+stepcount_noreader+stepcount_collreader+stepcount_reader-1;
435 helpint7[6]=groupcounter++;
436 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"define group #%d (step=%d, readyflag=%d, size=%d, collsize=%d, from=%d, to=%d) \n",
437 helpint7[6], helpint7[0], helpint7[1], helpint7[2], helpint7[3], helpint7[4], helpint7[5]));
439 sion_gendata->apidesc->bcastr_cb(helpint7, sion_gendata->comm_data_global, _SION_INT32, 7, 0);
440 readyflag=helpint7[1];
443 if(myrole!=SION_ROLE_NONE) {
445 if (buddy_info == NULL) {
447 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"_sion_paropen_generic_buddy: cannot allocate temporary memory of size %lu (buddy_info), aborting ...\n",
450 buddies->groups[buddies->numgroups]=buddy_info; buddies->numgroups++;
451 buddy_info->groupid=helpint7[6];
452 buddy_info->stepnum=helpint7[0];
453 buddy_info->rank=steprank;
454 buddy_info->size=helpint7[2];
455 buddy_info->commgroup=NULL;
456 buddy_info->collsize=helpint7[3];
457 buddy_info->from_index=helpint7[4];
458 buddy_info->to_index=helpint7[5];
459 buddy_info->myrole=myrole;
461 if( (myrole==SION_ROLE_READER) || (myrole==SION_ROLE_COLLECTOR_READER)) {
462 buddy_info->filelrank=lrank;
464 buddy_info->filelrank=-1;
466 buddy_info->filenum=file;
467 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"create group #%d (step=%d, rank=%d, size=%d, collsize=%d, from=%2d, to=%2d) (bnum=%2d file# %2d) with role %s \n",
468 buddy_info->groupid, buddy_info->stepnum, buddy_info->rank, buddy_info->size, buddy_info->collsize,
469 buddy_info->from_index, buddy_info->to_index, buddy_info->bnum, buddy_info->filenum, _sion_buddy_role_to_str(buddy_info->myrole)));
481 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"_sion_paropen_generic_buddy: cannot open file, not all files available"));
485 sion_gendata->apidesc->bcastr_cb(&numsteps, sion_gendata->comm_data_global, _SION_INT32, 1, 0);
486 buddies->numsteps=numsteps;
489 if (sion_gendata->grank == 0) numgroups=groupcounter;
490 sion_gendata->apidesc->bcastr_cb(&numgroups, sion_gendata->comm_data_global, _SION_INT32, 1, 0);
493 for(step=0;step<numsteps;step++) {
496 for(b=0; ( (b<buddies->numgroups) && (group==-1) ); b++ ) {
498 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"check group #%d of %d in stepnum=%d step=%d\n",b, buddies->numgroups, buddies->groups[b]->stepnum,step));
499 if (buddies->groups[b]->stepnum==step) group=b;
504 buddy_info=buddies->groups[group];
505 rc=sion_gendata->apidesc->create_lcg_cb(&buddy_info->commgroup,sion_gendata->comm_data_global,
506 sion_gendata->grank,sion_gendata->gsize,
507 buddy_info->rank,buddy_info->size,
508 buddy_info->groupid,numgroups);
509 DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK,
"COMM_created active [grank=%2d of %2d, group=%2d of %2d, lrank=%2d of %2d]\n",
510 sion_gendata->grank,sion_gendata->gsize, buddy_info->groupid,numgroups, buddy_info->rank,buddy_info->size));
513 void *dummycommgroup=NULL;
514 rc=sion_gendata->apidesc->create_lcg_cb(&dummycommgroup,sion_gendata->comm_data_global,
515 sion_gendata->grank,sion_gendata->gsize,
518 DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK,
"COMM_created dummy [grank=%2d of %2d, group=%2d of %2d, lrank=%2d of %2d]\n",
519 sion_gendata->grank,sion_gendata->gsize,
527 buddy_info=buddies->groups[group];
528 buddies->currentgroup=group;
530 if(buddy_info->bnum==-1) sprintf(buddy_fn,
"%s", fname);
531 else sprintf(buddy_fn,
"%s_BUDDY_%02d", fname,buddy_info->bnum);
532 nfname=(sion_apidesc->get_multi_filename_cb?sion_apidesc->get_multi_filename_cb:
_sion_get_multi_filename)(buddy_fn,buddy_info->filenum);
534 DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK,
"open file for group #%d %s [step=%d grank=%2d of %2d, lrank=%2d of %2d filelrank=%2d]\n",
535 buddy_info->groupid,nfname,buddy_info->stepnum,
536 sion_gendata->grank,sion_gendata->gsize, buddy_info->rank,buddy_info->size,buddy_info->filelrank));
539 if( (buddy_info->myrole == SION_ROLE_READER)
540 || (buddy_info->myrole == SION_ROLE_COLLECTOR_READER)) {
543 &sion_gendata->numfiles, &buddy_info->filenum,
544 chunksize, fsblksize,
545 buddy_info->rank, buddy_info->size, &sion_gendata->grank,
546 _SION_INTERNAL_FLAG_BUDDY_READ, NULL, sion_gendata,
553 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"_sion_paropen_generic_buddy: invalid sion_filedesc %d", sid));
557 sion_filedesc->
buddies=buddies;
560 sion_int64 dummy_chunksize;
561 sion_int32 dummy_fsblksize;
564 &sion_gendata->numfiles, &buddy_info->filenum,
565 &dummy_chunksize, &dummy_fsblksize,
566 buddy_info->rank, buddy_info->size, &sion_gendata->grank,
567 _SION_INTERNAL_FLAG_BUDDY_READ, NULL, sion_gendata,
572 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"BUDDYO afteropen [grank=%2d step=%d] rc=%d\n",sion_gendata->grank,step,rc));
577 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"BUDDYO no-op [grank=%2d]\n",sion_gendata->grank));
581 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"BUDDYO beforegbarrier [grank=%2d]\n",sion_gendata->grank));
582 sion_apidesc->barrier_cb(sion_gendata->comm_data_global);
583 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"BUDDYO aftergbarrier [grank=%2d]\n",sion_gendata->grank));
592 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"_sion_paropen_generic_buddy: unknown file mode"));
597 sion_apidesc->barrier_cb(sion_gendata->comm_data_global);
616 #define DFUNCTION "_sion_parclose_generic_buddy"
617 int _sion_parclose_generic_buddy(
int sid,
627 int b_mapping_size=0,group,step;
628 sion_int32 *b_mapping=NULL;
635 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"_sion_parclose_generic_buddy: invalid sion_filedesc %d", sid));
638 buddies=sion_filedesc->
buddies;
645 buddyptr=&buddies[b];
649 _sion_generic_collect_mapping_buddy(buddyptr, sion_gendata, &b_mapping_size, &b_mapping);
653 for(pass=1;pass<=_SION_BW_SCHED_NUM_PASSES;pass++) {
655 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"BUDDYC pass=%d [grank=%2d] op=%d\n",pass,sion_gendata->grank,
656 _sion_buddy_bwsched(buddyptr->buddy_send.groupnum, sion_gendata->numfiles, pass)));
658 switch (_sion_buddy_bwsched(buddyptr->buddy_send.groupnum, sion_gendata->numfiles, pass)) {
659 case _SION_BW_SCHED_ACTIONA:
660 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"BUDDYc call parallel close COLL [gendata: grank=%2d, group=%d of %d, lrank=%d of %d]\n",
661 sion_gendata->grank, buddyptr->buddy_coll.groupnum,sion_gendata->numfiles,
662 buddyptr->buddy_coll.rank, buddyptr->buddy_coll.size));
664 b_mapping_size, b_mapping, _SION_INTERNAL_FLAG_BUDDY_COLL, sion_gendata, buddyptr );
666 case _SION_BW_SCHED_ACTIONB:
667 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"BUDDYc call parallel close SEND [gendata: grank=%2d, group=%d of %d, lrank=%d of %d]\n",
668 sion_gendata->grank, buddyptr->buddy_send.groupnum,sion_gendata->numfiles,
669 buddyptr->buddy_send.rank, buddyptr->buddy_send.size));
671 0, NULL, _SION_INTERNAL_FLAG_BUDDY_SEND, sion_gendata, buddyptr );
674 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"BUDDYC no-op [grank=%2d]\n",sion_gendata->grank));
681 _SION_SAFE_FREE(b_mapping, NULL);
682 _SION_SAFE_FREE(buddies, NULL);
686 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"call parallel close of %d files, sid=%d\n",
687 sion_gendata->numfiles, sid));
689 sion_gendata, NULL );
690 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"leave parallel close of %d files in #tasks=%d globalrank=%d\n", sion_gendata->numfiles,
691 sion_gendata->lsize, sion_gendata->grank));
696 for(step=0;step<buddies->numsteps;step++) {
699 for(b=0; ( (b<buddies->numgroups) && (group==-1) ); b++ ) {
701 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"check group #%d of %d %d==%d %x\n",b, buddies->numgroups, buddies->groups[b]->stepnum,step, buddies->groups[b]));
702 if (buddies->groups[b]->stepnum==step) group=b;
704 buddy_info=buddies->groups[group];
708 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"call parallel close of %d files, sid=%d\n",
709 sion_gendata->numfiles, sid));
710 buddies->currentgroup=group;
711 rc =
_sion_parclose_generic( buddy_info->sid, buddy_info->rank,buddy_info->size, -1, NULL, _SION_INTERNAL_FLAG_BUDDY_READ,
712 sion_gendata, buddies );
713 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"leave parallel close of %d files in #tasks=%d globalrank=%d\n", sion_gendata->numfiles,
714 sion_gendata->lsize, sion_gendata->grank));
720 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"_sion_parclose_generic_buddy: unknown file mode"));
737 #define DFUNCTION "_sion_coll_fwrite_buddy"
738 int _sion_coll_fwrite_buddy(
const void *data,
744 _sion_filedesc *sion_filedesc,*sion_filedesc_coll,*sion_filedesc_send;
747 sion_int64 spec[2], ownnewposition;
748 int b, pass, collector=0;
750 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"enter parallel write buddy\n"));
754 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"_sion_coll_fwrite_buddy: invalid sion_filedesc %d", sid));
756 sion_apidesc=sion_gendata->apidesc;
757 buddies=sion_filedesc->
buddies;
761 buddyptr=&buddies[b];
765 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"_sion_coll_fwrite_buddy: invalid sion_filedesc %d", buddyptr->buddy_coll.sid));
768 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"_sion_coll_fwrite_buddy: invalid sion_filedesc %d", buddyptr->buddy_send.sid));
773 for(pass=1;pass<=_SION_BW_SCHED_NUM_PASSES;pass++) {
775 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"BUDDYw pass=%d [grank=%2d] op=%d\n",pass,sion_gendata->grank,
776 _sion_buddy_bwsched(buddyptr->buddy_send.groupnum, sion_gendata->numfiles, pass)));
778 switch (_sion_buddy_bwsched(buddyptr->buddy_send.groupnum, sion_gendata->numfiles, pass)) {
779 case _SION_BW_SCHED_ACTIONA:
780 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"BUDDYw call parallel write COLL [gendata: grank=%2d, group=%d of %d, lrank=%d of %d] %d..%d\n",
781 sion_gendata->grank, buddyptr->buddy_coll.groupnum,sion_gendata->numfiles,
782 buddyptr->buddy_coll.rank, buddyptr->buddy_coll.size,buddyptr->buddy_coll.from_index,buddyptr->buddy_coll.to_index));
784 if(sion_filedesc_coll->
rank == collector) {
785 ownnewposition=sion_filedesc_coll->
currentpos;
786 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"BUDDYw call parallel write COLL currentpos=%d\n",(
int) sion_filedesc_coll->
currentpos));
788 rc = sion_apidesc->gather_execute_cb(data,spec,2, sion_filedesc_coll->
fsblksize,
789 buddyptr->buddy_coll.commgroup,collector,buddyptr->buddy_coll.from_index,buddyptr->buddy_coll.to_index,
790 buddyptr->buddy_coll.sid, _sion_generic_collective_process_write);
791 if(sion_filedesc_coll->
rank == collector) {
794 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"BUDDYw call parallel write COLL currentpos=%d\n",(
int) sion_filedesc_coll->
currentpos));
797 case _SION_BW_SCHED_ACTIONB:
798 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"BUDDYw call parallel write SEND [gendata: grank=%2d, group=%d of %d, lrank=%d of %d] %d..%d\n",
799 sion_gendata->grank, buddyptr->buddy_send.groupnum,sion_gendata->numfiles,
800 buddyptr->buddy_send.rank, buddyptr->buddy_send.size,buddyptr->buddy_send.from_index,buddyptr->buddy_send.to_index));
804 _sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,
"could not ensure free space for this block, returning %d ...\n", sid);
810 rc = sion_apidesc->gather_execute_cb(data,spec,2, sion_filedesc_send->
fsblksize,
811 buddyptr->buddy_send.commgroup,collector,buddyptr->buddy_send.from_index,buddyptr->buddy_send.to_index,
812 buddyptr->buddy_send.sid, _sion_generic_collective_process_write);
816 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"BUDDYw no-op [grank=%2d]\n",sion_gendata->grank));
825 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"leave parallel write buddy\n"));
842 #define DFUNCTION "_sion_coll_fread_buddy"
843 int _sion_coll_fread_buddy(
void *data,
size_t size,
size_t nitems,
int sid) {
847 sion_int64 bread=-1, spec[2], ownnewposition, items_read = 0;
848 int rc_own=SION_STD_SUCCESS,rc_cb=SION_STD_SUCCESS;
849 int collector, firstsender, lastsender, step, b, group;
855 return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,
"_sion_coll_fread_buddy: invalid sion_filedesc %d", sid));
857 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"enter usecoll=%d collector=%d collsize=%d (%d tasks, %d files)\n",
860 sion_gendata=sion_filedesc->
dataptr;
861 sion_apidesc=sion_gendata->apidesc;
862 buddies=sion_filedesc->
buddies;
869 return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,
"_sion_coll_fread_buddy: collsize=%d <= 0, returning ...\n",
875 for(step=0;step<buddies->numsteps;step++) {
878 for(b=0; ( (b<buddies->numgroups) && (group==-1) ); b++ ) {
880 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"check group #%d of %d %d==%d %x\n",b, buddies->numgroups, buddies->groups[b]->stepnum,step, buddies->groups[b]));
881 if (buddies->groups[b]->stepnum==step) group=b;
887 buddy_info=buddies->groups[group];
890 return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,
"_sion_coll_fread_buddy: invalid sion_filedesc %d", buddy_info->sid));
895 firstsender=buddy_info->from_index;
896 lastsender=buddy_info->to_index;
900 if( (buddy_info->myrole == SION_ROLE_READER)
901 || (buddy_info->myrole == SION_ROLE_COLLECTOR_READER)) {
906 _sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,
"early eof found for this block, returning %d ...\n", buddy_info->sid);
923 if( (buddy_info->myrole == SION_ROLE_COLLECTOR_READER) ) {
924 rc_own=_sion_generic_collective_process_read(data,spec,buddy_info->sid);
928 if( (buddy_info->myrole == SION_ROLE_COLLECTOR)
929 || (buddy_info->myrole == SION_ROLE_COLLECTOR_READER)) {
935 if(!sion_apidesc->execute_scatter_cb ) {
936 return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,
937 "_sion_coll_fread_buddy: API %s not correctly initalized, collective I/O calls missing, aborting",sion_apidesc->name));
939 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"call execute_gather #%d (step=%d, rank=%d, size=%d, collsize=%d, from=%2d, to=%2d) (bnum=%2d file# %2d) with role %s \n",
940 buddy_info->groupid, buddy_info->stepnum, buddy_info->rank, buddy_info->size, buddy_info->collsize,
941 buddy_info->from_index, buddy_info->to_index, buddy_info->bnum, buddy_info->filenum, _sion_buddy_role_to_str(buddy_info->myrole)));
943 rc_cb=sion_apidesc->execute_scatter_cb(data,spec,2, b_sion_filedesc->
fsblksize,
944 buddy_info->commgroup,collector,firstsender,lastsender,buddy_info->sid,
945 _sion_generic_collective_process_read);
950 if( (buddy_info->myrole == SION_ROLE_COLLECTOR)
951 || (buddy_info->myrole == SION_ROLE_COLLECTOR_READER)) {
957 if(buddy_info->myrole == SION_ROLE_READER) {
964 if( (buddy_info->myrole == SION_ROLE_READER)
965 || (buddy_info->myrole == SION_ROLE_COLLECTOR_READER)) {
966 if( (rc_own == SION_STD_SUCCESS) && (rc_cb == SION_STD_SUCCESS) ) {
971 items_read = size ? bread / size : 0;
977 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"BUDDYO beforegbarrier [grank=%2d]\n",sion_gendata->grank));
978 sion_apidesc->barrier_cb(sion_gendata->comm_data_global);
979 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"BUDDYO aftergbarrier [grank=%2d]\n",sion_gendata->grank));
983 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"leave usecoll=%d collector=%d collsize=%d (%d tasks, %d files) rc=%d\n",
991 #define DFUNCTION "_sion_buddy_map"
999 int tmpsize,t,g, g_map, p, p_map, n, n_map, orig, distance,distance_1step,pass;
1000 int *size_per_group = NULL, *group_map_orig_to_new, *group_map_new_to_orig, mynewgroupnr;
1001 sion_int32 helpint2[2];
1002 sion_int32 *tasktogroup = NULL;
1003 sion_int32 *tmpintfield1 = NULL;
1004 int comm_send_rank, comm_send_size, comm_send_grpnum;
1005 int comm_coll_rank, comm_coll_size, comm_coll_grpnum;
1006 void *dummycommgroup=NULL;
1008 DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK,
"buddy_idx=%d capability=%d [gendata: grank=%d of %d, group=%d of %d, lrank=%d of %d]\n",
1009 buddy_idx, capability,
1010 sion_gendata->grank, sion_gendata->gsize,
1011 sion_gendata->filenumber, sion_gendata->numfiles,
1012 sion_gendata->lrank, sion_gendata->lsize));
1018 if(sion_gendata->grank==0) {
1020 tmpsize=1*sion_gendata->gsize;
1021 tasktogroup = (sion_int32 *) malloc(tmpsize *
sizeof(sion_int32));
1022 if (tasktogroup == NULL) {
1023 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"_sion_buddy_map: cannot allocate temporary memory of size %lu (tasktogroup), aborting ...\n",
1024 (
unsigned long) tmpsize *
sizeof(sion_int32)));
1027 tmpsize=2*sion_gendata->gsize;
1028 tmpintfield1 = (sion_int32 *) malloc(tmpsize *
sizeof(sion_int32));
1029 if (tmpintfield1 == NULL) {
1031 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"_sion_buddy_map: cannot allocate temporary memory of size %lu (tmpintfield1), aborting ...\n",
1032 (
unsigned long) tmpsize *
sizeof(sion_int32)));
1035 size_per_group = (
int *) malloc(sion_gendata->numfiles *
sizeof(
int));
1036 if (size_per_group == NULL) {
1039 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"_sion_buddy_map: cannot allocate temporary memory of size %lu (size_per_group), aborting ...\n",
1040 (
unsigned long) sion_gendata->numfiles *
sizeof(
int)));
1046 group_map_orig_to_new = (
int *) malloc(sion_gendata->numfiles *
sizeof(
int));
1047 if (group_map_orig_to_new == NULL) {
1049 _SION_SAFE_FREE(size_per_group, NULL);
1051 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"_sion_buddy_map: cannot allocate temporary memory of size %lu (group_map_orig_to_new), aborting ...\n",
1052 (
unsigned long) sion_gendata->numfiles *
sizeof(
int)));
1055 group_map_new_to_orig = (
int *) malloc(sion_gendata->numfiles *
sizeof(
int));
1056 if (group_map_new_to_orig == NULL) {
1057 _SION_SAFE_FREE(size_per_group, NULL);
1058 free(group_map_orig_to_new);
1061 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"_sion_buddy_map: cannot allocate temporary memory of size %lu (group_map_new_to_orig), aborting ...\n",
1062 (
unsigned long) sion_gendata->numfiles *
sizeof(
int)));
1066 helpint2[0]=sion_gendata->filenumber;
1067 sion_gendata->apidesc->gatherr_cb(helpint2, tasktogroup, sion_gendata->comm_data_global, _SION_INT32,1,0);
1071 helpint2[0]=sion_gendata->lsize;
1072 helpint2[1]=capability;
1073 sion_gendata->apidesc->gatherr_cb(helpint2, tmpintfield1, sion_gendata->comm_data_global, _SION_INT32,2,0);
1074 if(sion_gendata->grank==0) {
1075 for(t=0;t<sion_gendata->gsize;t++) {
1076 DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK,
"task-map --> t=%2d grpnr=%2d lsize=%2d capability=%d\n",
1077 t,tasktogroup[t],tmpintfield1[2*t+0],tmpintfield1[2*t+1]));
1083 if(sion_gendata->grank==0) {
1084 for(t=0;t<sion_gendata->gsize;t++) {
1085 size_per_group[tasktogroup[t]] = tmpintfield1[2*t+0];
1089 for(g=0;g<sion_gendata->numfiles;g++) {
1090 orig=(g*distance) % sion_gendata->numfiles;
1091 group_map_orig_to_new[orig]=g;
1092 group_map_new_to_orig[g]=orig;
1095 for(g=0;g<sion_gendata->numfiles;g++) {
1096 DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK,
"grp-map --> g=%2d->%2d\n",g,group_map_orig_to_new[g]));
1101 if(sion_gendata->grank==0) {
1102 for(t=0;t<sion_gendata->gsize;t++) {
1104 g_map=group_map_orig_to_new[g];
1105 p_map=(g_map-distance_1step + sion_gendata->numfiles) % sion_gendata->numfiles;
1106 p=group_map_new_to_orig[p_map];
1107 tmpintfield1[2*t+0] = g_map;
1108 tmpintfield1[2*t+1] = size_per_group[p];
1109 DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK,
"comm-send --> t=%2d grpnr=%2d grpmap=%2d prevgr=%2d prevgrmap=%2d prev_lsize=%2d\n",
1110 t, g, tmpintfield1[2*t+0], p, p_map, tmpintfield1[2*t+1]));
1116 sion_gendata->apidesc->scatterr_cb(tmpintfield1, helpint2, sion_gendata->comm_data_global, _SION_INT32,2,0);
1119 mynewgroupnr=comm_send_grpnum=helpint2[0];
1120 buddy_send->rank=comm_send_rank=helpint2[1]+sion_gendata->lrank;
1121 buddy_send->size=comm_send_size=helpint2[1]+sion_gendata->lsize;
1122 buddy_send->collsize=sion_gendata->lsize;
1123 buddy_send->groupnum=mynewgroupnr;
1124 buddy_send->filenum=group_map_new_to_orig[mynewgroupnr];
1125 buddy_send->from_index=helpint2[1];
1126 buddy_send->to_index=helpint2[1]+sion_gendata->lsize-1;
1127 DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK,
"COMM_SEND d=%d grpnum=%2d lrank=%2d lsize=%2d mynewgroupnr=%2d collsize=%2d %d..%d fnum=%d\n",
1128 distance,comm_send_grpnum,comm_send_rank,comm_send_size,mynewgroupnr,
1129 buddy_send->collsize,buddy_send->from_index,buddy_send->to_index,
1130 buddy_send->filenum));
1133 if(sion_gendata->grank==0) {
1134 for(t=0;t<sion_gendata->gsize;t++) {
1136 g_map=group_map_orig_to_new[g];
1137 n_map=(g_map+distance_1step) % sion_gendata->numfiles;
1138 n=group_map_new_to_orig[n_map];
1139 tmpintfield1[2*t+0] = n_map;
1140 tmpintfield1[2*t+1] = size_per_group[n];
1141 DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK,
"comm-coll --> t=%2d grpnr=%2d grpmap=%2d nextgr=%2d nextgrmap=%2d next_lsize=%2d\n",
1142 t, g, g_map, n, tmpintfield1[2*t+0], tmpintfield1[2*t+1]));
1148 sion_gendata->apidesc->scatterr_cb(tmpintfield1, helpint2, sion_gendata->comm_data_global, _SION_INT32,2,0);
1151 comm_coll_grpnum=helpint2[0];
1152 buddy_coll->rank=comm_coll_rank=sion_gendata->lrank;
1153 buddy_coll->size=comm_coll_size=sion_gendata->lsize+helpint2[1];
1154 buddy_coll->collsize=helpint2[1];
1155 buddy_coll->groupnum=comm_coll_grpnum;
1156 buddy_coll->filenum=group_map_new_to_orig[comm_coll_grpnum];
1157 buddy_coll->from_index=sion_gendata->lsize;
1158 buddy_coll->to_index=sion_gendata->lsize+helpint2[1]-1;
1159 DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK,
"COMM_COLL d=%d grpnum=%2d lrank=%2d lsize=%2d collsize=%2d %d..%d fnum=%d\n",
1160 distance,comm_coll_grpnum,comm_coll_rank,comm_coll_size,
1161 buddy_coll->collsize,buddy_coll->from_index,buddy_coll->to_index,
1162 buddy_coll->filenum));
1166 for(pass=1;pass<=_SION_BW_SCHED_NUM_PASSES;pass++) {
1167 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"pass=%d [grank=%2d] op=%d\n",pass,sion_gendata->grank,
1168 _sion_buddy_bwsched(mynewgroupnr, sion_gendata->numfiles, pass)));
1170 switch (_sion_buddy_bwsched(mynewgroupnr, sion_gendata->numfiles, pass)) {
1171 case _SION_BW_SCHED_ACTIONA:
1172 rc=sion_gendata->apidesc->create_lcg_cb(&buddy_coll->commgroup,sion_gendata->comm_data_global,
1173 sion_gendata->grank,sion_gendata->gsize,
1174 comm_coll_rank,comm_coll_size,
1175 comm_coll_grpnum,sion_gendata->numfiles);
1176 DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK,
"COMM_created comm_coll (d=%1d, p=%1d) [gendata: mygrp=%2d grank=%2d of %2d, group=%2d of %2d, lrank=%2d of %2d]\n",
1177 distance,pass,mynewgroupnr,sion_gendata->grank,sion_gendata->gsize,
1178 comm_coll_grpnum,sion_gendata->numfiles,
1179 comm_coll_rank,comm_coll_size ));
1181 case _SION_BW_SCHED_ACTIONB:
1182 rc=sion_gendata->apidesc->create_lcg_cb(&buddy_send->commgroup,sion_gendata->comm_data_global,
1183 sion_gendata->grank,sion_gendata->gsize,
1184 comm_send_rank,comm_send_size,
1185 comm_send_grpnum,sion_gendata->numfiles);
1186 DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK,
"COMM_created comm_send (d=%1d, p=%1d) [gendata: mygrp=%2d grank=%2d of %2d, group=%2d of %2d, lrank=%2d of %2d]\n",
1187 distance,pass,mynewgroupnr,sion_gendata->grank,sion_gendata->gsize,
1188 comm_send_grpnum,sion_gendata->numfiles,
1189 comm_send_rank,comm_send_size ));
1191 case _SION_BW_SCHED_NOACTION:
1192 rc=sion_gendata->apidesc->create_lcg_cb(&dummycommgroup,sion_gendata->comm_data_global,
1193 sion_gendata->grank,sion_gendata->gsize,
1195 -1,sion_gendata->numfiles);
1196 DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK,
"COMM_created dummy (d=%1d, p=%1d) [gendata: mygrp=%2d grank=%2d of %2d, group=%2d of %2d, lrank=%2d of %2d]\n",
1197 distance,pass,mynewgroupnr,sion_gendata->grank,sion_gendata->gsize,
1198 -1,sion_gendata->numfiles,0,1 ));
1206 if(sion_gendata->grank==0) {
1209 free(size_per_group);
1211 free(group_map_new_to_orig);
1212 free(group_map_orig_to_new);
1219 #define DFUNCTION "_sion_buddy_bwsched"
1220 int _sion_buddy_bwsched(
int groupnr,
int numgroups,
int pass) {
1221 int res=_SION_BW_SCHED_NOACTION;
1222 if(numgroups%2==0) {
1224 if(pass==1) res=_SION_BW_SCHED_ACTIONA;
1225 if(pass==2) res=_SION_BW_SCHED_ACTIONB;
1227 if(pass==1) res=_SION_BW_SCHED_ACTIONB;
1228 if(pass==2) res=_SION_BW_SCHED_ACTIONA;
1232 if((pass==1) && (groupnr != (numgroups-1))) res=_SION_BW_SCHED_ACTIONA;
1233 if((pass==2) && (groupnr != 0)) res=_SION_BW_SCHED_ACTIONB;
1234 if((pass==3) && (groupnr == 0)) res=_SION_BW_SCHED_ACTIONB;
1235 if((pass==3) && (groupnr == (numgroups-1))) res=_SION_BW_SCHED_ACTIONA;
1237 if(pass==1) res=_SION_BW_SCHED_ACTIONB;
1238 if(pass==2) res=_SION_BW_SCHED_ACTIONA;
1249 #define DFUNCTION "_sion_generic_collect_mapping"
1253 sion_int32 **mapping) {
1254 int rc=SION_SUCCESS;
1257 sion_int32 lpos[2], *receivemap=NULL, iamreceiver, receiver = -1;
1259 sion_apidesc=sion_gendata->apidesc;
1261 *mapping = NULL; *mapping_size = 0;
1265 if((buddyptr->buddy_coll.groupnum==0) && (buddyptr->buddy_coll.rank==0)) {
1267 *mapping_size=sion_gendata->gsize;
1268 *mapping = (sion_int32 *) malloc(*mapping_size * 2 *
sizeof(sion_int32));
1269 if (*mapping == NULL) {
1270 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"_sion_generic_collect_mapping_buddy: Cannot allocate memory for mapping"));
1275 if(sion_gendata->grank==0) {
1276 receivemap = (sion_int32 *) malloc(sion_gendata->gsize *
sizeof(sion_int32));
1277 if (receivemap == NULL) {
1278 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"_sion_generic_collect_mapping_buddy: Cannot allocate memory for receivemap"));
1282 if((buddyptr->buddy_coll.filenum==0) && (buddyptr->buddy_coll.rank==0)) iamreceiver=sion_gendata->grank;
1283 else iamreceiver=-1;
1284 sion_apidesc->gatherr_cb(&iamreceiver, receivemap, sion_gendata->comm_data_global, _SION_INT32, 1, 0);
1285 if(sion_gendata->grank==0) {
1286 for(t=0;t<sion_gendata->gsize;t++) {
1287 if(receivemap[t]>=0) {
1288 receiver=receivemap[t];
1292 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"receiver of mapping grank=%d\n", receiver));
1294 sion_apidesc->bcastr_cb(&receiver, sion_gendata->comm_data_global, _SION_INT32, 1, 0);
1297 lpos[0] = buddyptr->buddy_send.filenum;
1298 lpos[1] = buddyptr->buddy_send.rank;
1299 sion_apidesc->gatherr_cb(&lpos, *mapping, sion_gendata->comm_data_global, _SION_INT32, 2, receiver);
1301 if(receivemap!=NULL) free(receivemap);
1308 #define DFUNCTION "_sion_generic_buddy_get_and_distribute_info_from_file"
1309 int _sion_generic_buddy_get_and_distribute_info_from_file(
_sion_generic_gendata *sion_gendata,
char *fname,
int root,
1310 sion_int32 *filenumber, sion_int32 *numfiles,
1311 sion_int32 *lrank, sion_int32 *lsize) {
1313 int sid = -1, ntasks, nfiles, t;
1314 int rc = SION_SUCCESS;
1316 sion_int32 fsblksize;
1317 int *tasksinfile = NULL;
1318 int mapping_size = -1;
1319 sion_int32 *mapping = NULL;
1323 sion_apidesc=sion_gendata->apidesc;
1325 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"entering function sion_gendata->grank=%d\n",sion_gendata->grank));
1327 if(sion_gendata->grank == root) {
1329 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"before open\n"));
1330 sid=
_sion_open_read(fname,_SION_FMODE_READ|_SION_FMODE_ANSI,_SION_READ_MASTER_ONLY_OF_MULTI_FILES,
1331 &ntasks,&nfiles,NULL,&fsblksize,NULL,&fileptr);
1333 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"after open\n"));
1335 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"sion file %d files\n", *numfiles));
1343 sion_apidesc->bcastr_cb(numfiles, sion_gendata->comm_data_global, _SION_INT32, 1, root);
1344 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"%s: numfiles=%d\n",fname,*numfiles));
1346 if((sion_gendata->grank == root) && (*numfiles>1)) {
1347 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"%s: mapping_size=%d sion_gendata->gsize=%d\n",fname,*numfiles,mapping_size,sion_gendata->gsize));
1348 if(mapping_size!=sion_gendata->gsize) {
1349 return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,
1350 "_sion_generic_buddy_get_and_distribute_info_from_file: Incorrect sum of ntasks of files %d <> %d\n",
1351 mapping_size, sion_gendata->gsize));
1356 return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,
1357 "_sion_generic_buddy_get_and_distribute_info_from_file: could not get numfiles from sion file\n"));
1361 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"before scatter\n"));
1362 if(sion_gendata->grank==root) {
1363 for(t=0;t<mapping_size;t++) {
1364 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
" %d -> (%d,%d)\n",t,mapping[t*2],mapping[t*2+1]));
1369 sion_apidesc->scatterr_cb(mapping, lpos, sion_gendata->comm_data_global, _SION_INT32, 2, root);
1370 *filenumber=lpos[0];
1372 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"after scatter filenum+lrank (%d,%d)\n",*filenumber,*lrank));
1375 if(sion_gendata->grank==root) {
1376 tasksinfile = (
int *) malloc(*numfiles *
sizeof(
int));
1377 if (tasksinfile == NULL) {
1378 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"_sion_generic_get_and_distribute_info_from_file: Cannot allocate memory for tasksinfile counter vector"));
1380 for(t=0;t<*numfiles;t++) tasksinfile[t]=0;
1381 for(t=0;t<mapping_size;t++) tasksinfile[ mapping[t*2] ]++;
1382 for(t=0;t<mapping_size;t++) mapping[t*2+1]=tasksinfile[ mapping[t*2] ];
1384 sion_apidesc->scatterr_cb(mapping, lpos, sion_gendata->comm_data_global, _SION_INT32, 2, root);
1386 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"after scatter lsize (%d, %d of %d)\n",*filenumber, *lrank, *lsize));
1388 if(sion_gendata->grank==root) {
1389 if(tasksinfile) free(tasksinfile);
1395 *lrank = sion_gendata->grank;
1396 *lsize = sion_gendata->gsize;
1397 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"only one file -> filenumber=%d lRank=%d\n",*filenumber,*lrank));
1400 if(sion_gendata->grank == root) {
1402 if (sid>=0) _sion_close_sid(sid);
1410 #define DFUNCTION "_sion_generic_buddy_get_and_distribute_info_from_one_file"
1411 int _sion_generic_buddy_get_and_distribute_info_from_one_file(
_sion_generic_gendata *sion_gendata,
char *fname,
int root,
1412 sion_int32 *filenumber, sion_int32 *numfiles,
1413 sion_int32 *lrank, sion_int32 *lsize) {
1415 int rc = SION_SUCCESS;
1416 int t, mapping_size = 0, grank, *mapping = NULL;
1417 sion_int32 file_filenumber,file_numfiles,file_lrank,file_lsize;
1422 sion_apidesc=sion_gendata->apidesc;
1424 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"entering function sion_gendata->grank=%d\n",sion_gendata->grank));
1426 if(sion_gendata->grank == root) {
1428 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"before open\n"));
1431 if (sion_filedesc == NULL) {
1432 return(_sion_errorprint(SION_ID_UNDEF,SION_ID_UNDEF,
1433 "sion_open: cannot allocate filedescriptor structure of size %lu (sion_filedesc), aborting ...\n",
1434 (
unsigned long)
sizeof(sion_filedesc)));
1440 if (!sion_fileptr) {
1441 return(_sion_errorprint(SION_ID_UNDEF,_SION_ERROR_RETURN,
"sion_open: cannot open %s for reading, aborting ...\n", fname));
1443 sion_filedesc->
fileptr = sion_fileptr;
1447 if (rc!=SION_SUCCESS) {
1448 return(_sion_errorprint(SION_ID_UNDEF,_SION_ERROR_RETURN,
"sion_open: cannot read header from file %s, aborting ...\n", fname));
1450 sion_filedesc->
rank = 0;
1459 if (rc!=SION_SUCCESS) {
1460 return(_sion_errorprint(SION_ID_UNDEF,_SION_ERROR_RETURN,
"sion_open: cannot read header (var part) from file %s, aborting ...\n", fname));
1463 file_numfiles=sion_filedesc->
nfiles;
1465 file_lsize=sion_filedesc->
ntasks;
1468 mapping_size=sion_gendata->gsize;
1469 mapping = (sion_int32 *) malloc(mapping_size * 2 *
sizeof(sion_int32));
1470 if (mapping == NULL) {
1471 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"_sion_generic_buddy_get_and_distribute_info_from_one_file: cannot allocate temporary memory of size %lu (mapping), aborting ...\n",
1472 (
unsigned long) mapping_size * 2 *
sizeof(sion_int32)));
1479 sion_apidesc->bcastr_cb(&file_numfiles, sion_gendata->comm_data_global, _SION_INT32, 1, root);
1480 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"%s: numfiles=%d\n",fname,file_numfiles));
1481 sion_apidesc->bcastr_cb(&file_filenumber, sion_gendata->comm_data_global, _SION_INT32, 1, root);
1482 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"%s: filenumber=%d\n",fname,file_filenumber));
1483 sion_apidesc->bcastr_cb(&file_lsize, sion_gendata->comm_data_global, _SION_INT32, 1, root);
1484 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"%s: file_lsize=%d\n",fname,file_lsize));
1487 if(file_numfiles!=*numfiles) {
1488 _SION_SAFE_FREE(mapping, NULL);
1489 return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,
1490 "_sion_generic_buddy_get_and_distribute_info_from_one_file: Incorrect number of files %d <> %d\n",
1491 file_numfiles,*numfiles));
1495 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"before scatter\n"));
1496 if(sion_gendata->grank==root) {
1497 for(t=0;t<mapping_size;t++) mapping[t]=-1;
1498 for(t=0;t<file_lsize;t++) {
1502 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
" file=%s mapping[%d] -> (%d)\n",fname,grank,mapping[grank]));
1508 sion_apidesc->scatterr_cb(mapping, &file_lrank, sion_gendata->comm_data_global, _SION_INT32, 1, root);
1509 if(file_lrank!=-1) {
1510 *filenumber=file_filenumber;
1513 DPRINTFP((1, DFUNCTION, sion_gendata->grank,
"after scatter filenum=%d lrank=%d lsize=%d\n",*filenumber,*lrank,*lsize));
1516 if(sion_gendata->grank == root) {
int sion_get_mapping(int sid, int *mapping_size, sion_int32 **mapping, int *numfiles)
Returns pointers to the internal field mapping.
int sion_ensure_free_space(int sid, sion_int64 bytes)
Funtion to ensure that enough space is available for writing.
int sion_feof(int sid)
Function that indicates whether the end of file is reached for this task.
void * _sion_vcdtovcon(int sid)
int _sion_vcdtype(int sid)
int _sion_newvcd(void *data, int type)
#define SION_FILEDESCRIPTOR
int _sion_file_stat_file2(const char *fname, unsigned int apiflag)
Check if file exists with appropriate low-level API.
int _sion_file_flush(_sion_fileptr *sion_fileptr)
Flush data to file.
int _sion_file_close(_sion_fileptr *sion_fileptr)
Close file and destroys fileptr structure.
sion_int64 _sion_file_set_position(_sion_fileptr *sion_fileptr, sion_int64 startpointer)
Set new position in file.
_sion_fileptr * _sion_file_open(const char *fname, unsigned int flags, unsigned int addflags)
Create and open a new file for writing.
#define SION_FILE_FLAG_READ
#define SION_FILE_FLAG_ANSI
#define SION_FILE_FLAG_POSIX
int _sion_init_filedesc(_sion_filedesc *sion_filedesc)
Initialize the sion file description.
int _sion_alloc_filedesc_arrays(_sion_filedesc *sion_filedesc)
Allocate memory for the internal sion arrays.
int _sion_free_filedesc_arrays(_sion_filedesc *sion_filedesc)
free memory for the internal sion arrays
_sion_filedesc * _sion_alloc_filedesc(void)
Allocates memory for internal sion structure.
#define SION_FILEMODE_WRITE
#define SION_FILEMODE_READ
#define SION_FILESTATE_SEROPEN
int _sion_parclose_generic(int sid, int rank, int ntasks, int mapping_size, sion_int32 *mapping, int flag, _sion_generic_gendata *sion_gendata, _sion_generic_buddy *buddy_data)
Internal function to close parallel opened SION file.
int _sion_paropen_generic_one_file(int sid, char *fname, _sion_flags_store *flags_store, char *prefix, int *numFiles, int *filenumber, sion_int64 *chunksize, sion_int32 *fsblksize, int rank, int ntasks, int *globalrank, int flag, FILE **fileptr, _sion_generic_gendata *sion_gendata, _sion_generic_buddy *buddy_data)
Generic parallel open of one direct access file.
char * _sion_get_multi_filename(const char *fname, int filenumber)
generates the multi filename
int _sion_open_read(const char *fname, sion_int64 file_mode_flags, int read_all, int *ntasks, int *nfiles, sion_int64 **chunksizes, sion_int32 *fsblksize, int **globalranks, FILE **fileptr)
internal sion serial open function for reading on one or more files
Sion File Descriptor Structure.
sion_int32 fileptr_exported
sion_int64 * all_globalranks