17 #define _XOPEN_SOURCE 700
29 #include <sys/types.h>
36 #include "sion_error_handler.h"
50 #include "sion_lock.h"
54 int _sion_ompi_api_aid = -1;
55 static omp_lock_t _sion_ompi_lock_data;
57 static void * __ompi_thread_sync_struct;
59 int _sion_ompi_user_lock(
void * data) {
61 omp_set_lock(&_sion_ompi_lock_data);
64 int _sion_ompi_user_unlock(
void * data) {
66 omp_unset_lock(&_sion_ompi_lock_data);
90 #define DFUNCTION "sion_paropen_ompi"
91 int sion_paropen_ompi(
const char* fname,
92 const char* file_mode,
95 const MPI_Comm* lComm,
96 sion_int64* chunksize,
97 sion_int32* fsblksize,
105 int filenumber, gRank, lRank, lSize, gSize;
109 _ompi_api_commdata *gen_gcomm;
110 _ompi_api_commdata *gen_lcomm=NULL;
112 int num_threads, thread_num;
113 __ompi_thread_sync *thread_sync;
117 thread_num = omp_get_thread_num();
122 _sion_debug_set_query_thread_num_function(omp_get_thread_num);
123 _sion_error_set_query_thread_num_function(omp_get_thread_num);
124 omp_init_lock(&_sion_ompi_lock_data);
127 MPI_Comm_size(gComm, &gSize);
128 MPI_Comm_rank(gComm, &gRank);
129 num_threads = omp_get_num_threads();
131 thread_sync = malloc(
sizeof(__ompi_thread_sync));
132 if(thread_sync==NULL) (_sion_errorprint_ompi(SION_ID_NOT_VALID,_SION_ERROR_ABORT,
"sion_paropen_ompi: cannot allocate struct of size %lu (__ompi_thread_sync), aborting...",
133 sizeof(__ompi_thread_sync)));
135 thread_sync->grank_master_mpi = gRank;
136 thread_sync->gsize_mpi = gSize;
137 thread_sync->grank_master_ompi = _sion_map_rank_mpi_to_ompi(gRank,num_threads,thread_num);
138 thread_sync->gsize_ompi = _sion_get_size_ompi(gSize,num_threads);
139 thread_sync->num_threads = num_threads;
140 thread_sync->numFiles = *numFiles;
141 __ompi_thread_sync_struct = thread_sync;
150 thread_sync = (__ompi_thread_sync *) __ompi_thread_sync_struct;
152 DPRINTFP((1,
"sion_paropen_ompi", thread_sync->grank_master_ompi+thread_num,
"thread %d enters parallel open of file %s\n", thread_num, fname));
157 return(_sion_errorprint_ompi(SION_ID_NOT_VALID,_SION_ERROR_ABORT,
"sion_paropen_ompi: No lComm variable given"));
159 if (numFiles == NULL) {
160 return(_sion_errorprint_ompi(SION_ID_NOT_VALID,_SION_ERROR_ABORT,
"sion_paropen_ompi: No numFiles variable given"));
163 if ( ! flags_store ) {
164 return(_sion_errorprint_ompi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"sion_paropen_mpi: could not parse file mode in %s, aborting ...\n", file_mode));
166 if (_sion_flags_get(flags_store,
"collmsa")) {
167 _sion_flags_destroy_store(&flags_store);
168 return _sion_errorprint(SION_ID_NOT_VALID, _SION_ERROR_ABORT,
"sion_paropen_omp: MSA aware collective operations not supported with OpenMP API, aborting ...\n");
175 if(_sion_ompi_api_aid<0) _sion_ompi_api_aid=_sion_register_callbacks_ompi();
179 gen_gcomm = (_ompi_api_commdata *) malloc(
sizeof(_ompi_api_commdata));
180 if (gen_gcomm != NULL) {
181 gen_gcomm->commset=0;
183 gen_gcomm->rank=thread_sync->grank_master_ompi+thread_num;
184 gen_gcomm->size=thread_sync->gsize_ompi;
185 gen_gcomm->num_threads=thread_sync->num_threads;
186 gen_gcomm->thread_num=thread_num;
187 gen_gcomm->lcommgroup=NULL;
189 _sion_flags_destroy_store(&flags_store);
190 return(_sion_errorprint_ompi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
191 "cannot allocate ompi internal data structure of size %lu (_omp_api_commdata), aborting ...\n",
192 (
unsigned long)
sizeof(_ompi_api_commdata)));
198 gen_gcomm->comm=gComm;
208 if (flags_store->mask&_SION_FMODE_WRITE) {
212 if (*numFiles <= 0) {
213 gen_lcomm = (_ompi_api_commdata *) malloc(
sizeof(_ompi_api_commdata));
214 if (gen_lcomm != NULL) {
215 gen_lcomm->commset=1;
216 gen_lcomm->commcreated=0;
218 gen_lcomm->num_threads=gen_gcomm->num_threads;
219 gen_lcomm->thread_num=thread_num;
220 gen_gcomm->lcommgroup=gen_lcomm;
222 _sion_flags_destroy_store(&flags_store);
223 return(_sion_errorprint_ompi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"cannot allocate ompi internal data structure of size %lu (_ompi_api_commdata), aborting ...\n",
224 (
unsigned long)
sizeof(_ompi_api_commdata)));
231 if (*numFiles <= 0) {
234 rc = _sion_get_info_from_splitted_comm_ompi(gComm, *lComm, numFiles, &filenumber, &lRank, &lSize);
235 if(rc != SION_SUCCESS) _sion_errorprint_ompi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"sion_paropen_ompi: error in _sion_get_info_from_splitted_comm_ompi");
236 DPRINTFP((1, DFUNCTION, gRank,
"%d local communicators found\n", *numFiles));
238 gen_lcomm->comm=*lComm;
242 rc = _sion_gen_info_from_gcomm_ompi(*numFiles, gComm, &filenumber, &lRank, &lSize);
243 if(rc != SION_SUCCESS) _sion_errorprint_ompi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"sion_paropen_ompi: error in _sion_gen_info_from_gcomm_ompi");
244 DPRINTFP((1,
"sion_paropen_ompi", gRank,
"Global communicator divided in %d local communicators\n", *numFiles));
247 thread_sync->filenumber = filenumber;
248 thread_sync->numFiles = *numFiles;
249 thread_sync->lrank_master_mpi = lRank;
250 thread_sync->lsize_mpi = lSize;
251 thread_sync->lrank_master_ompi = _sion_map_rank_mpi_to_ompi(lRank,num_threads,thread_num);
252 thread_sync->lsize_ompi = _sion_get_size_ompi(lSize,num_threads);
262 gRank = thread_sync->grank_master_ompi+thread_num;
263 gSize = thread_sync->gsize_ompi;
264 lRank = thread_sync->lrank_master_ompi+thread_num;
265 lSize = thread_sync->lsize_ompi;
266 filenumber = thread_sync->filenumber;
267 *numFiles = thread_sync->numFiles;
269 if (gen_lcomm != NULL) {
270 gen_lcomm->rank=thread_sync->lrank_master_ompi+thread_num;
271 gen_lcomm->size=thread_sync->lsize_ompi;
274 }
else if (flags_store->mask&_SION_FMODE_READ) {
278 gRank = thread_sync->grank_master_ompi+thread_num;
279 gSize = thread_sync->gsize_ompi;
286 _sion_flags_destroy_store(&flags_store);
287 return(_sion_errorprint_ompi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"sion_paropen_ompi: unknown file mode"));
289 _sion_flags_destroy_store(&flags_store);
292 DPRINTFP((1, DFUNCTION, gRank,
"enter parallel open of %d files (current name %s) in %s mode\n", *numFiles, fname, file_mode));
293 DPRINTFP((2, DFUNCTION, gRank,
"enter parallel parameters: grank=%d gsize=%d fnum=%d numfiles=%d lrank=%d lsize=%d chunksize=%d\n",
294 gRank, gSize,filenumber, *numFiles, lRank, lSize, (
int) *chunksize));
295 sid =
sion_generic_paropen(_sion_ompi_api_aid, fname, file_mode, chunksize, fsblksize, gen_gcomm,
296 gRank, gSize, &filenumber, numFiles, &lRank, &lSize,
298 DPRINTFP((1, DFUNCTION, gRank,
"leave parallel open of %d files in %s mode #tasks=%d sid=%d\n", *numFiles, file_mode, lSize, sid));
301 if ( sid == SION_ID_NOT_VALID ) {
302 return(_sion_errorprint_ompi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
"sion_paropen_mpi: invalid return code from internal open %d", rc));
308 DPRINTFP((1,
"sion_paropen_ompi", gRank,
"leave parallel open of file %s sid=%d globalrank=%d\n", fname, sid,*globalrank));
321 int sion_parclose_ompi(
int sid)
325 DPRINTFP((1,
"sion_parclose_ompi", _SION_DEFAULT_RANK,
"enter parallel close of sid %d\n", sid));
327 rc = sion_generic_parclose(sid);
329 DPRINTFP((1,
"sion_parclose_ompi", _SION_DEFAULT_RANK,
"leave parallel close of sid %d rc=%d\n", sid, rc));
int sion_lock_register_lock_callbacks(int lock(void *), int unlock(void *), void *lock_data)
Function which registers callback funtions for lock and unlock internal access to shared data structu...
_sion_flags_store * _sion_parse_flags(const char *flags)
Parse flags and return a flags store with key value pairs.
int sion_generic_paropen(int aid, const char *fname, const char *file_mode, sion_int64 *chunksize, sion_int32 *fsblksize, void *gcommgroup, int grank, int gsize, int *filenumber, int *numfiles, const int *lrank, const int *lsize, FILE **fileptr, char **newfname)
Open a sion file a generic interface.