10 #define _XOPEN_SOURCE 700 17 #include "sion_common.h" 18 #include "sion_const.h" 19 #include "sion_debug.h" 20 #include "sion_error_handler.h" 21 #include "sion_flags.h" 22 #include "sion_internal.h" 28 #include "sion_generic.h" 29 #include "sion_ompi_cb_gen.h" 30 #include "sion_ompi_internal_gen.h" 32 int _sion_ompi_api_aid = -1;
33 static omp_lock_t _sion_ompi_lock_data;
35 static void *__ompi_thread_sync_struct;
37 int _sion_ompi_user_lock(
void *data)
39 int rc = SION_SUCCESS;
40 omp_set_lock(&_sion_ompi_lock_data);
43 int _sion_ompi_user_unlock(
void *data)
45 int rc = SION_SUCCESS;
46 omp_unset_lock(&_sion_ompi_lock_data);
50 #define DFUNCTION "sion_paropen_ompi" 51 int sion_paropen_ompi(
const char *fname,
const char *file_mode,
int *numFiles, MPI_Comm gComm,
const MPI_Comm *lComm,
52 int64_t *chunksize, int32_t *fsblksize,
int *globalrank, FILE **fileptr,
char **newfname)
56 int filenumber, gRank, lRank, lSize, gSize;
58 _sion_flags_store *flags_store = NULL;
60 _ompi_api_commdata *gen_gcomm;
61 _ompi_api_commdata *gen_lcomm = NULL;
63 int num_threads, thread_num;
64 __ompi_thread_sync *thread_sync;
66 thread_num = omp_get_thread_num();
70 _sion_debug_set_query_thread_num_function(omp_get_thread_num);
71 _sion_error_set_query_thread_num_function(omp_get_thread_num);
72 omp_init_lock(&_sion_ompi_lock_data);
75 MPI_Comm_size(gComm, &gSize);
76 MPI_Comm_rank(gComm, &gRank);
77 num_threads = omp_get_num_threads();
79 thread_sync = malloc(
sizeof(__ompi_thread_sync));
80 if (thread_sync == NULL)
81 (_sion_errorprint_ompi(SION_ID_NOT_VALID, _SION_ERROR_ABORT,
82 "sion_paropen_ompi: cannot allocate struct of size %lu (__ompi_thread_sync), aborting...",
sizeof(__ompi_thread_sync)));
84 thread_sync->grank_master_mpi = gRank;
85 thread_sync->gsize_mpi = gSize;
86 thread_sync->grank_master_ompi = _sion_map_rank_mpi_to_ompi(gRank, num_threads, thread_num);
87 thread_sync->gsize_ompi = _sion_get_size_ompi(gSize, num_threads);
88 thread_sync->num_threads = num_threads;
89 thread_sync->numFiles = *numFiles;
90 __ompi_thread_sync_struct = thread_sync;
98 thread_sync = (__ompi_thread_sync *)__ompi_thread_sync_struct;
100 DPRINTFP((1,
"sion_paropen_ompi", thread_sync->grank_master_ompi + thread_num,
"thread %d enters parallel open of file %s\n",
105 return _sion_errorprint_ompi(SION_ID_NOT_VALID, _SION_ERROR_ABORT,
"sion_paropen_ompi: No lComm variable given");
107 if (numFiles == NULL) {
108 return _sion_errorprint_ompi(SION_ID_NOT_VALID, _SION_ERROR_ABORT,
"sion_paropen_ompi: No numFiles variable given");
110 flags_store = _sion_parse_flags(file_mode);
112 return _sion_errorprint_ompi(
113 SION_ID_NOT_VALID, _SION_ERROR_RETURN,
"sion_paropen_mpi: could not parse file mode in %s, aborting ...\n", file_mode);
120 if (_sion_ompi_api_aid < 0) {
121 _sion_ompi_api_aid = _sion_register_callbacks_ompi();
126 gen_gcomm = malloc(
sizeof(_ompi_api_commdata));
127 if (gen_gcomm != NULL) {
128 gen_gcomm->commset = 0;
129 gen_gcomm->local = 0;
130 gen_gcomm->rank = thread_sync->grank_master_ompi + thread_num;
131 gen_gcomm->size = thread_sync->gsize_ompi;
132 gen_gcomm->num_threads = thread_sync->num_threads;
133 gen_gcomm->thread_num = thread_num;
134 gen_gcomm->lcommgroup = NULL;
136 _sion_flags_destroy_store(&flags_store);
137 return _sion_errorprint_ompi(SION_ID_NOT_VALID, _SION_ERROR_RETURN,
138 "cannot allocate ompi internal data structure of size %lu (_omp_api_commdata), aborting ...\n",
139 (
unsigned long)
sizeof(_ompi_api_commdata));
145 gen_gcomm->comm = gComm;
153 if (flags_store->mask & _SION_FMODE_WRITE) {
157 if (*numFiles <= 0) {
158 gen_lcomm = malloc(
sizeof(_ompi_api_commdata));
159 if (gen_lcomm != NULL) {
160 gen_lcomm->commset = 1;
161 gen_lcomm->commcreated = 0;
162 gen_lcomm->local = 1;
163 gen_lcomm->num_threads = gen_gcomm->num_threads;
164 gen_lcomm->thread_num = thread_num;
165 gen_gcomm->lcommgroup = gen_lcomm;
167 _sion_flags_destroy_store(&flags_store);
168 return _sion_errorprint_ompi(SION_ID_NOT_VALID, _SION_ERROR_RETURN,
169 "cannot allocate ompi internal data structure of size %lu (_ompi_api_commdata), aborting ...\n",
170 (
unsigned long)
sizeof(_ompi_api_commdata));
176 if (*numFiles <= 0) {
179 rc = _sion_get_info_from_splitted_comm_ompi(gComm, *lComm, numFiles, &filenumber, &lRank, &lSize);
180 if (rc != SION_SUCCESS) {
181 _sion_errorprint_ompi(
182 SION_ID_NOT_VALID, _SION_ERROR_RETURN,
"sion_paropen_ompi: error in _sion_get_info_from_splitted_comm_ompi");
184 DPRINTFP((1, DFUNCTION, gRank,
"%d local communicators found\n", *numFiles));
186 gen_lcomm->comm = *lComm;
190 rc = _sion_gen_info_from_gcomm_ompi(*numFiles, gComm, &filenumber, &lRank, &lSize);
191 if (rc != SION_SUCCESS) {
192 _sion_errorprint_ompi(
193 SION_ID_NOT_VALID, _SION_ERROR_RETURN,
"sion_paropen_ompi: error in _sion_gen_info_from_gcomm_ompi");
195 DPRINTFP((1,
"sion_paropen_ompi", gRank,
"Global communicator divided in %d local communicators\n", *numFiles));
198 thread_sync->filenumber = filenumber;
199 thread_sync->numFiles = *numFiles;
200 thread_sync->lrank_master_mpi = lRank;
201 thread_sync->lsize_mpi = lSize;
202 thread_sync->lrank_master_ompi = _sion_map_rank_mpi_to_ompi(lRank, num_threads, thread_num);
203 thread_sync->lsize_ompi = _sion_get_size_ompi(lSize, num_threads);
212 gRank = thread_sync->grank_master_ompi + thread_num;
213 gSize = thread_sync->gsize_ompi;
214 lRank = thread_sync->lrank_master_ompi + thread_num;
215 lSize = thread_sync->lsize_ompi;
216 filenumber = thread_sync->filenumber;
217 *numFiles = thread_sync->numFiles;
219 if (gen_lcomm != NULL) {
220 gen_lcomm->rank = thread_sync->lrank_master_ompi + thread_num;
221 gen_lcomm->size = thread_sync->lsize_ompi;
224 }
else if (flags_store->mask & _SION_FMODE_READ) {
227 gRank = thread_sync->grank_master_ompi + thread_num;
228 gSize = thread_sync->gsize_ompi;
235 _sion_flags_destroy_store(&flags_store);
236 return _sion_errorprint_ompi(SION_ID_NOT_VALID, _SION_ERROR_RETURN,
"sion_paropen_ompi: unknown file mode");
238 _sion_flags_destroy_store(&flags_store);
240 DPRINTFP((1, DFUNCTION, gRank,
"enter parallel open of %d files (current name %s) in %s mode\n", *numFiles, fname, file_mode));
242 (2, DFUNCTION, gRank,
"enter parallel parameters: grank=%d gsize=%d fnum=%d numfiles=%d lrank=%d lsize=%d chunksize=%d\n",
243 gRank, gSize, filenumber, *numFiles, lRank, lSize, (
int)*chunksize));
244 sid =
sion_generic_paropen(_sion_ompi_api_aid, fname, file_mode, chunksize, fsblksize, gen_gcomm, gRank, gSize, &filenumber,
245 numFiles, &lRank, &lSize, fileptr, newfname);
247 (1, DFUNCTION, gRank,
"leave parallel open of %d files in %s mode #tasks=%d sid=%d\n", *numFiles, file_mode, lSize, sid));
250 if (sid == SION_ID_NOT_VALID) {
251 return _sion_errorprint_ompi(
252 SION_ID_NOT_VALID, _SION_ERROR_RETURN,
"sion_paropen_mpi: invalid return code from internal open %d", rc);
258 DPRINTFP((1,
"sion_paropen_ompi", gRank,
"leave parallel open of file %s sid=%d globalrank=%d\n", fname, sid, *globalrank));
268 DPRINTFP((1,
"sion_parclose_ompi", _SION_DEFAULT_RANK,
"enter parallel close of sid %d\n", sid));
270 rc = sion_generic_parclose(sid);
272 DPRINTFP((1,
"sion_parclose_ompi", _SION_DEFAULT_RANK,
"leave parallel close of sid %d rc=%d\n", sid, rc));
int sion_paropen_ompi(const char *fname, const char *file_mode, int *numFiles, MPI_Comm gComm, const MPI_Comm *lComm, int64_t *chunksize, int32_t *fsblksize, int *globalrank, FILE **fileptr, char **newfname)
Open a sion file using OpenMP/MPI.
int sion_generic_paropen(int aid, const char *fname, const char *file_mode, int64_t *chunksize, int32_t *fsblksize, void *gcommgroup, int grank, int gsize, int *filenumber, int *numfiles, const int *lrank, const int *lsize, FILE **fileptr, char **newfname)
Open a sion file a generic interface.
int sion_lock_register_lock_callbacks(int lock(void *), int unlock(void *), void *lock_data)
Function which registers callback funtions for lock and unlock internal access to shared data structu...
int sion_parclose_ompi(int sid)
closes a SION file previously opened in OpenMP/MPI mode