10 #define _XOPEN_SOURCE 700 21 #include "sion_const.h" 22 #include "sion_debug.h" 23 #include "sion_enums.h" 24 #include "sion_error_handler.h" 25 #include "sion_flags.h" 26 #include "sion_generic.h" 27 #include "sion_generic_internal.h" 28 #include "sion_internal.h" 30 #include "sion_mpi_cb_gen.h" 31 #include "sion_mpi_internal_gen.h" 33 int _sion_mpi_api_aid = -1;
35 int sion_paropen_mpi(
const char *fname,
const char *file_mode,
int *numFiles, MPI_Comm gComm,
const MPI_Comm *lComm,
36 int64_t *chunksize, int32_t *fsblksize,
int *globalrank, FILE **fileptr,
char **newfname)
40 _sion_flags_store *flags_store = _sion_parse_flags(file_mode);
42 return _sion_errorprint_mpi(
43 SION_ID_NOT_VALID, _SION_ERROR_RETURN,
"sion_paropen_mpi: could not parse file mode in %s, aborting ...\n", file_mode);
45 sid = _sion_paropen_mpi(fname, flags_store, numFiles, gComm, lComm, chunksize, fsblksize, globalrank, fileptr, newfname);
46 _sion_flags_destroy_store(&flags_store);
50 int _sion_paropen_mpi(
const char *fname,
const _sion_flags_store *flags_store,
int *numFiles, MPI_Comm gComm,
51 const MPI_Comm *lComm, int64_t *chunksize, int32_t *fsblksize,
int *globalrank, FILE **fileptr,
char **newfname)
53 int rc = SION_NOT_SUCCESS, sid = SION_ID_UNDEF;
54 int filenumber, gtasks, gRank, lRank, lSize;
56 _mpi_api_commdata *gen_gcomm;
58 MPI_Comm_size(gComm, >asks);
59 MPI_Comm_rank(gComm, &gRank);
61 DPRINTFP((1,
"sion_paropen_mpi", gRank,
"enter parallel open of file %s\n", fname));
65 return _sion_errorprint_mpi(SION_ID_NOT_VALID, _SION_ERROR_RETURN,
"sion_paropen_mpi: No lComm variable given");
67 if (numFiles == NULL) {
68 return _sion_errorprint_mpi(SION_ID_NOT_VALID, _SION_ERROR_RETURN,
"sion_paropen_mpi: No numFiles variable given");
72 if (_sion_mpi_api_aid < 0) {
73 _sion_mpi_api_aid = _sion_register_callbacks_mpi();
76 if (flags_store->mask & _SION_FMODE_WRITE) {
82 rc = _sion_get_info_from_splitted_comm_mpi(gComm, *lComm, numFiles, &filenumber, &lRank, &lSize);
83 if (rc != SION_SUCCESS) {
84 return _sion_errorprint_mpi(
85 SION_ID_NOT_VALID, _SION_ERROR_RETURN,
"sion_paropen_mpi: error in _sion_get_info_from_splitted_comm_mpi");
87 DPRINTFP((1,
"sion_paropen_mpi", gRank,
"%d local communicators found\n", *numFiles));
92 rc = _sion_gen_info_from_gcomm_mpi(*numFiles, gComm, &filenumber, &lRank, &lSize);
93 if (rc != SION_SUCCESS) {
94 return _sion_errorprint_mpi(
95 SION_ID_NOT_VALID, _SION_ERROR_RETURN,
"sion_paropen_mpi: error in _sion_gen_info_from_gcomm_mpi");
97 DPRINTFP((1,
"sion_paropen_mpi", gRank,
"Global communicator divided in %d local communicators\n", *numFiles));
103 }
else if (flags_store->mask & _SION_FMODE_READ) {
112 if (!(flags_store->mask & _SION_FMODE_BUDDY)) {
116 rc = _sion_get_info_from_splitted_comm_mpi(gComm, *lComm, numFiles, &filenumber, &lRank, &lSize);
117 if (rc != SION_SUCCESS) {
118 return _sion_errorprint_mpi(
119 SION_ID_NOT_VALID, _SION_ERROR_RETURN,
"sion_paropen_mpi: error in _sion_get_info_from_splitted_comm_mpi");
121 DPRINTFP((1,
"sion_paropen_mpi", gRank,
"%d local communicators found\n", *numFiles));
125 return _sion_errorprint_mpi(SION_ID_NOT_VALID, _SION_ERROR_RETURN,
"sion_paropen_mpi: unknown file mode");
129 gen_gcomm = malloc(
sizeof(_mpi_api_commdata));
130 if (gen_gcomm == NULL) {
131 return _sion_errorprint(SION_ID_NOT_VALID, _SION_ERROR_RETURN,
132 "cannot allocate mpi internal data structure of size %lu (_mpi_api_commdata), aborting ...\n",
133 (
unsigned long)
sizeof(_mpi_api_commdata));
135 gen_gcomm->comm = gComm;
136 gen_gcomm->commset = 1;
137 gen_gcomm->local = 0;
138 gen_gcomm->rank = gRank;
139 gen_gcomm->size = gtasks;
140 gen_gcomm->lcommgroup = NULL;
144 DPRINTFP((1,
"sion_paropen_mpi", gRank,
"enter parallel open of %d files (current name %s)\n", *numFiles, fname));
145 sid = _sion_generic_paropen(_sion_mpi_api_aid, fname, flags_store, chunksize, fsblksize, gen_gcomm, gRank, gtasks, &filenumber,
146 numFiles, &lRank, &lSize, fileptr, newfname);
149 DPRINTFP((1,
"sion_paropen_mpi", gRank,
"leave parallel open of %d files #tasks=%d sid=%d\n", *numFiles, lSize, sid));
152 if (sid == SION_ID_NOT_VALID) {
153 return _sion_errorprint_mpi(
154 SION_ID_NOT_VALID, _SION_ERROR_RETURN,
"sion_paropen_mpi: invalid return code from internal open %d", rc);
157 DPRINTFP((1,
"sion_paropen_mpi", gRank,
"leave parallel open of file %s sid=%d\n", fname, sid));
166 DPRINTFP((1,
"sion_parclose_mpi", _SION_DEFAULT_RANK,
"enter parallel close of sid %d\n", sid));
168 rc = sion_generic_parclose(sid);
170 DPRINTFP((1,
"sion_parclose_mpi", _SION_DEFAULT_RANK,
"leave parallel close of sid %d rc=%d\n", sid, rc));
175 int sion_parreinit_mpi(
int sid, int64_t chunksize)
179 DPRINTFP((1,
"sion_parreinit_mpi", _SION_DEFAULT_RANK,
"enter parallel reinit of sid %d\n", sid));
181 rc = sion_generic_parreinit(sid, chunksize);
183 DPRINTFP((1,
"sion_parreinit_mpi", _SION_DEFAULT_RANK,
"leave parallel reinit of sid %d rc=%d\n", sid, rc));
188 int sion_paropen_mapped_mpi(
char *fname,
const char *file_mode,
int *numFiles, MPI_Comm gComm,
int *nlocaltasks,
189 int **globalranks, int64_t **chunksizes,
int **mapping_filenrs,
int **mapping_lranks, int32_t *fsblksize, FILE **fileptr)
192 _sion_flags_store *flags_store = _sion_parse_flags(file_mode);
194 return _sion_errorprint_mpi(SION_ID_NOT_VALID, _SION_ERROR_RETURN,
195 "sion_paropen_mapped_mpi: could not parse file mode in %s, aborting ...\n", file_mode);
197 sid = _sion_paropen_mapped_mpi(fname, flags_store, numFiles, gComm, nlocaltasks, globalranks, chunksizes, mapping_filenrs,
198 mapping_lranks, fsblksize, fileptr);
199 _sion_flags_destroy_store(&flags_store);
203 int _sion_paropen_mapped_mpi(
char *fname,
const _sion_flags_store *flags_store,
int *numFiles, MPI_Comm gComm,
int *nlocaltasks,
204 int **globalranks, int64_t **chunksizes,
int **mapping_filenrs,
int **mapping_lranks, int32_t *fsblksize, FILE **fileptr)
206 int sid = SION_ID_UNDEF;
208 _mpi_api_commdata *gen_gcomm;
210 MPI_Comm_size(gComm, >asks);
211 MPI_Comm_rank(gComm, &gRank);
213 DPRINTFP((1,
"sion_paropen_mapped_mpi", gRank,
"enter parallel open of file %s\n", fname));
216 if (numFiles == NULL) {
217 return _sion_errorprint_mpi(SION_ID_NOT_VALID, _SION_ERROR_RETURN,
"sion_paropen_mapped_mpi: No numFiles variable given");
221 if (_sion_mpi_api_aid < 0) {
222 _sion_mpi_api_aid = _sion_register_callbacks_mpi();
225 if (flags_store->mask & _SION_FMODE_WRITE) {
228 if (*numFiles <= 0) {
229 return _sion_errorprint_mpi(SION_ID_NOT_VALID, _SION_ERROR_RETURN,
230 "sion_paropen_mapped_mpi: numFiles variable <= 0 not allowed for mapped files in write mode");
232 }
else if (flags_store->mask & _SION_FMODE_READ) {
237 return _sion_errorprint_mpi(SION_ID_NOT_VALID, _SION_ERROR_RETURN,
"sion_paropen_mapped_mpi: unknown file mode");
241 gen_gcomm = malloc(
sizeof(_mpi_api_commdata));
242 if (gen_gcomm == NULL) {
243 return _sion_errorprint(SION_ID_NOT_VALID, _SION_ERROR_RETURN,
244 "cannot allocate mpi internal data structure of size %lu (_mpi_api_commdata), aborting ...\n",
245 (
unsigned long)
sizeof(_mpi_api_commdata));
247 gen_gcomm->comm = gComm;
248 gen_gcomm->commset = 1;
249 gen_gcomm->local = 0;
250 gen_gcomm->rank = gRank;
251 gen_gcomm->size = gtasks;
252 gen_gcomm->lcommgroup = NULL;
257 (1,
"sion_paropen_mapped_mpi", gRank,
"enter parallel open of %d files (current name %s) (sid=%d)\n", *numFiles, fname, sid));
258 sid = _sion_generic_paropen_mapped(_sion_mpi_api_aid, fname, flags_store, numFiles, gen_gcomm, gRank, gtasks, nlocaltasks,
259 globalranks, chunksizes, mapping_filenrs, mapping_lranks, fsblksize, fileptr);
263 (1,
"sion_paropen_mapped_mpi", gRank,
"leave parallel open of %d files #tasks=%d sid=%d\n", *numFiles, *nlocaltasks, sid));
266 if (sid == SION_ID_NOT_VALID) {
267 return _sion_errorprint_mpi(
268 SION_ID_NOT_VALID, _SION_ERROR_RETURN,
"sion_paropen_mapped_mpi: invalid return code from internal open %d", sid);
271 DPRINTFP((1,
"sion_paropen_mapped_mpi", gRank,
"leave parallel open of file %s sid=%d\n", fname, sid));
276 int sion_parclose_mapped_mpi(
int sid)
280 DPRINTFP((1,
"sion_parclose_mapped_mpi", _SION_DEFAULT_RANK,
"enter parallel close of sid %d\n", sid));
282 rc = sion_generic_parclose_mapped(sid);
284 DPRINTFP((1,
"sion_parclose_mapped_mpi", _SION_DEFAULT_RANK,
"leave parallel close of sid %d rc=%d\n", sid, rc));
293 options.chunksize = 1024 * 1024;
294 options.fsblksize = -1;
296 options.multifile_mode = _SION_MULTIFILE_SINGLE;
298 options.keyval_mode = SION_KEYVAL_MODE_NONE;
300 options.buddylevel = 0;
302 options.collsize = 0;
303 options.collective_merge =
false;
308 void sion_mpi_options_set_chunksize(
sion_mpi_options *options, int64_t chunksize)
311 options->chunksize = chunksize;
314 void sion_mpi_options_set_fsblksize(
sion_mpi_options *options, int32_t fsblksize)
318 options->fsblksize = fsblksize;
321 void sion_mpi_options_set_multifile_number(
sion_mpi_options *options,
int multifile_number)
324 options->multifile_mode = _SION_MULTIFILE_NUMBER;
325 options->multifile.number = multifile_number;
328 void sion_mpi_options_set_multifile_communicator(
sion_mpi_options *options, MPI_Comm multifile_communicator)
330 options->multifile_mode = _SION_MULTIFILE_COMMUNICATOR;
331 options->multifile.communicator = multifile_communicator;
334 void sion_mpi_options_set_keyval_mode(
sion_mpi_options *options, sion_keyval_mode keyval_mode)
336 options->keyval_mode = keyval_mode;
341 sion_mpi_options_set_buddylevel(options, 1);
344 void sion_mpi_options_set_buddylevel(
sion_mpi_options *options, int32_t buddylevel)
346 options->buddylevel = buddylevel;
351 options->collsize = -1;
354 void sion_mpi_options_set_collective_size(
sion_mpi_options *options, int32_t size)
356 options->collsize = size;
361 if (options->collsize == 0) {
362 options->collsize = -1;
364 options->collective_merge =
true;
367 _sion_flags_store *_sion_mpi_options_into_flags_store(sion_open_mode mode,
const sion_mpi_options *options)
369 _sion_flags_store *flags_store = _sion_flags_create_store();
372 fprintf(stderr,
"could not create flags store.\n");
378 _sion_flags_add(flags_store,
"br",
"");
380 switch (options->keyval_mode) {
381 case SION_KEYVAL_MODE_NONE:
384 case SION_KEYVAL_MODE_DEFAULT:
386 case SION_KEYVAL_MODE_INLINE:
387 _sion_flags_add(flags_store,
"keyval",
"inline");
389 case SION_KEYVAL_MODE_META:
390 _sion_flags_add(flags_store,
"keyval",
"meta");
392 case SION_KEYVAL_MODE_HASH:
393 _sion_flags_add(flags_store,
"keyval",
"hash");
395 case SION_KEYVAL_MODE_UNKNOWN:
396 _sion_flags_add(flags_store,
"keyval",
"unknown");
400 fprintf(stderr,
"wrong keyval mode in read mode.\n");
401 _sion_flags_destroy_store(&flags_store);
406 case SION_OPEN_WRITE:
407 _sion_flags_add(flags_store,
"bw",
"");
409 switch (options->keyval_mode) {
410 case SION_KEYVAL_MODE_NONE:
413 case SION_KEYVAL_MODE_DEFAULT:
415 case SION_KEYVAL_MODE_INLINE:
416 _sion_flags_add(flags_store,
"keyval",
"inline");
418 case SION_KEYVAL_MODE_META:
419 _sion_flags_add(flags_store,
"keyval",
"meta");
421 case SION_KEYVAL_MODE_HASH:
422 _sion_flags_add(flags_store,
"keyval",
"hash");
426 fprintf(stderr,
"wrong keyval mode in read mode.\n");
427 _sion_flags_destroy_store(&flags_store);
434 if (options->buddylevel > 0) {
435 char buddylevel_str[12];
436 int written = snprintf(buddylevel_str, 12,
"%" PRId32, (int32_t)options->buddylevel);
439 _sion_flags_add(flags_store,
"buddy", buddylevel_str);
440 fprintf(stderr,
"added buddy level %s.\n", buddylevel_str);
443 if (options->collsize) {
444 _sion_flags_add(flags_store,
"collective",
"");
445 char collsize_str[12];
446 int written = snprintf(collsize_str, 12,
"%" PRId32, (int32_t)options->collsize);
449 _sion_flags_add(flags_store,
"collsize", collsize_str);
450 if (options->collective_merge) {
451 _sion_flags_add(flags_store,
"cmerge",
"");
455 _sion_flags_update_mask(flags_store);
460 int sion_paropen_mpi_with_options(
461 const char *filename, sion_open_mode mode, MPI_Comm communicator,
const sion_mpi_options *options_)
464 if (options_ == NULL) {
465 options = sion_mpi_options_new();
474 switch (options.multifile_mode) {
475 case _SION_MULTIFILE_SINGLE:
477 lcomm = communicator;
479 case _SION_MULTIFILE_NUMBER:
480 num_files = options.multifile.number;
481 lcomm = communicator;
483 case _SION_MULTIFILE_COMMUNICATOR:
485 lcomm = options.multifile.communicator;
489 _sion_flags_store *flags_store = _sion_mpi_options_into_flags_store(mode, &options);
494 int sid = _sion_paropen_mpi(
495 filename, flags_store, &num_files, communicator, &lcomm, &options.chunksize, &options.fsblksize, &rank_, NULL, NULL);
497 _sion_flags_destroy_store(&flags_store);
int sion_parclose_mpi(int sid)
Close a sion file using MPI.
int sion_paropen_mpi(const char *fname, const char *file_mode, int *numFiles, MPI_Comm gComm, const MPI_Comm *lComm, int64_t *chunksize, int32_t *fsblksize, int *globalrank, FILE **fileptr, char **newfname)
Open a sion file using MPI.