10 #define _XOPEN_SOURCE 700 17 #include "sion_common.h" 18 #include "sion_const.h" 19 #include "sion_debug.h" 20 #include "sion_enums.h" 21 #include "sion_error_handler.h" 27 #include "sion_generic.h" 28 #include "sion_ompi.h" 29 #include "sion_ompi_cb_gen.h" 30 #include "sion_ompi_internal_gen.h" 32 int _sion_ompi_api_aid = -1;
33 static omp_lock_t _sion_ompi_lock_data;
35 static void *__ompi_thread_sync_struct;
37 int _sion_ompi_user_lock(
void *data)
39 int rc = SION_SUCCESS;
40 omp_set_lock(&_sion_ompi_lock_data);
43 int _sion_ompi_user_unlock(
void *data)
45 int rc = SION_SUCCESS;
46 omp_unset_lock(&_sion_ompi_lock_data);
54 DPRINTFP((1,
"sion_parclose_ompi", _SION_DEFAULT_RANK,
"enter parallel close of sid %d\n", sid));
58 DPRINTFP((1,
"sion_parclose_ompi", _SION_DEFAULT_RANK,
"leave parallel close of sid %d rc=%d\n", sid, rc));
67 *options = SION_OMPI_OPTIONS_INIT;
89 options->multifile_mode = _SION_OMPI_MULTIFILE_NUMBER;
90 options->multifile.number = multifile_number;
95 options->multifile_mode = _SION_OMPI_MULTIFILE_COMMUNICATOR;
96 options->multifile.communicator = multifile_communicator;
143 int thread_num = omp_get_thread_num();
144 int num_threads = omp_get_num_threads();
147 __ompi_thread_sync *thread_sync;
150 _sion_debug_set_query_thread_num_function(omp_get_thread_num);
151 _sion_error_set_query_thread_num_function(omp_get_thread_num);
152 omp_init_lock(&_sion_ompi_lock_data);
155 MPI_Comm_size(gcomm, &gsize);
156 MPI_Comm_rank(gcomm, &grank);
158 thread_sync = malloc(
sizeof(__ompi_thread_sync));
160 _sion_errorprint_ompi(SION_ID_NOT_VALID, _SION_ERROR_ABORT,
161 "sion_paropen_ompi: cannot allocate struct of size %lu (__ompi_thread_sync), aborting...",
sizeof(__ompi_thread_sync));
164 thread_sync->grank_master_mpi = grank;
165 thread_sync->gsize_mpi = gsize;
166 thread_sync->grank_master_ompi = _sion_map_rank_mpi_to_ompi(grank, num_threads, thread_num);
167 thread_sync->gsize_ompi = _sion_get_size_ompi(gsize, num_threads);
168 thread_sync->num_threads = num_threads;
169 __ompi_thread_sync_struct = thread_sync;
175 thread_sync = __ompi_thread_sync_struct;
177 (1, __func__, thread_sync->grank_master_ompi + thread_num,
"thread %d enters parallel open of file %s\n", thread_num, fname));
183 if (_sion_ompi_api_aid < 0) {
184 _sion_ompi_api_aid = _sion_register_callbacks_ompi();
189 _ompi_api_commdata *gen_gcomm = malloc(
sizeof(_ompi_api_commdata));
190 if (gen_gcomm != NULL) {
191 gen_gcomm->commset = 0;
192 gen_gcomm->local = 0;
193 gen_gcomm->rank = thread_sync->grank_master_ompi + thread_num;
194 gen_gcomm->size = thread_sync->gsize_ompi;
195 gen_gcomm->num_threads = thread_sync->num_threads;
196 gen_gcomm->thread_num = thread_num;
197 gen_gcomm->lcommgroup = NULL;
199 return _sion_errorprint_ompi(SION_ID_NOT_VALID, _SION_ERROR_RETURN,
200 "cannot allocate ompi internal data structure of size %lu (_omp_api_commdata), aborting ...\n",
201 (
unsigned long)
sizeof(_ompi_api_commdata));
207 gen_gcomm->comm = gcomm;
213 _ompi_api_commdata *gen_lcomm = NULL;
214 int filenumber, nfiles, lrank, lsize;
217 if (options.multifile_mode == _SION_OMPI_MULTIFILE_COMMUNICATOR) {
218 gen_lcomm = malloc(
sizeof(_ompi_api_commdata));
220 return _sion_errorprint_ompi(SION_ID_NOT_VALID, _SION_ERROR_RETURN,
221 "cannot allocate ompi internal data structure of size %lu (_ompi_api_commdata), aborting ...\n",
222 (
unsigned long)
sizeof(_ompi_api_commdata));
224 gen_lcomm->commset = 1;
225 gen_lcomm->commcreated = 0;
226 gen_lcomm->local = 1;
227 gen_lcomm->num_threads = gen_gcomm->num_threads;
228 gen_lcomm->thread_num = thread_num;
229 gen_gcomm->lcommgroup = gen_lcomm;
234 if (options.multifile_mode == _SION_OMPI_MULTIFILE_COMMUNICATOR) {
236 _sion_get_info_from_splitted_comm_ompi(gcomm, options.multifile.communicator, &nfiles, &filenumber, &lrank, &lsize);
237 if (rc != SION_SUCCESS) {
238 _sion_errorprint_ompi(
239 SION_ID_NOT_VALID, _SION_ERROR_RETURN,
"sion_paropen_ompi: error in _sion_get_info_from_splitted_comm_ompi");
241 gen_lcomm->comm = options.multifile.communicator;
242 DPRINTFP((1, __func__, grank,
"%d local communicators found\n", nfiles));
245 int rc = _sion_gen_info_from_gcomm_ompi(options.multifile.number, gcomm, &filenumber, &lrank, &lsize);
246 if (rc != SION_SUCCESS) {
247 _sion_errorprint_ompi(
248 SION_ID_NOT_VALID, _SION_ERROR_RETURN,
"sion_paropen_ompi: error in _sion_gen_info_from_gcomm_ompi");
250 nfiles = options.multifile.number;
251 DPRINTFP((1, __func__, grank,
"Global communicator divided in %d local communicators\n", nfiles));
253 thread_sync->filenumber = filenumber;
254 thread_sync->numFiles = nfiles;
255 thread_sync->lrank_master_mpi = lrank;
256 thread_sync->lsize_mpi = lsize;
257 thread_sync->lrank_master_ompi = _sion_map_rank_mpi_to_ompi(lrank, num_threads, thread_num);
258 thread_sync->lsize_ompi = _sion_get_size_ompi(lsize, num_threads);
263 grank = thread_sync->grank_master_ompi + thread_num;
264 gsize = thread_sync->gsize_ompi;
265 lrank = thread_sync->lrank_master_ompi + thread_num;
266 lsize = thread_sync->lsize_ompi;
267 filenumber = thread_sync->filenumber;
268 nfiles = thread_sync->numFiles;
271 gen_lcomm->rank = lrank;
272 gen_lcomm->size = lsize;
278 grank = thread_sync->grank_master_ompi + thread_num;
279 gsize = thread_sync->gsize_ompi;
286 return _sion_errorprint_ompi(SION_ID_NOT_VALID, _SION_ERROR_RETURN,
"sion_paropen_ompi: unknown file mode");
290 DPRINTFP((1, __func__, grank,
"enter parallel open of %d files (current name %s)\n", nfiles, fname));
291 DPRINTFP((2, __func__, grank,
"enter parallel parameters: grank=%d gsize=%d fnum=%d nfiles=%d lrank=%d lsize=%d\n", grank,
292 gsize, filenumber, nfiles, lrank, lsize));
294 _sion_ompi_api_aid, fname, mode, gen_gcomm, grank, gsize, filenumber, nfiles, lrank, lsize, &options.generic_options);
297 DPRINTFP((1, __func__, grank,
"leave parallel open of %d files #tasks=%d sid=%d\n", nfiles, lsize, sid));
300 if (sid == SION_ID_NOT_VALID) {
301 return _sion_errorprint_ompi(
302 SION_ID_NOT_VALID, _SION_ERROR_RETURN,
"sion_paropen_mpi: invalid return code from internal open %d", sid);
304 DPRINTFP((1, __func__, grank,
"leave parallel open of file %s sid=%d globalrank=%d\n", fname, sid, grank));
sion_lowlevel_api
specifies a low-level API to use for file system access
int sion_generic_paropen(int aid, const char *fname, sion_open_mode mode, void *gcommgroup, int grank, int gsize, int filenumber, int numfiles, int lrank, int lsize, const sion_generic_options *options_)
Open a SIONlib file through a generic interface.
void sion_generic_options_set_collective_size(sion_generic_options *options, int32_t size)
Enable collective I/O.
void sion_ompi_options_set_endianness(sion_ompi_options *options, sion_endianness endianness)
Set the endianness for the contents of a container.
struct sion_ompi_options sion_ompi_options
Holds non-essential arguments for sion_paropen_ompi().
void sion_ompi_options_set_collective(sion_ompi_options *options)
Enable collective I/O.
void sion_ompi_options_set_keyval_mode(sion_ompi_options *options, sion_keyval_mode keyval_mode)
Set the key-value mode to use for a container.
int sion_lock_register_lock_callbacks(int lock(void *), int unlock(void *), void *lock_data)
Function which registers callback funtions for lock and unlock internal access to shared data structu...
void sion_ompi_options_set_fsblksize(sion_ompi_options *options, int32_t fsblksize)
Set the file system block size to assume.
void sion_generic_options_set_collective_merge(sion_generic_options *options)
Use collective merging.
void sion_ompi_options_set_chunksize(sion_ompi_options *options, int64_t chunksize)
Set the chunk size of a logical file in the container.
void sion_generic_options_set_endianness(sion_generic_options *options, sion_endianness endianness)
Set the endianness for the contents of a container.
void sion_ompi_options_set_lowlevel_api(sion_ompi_options *options, sion_lowlevel_api lowlevel_api)
Set the low-level API to use for opening a container.
void sion_generic_options_set_keyval_mode(sion_generic_options *options, sion_keyval_mode keyval_mode)
Set the key-value mode to use for a container.
sion_ompi_options * sion_ompi_options_new()
Allocates and initializes an instance of sion_ompi_options
void sion_generic_options_set_fsblksize(sion_generic_options *options, int32_t fsblksize)
Set the file system block size to assume.
void sion_generic_options_set_chunksize(sion_generic_options *options, int64_t chunksize)
Set the chunk size of a logical file in the container.
void sion_ompi_options_set_multifile_number(sion_ompi_options *options, int multifile_number)
Set the number of physical files to use.
void sion_ompi_options_set_multifile_communicator(sion_ompi_options *options, MPI_Comm multifile_communicator)
Create multiple physical files based on disjunct communicators.
void sion_ompi_options_set_collective_merge(sion_ompi_options *options)
Use collective merging.
void sion_ompi_options_set_buddy(sion_ompi_options *options)
Enable buddy checkpointing mechanism.
int sion_parclose_ompi(int sid)
closes a SIONlib file previously opened in OpenMP/MPI mode
open the file for reading only
sion_keyval_mode
specifies whether to use SIONlib's key-value mechanism for accessing file content and if so in what m...
int sion_generic_parclose(int sid)
Close a SIONlib file.
void sion_ompi_options_delete(sion_ompi_options *options)
Delete an instance of sion_ompi_options
sion_endianness
declares the endianness of user data written to a file
sion_open_mode
specifies for what type of access to open a file
int sion_paropen_ompi(const char *fname, sion_open_mode mode, MPI_Comm gcomm, const sion_ompi_options *options_)
Open a SIONlib file from multiple OpenMP threads on multiple MPI processes in parallel.
void sion_generic_options_set_buddylevel(sion_generic_options *options, int32_t buddylevel)
Enable buddy checkpointing mechanism.
void sion_generic_options_set_lowlevel_api(sion_generic_options *options, sion_lowlevel_api lowlevel_api)
Set the low-level API to use for opening a container.
void sion_ompi_options_set_collective_size(sion_ompi_options *options, int32_t size)
Enable collective I/O.
open the file for writing only
void sion_ompi_options_set_buddylevel(sion_ompi_options *options, int32_t buddylevel)
Enable buddy checkpointing mechanism.