19 #define _XOPEN_SOURCE 700 28 #include "sion_error_handler.h" 40 #include "sion_hints.h" 43 #include "sion_generic_buddy.h" 75 sion_int64 *chunksize,
76 sion_int32 *fsblksize,
93 sion_int64 lchunksize, lstartpointer, lglobalrank, new_fsblocksize, helpint64, apiflag;
94 sion_int64 *sion_tmpintfield = NULL;
95 sion_int32 *sion_tmpintfield_map = NULL, helpint32;
96 sion_int32 *sion_tmpintfield_buddy32 = NULL;
97 sion_int64 *sion_tmpintfield_buddy64 = NULL;
98 void *comm_group=NULL;
106 DPRINTFP((2,
"_sion_paropen_generic_one_file", rank,
"enter parallel open of file %s in mode %d #tasks=%d\n", fname, (
int) flags_store->mask, ntasks));
107 DPRINTFP((32,
"_sion_paropen_generic_one_file", rank,
"sizeof: int=%d long=%d longlong=%d sion_int32=%d sion_int64=%d\n",
sizeof(
int),
sizeof(
long),
108 sizeof(
long long),
sizeof(sion_int32),
sizeof(sion_int64)));
112 filenum = *filenumber;
115 if(flag& _SION_INTERNAL_FLAG_NORMAL ) comm_group=sion_gendata->comm_data_local;
116 if(flag& _SION_INTERNAL_FLAG_BUDDY_NORMAL ) comm_group=sion_gendata->comm_data_local;
117 if(flag& _SION_INTERNAL_FLAG_BUDDY_SEND ) comm_group=buddy_data->buddy_send.commgroup;
118 if(flag& _SION_INTERNAL_FLAG_BUDDY_COLL ) comm_group=buddy_data->buddy_coll.commgroup;
119 if(flag& _SION_INTERNAL_FLAG_BUDDY_READ ) comm_group=buddy_data->groups[buddy_data->currentgroup]->commgroup;
123 if (flag&_SION_INTERNAL_FLAG_BUDDY_SEND) do_open_file=0;
124 if ( (flag&_SION_INTERNAL_FLAG_BUDDY_COLL) && (rank>0) ) do_open_file=0;
125 if ( (flag&_SION_INTERNAL_FLAG_BUDDY_READ) && (rank>0) ) do_open_file=0;
127 DPRINTFP((2,
"_sion_paropen_generic_one_file", rank,
"do_open_file=%d\n", do_open_file));
130 if (sion_filedesc == NULL) {
131 _sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_ABORT,
"_sion_paropen_omp: cannot allocate filedescriptor structure of size %lu (sion_filedesc), aborting ...\n",
132 (
unsigned long)
sizeof(sion_filedesc));
135 sion_filedesc->
fname = strdup(fname);
138 sion_filedesc->
sid=sid;
145 if (flags_store->mask&_SION_FMODE_WRITE) {
148 DPRINTFP((32,
"_sion_paropen_generic_one_file", rank,
" starting open for write #tasks=%d\n", ntasks));
152 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"sion_paropen: wrong number of tasks specific: ntasks=%d (<0), returning ...\n", (
int) ntasks));
156 if ((chunksize != NULL) && (*chunksize<0)) {
157 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"sion_paropen: ((chunksize != NULL) && (*chunksize<0)), returning ...\n"));
161 if ((flag & _SION_INTERNAL_FLAG_NORMAL ) && (globalrank != NULL) && (*globalrank<0)) {
162 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"sion_paropen: ((globalrank != NULL) && (*globalrank<0)), returning ...\n"));
170 sion_filedesc->
rank = rank;
172 sion_filedesc->
ntasks = ntasks;
173 sion_filedesc->
nfiles = nfiles;
175 sion_filedesc->
prefix = strdup(prefix);
176 sion_filedesc->
compress = flags_store->mask&_SION_FMODE_COMPRESS;
177 sion_filedesc->
usecoll = (flags_store->mask&_SION_FMODE_COLLECTIVE)>0;
178 sion_filedesc->
collmergemode = (flags_store->mask&_SION_FMODE_COLLECTIVE_MERGE)>0;
179 sion_filedesc->
usebuddy = (flags_store->mask&_SION_FMODE_BUDDY)>0;
181 sion_filedesc->
buddylevel = atoi(_sion_flags_get(flags_store,
"buddy")->val);
189 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"sion_paropen_generic: cannot open %s for writing, aborting ...\n", fname));
194 if((new_fsblocksize<0) || (new_fsblocksize>SION_MAX_FSBLOCKSIZE)) new_fsblocksize=SION_DEFAULT_FSBLOCKSIZE;
198 sion_gendata->apidesc->barrier_cb(comm_group);
203 sion_gendata->apidesc->bcastr_cb(&new_fsblocksize, comm_group, _SION_INT64, 1, 0);
204 *fsblksize=new_fsblocksize;
207 DPRINTFP((32,
"_sion_paropen_generic_one_file", rank,
"setting fsblksize to %lld\n", new_fsblocksize));
216 _sion_keyval_check_env(sion_filedesc, flags_store->mask);
218 sion_gendata->apidesc->bcastr_cb(&sion_filedesc->
keyvalmode, comm_group, _SION_INT32, 1, 0);
222 if ((flags_entry = _sion_flags_get(flags_store,
"collsize"))) {
223 sion_filedesc->
collsize = atoi(flags_entry->val);
225 _sion_coll_check_env(sion_filedesc);
229 ( flag&_SION_INTERNAL_FLAG_BUDDY_NORMAL )
230 || ( flag&_SION_INTERNAL_FLAG_BUDDY_SEND )
231 || ( flag&_SION_INTERNAL_FLAG_BUDDY_COLL )
241 sion_gendata->apidesc->bcastr_cb(&sion_filedesc->
usecoll, comm_group, _SION_INT32, 1, 0);
242 sion_gendata->apidesc->bcastr_cb(&sion_filedesc->
collsize, comm_group, _SION_INT32, 1, 0);
243 sion_gendata->apidesc->bcastr_cb(&sion_filedesc->
collmergemode, comm_group, _SION_INT32, 1, 0);
247 if(sion_gendata->apidesc->level!=SION_GENERIC_API_LEVEL_FULL) {
248 _sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_WARN,
"sion_paropen_generic: requested coalescing I/O but API does not support this mode, falling back to individual mode ...\n");
255 _sion_hints_check_env(sion_filedesc);
257 sion_gendata->apidesc->bcastr_cb(&sion_filedesc->
usehints, comm_group, _SION_INT32, 1, 0);
258 sion_gendata->apidesc->bcastr_cb(&sion_filedesc->
hinttype, comm_group, _SION_INT32, 1, 0);
260 DPRINTFTS(rank,
"before alloc");
266 DPRINTFTS(rank,
"after alloc");
269 lchunksize = (sion_int64) *chunksize;
270 lglobalrank = (sion_int64) *globalrank;
272 DPRINTFP((32,
"_sion_paropen_generic_one_file", rank,
"lchunksize=%lld lglobalrank=%lld\n", lchunksize,lglobalrank));
274 DPRINTFTS2(rank,
"before gather");
275 sion_gendata->apidesc->gatherr_cb(&lchunksize, sion_filedesc->
all_chunksizes, comm_group, _SION_INT64, 1, 0);
276 sion_gendata->apidesc->gatherr_cb(&lglobalrank, sion_filedesc->
all_globalranks, comm_group, _SION_INT64, 1, 0);
280 sion_filedesc->
coll_capability=sion_gendata->apidesc->get_capability_cb(comm_group);
284 DPRINTFTS2(rank,
"after gather");
286 DPRINTFTS(rank,
"before calculate");
287 DPRINTFP((32,
"_sion_paropen_generic_one_file", rank,
"chunksizes[%d - 1]=%ld\n", ntasks,(
long) sion_filedesc->
all_chunksizes[ntasks - 1]));
288 if (!sion_filedesc->
usecoll) _sion_calculate_startpointers(sion_filedesc);
290 if (!sion_filedesc->
collmergemode) _sion_calculate_startpointers_collective(sion_filedesc);
291 else _sion_calculate_startpointers_collective_merge(sion_filedesc);
293 DPRINTFTS(rank,
"after calculate");
296 DPRINTFTS(rank,
"before open");
303 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"sion_paropen_generic: cannot open %s for writing, aborting ...\n", fname));
306 sion_filedesc->
fileptr = sion_fileptr;
308 sion_gendata->apidesc->barrier_cb(comm_group);
309 DPRINTFTS(rank,
"after open");
316 _sion_apply_hints(sion_filedesc,SION_HINTS_ACCESS_TYPE_METADATABLOCK1);
318 DPRINTFTS(rank,
"before writeh");
320 DPRINTFTS(rank,
"after writeh");
328 DPRINTFTS(rank,
"before setp(0)");
331 DPRINTFTS(rank,
"after setp(0)");
336 DPRINTFTS(rank,
"before scatter");
337 sion_gendata->apidesc->scatterr_cb(sion_filedesc->
all_startpointers, &sion_filedesc->
startpos, comm_group, _SION_INT64, 1, 0);
338 DPRINTFTS(rank,
"after scatter");
341 DPRINTFTS(rank,
"before scatter");
342 sion_gendata->apidesc->scatterr_cb(sion_filedesc->
all_chunksizes, &sion_filedesc->
chunksize, comm_group, _SION_INT64, 1, 0);
343 DPRINTFTS(rank,
"after scatter");
347 sion_gendata->apidesc->scatterr_cb(sion_filedesc->
all_coll_collsize, &sion_filedesc->
collsize, comm_group, _SION_INT32, 1, 0);
354 sion_gendata->apidesc->bcastr_cb(&sion_filedesc->
globalskip, comm_group, _SION_INT64, 1, 0);
356 DPRINTFP((32,
"_sion_paropen_generic_one_file", rank,
" start position is %10lld %10.4f MB chunksize=%10lld %10.4f MB\n",
362 DPRINTFTS(rank,
"before setp");
363 sion_gendata->apidesc->barrier_cb(comm_group);
372 sion_gendata->apidesc->barrier_cb(comm_group);
375 _sion_apply_hints(sion_filedesc,SION_HINTS_ACCESS_TYPE_CHUNK);
377 DPRINTFTS(rank,
"after setp");
378 DPRINTFP((32,
"_sion_paropen_generic_one_file", rank,
" ending open for write #tasks=%d filepos=%lld\n", ntasks,
_sion_file_get_position(sion_filedesc->
fileptr)));
381 else if (flags_store->mask&_SION_FMODE_READ) {
384 DPRINTFP((32,
"_sion_paropen_generic_one_file", rank,
" starting open for read #tasks=%d\n", ntasks));
388 DPRINTFTS(rank,
"before openR");
390 DPRINTFTS(rank,
"after openR");
392 DPRINTFP((32,
"_sion_paropen_generic_one_file", rank,
" cannot open %s for reading, aborting ...\n", fname));
393 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"_sion_paropen_generic_one_file: cannot open %s for reading, aborting ...\n", fname));
398 sion_gendata->apidesc->barrier_cb(comm_group);
401 sion_filedesc->
fileptr = sion_fileptr;
402 sion_filedesc->
rank = rank;
403 sion_filedesc->
ntasks = ntasks;
406 sion_filedesc->
nfiles = nfiles;
407 sion_filedesc->
usebuddy = (flags_store->mask&_SION_FMODE_BUDDY)>0;
409 sion_filedesc->
buddylevel = atoi(_sion_flags_get(flags_store,
"buddy")->val);
414 if ( flag&_SION_INTERNAL_FLAG_BUDDY_READ ) {
422 DPRINTFP((32,
"_sion_paropen_generic_one_file", rank,
" create buddy mapping ntasks=%d filentasks=%d\n",ntasks,sion_filedesc->
ntasks));
425 sion_tmpintfield_buddy32 = (sion_int32 *) malloc(ntasks *
sizeof(sion_int32));
426 if (sion_tmpintfield_buddy32 == NULL) {
427 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"_sion_paropen_generic_one_file: cannot allocate temporary memory of size %lu (sion_tmpintfield_buddy), aborting ...\n",
428 (
unsigned long) ntasks *
sizeof(sion_int32)));
430 for (j = 0; j < ntasks; j++) sion_tmpintfield_buddy32[j]=-1;
431 sion_tmpintfield_buddy64 = (sion_int64 *) malloc(ntasks *
sizeof(sion_int64));
432 if (sion_tmpintfield_buddy64 == NULL) {
433 free(sion_tmpintfield_buddy32);
434 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"_sion_paropen_generic_one_file: cannot allocate temporary memory of size %lu (sion_tmpintfield_buddy), aborting ...\n",
435 (
unsigned long) ntasks *
sizeof(sion_int64)));
437 for (j = 0; j < ntasks; j++) sion_tmpintfield_buddy64[j]=-1;
438 sion_tmpintfield_map = (sion_int32 *) malloc(ntasks *
sizeof(sion_int32));
439 if (sion_tmpintfield_map == NULL) {
440 free(sion_tmpintfield_buddy32);
441 free(sion_tmpintfield_buddy64);
442 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"_sion_paropen_generic_one_file: cannot allocate temporary memory of size %lu (sion_tmpintfield_map), aborting ...\n",
443 (
unsigned long) ntasks *
sizeof(sion_int32)));
445 for (j = 0; j < ntasks; j++) sion_tmpintfield_map[j]=-1;
448 helpint32=buddy_data->groups[buddy_data->currentgroup]->filelrank;
449 sion_gendata->apidesc->gatherr_cb(&helpint32, sion_tmpintfield_map, comm_group, _SION_INT32, 1, 0);
453 for (j = 0; j < ntasks; j++)
454 DPRINTFP((64,
"_sion_paropen_generic_one_file", rank,
" buddy map[%d]=%d\n", j, (
int) sion_tmpintfield_map[j]));
461 if (rc!=SION_SUCCESS) {
462 free(sion_tmpintfield_buddy32);
463 free(sion_tmpintfield_buddy64);
464 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"_sion_paropen_generic_one_file: cannot read header from file %s, aborting ...\n", fname));
466 DPRINTFP((32,
"_sion_paropen_generic_one_file", rank,
467 " read, after read of fix header part endianness=0x%x blksize=%d ntasks=%d\n", sion_filedesc->
endianness, sion_filedesc->
fsblksize, sion_filedesc->
ntasks));
469 DPRINTFTS(rank,
"before alloc");
472 DPRINTFTS(rank,
"after alloc");
475 if (rc!=SION_SUCCESS) {
476 free(sion_tmpintfield_buddy32);
477 free(sion_tmpintfield_buddy64);
478 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"_sion_paropen_generic_one_file: cannot read header from file %s, aborting ...\n", fname));
481 if ((flags_entry = _sion_flags_get(flags_store,
"collsize"))) {
482 sion_filedesc->
collsize = atoi(flags_entry->val);
484 _sion_coll_check_env(sion_filedesc);
488 if (!sion_filedesc->
usecoll) _sion_calculate_startpointers(sion_filedesc);
490 if (!sion_filedesc->
collmergemode) _sion_calculate_startpointers_collective(sion_filedesc);
491 else _sion_calculate_startpointers_collective_merge(sion_filedesc);
493 DPRINTFTS(rank,
"after calculate");
496 _sion_keyval_check_env(sion_filedesc, flags_store->mask);
501 sion_gendata->apidesc->bcastr_cb(&sion_filedesc->
keyvalmode, comm_group, _SION_INT32, 1, 0);
504 sion_gendata->apidesc->bcastr_cb(&sion_filedesc->
usecoll, comm_group, _SION_INT32, 1, 0);
505 sion_gendata->apidesc->bcastr_cb(&sion_filedesc->
collsize, comm_group, _SION_INT32, 1, 0);
508 DPRINTFP((32,
"_sion_paropen_generic_one_file", rank,
" usecoll=%d\n", sion_filedesc->
usecoll));
512 if (! (flag&_SION_INTERNAL_FLAG_BUDDY_READ) ) {
513 sion_gendata->apidesc->scatterr_cb(sion_filedesc->
all_coll_collsize, &sion_filedesc->
collsize, comm_group, _SION_INT32, 1, 0);
518 if(rank==0)
for (j = 0; j < ntasks; j++)
if(sion_tmpintfield_map[j]>=0) sion_tmpintfield_buddy32[j]=sion_filedesc->
all_coll_collsize[sion_tmpintfield_map[j]];
519 sion_gendata->apidesc->scatterr_cb(sion_tmpintfield_buddy32, &sion_filedesc->
collsize, comm_group, _SION_INT32, 1, 0);
521 if(rank==0)
for (j = 0; j < ntasks; j++)
if(sion_tmpintfield_map[j]>=0) sion_tmpintfield_buddy32[j]=sion_filedesc->
all_coll_collector[sion_tmpintfield_map[j]];
522 sion_gendata->apidesc->scatterr_cb(sion_tmpintfield_buddy32, &sion_filedesc->
collector, comm_group, _SION_INT32, 1, 0);
530 sion_gendata->apidesc->bcastr_cb(&sion_filedesc->
globalskip, comm_group, _SION_INT64, 1, 0);
533 sion_gendata->apidesc->bcastr_cb(&sion_filedesc->
endianness, comm_group, _SION_INT32, 1, 0);
534 sion_gendata->apidesc->bcastr_cb(&sion_filedesc->
swapbytes, comm_group, _SION_INT32, 1, 0);
535 sion_gendata->apidesc->bcastr_cb(&sion_filedesc->
fsblksize, comm_group, _SION_INT32, 1, 0);
536 sion_gendata->apidesc->bcastr_cb(&sion_filedesc->
ntasks, comm_group, _SION_INT32, 1, 0);
537 sion_gendata->apidesc->bcastr_cb(&sion_filedesc->
fileversion, comm_group, _SION_INT32, 1, 0);
538 sion_gendata->apidesc->bcastr_cb(&sion_filedesc->
nfiles, comm_group, _SION_INT32, 1, 0);
539 sion_gendata->apidesc->bcastr_cb(&sion_filedesc->
filenumber, comm_group, _SION_INT32, 1, 0);
540 sion_gendata->apidesc->bcastr_cb(&sion_filedesc->
flag1, comm_group, _SION_INT32, 1, 0);
541 sion_gendata->apidesc->bcastr_cb(&sion_filedesc->
flag2, comm_group, _SION_INT32, 1, 0);
542 sion_gendata->apidesc->bcastr_cb(&sion_filedesc->
maxusedchunks, comm_group, _SION_INT32, 1, 0);
544 DPRINTFP((32,
"_sion_paropen_generic_one_file", rank,
547 DPRINTFTS(rank,
"after bcast");
550 DPRINTFTS(rank,
"before scatter");
551 if (! (flag&_SION_INTERNAL_FLAG_BUDDY_READ) ) {
552 sion_gendata->apidesc->scatterr_cb(sion_filedesc->
all_chunksizes, &sion_filedesc->
chunksize, comm_group, _SION_INT64, 1, 0);
553 sion_gendata->apidesc->scatterr_cb(sion_filedesc->
all_startpointers, &sion_filedesc->
startpos, comm_group, _SION_INT64, 1, 0);
554 sion_gendata->apidesc->scatterr_cb(sion_filedesc->
all_globalranks, &helpint64, comm_group, _SION_INT64, 1, 0);sion_filedesc->
globalrank=(sion_int32) helpint64;
557 if(rank==0)
for (j = 0; j < ntasks; j++)
if(sion_tmpintfield_map[j]>=0) sion_tmpintfield_buddy64[j]=sion_filedesc->
all_chunksizes[sion_tmpintfield_map[j]];
558 sion_gendata->apidesc->scatterr_cb(sion_tmpintfield_buddy64, &sion_filedesc->
chunksize, comm_group, _SION_INT64, 1, 0);
560 if(rank==0)
for (j = 0; j < ntasks; j++)
if(sion_tmpintfield_map[j]>=0) sion_tmpintfield_buddy64[j]=sion_filedesc->
all_startpointers[sion_tmpintfield_map[j]];
561 sion_gendata->apidesc->scatterr_cb(sion_tmpintfield_buddy64, &sion_filedesc->
startpos, comm_group, _SION_INT64, 1, 0);
563 if(rank==0)
for (j = 0; j < ntasks; j++)
if(sion_tmpintfield_map[j]>=0) sion_tmpintfield_buddy64[j]=sion_filedesc->
all_globalranks[sion_tmpintfield_map[j]];
564 sion_gendata->apidesc->scatterr_cb(sion_tmpintfield_buddy64, &helpint64, comm_group, _SION_INT64, 1, 0);sion_filedesc->
globalrank=(sion_int32) helpint64;
567 DPRINTFTS(rank,
"after scatter");
571 sion_tmpintfield = (sion_int64 *) malloc(sion_filedesc->
ntasks *
sizeof(sion_int64));
572 if (sion_tmpintfield == NULL) {
573 free(sion_tmpintfield_buddy32);
574 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"_sion_paropen_generic_one_file: cannot allocate temporary memory of size %lu (sion_tmpintfield), aborting ...\n",
575 (
unsigned long) ntasks *
sizeof(sion_int64)));
579 for (j = 0; j < sion_filedesc->
ntasks; j++)
580 DPRINTFP((2048,
"_sion_paropen_generic_one_file", rank,
" read, blockcount on task %02d is %10ld\n", j, (
long) sion_tmpintfield[j]));
584 DPRINTFTS(rank,
"before scatter");
585 if (! (flag&_SION_INTERNAL_FLAG_BUDDY_READ) ) {
586 sion_gendata->apidesc->scatterr_cb(sion_tmpintfield, &helpint64, comm_group, _SION_INT64, 1, 0);
589 if(rank==0)
for (j = 0; j < ntasks; j++)
if(sion_tmpintfield_map[j]>=0) sion_tmpintfield_buddy64[j]=sion_tmpintfield[sion_tmpintfield_map[j]];
590 sion_gendata->apidesc->scatterr_cb(sion_tmpintfield_buddy64, &helpint64, comm_group, _SION_INT64, 1, 0);
592 DPRINTFTS(rank,
"after scatter");
594 DPRINTFP((32,
"_sion_paropen_generic_one_file", rank,
" lastchunknr on task %02d is %10ld\n", rank, (
long) sion_filedesc->
lastchunknr));
598 DPRINTFTS(rank,
"before scatter");
599 if (! (flag&_SION_INTERNAL_FLAG_BUDDY_READ) ) {
600 sion_gendata->apidesc->scatterr_cb(sion_tmpintfield, &helpint64, comm_group, _SION_INT64, 1, 0);
603 if(rank==0)
for (j = 0; j < ntasks; j++)
if(sion_tmpintfield_map[j]>=0) sion_tmpintfield_buddy64[j]=sion_tmpintfield[sion_tmpintfield_map[j]];
604 sion_gendata->apidesc->scatterr_cb(sion_tmpintfield_buddy64, &helpint64, comm_group, _SION_INT64, 1, 0);
606 DPRINTFTS(rank,
"after scatter");
617 DPRINTFTS(rank,
"before setp");
618 sion_gendata->apidesc->barrier_cb(comm_group);
632 if(sion_tmpintfield) free(sion_tmpintfield);
633 if(sion_tmpintfield_map) free(sion_tmpintfield_map);
634 if(sion_tmpintfield_buddy32) free(sion_tmpintfield_buddy32);
635 if(sion_tmpintfield_buddy64) free(sion_tmpintfield_buddy64);
637 sion_gendata->apidesc->barrier_cb(comm_group);
638 DPRINTFTS(rank,
"after setp");
641 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"sion_paropen_multi_mpi: unknown file mode"));
647 *fileptr=sion_filedesc->
fileptr->fileptr;
655 if(fileptr!=NULL) *fileptr=NULL;
666 DPRINTFP((32,
"_sion_paropen_generic_one_file", rank,
" start position on task %02d is at end of sion_paropen_generic %10lld\n", rank,
669 DPRINTFP((2,
"_sion_paropen_generic_one_file", rank,
"leave parallel open of file %s in mode 0x%lx #tasks=%d\n", fname, (
long) flags_store->mask, ntasks));
702 int rc = SION_SUCCESS;
704 sion_int64 helpint64;
705 sion_int64 *sion_tmpintfield = NULL;
707 void *comm_group=NULL;
711 return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,
"_sion_parclose_generic: invalid sion_filedesc, aborting %d ...\n", sid));
715 return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,
"_sion_parclose_generic: sion file with sid=%d was not opened by a sion_paropen\n", sid));
718 DPRINTFP((2,
"_sion_parclose_generic", rank,
"enter parallel close sid=%d\n", sid));
721 if(flag& _SION_INTERNAL_FLAG_NORMAL ) comm_group=sion_gendata->comm_data_local;
722 if(flag& _SION_INTERNAL_FLAG_BUDDY_SEND ) comm_group=buddy_data->buddy_send.commgroup;
723 if(flag& _SION_INTERNAL_FLAG_BUDDY_COLL ) comm_group=buddy_data->buddy_coll.commgroup;
724 if(flag& _SION_INTERNAL_FLAG_BUDDY_READ ) comm_group=buddy_data->groups[buddy_data->currentgroup]->commgroup;
728 if (flag&_SION_INTERNAL_FLAG_BUDDY_SEND) do_close_file=0;
729 if ( (flag&_SION_INTERNAL_FLAG_BUDDY_COLL) && (rank>0) ) do_close_file=0;
730 if ( (flag&_SION_INTERNAL_FLAG_BUDDY_READ) && (rank>0) ) do_close_file=0;
737 DPRINTFP((32,
"_sion_parclose_generic", rank,
" parallel close (read mode) sid=%d, call fclose on file\n", sid));
761 DPRINTFP((32,
"_sion_parclose_generic", rank,
" parallel close (write mode) sid=%d, call fclose on file\n", sid));
770 sion_gendata->apidesc->barrier_cb(comm_group);
772 DPRINTFP((32,
"_sion_parclose_generic", rank,
" parallel close sid=%d: lastchunknr=%d globalskip=%lld\n", sid, sion_filedesc->
lastchunknr,
774 for (blknum = 0; blknum <= sion_filedesc->
lastchunknr; blknum++) {
775 DPRINTFP((1024,
"_sion_parclose_generic", rank,
" parallel close sid=%d: local block %02d -> %10lld bytes\n", sid, blknum,
780 sion_tmpintfield = (sion_int64 *) malloc(sion_filedesc->
ntasks *
sizeof(sion_int64));
781 if (sion_tmpintfield == NULL) {
782 return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,
"_sion_parclose_generic: cannot allocate temporary memory of size %lu (sion_tmpintfield), aborting ...\n",
783 (
unsigned long) sion_filedesc->
ntasks *
sizeof(sion_int64)));
788 DPRINTFTS2(rank,
"before gather");
790 sion_gendata->apidesc->gatherr_cb(&helpint64, sion_tmpintfield, comm_group, _SION_INT64, 1, 0);
794 for (blknum = 0; blknum < sion_filedesc->
ntasks; blknum++)
796 sion_filedesc->
maxusedchunks = (int) sion_tmpintfield[blknum];
798 sion_gendata->apidesc->bcastr_cb(&sion_filedesc->
maxusedchunks, comm_group, _SION_INT32, 1, 0);
799 DPRINTFTS2(rank,
"after gather");
813 for (blknum = 0; blknum < sion_filedesc->
maxusedchunks; blknum++) {
814 if (blknum <= sion_filedesc->lastchunknr) {
815 helpint64 = sion_filedesc->
blocksizes[blknum];
821 DPRINTFTS2(rank,
"before gather");
822 sion_gendata->apidesc->gatherr_cb(&helpint64, sion_tmpintfield, comm_group, _SION_INT64, 1, 0);
823 DPRINTFTS2(rank,
"after gather");
826 for (lrank = 0; lrank < ntasks; lrank++)
827 DPRINTFP((2048,
"_sion_parclose_generic", rank,
" parallel close sid=%d: write total chunksize for block %d: %2lld rank=%d\n", sid, blknum,
828 sion_tmpintfield[lrank], lrank));
836 if (mapping != NULL) {
842 DPRINTFP((32,
"_sion_parclose_generic", rank,
" parallel close (write mode) sid=%d, call fclose on file\n", sid));
850 if(sion_tmpintfield) free(sion_tmpintfield);
855 _sion_free_filedesc(sion_filedesc);
856 sion_filedesc = NULL;
859 DPRINTFP((2,
"_sion_parclose_generic", rank,
"leave parallel close sid=%d\n", sid));
870 sion_int64 chunksize,
876 int rc = SION_SUCCESS;
878 sion_int64 lchunksize, lstartpointer, lglobalrank;
879 void *comm_group=NULL;
882 return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,
"_sion_parreinit_generic: invalid sion_filedesc, aborting %d ...\n", sid));
886 return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,
"_sion_parreinit_generic: sion file with sid=%d was not opened by a sion_paropen\n", sid));
889 DPRINTFP((2,
"_sion_parreinit_generic", sion_filedesc->
rank,
"enter parallel reinit sid=%d\n", sid));
891 comm_group=sion_gendata->comm_data_local;
894 return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,
"_sion_parreinit_generic: sion file with sid=%d only allowed for files openend for write\n", sid));
897 DPRINTFTS(sion_filedesc->
rank,
"before alloc");
898 if (sion_filedesc->
rank == 0) {
902 DPRINTFTS(sion_filedesc->
rank,
"after alloc");
905 lchunksize = (sion_int64) chunksize;
906 lglobalrank = (sion_int64) sion_filedesc->
globalrank;
908 DPRINTFTS2(sion_filedesc->
rank,
"before gather");
909 sion_gendata->apidesc->gatherr_cb(&lchunksize, sion_filedesc->
all_chunksizes, comm_group, _SION_INT64, 1, 0);
910 sion_gendata->apidesc->gatherr_cb(&lglobalrank, sion_filedesc->
all_globalranks, comm_group, _SION_INT64, 1, 0);
912 DPRINTFTS2(sion_filedesc->
rank,
"after gather");
916 if (sion_filedesc->
rank == 0) {
917 DPRINTFTS(sion_filedesc->
rank,
"before calculate");
918 if (!sion_filedesc->
usecoll) _sion_calculate_startpointers(sion_filedesc);
919 else _sion_calculate_startpointers_collective(sion_filedesc);
920 DPRINTFTS(sion_filedesc->
rank,
"after calculate");
924 if (sion_filedesc->
rank == 0) {
927 _sion_apply_hints(sion_filedesc,SION_HINTS_ACCESS_TYPE_METADATABLOCK1);
933 DPRINTFTS(sion_filedesc->
rank,
"before writeh");
935 DPRINTFTS(sion_filedesc->
rank,
"after writeh");
944 DPRINTFTS(sion_filedesc->
rank,
"before setp(0)");
947 DPRINTFTS(sion_filedesc->
rank,
"after setp(0)");
952 DPRINTFTS(sion_filedesc->
rank,
"before scatter");
953 sion_gendata->apidesc->scatterr_cb(sion_filedesc->
all_startpointers, &sion_filedesc->
startpos, comm_group, _SION_INT64, 1, 0);
954 DPRINTFTS(sion_filedesc->
rank,
"after scatter");
958 sion_gendata->apidesc->scatterr_cb(sion_filedesc->
all_coll_collsize, &sion_filedesc->
collsize, comm_group, _SION_INT32, 1, 0);
965 sion_gendata->apidesc->bcastr_cb(&sion_filedesc->
globalskip, comm_group, _SION_INT64, 1, 0);
967 DPRINTFP((32,
"_sion_parreinit_generic", sion_filedesc->
rank,
" start position is %10lld %10.4f MB\n",
971 DPRINTFTS(sion_filedesc->
rank,
"before setp");
972 sion_gendata->apidesc->barrier_cb(comm_group);
976 sion_filedesc->
chunksize = (sion_int64) chunksize;
977 sion_gendata->apidesc->barrier_cb(comm_group);
980 _sion_apply_hints(sion_filedesc,SION_HINTS_ACCESS_TYPE_CHUNK);
982 if (sion_filedesc->
rank == 0) {
987 DPRINTFTS(sion_filedesc->
rank,
"after setp");
988 DPRINTFP((32,
"_sion_parreinit_generic", sion_filedesc->
rank,
" ending open for write #tasks=%d filepos=%lld\n",
991 DPRINTFP((2,
"_sion_parreinit_generic", sion_filedesc->
rank,
"leave parallel reinit of file %s in #tasks=%d\n",
1004 #define DFUNCTION "_sion_generic_collect_mapping" 1005 int _sion_generic_collect_mapping(
_sion_filedesc *sion_filedesc,
1007 sion_int32 **mapping ) {
1008 int rc=SION_SUCCESS;
1012 sion_int32 lpos[2], *receivemap=NULL, iamreceiver, receiver = -1;
1015 sion_gendata=sion_filedesc->
dataptr;
1016 sion_apidesc=sion_gendata->apidesc;
1018 *mapping = NULL; *mapping_size = 0;
1026 *mapping_size=sion_gendata->gsize;
1027 *mapping = (sion_int32 *) malloc(*mapping_size * 2 *
sizeof(sion_int32));
1028 if (*mapping == NULL) {
1029 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"sion_generic_parclose: Cannot allocate memory for mapping"));
1034 if(sion_gendata->grank==0) {
1035 receivemap = (sion_int32 *) malloc(sion_gendata->gsize *
sizeof(sion_int32));
1036 if (receivemap == NULL) {
1037 return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"sion_generic_parclose: Cannot allocate memory for receivemap"));
1041 if((sion_filedesc->
filenumber==0) && (sion_filedesc->
rank==0)) iamreceiver=sion_gendata->grank;
1042 else iamreceiver=-1;
1043 sion_apidesc->gatherr_cb(&iamreceiver, receivemap, sion_gendata->comm_data_global, _SION_INT32, 1, 0);
1044 if(sion_gendata->grank==0) {
1045 for(t=0;t<sion_gendata->gsize;t++) {
1046 if(receivemap[t]>=0) {
1047 receiver=receivemap[t];
1051 DPRINTFP((1,
DFUNCTION, sion_gendata->grank,
"receiver of mapping grank=%d\n", receiver));
1053 sion_apidesc->bcastr_cb(&receiver, sion_gendata->comm_data_global, _SION_INT32, 1, 0);
1057 lpos[1] = sion_filedesc->
rank;
1058 sion_apidesc->gatherr_cb(&lpos, *mapping, sion_gendata->comm_data_global, _SION_INT32, 2, receiver);
1061 if(receivemap!=NULL) free(receivemap);
long _sion_file_get_opt_blksize(_sion_fileptr *sion_fileptr)
Get optional file system block size for a file.
sion_int64 _sion_file_get_position(_sion_fileptr *sion_fileptr)
Get new position in file.
int _sion_buffer_flush(_sion_filedesc *sion_filedesc)
Flush buffer.
int _sion_flush_block(_sion_filedesc *sion_filedesc)
Update the internal data structure.
sion_int64 _sion_file_set_position(_sion_fileptr *sion_fileptr, sion_int64 startpointer)
Set new position in file.
int _sion_reassignvcd(int sid, void *data, int type)
_sion_fileptr * _sion_file_open(const char *fname, unsigned int flags, unsigned int addflags)
Create and open a new file for writing.
Sion File Descriptor Structure.
int _sion_paropen_generic_one_file(int sid, char *fname, _sion_flags_store *flags_store, char *prefix, int *numFiles, int *filenumber, sion_int64 *chunksize, sion_int32 *fsblksize, int rank, int ntasks, int *globalrank, int flag, FILE **fileptr, _sion_generic_gendata *sion_gendata, _sion_generic_buddy *buddy_data)
Generic parallel open of one direct access file.
int _sion_parclose_generic(int sid, int rank, int ntasks, int mapping_size, sion_int32 *mapping, int flag, _sion_generic_gendata *sion_gendata, _sion_generic_buddy *buddy_data)
Internal function to close parallel opened SION file.
#define SION_FILE_FLAG_WRITE
int _sion_file_purge(_sion_fileptr *sion_fileptr)
Purge data to file.
sion_int64 * all_globalranks
int _sion_alloc_filedesc_arrays(_sion_filedesc *sion_filedesc)
Allocate memory for the internal sion arrays.
int _sion_free_filedesc_arrays(_sion_filedesc *sion_filedesc)
free memory for the internal sion arrays
#define SION_FILE_FLAG_READ
int _sion_realloc_filedesc_blocklist(_sion_filedesc *sion_filedesc, sion_int32 maxchunks)
Increase the memory used by the internal sion structure for the blocklist.
int _sion_vcdtype(int sid)
sion_int32 * all_coll_collector
sion_int32 fileptr_exported
sion_int32 _sion_get_endianness_with_flags(sion_int64 flags)
Return endianness including possible choice via flags.
#define SION_FILE_FLAG_POSIX
int _sion_parreinit_generic(int sid, sion_int64 chunksize, int rank, int ntasks, _sion_generic_gendata *sion_gendata)
change chunksize for an already opened SION file (write)
sion_int32 * all_coll_capability
void * _sion_vcdtovcon(int sid)
#define SION_FILE_FLAG_ANSI
#define SION_FILE_FLAG_CREATE
#define SION_FILEMODE_READ
sion_int64 * all_startpointers
int _sion_file_close(_sion_fileptr *sion_fileptr)
Close file and destroys fileptr structure.
#define SION_FILESTATE_PAROPEN
int _sion_free_filedesc_coll_arrays(_sion_filedesc *sion_filedesc)
free memory for the internal sion arrays
int _sion_buffer_check_env(_sion_filedesc *sion_filedesc)
Checks if environment variables are set to use buffer.
sion_int32 coll_capability
sion_int32 currentblocknr
int _sion_cache_check_env(_sion_filedesc *sion_filedesc)
Check if environment variables are set to use cache.
#define SION_FILEDESCRIPTOR
sion_int64 * all_chunksizes
int _sion_print_filedesc(_sion_filedesc *sion_filedesc, int level, char *desc, int flag)
Print the initialized sion file description.
#define SION_FILESTATE_CLOSE
_sion_filedesc * _sion_alloc_filedesc(void)
Allocates memory for internal sion structure.
int _sion_alloc_filedesc_coll_arrays(_sion_filedesc *sion_filedesc)
Allocate memory for the internal sion arrays.
int _sion_init_filedesc(_sion_filedesc *sion_filedesc)
Initialize the sion file description.
#define SION_FILEMODE_WRITE
#define DFUNCTION
collect mapping information on rank 0 of first file, mapping=NULL for all others
sion_int32 * all_coll_collsize
sion_int64 start_of_varheader
int _sion_file_flush(_sion_fileptr *sion_fileptr)
Flush data to file.