19 #define _XOPEN_SOURCE 700
22 #if defined(_SION_MSA_HOSTNAME_REGEX)
32 #include "sion_error_handler.h"
45 #include "sion_hints.h"
48 #include "sion_generic_buddy.h"
80 sion_int64 *chunksize,
81 sion_int32 *fsblksize,
98 sion_int64 lchunksize, lstartpointer, lglobalrank, new_fsblocksize, helpint64, apiflag;
99 sion_int64 *sion_tmpintfield = NULL;
100 sion_int32 *sion_tmpintfield_map = NULL, helpint32;
101 sion_int32 *sion_tmpintfield_buddy32 = NULL;
102 sion_int64 *sion_tmpintfield_buddy64 = NULL;
103 void *comm_group=NULL;
112 DPRINTFP((2,
"_sion_paropen_generic_one_file", rank,
"enter parallel open of file %s in mode %d #tasks=%d\n", fname, (
int) flags_store->mask, ntasks));
113 DPRINTFP((32,
"_sion_paropen_generic_one_file", rank,
"sizeof: int=%d long=%d longlong=%d sion_int32=%d sion_int64=%d\n",
sizeof(
int),
sizeof(
long),
114 sizeof(
long long),
sizeof(sion_int32),
sizeof(sion_int64)));
118 filenum = *filenumber;
121 if(flag& _SION_INTERNAL_FLAG_NORMAL ) comm_group=sion_gendata->comm_data_local;
122 if(flag& _SION_INTERNAL_FLAG_BUDDY_NORMAL ) comm_group=sion_gendata->comm_data_local;
123 if(flag& _SION_INTERNAL_FLAG_BUDDY_SEND ) comm_group=buddy_data->buddy_send.commgroup;
124 if(flag& _SION_INTERNAL_FLAG_BUDDY_COLL ) comm_group=buddy_data->buddy_coll.commgroup;
125 if(flag& _SION_INTERNAL_FLAG_BUDDY_READ ) comm_group=buddy_data->groups[buddy_data->currentgroup]->commgroup;
129 if (flag&_SION_INTERNAL_FLAG_BUDDY_SEND) do_open_file=0;
130 if ( (flag&_SION_INTERNAL_FLAG_BUDDY_COLL) && (rank>0) ) do_open_file=0;
131 if ( (flag&_SION_INTERNAL_FLAG_BUDDY_READ) && (rank>0) ) do_open_file=0;
133 DPRINTFP((2,
"_sion_paropen_generic_one_file", rank,
"do_open_file=%d\n", do_open_file));
136 if (sion_filedesc == NULL) {
137 _sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_ABORT,
"_sion_paropen_omp: cannot allocate filedescriptor structure of size %lu (sion_filedesc), aborting ...\n",
138 (
unsigned long)
sizeof(sion_filedesc));
141 sion_filedesc->
fname = strdup(fname);
144 sion_filedesc->
sid=sid;
151 if (flags_store->mask&_SION_FMODE_WRITE) {
154 DPRINTFP((32,
"_sion_paropen_generic_one_file", rank,
" starting open for write #tasks=%d\n", ntasks));
158 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"sion_paropen: wrong number of tasks specific: ntasks=%d (<0), returning ...\n", (
int) ntasks));
162 if ((chunksize != NULL) && (*chunksize<0)) {
163 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"sion_paropen: ((chunksize != NULL) && (*chunksize<0)), returning ...\n"));
167 if ((flag & _SION_INTERNAL_FLAG_NORMAL ) && (globalrank != NULL) && (*globalrank<0)) {
168 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"sion_paropen: ((globalrank != NULL) && (*globalrank<0)), returning ...\n"));
176 sion_filedesc->
rank = rank;
178 sion_filedesc->
ntasks = ntasks;
179 sion_filedesc->
nfiles = nfiles;
181 sion_filedesc->
prefix = strdup(prefix);
182 sion_filedesc->
compress = flags_store->mask&_SION_FMODE_COMPRESS;
183 sion_filedesc->
usecoll = (flags_store->mask&_SION_FMODE_COLLECTIVE)>0;
184 sion_filedesc->
collmergemode = (flags_store->mask&_SION_FMODE_COLLECTIVE_MERGE)>0;
185 sion_filedesc->
collmsa = !!_sion_flags_get(flags_store,
"collmsa");
186 sion_filedesc->
usebuddy = (flags_store->mask&_SION_FMODE_BUDDY)>0;
188 sion_filedesc->
buddylevel = atoi(_sion_flags_get(flags_store,
"buddy")->val);
196 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"sion_paropen_generic: cannot open %s for writing, aborting ...\n", fname));
201 if((new_fsblocksize<0) || (new_fsblocksize>SION_MAX_FSBLOCKSIZE)) new_fsblocksize=SION_DEFAULT_FSBLOCKSIZE;
205 sion_gendata->apidesc->barrier_cb(comm_group);
210 sion_gendata->apidesc->bcastr_cb(&new_fsblocksize, comm_group, _SION_INT64, 1, 0);
211 *fsblksize=new_fsblocksize;
214 DPRINTFP((32,
"_sion_paropen_generic_one_file", rank,
"setting fsblksize to %lld\n", new_fsblocksize));
223 _sion_keyval_check_env(sion_filedesc, flags_store->mask);
225 sion_gendata->apidesc->bcastr_cb(&sion_filedesc->
keyvalmode, comm_group, _SION_INT32, 1, 0);
229 if ((flags_entry = _sion_flags_get(flags_store,
"collsize"))) {
230 sion_filedesc->
collsize = atoi(flags_entry->val);
232 _sion_coll_check_env(sion_filedesc);
236 ( flag&_SION_INTERNAL_FLAG_BUDDY_NORMAL )
237 || ( flag&_SION_INTERNAL_FLAG_BUDDY_SEND )
238 || ( flag&_SION_INTERNAL_FLAG_BUDDY_COLL )
248 sion_gendata->apidesc->bcastr_cb(&sion_filedesc->
usecoll, comm_group, _SION_INT32, 1, 0);
249 sion_gendata->apidesc->bcastr_cb(&sion_filedesc->
collsize, comm_group, _SION_INT32, 1, 0);
250 sion_gendata->apidesc->bcastr_cb(&sion_filedesc->
collmergemode, comm_group, _SION_INT32, 1, 0);
254 if(sion_gendata->apidesc->level!=SION_GENERIC_API_LEVEL_FULL) {
255 _sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_WARN,
"sion_paropen_generic: requested coalescing I/O but API does not support this mode, falling back to individual mode ...\n");
262 _sion_hints_check_env(sion_filedesc);
264 sion_gendata->apidesc->bcastr_cb(&sion_filedesc->
usehints, comm_group, _SION_INT32, 1, 0);
265 sion_gendata->apidesc->bcastr_cb(&sion_filedesc->
hinttype, comm_group, _SION_INT32, 1, 0);
267 DPRINTFTS(rank,
"before alloc");
273 DPRINTFTS(rank,
"after alloc");
276 lchunksize = (sion_int64) *chunksize;
277 lglobalrank = (sion_int64) *globalrank;
279 DPRINTFP((32,
"_sion_paropen_generic_one_file", rank,
"lchunksize=%lld lglobalrank=%lld\n", lchunksize,lglobalrank));
281 DPRINTFTS2(rank,
"before gather");
282 sion_gendata->apidesc->gatherr_cb(&lchunksize, sion_filedesc->
all_chunksizes, comm_group, _SION_INT64, 1, 0);
283 sion_gendata->apidesc->gatherr_cb(&lglobalrank, sion_filedesc->
all_globalranks, comm_group, _SION_INT64, 1, 0);
287 sion_filedesc->
coll_capability=sion_gendata->apidesc->get_capability_cb(comm_group);
291 DPRINTFTS2(rank,
"after gather");
293 DPRINTFTS(rank,
"before calculate");
294 DPRINTFP((32,
"_sion_paropen_generic_one_file", rank,
"chunksizes[%d - 1]=%ld\n", ntasks,(
long) sion_filedesc->
all_chunksizes[ntasks - 1]));
295 if (!sion_filedesc->
usecoll) _sion_calculate_startpointers(sion_filedesc);
297 if (sion_filedesc->
collmergemode) _sion_calculate_startpointers_collective_merge(sion_filedesc);
298 else if (sion_filedesc->
collmsa) _sion_calculate_startpointers_collective_msa(sion_filedesc);
299 else _sion_calculate_startpointers_collective(sion_filedesc);
301 DPRINTFTS(rank,
"after calculate");
304 DPRINTFTS(rank,
"before open");
311 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"sion_paropen_generic: cannot open %s for writing, aborting ...\n", fname));
314 sion_filedesc->
fileptr = sion_fileptr;
316 sion_gendata->apidesc->barrier_cb(comm_group);
317 DPRINTFTS(rank,
"after open");
324 _sion_apply_hints(sion_filedesc,SION_HINTS_ACCESS_TYPE_METADATABLOCK1);
326 DPRINTFTS(rank,
"before writeh");
328 DPRINTFTS(rank,
"after writeh");
336 DPRINTFTS(rank,
"before setp(0)");
339 DPRINTFTS(rank,
"after setp(0)");
344 DPRINTFTS(rank,
"before scatter");
345 sion_gendata->apidesc->scatterr_cb(sion_filedesc->
all_startpointers, &sion_filedesc->
startpos, comm_group, _SION_INT64, 1, 0);
346 DPRINTFTS(rank,
"after scatter");
349 DPRINTFTS(rank,
"before scatter");
350 sion_gendata->apidesc->scatterr_cb(sion_filedesc->
all_chunksizes, &sion_filedesc->
chunksize, comm_group, _SION_INT64, 1, 0);
351 DPRINTFTS(rank,
"after scatter");
355 sion_gendata->apidesc->scatterr_cb(sion_filedesc->
all_coll_collsize, &sion_filedesc->
collsize, comm_group, _SION_INT32, 1, 0);
362 sion_gendata->apidesc->bcastr_cb(&sion_filedesc->
globalskip, comm_group, _SION_INT64, 1, 0);
364 DPRINTFP((32,
"_sion_paropen_generic_one_file", rank,
" start position is %10lld %10.4f MB chunksize=%10lld %10.4f MB\n",
370 DPRINTFTS(rank,
"before setp");
371 sion_gendata->apidesc->barrier_cb(comm_group);
380 sion_gendata->apidesc->barrier_cb(comm_group);
383 _sion_apply_hints(sion_filedesc,SION_HINTS_ACCESS_TYPE_CHUNK);
385 DPRINTFTS(rank,
"after setp");
386 DPRINTFP((32,
"_sion_paropen_generic_one_file", rank,
" ending open for write #tasks=%d filepos=%lld\n", ntasks,
_sion_file_get_position(sion_filedesc->
fileptr)));
389 else if (flags_store->mask&_SION_FMODE_READ) {
392 DPRINTFP((32,
"_sion_paropen_generic_one_file", rank,
" starting open for read #tasks=%d\n", ntasks));
396 DPRINTFTS(rank,
"before openR");
398 DPRINTFTS(rank,
"after openR");
400 DPRINTFP((32,
"_sion_paropen_generic_one_file", rank,
" cannot open %s for reading, aborting ...\n", fname));
401 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"_sion_paropen_generic_one_file: cannot open %s for reading, aborting ...\n", fname));
406 sion_gendata->apidesc->barrier_cb(comm_group);
409 sion_filedesc->
fileptr = sion_fileptr;
410 sion_filedesc->
rank = rank;
411 sion_filedesc->
ntasks = ntasks;
414 sion_filedesc->
nfiles = nfiles;
415 sion_filedesc->
collmsa = !!_sion_flags_get(flags_store,
"collmsa");
416 sion_filedesc->
usebuddy = (flags_store->mask&_SION_FMODE_BUDDY)>0;
418 sion_filedesc->
buddylevel = atoi(_sion_flags_get(flags_store,
"buddy")->val);
423 if ( flag&_SION_INTERNAL_FLAG_BUDDY_READ ) {
431 DPRINTFP((32,
"_sion_paropen_generic_one_file", rank,
" create buddy mapping ntasks=%d filentasks=%d\n",ntasks,sion_filedesc->
ntasks));
434 sion_tmpintfield_buddy32 = (sion_int32 *) malloc(ntasks *
sizeof(sion_int32));
435 if (sion_tmpintfield_buddy32 == NULL) {
436 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"_sion_paropen_generic_one_file: cannot allocate temporary memory of size %lu (sion_tmpintfield_buddy), aborting ...\n",
437 (
unsigned long) ntasks *
sizeof(sion_int32)));
439 for (j = 0; j < ntasks; j++) sion_tmpintfield_buddy32[j]=-1;
440 sion_tmpintfield_buddy64 = (sion_int64 *) malloc(ntasks *
sizeof(sion_int64));
441 if (sion_tmpintfield_buddy64 == NULL) {
442 free(sion_tmpintfield_buddy32);
443 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"_sion_paropen_generic_one_file: cannot allocate temporary memory of size %lu (sion_tmpintfield_buddy), aborting ...\n",
444 (
unsigned long) ntasks *
sizeof(sion_int64)));
446 for (j = 0; j < ntasks; j++) sion_tmpintfield_buddy64[j]=-1;
447 sion_tmpintfield_map = (sion_int32 *) malloc(ntasks *
sizeof(sion_int32));
448 if (sion_tmpintfield_map == NULL) {
449 free(sion_tmpintfield_buddy32);
450 free(sion_tmpintfield_buddy64);
451 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"_sion_paropen_generic_one_file: cannot allocate temporary memory of size %lu (sion_tmpintfield_map), aborting ...\n",
452 (
unsigned long) ntasks *
sizeof(sion_int32)));
454 for (j = 0; j < ntasks; j++) sion_tmpintfield_map[j]=-1;
457 helpint32=buddy_data->groups[buddy_data->currentgroup]->filelrank;
458 sion_gendata->apidesc->gatherr_cb(&helpint32, sion_tmpintfield_map, comm_group, _SION_INT32, 1, 0);
462 for (j = 0; j < ntasks; j++)
463 DPRINTFP((64,
"_sion_paropen_generic_one_file", rank,
" buddy map[%d]=%d\n", j, (
int) sion_tmpintfield_map[j]));
470 if (rc!=SION_SUCCESS) {
471 free(sion_tmpintfield_buddy32);
472 free(sion_tmpintfield_buddy64);
473 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"_sion_paropen_generic_one_file: cannot read header from file %s, aborting ...\n", fname));
475 DPRINTFP((32,
"_sion_paropen_generic_one_file", rank,
476 " read, after read of fix header part endianness=0x%x blksize=%d ntasks=%d\n", sion_filedesc->
endianness, sion_filedesc->
fsblksize, sion_filedesc->
ntasks));
478 DPRINTFTS(rank,
"before alloc");
481 DPRINTFTS(rank,
"after alloc");
484 if (rc!=SION_SUCCESS) {
485 free(sion_tmpintfield_buddy32);
486 free(sion_tmpintfield_buddy64);
487 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"_sion_paropen_generic_one_file: cannot read header from file %s, aborting ...\n", fname));
490 if ((flags_entry = _sion_flags_get(flags_store,
"collsize"))) {
491 sion_filedesc->
collsize = atoi(flags_entry->val);
493 _sion_coll_check_env(sion_filedesc);
497 if (!sion_filedesc->
usecoll) _sion_calculate_startpointers(sion_filedesc);
499 if (sion_filedesc->
collmergemode) _sion_calculate_startpointers_collective_merge(sion_filedesc);
500 else if (sion_filedesc->
collmsa) _sion_calculate_startpointers_collective_msa(sion_filedesc);
501 else _sion_calculate_startpointers_collective(sion_filedesc);
503 DPRINTFTS(rank,
"after calculate");
506 _sion_keyval_check_env(sion_filedesc, flags_store->mask);
511 sion_gendata->apidesc->bcastr_cb(&sion_filedesc->
keyvalmode, comm_group, _SION_INT32, 1, 0);
514 sion_gendata->apidesc->bcastr_cb(&sion_filedesc->
usecoll, comm_group, _SION_INT32, 1, 0);
515 sion_gendata->apidesc->bcastr_cb(&sion_filedesc->
collsize, comm_group, _SION_INT32, 1, 0);
518 DPRINTFP((32,
"_sion_paropen_generic_one_file", rank,
" usecoll=%d\n", sion_filedesc->
usecoll));
522 if (! (flag&_SION_INTERNAL_FLAG_BUDDY_READ) ) {
523 sion_gendata->apidesc->scatterr_cb(sion_filedesc->
all_coll_collsize, &sion_filedesc->
collsize, comm_group, _SION_INT32, 1, 0);
528 if(rank==0)
for (j = 0; j < ntasks; j++)
if(sion_tmpintfield_map[j]>=0) sion_tmpintfield_buddy32[j]=sion_filedesc->
all_coll_collsize[sion_tmpintfield_map[j]];
529 sion_gendata->apidesc->scatterr_cb(sion_tmpintfield_buddy32, &sion_filedesc->
collsize, comm_group, _SION_INT32, 1, 0);
531 if(rank==0)
for (j = 0; j < ntasks; j++)
if(sion_tmpintfield_map[j]>=0) sion_tmpintfield_buddy32[j]=sion_filedesc->
all_coll_collector[sion_tmpintfield_map[j]];
532 sion_gendata->apidesc->scatterr_cb(sion_tmpintfield_buddy32, &sion_filedesc->
collector, comm_group, _SION_INT32, 1, 0);
540 sion_gendata->apidesc->bcastr_cb(&sion_filedesc->
globalskip, comm_group, _SION_INT64, 1, 0);
543 sion_gendata->apidesc->bcastr_cb(&sion_filedesc->
endianness, comm_group, _SION_INT32, 1, 0);
544 sion_gendata->apidesc->bcastr_cb(&sion_filedesc->
swapbytes, comm_group, _SION_INT32, 1, 0);
545 sion_gendata->apidesc->bcastr_cb(&sion_filedesc->
fsblksize, comm_group, _SION_INT32, 1, 0);
546 sion_gendata->apidesc->bcastr_cb(&sion_filedesc->
ntasks, comm_group, _SION_INT32, 1, 0);
547 sion_gendata->apidesc->bcastr_cb(&sion_filedesc->
fileversion, comm_group, _SION_INT32, 1, 0);
548 sion_gendata->apidesc->bcastr_cb(&sion_filedesc->
nfiles, comm_group, _SION_INT32, 1, 0);
549 sion_gendata->apidesc->bcastr_cb(&sion_filedesc->
filenumber, comm_group, _SION_INT32, 1, 0);
550 sion_gendata->apidesc->bcastr_cb(&sion_filedesc->
flag1, comm_group, _SION_INT32, 1, 0);
551 sion_gendata->apidesc->bcastr_cb(&sion_filedesc->
flag2, comm_group, _SION_INT32, 1, 0);
552 sion_gendata->apidesc->bcastr_cb(&sion_filedesc->
maxusedchunks, comm_group, _SION_INT32, 1, 0);
554 DPRINTFP((32,
"_sion_paropen_generic_one_file", rank,
557 DPRINTFTS(rank,
"after bcast");
560 DPRINTFTS(rank,
"before scatter");
561 if (! (flag&_SION_INTERNAL_FLAG_BUDDY_READ) ) {
562 sion_gendata->apidesc->scatterr_cb(sion_filedesc->
all_chunksizes, &sion_filedesc->
chunksize, comm_group, _SION_INT64, 1, 0);
563 sion_gendata->apidesc->scatterr_cb(sion_filedesc->
all_startpointers, &sion_filedesc->
startpos, comm_group, _SION_INT64, 1, 0);
564 sion_gendata->apidesc->scatterr_cb(sion_filedesc->
all_globalranks, &helpint64, comm_group, _SION_INT64, 1, 0);sion_filedesc->
globalrank=(sion_int32) helpint64;
567 if(rank==0)
for (j = 0; j < ntasks; j++)
if(sion_tmpintfield_map[j]>=0) sion_tmpintfield_buddy64[j]=sion_filedesc->
all_chunksizes[sion_tmpintfield_map[j]];
568 sion_gendata->apidesc->scatterr_cb(sion_tmpintfield_buddy64, &sion_filedesc->
chunksize, comm_group, _SION_INT64, 1, 0);
570 if(rank==0)
for (j = 0; j < ntasks; j++)
if(sion_tmpintfield_map[j]>=0) sion_tmpintfield_buddy64[j]=sion_filedesc->
all_startpointers[sion_tmpintfield_map[j]];
571 sion_gendata->apidesc->scatterr_cb(sion_tmpintfield_buddy64, &sion_filedesc->
startpos, comm_group, _SION_INT64, 1, 0);
573 if(rank==0)
for (j = 0; j < ntasks; j++)
if(sion_tmpintfield_map[j]>=0) sion_tmpintfield_buddy64[j]=sion_filedesc->
all_globalranks[sion_tmpintfield_map[j]];
574 sion_gendata->apidesc->scatterr_cb(sion_tmpintfield_buddy64, &helpint64, comm_group, _SION_INT64, 1, 0);sion_filedesc->
globalrank=(sion_int32) helpint64;
577 DPRINTFTS(rank,
"after scatter");
581 sion_tmpintfield = (sion_int64 *) malloc(sion_filedesc->
ntasks *
sizeof(sion_int64));
582 if (sion_tmpintfield == NULL) {
583 free(sion_tmpintfield_buddy32);
584 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"_sion_paropen_generic_one_file: cannot allocate temporary memory of size %lu (sion_tmpintfield), aborting ...\n",
585 (
unsigned long) ntasks *
sizeof(sion_int64)));
589 for (j = 0; j < sion_filedesc->
ntasks; j++)
590 DPRINTFP((2048,
"_sion_paropen_generic_one_file", rank,
" read, blockcount on task %02d is %10ld\n", j, (
long) sion_tmpintfield[j]));
594 DPRINTFTS(rank,
"before scatter");
595 if (! (flag&_SION_INTERNAL_FLAG_BUDDY_READ) ) {
596 sion_gendata->apidesc->scatterr_cb(sion_tmpintfield, &helpint64, comm_group, _SION_INT64, 1, 0);
599 if(rank==0)
for (j = 0; j < ntasks; j++)
if(sion_tmpintfield_map[j]>=0) sion_tmpintfield_buddy64[j]=sion_tmpintfield[sion_tmpintfield_map[j]];
600 sion_gendata->apidesc->scatterr_cb(sion_tmpintfield_buddy64, &helpint64, comm_group, _SION_INT64, 1, 0);
602 DPRINTFTS(rank,
"after scatter");
604 DPRINTFP((32,
"_sion_paropen_generic_one_file", rank,
" lastchunknr on task %02d is %10ld\n", rank, (
long) sion_filedesc->
lastchunknr));
608 DPRINTFTS(rank,
"before scatter");
609 if (! (flag&_SION_INTERNAL_FLAG_BUDDY_READ) ) {
610 sion_gendata->apidesc->scatterr_cb(sion_tmpintfield, &helpint64, comm_group, _SION_INT64, 1, 0);
613 if(rank==0)
for (j = 0; j < ntasks; j++)
if(sion_tmpintfield_map[j]>=0) sion_tmpintfield_buddy64[j]=sion_tmpintfield[sion_tmpintfield_map[j]];
614 sion_gendata->apidesc->scatterr_cb(sion_tmpintfield_buddy64, &helpint64, comm_group, _SION_INT64, 1, 0);
616 DPRINTFTS(rank,
"after scatter");
627 DPRINTFTS(rank,
"before setp");
628 sion_gendata->apidesc->barrier_cb(comm_group);
642 if(sion_tmpintfield) free(sion_tmpintfield);
643 if(sion_tmpintfield_map) free(sion_tmpintfield_map);
644 if(sion_tmpintfield_buddy32) free(sion_tmpintfield_buddy32);
645 if(sion_tmpintfield_buddy64) free(sion_tmpintfield_buddy64);
647 sion_gendata->apidesc->barrier_cb(comm_group);
648 DPRINTFTS(rank,
"after setp");
651 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"sion_paropen_multi_mpi: unknown file mode"));
657 *fileptr=sion_filedesc->
fileptr->fileptr;
665 if(fileptr!=NULL) *fileptr=NULL;
676 DPRINTFP((32,
"_sion_paropen_generic_one_file", rank,
" start position on task %02d is at end of sion_paropen_generic %10lld\n", rank,
679 DPRINTFP((2,
"_sion_paropen_generic_one_file", rank,
"leave parallel open of file %s in mode 0x%lx #tasks=%d\n", fname, (
long) flags_store->mask, ntasks));
712 int rc = SION_SUCCESS;
714 sion_int64 helpint64;
715 sion_int64 *sion_tmpintfield = NULL;
717 void *comm_group=NULL;
721 return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,
"_sion_parclose_generic: invalid sion_filedesc, aborting %d ...\n", sid));
725 return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,
"_sion_parclose_generic: sion file with sid=%d was not opened by a sion_paropen\n", sid));
728 DPRINTFP((2,
"_sion_parclose_generic", rank,
"enter parallel close sid=%d\n", sid));
731 if(flag& _SION_INTERNAL_FLAG_NORMAL ) comm_group=sion_gendata->comm_data_local;
732 if(flag& _SION_INTERNAL_FLAG_BUDDY_SEND ) comm_group=buddy_data->buddy_send.commgroup;
733 if(flag& _SION_INTERNAL_FLAG_BUDDY_COLL ) comm_group=buddy_data->buddy_coll.commgroup;
734 if(flag& _SION_INTERNAL_FLAG_BUDDY_READ ) comm_group=buddy_data->groups[buddy_data->currentgroup]->commgroup;
738 if (flag&_SION_INTERNAL_FLAG_BUDDY_SEND) do_close_file=0;
739 if ( (flag&_SION_INTERNAL_FLAG_BUDDY_COLL) && (rank>0) ) do_close_file=0;
740 if ( (flag&_SION_INTERNAL_FLAG_BUDDY_READ) && (rank>0) ) do_close_file=0;
747 DPRINTFP((32,
"_sion_parclose_generic", rank,
" parallel close (read mode) sid=%d, call fclose on file\n", sid));
771 DPRINTFP((32,
"_sion_parclose_generic", rank,
" parallel close (write mode) sid=%d, call fclose on file\n", sid));
780 sion_gendata->apidesc->barrier_cb(comm_group);
782 DPRINTFP((32,
"_sion_parclose_generic", rank,
" parallel close sid=%d: lastchunknr=%d globalskip=%lld\n", sid, sion_filedesc->
lastchunknr,
784 for (blknum = 0; blknum <= sion_filedesc->
lastchunknr; blknum++) {
785 DPRINTFP((1024,
"_sion_parclose_generic", rank,
" parallel close sid=%d: local block %02d -> %10lld bytes\n", sid, blknum,
790 sion_tmpintfield = (sion_int64 *) malloc(sion_filedesc->
ntasks *
sizeof(sion_int64));
791 if (sion_tmpintfield == NULL) {
792 return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,
"_sion_parclose_generic: cannot allocate temporary memory of size %lu (sion_tmpintfield), aborting ...\n",
793 (
unsigned long) sion_filedesc->
ntasks *
sizeof(sion_int64)));
798 DPRINTFTS2(rank,
"before gather");
800 sion_gendata->apidesc->gatherr_cb(&helpint64, sion_tmpintfield, comm_group, _SION_INT64, 1, 0);
804 for (blknum = 0; blknum < sion_filedesc->
ntasks; blknum++)
806 sion_filedesc->
maxusedchunks = (int) sion_tmpintfield[blknum];
808 sion_gendata->apidesc->bcastr_cb(&sion_filedesc->
maxusedchunks, comm_group, _SION_INT32, 1, 0);
809 DPRINTFTS2(rank,
"after gather");
823 for (blknum = 0; blknum < sion_filedesc->
maxusedchunks; blknum++) {
824 if (blknum <= sion_filedesc->lastchunknr) {
825 helpint64 = sion_filedesc->
blocksizes[blknum];
831 DPRINTFTS2(rank,
"before gather");
832 sion_gendata->apidesc->gatherr_cb(&helpint64, sion_tmpintfield, comm_group, _SION_INT64, 1, 0);
833 DPRINTFTS2(rank,
"after gather");
836 for (lrank = 0; lrank < ntasks; lrank++)
837 DPRINTFP((2048,
"_sion_parclose_generic", rank,
" parallel close sid=%d: write total chunksize for block %d: %2lld rank=%d\n", sid, blknum,
838 sion_tmpintfield[lrank], lrank));
846 if (mapping != NULL) {
852 DPRINTFP((32,
"_sion_parclose_generic", rank,
" parallel close (write mode) sid=%d, call fclose on file\n", sid));
860 if(sion_tmpintfield) free(sion_tmpintfield);
865 _sion_free_filedesc(sion_filedesc);
866 sion_filedesc = NULL;
869 DPRINTFP((2,
"_sion_parclose_generic", rank,
"leave parallel close sid=%d\n", sid));
880 sion_int64 chunksize,
886 int rc = SION_SUCCESS;
888 sion_int64 lchunksize, lstartpointer, lglobalrank;
889 void *comm_group=NULL;
892 return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,
"_sion_parreinit_generic: invalid sion_filedesc, aborting %d ...\n", sid));
896 return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,
"_sion_parreinit_generic: sion file with sid=%d was not opened by a sion_paropen\n", sid));
899 DPRINTFP((2,
"_sion_parreinit_generic", sion_filedesc->
rank,
"enter parallel reinit sid=%d\n", sid));
901 comm_group=sion_gendata->comm_data_local;
904 return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,
"_sion_parreinit_generic: sion file with sid=%d only allowed for files openend for write\n", sid));
907 DPRINTFTS(sion_filedesc->
rank,
"before alloc");
908 if (sion_filedesc->
rank == 0) {
912 DPRINTFTS(sion_filedesc->
rank,
"after alloc");
915 lchunksize = (sion_int64) chunksize;
916 lglobalrank = (sion_int64) sion_filedesc->
globalrank;
918 DPRINTFTS2(sion_filedesc->
rank,
"before gather");
919 sion_gendata->apidesc->gatherr_cb(&lchunksize, sion_filedesc->
all_chunksizes, comm_group, _SION_INT64, 1, 0);
920 sion_gendata->apidesc->gatherr_cb(&lglobalrank, sion_filedesc->
all_globalranks, comm_group, _SION_INT64, 1, 0);
922 DPRINTFTS2(sion_filedesc->
rank,
"after gather");
926 if (sion_filedesc->
rank == 0) {
927 DPRINTFTS(sion_filedesc->
rank,
"before calculate");
928 if (!sion_filedesc->
usecoll) _sion_calculate_startpointers(sion_filedesc);
929 else _sion_calculate_startpointers_collective(sion_filedesc);
930 DPRINTFTS(sion_filedesc->
rank,
"after calculate");
934 if (sion_filedesc->
rank == 0) {
937 _sion_apply_hints(sion_filedesc,SION_HINTS_ACCESS_TYPE_METADATABLOCK1);
943 DPRINTFTS(sion_filedesc->
rank,
"before writeh");
945 DPRINTFTS(sion_filedesc->
rank,
"after writeh");
954 DPRINTFTS(sion_filedesc->
rank,
"before setp(0)");
957 DPRINTFTS(sion_filedesc->
rank,
"after setp(0)");
962 DPRINTFTS(sion_filedesc->
rank,
"before scatter");
963 sion_gendata->apidesc->scatterr_cb(sion_filedesc->
all_startpointers, &sion_filedesc->
startpos, comm_group, _SION_INT64, 1, 0);
964 DPRINTFTS(sion_filedesc->
rank,
"after scatter");
968 sion_gendata->apidesc->scatterr_cb(sion_filedesc->
all_coll_collsize, &sion_filedesc->
collsize, comm_group, _SION_INT32, 1, 0);
975 sion_gendata->apidesc->bcastr_cb(&sion_filedesc->
globalskip, comm_group, _SION_INT64, 1, 0);
977 DPRINTFP((32,
"_sion_parreinit_generic", sion_filedesc->
rank,
" start position is %10lld %10.4f MB\n",
981 DPRINTFTS(sion_filedesc->
rank,
"before setp");
982 sion_gendata->apidesc->barrier_cb(comm_group);
986 sion_filedesc->
chunksize = (sion_int64) chunksize;
987 sion_gendata->apidesc->barrier_cb(comm_group);
990 _sion_apply_hints(sion_filedesc,SION_HINTS_ACCESS_TYPE_CHUNK);
992 if (sion_filedesc->
rank == 0) {
997 DPRINTFTS(sion_filedesc->
rank,
"after setp");
998 DPRINTFP((32,
"_sion_parreinit_generic", sion_filedesc->
rank,
" ending open for write #tasks=%d filepos=%lld\n",
1001 DPRINTFP((2,
"_sion_parreinit_generic", sion_filedesc->
rank,
"leave parallel reinit of file %s in #tasks=%d\n",
1014 #define DFUNCTION "_sion_generic_collect_mapping"
1015 int _sion_generic_collect_mapping(
_sion_filedesc *sion_filedesc,
1017 sion_int32 **mapping ) {
1018 int rc=SION_SUCCESS;
1022 sion_int32 lpos[2], *receivemap=NULL, iamreceiver, receiver = -1;
1025 sion_gendata=sion_filedesc->
dataptr;
1026 sion_apidesc=sion_gendata->apidesc;
1028 *mapping = NULL; *mapping_size = 0;
1036 *mapping_size=sion_gendata->gsize;
1037 *mapping = (sion_int32 *) malloc(*mapping_size * 2 *
sizeof(sion_int32));
1038 if (*mapping == NULL) {
1039 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"sion_generic_parclose: Cannot allocate memory for mapping"));
1044 if(sion_gendata->grank==0) {
1045 receivemap = (sion_int32 *) malloc(sion_gendata->gsize *
sizeof(sion_int32));
1046 if (receivemap == NULL) {
1047 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"sion_generic_parclose: Cannot allocate memory for receivemap"));
1051 if((sion_filedesc->
filenumber==0) && (sion_filedesc->
rank==0)) iamreceiver=sion_gendata->grank;
1052 else iamreceiver=-1;
1053 sion_apidesc->gatherr_cb(&iamreceiver, receivemap, sion_gendata->comm_data_global, _SION_INT32, 1, 0);
1054 if(sion_gendata->grank==0) {
1055 for(t=0;t<sion_gendata->gsize;t++) {
1056 if(receivemap[t]>=0) {
1057 receiver=receivemap[t];
1061 DPRINTFP((1,
DFUNCTION, sion_gendata->grank,
"receiver of mapping grank=%d\n", receiver));
1063 sion_apidesc->bcastr_cb(&receiver, sion_gendata->comm_data_global, _SION_INT32, 1, 0);
1067 lpos[1] = sion_filedesc->
rank;
1068 sion_apidesc->gatherr_cb(&lpos, *mapping, sion_gendata->comm_data_global, _SION_INT32, 2, receiver);
1071 if(receivemap!=NULL) free(receivemap);
1087 int grank = comm->grank;
1088 int gsize = comm->gsize;
1090 DPRINTFP((2, __func__, grank,
"enter\n"));
1092 if (0 != strcmp(comm->apidesc->name,
"SIONlib_MPI_API")) {
1093 return _sion_errorprint(SION_NOT_SUCCESS, _SION_ERROR_RETURN,
"MSA Collectives: not supported for generic API \"%s\"\n", comm->apidesc->name);
1101 fd.
collsize = atoi(flags_entry->val);
1103 _sion_coll_check_env(&fd);
1106 return _sion_errorprint(SION_NOT_SUCCESS, _SION_ERROR_RETURN,
"MSA Collectives: size of collective groups should be 2 or more, but is %d\n", collsize);
1108 sion_int32 n_groups = gsize / collsize + ((gsize % collsize) ? 1 : 0);
1111 return _sion_errorprint(SION_NOT_SUCCESS, _SION_ERROR_RETURN,
"MSA Collectives: usecoll == false\n");
1114 int is_candidate = _sion_generic_is_candidate(comm);
1115 int n_candidates = 0;
1116 int candidates_before = 0;
1119 int *candidates = NULL;
1120 if (0 == comm->grank) {
1121 candidates = calloc(gsize,
sizeof(
int));
1123 _sion_errorprint(SION_NOT_SUCCESS, _SION_ERROR_ABORT,
"MSA Collectives: malloc returned NULL\n");
1127 comm->apidesc->gatherr_cb(&is_candidate, candidates, comm->comm_data_global, _SION_INT32, 1, 0);
1129 if (0 == comm->grank) {
1130 for (
size_t i = 0; i < gsize; i++) {
1131 int tmp = candidates[i];
1132 candidates[i] = n_candidates;
1133 n_candidates += tmp;
1137 comm->apidesc->bcastr_cb(&n_candidates, comm->comm_data_global, _SION_INT32, 1, 0);
1138 comm->apidesc->scatterr_cb(candidates, &candidates_before, comm->comm_data_global, _SION_INT32, 1, 0);
1140 if (0 == comm->grank) {
1145 if ((n_candidates < n_groups) || (n_candidates < comm->numfiles)) {
1146 return _sion_errorprint(SION_NOT_SUCCESS, _SION_ERROR_RETURN,
"MSA Collectives: insufficient number of candidates %d, number of groups %d, number of files %d\n", n_groups, comm->numfiles);
1150 int groups_per_file = n_groups / comm->numfiles;
1151 int is_collector = is_candidate && (candidates_before < n_groups);
1153 int rank_collector = candidates_before;
1154 comm->filenumber = rank_collector % comm->numfiles;
1155 comm->lrank = (rank_collector / comm->numfiles) * collsize;
1157 int collectors_before = (candidates_before < n_groups) ? candidates_before : n_groups;
1158 int rank_sender = grank - collectors_before;
1159 int group_number = rank_sender / (collsize - 1);
1160 comm->filenumber = group_number % comm->numfiles;
1161 comm->lrank = (group_number / comm->numfiles) * collsize + rank_sender % (collsize - 1) + 1;
1163 comm->lsize = collsize * (groups_per_file + ((comm->filenumber < n_groups % comm->numfiles) ? 1 : 0));
1164 if (comm->filenumber == comm->numfiles - 1) {
1165 comm->lsize += gsize - n_groups * collsize;
1168 DPRINTFP((32, __func__, grank,
"MSA Collectives: global rank %d of %d, is candidate %d, is collector %d, file no %d, local rank %d, local size %d\n", grank, gsize, is_candidate, is_collector, comm->filenumber, comm->lrank, comm->lsize));
1169 DPRINTFP((2, __func__, grank,
"exit\n"));
1170 return SION_SUCCESS;
1174 #if defined(_SION_MSA_DEEP_EST_SDV)
1175 char hostname[1024];
1176 if (0 == gethostname(hostname, 1024)) {
1177 if (0 == strncmp(
"knl", hostname, 3)) {
1185 #elif defined(_SION_MSA_HOSTNAME_REGEX)
1189 if ((regex =
_sion_getenv(
"SION_MSA_COLLECTOR_HOSTNAME_EREGEX"))) {
1190 compile_error = regcomp(&compiled, regex, REG_EXTENDED|REG_ICASE|REG_NOSUB);
1191 }
else if ((regex =
_sion_getenv(
"SION_MSA_COLLECTOR_HOSTNAME_REGEX"))) {
1192 compile_error = regcomp(&compiled, regex, REG_ICASE|REG_NOSUB);
1196 if (compile_error) {
1197 char error_msg[1024];
1198 size_t error_size = regerror(compile_error, &compiled, error_msg, 1024);
1200 _sion_errorprint(SION_NOT_SUCCESS, _SION_ERROR_ABORT,
"MSA Collectives: error compiling regex \"%s\": %s%s\n", regex, error_msg, (error_size > 1024) ?
"..." :
"");
1203 char hostname[1024];
1204 int hostname_error = gethostname(hostname, 1023);
1205 hostname[1023] =
'\0';
1206 if (hostname_error) {
1208 _sion_errorprint(SION_NOT_SUCCESS, _SION_ERROR_ABORT,
"MSA Collectives: error getting host name\n");
1211 int exec_status = regexec(&compiled, hostname, 0, NULL, 0);
1212 if (exec_status == 0) {
1215 }
else if (exec_status == REG_NOMATCH) {
1219 char error_msg[1024];
1220 size_t error_size = regerror(exec_status, &compiled, error_msg, 1024);
1222 _sion_errorprint(SION_NOT_SUCCESS, _SION_ERROR_ABORT,
"MSA Collectives: error matching regex \"%s\": %s%s\n", regex, error_msg, (error_size > 1024) ?
"..." :
"");
1225 #elif defined(_SION_MSA_TEST)
1226 return comm->grank %2;