SIONlib  2.0.0-rc.1
Scalable I/O library for parallel access to task-local files
sion_ompi_gen.c
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2018 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
10 #define _XOPEN_SOURCE 700
11 
12 #include <mpi.h>
13 #include <stdint.h>
14 #include <stdio.h>
15 #include <stdlib.h>
16 
17 #include "sion_common.h"
18 #include "sion_const.h"
19 #include "sion_debug.h"
20 #include "sion_error_handler.h"
21 #include "sion_flags.h"
22 #include "sion_internal.h"
23 
24 #ifdef SION_OMPI
25 
26 #include <omp.h>
27 
28 #include "sion_generic.h"
29 #include "sion_ompi_cb_gen.h"
30 #include "sion_ompi_internal_gen.h"
31 
32 int _sion_ompi_api_aid = -1;
33 static omp_lock_t _sion_ompi_lock_data;
34 
35 static void *__ompi_thread_sync_struct;
36 
37 int _sion_ompi_user_lock(void *data)
38 {
39  int rc = SION_SUCCESS;
40  omp_set_lock(&_sion_ompi_lock_data);
41  return rc;
42 }
43 int _sion_ompi_user_unlock(void *data)
44 {
45  int rc = SION_SUCCESS;
46  omp_unset_lock(&_sion_ompi_lock_data);
47  return rc;
48 }
49 
50 #define DFUNCTION "sion_paropen_ompi"
51 int sion_paropen_ompi(const char *fname, const char *file_mode, int *numFiles, MPI_Comm gComm, const MPI_Comm *lComm,
52  int64_t *chunksize, int32_t *fsblksize, int *globalrank, FILE **fileptr, char **newfname)
53 {
54  /* gRank and lRank refer to the MPI process rank in the global and local communicator respectively */
55  int rc, sid = -1;
56  int filenumber, gRank, lRank, lSize, gSize;
57 
58  _sion_flags_store *flags_store = NULL;
59 
60  _ompi_api_commdata *gen_gcomm;
61  _ompi_api_commdata *gen_lcomm = NULL;
62 
63  int num_threads, thread_num;
64  __ompi_thread_sync *thread_sync;
65 
66  thread_num = omp_get_thread_num();
67 
68 #pragma omp master
69  {
70  _sion_debug_set_query_thread_num_function(omp_get_thread_num);
71  _sion_error_set_query_thread_num_function(omp_get_thread_num);
72  omp_init_lock(&_sion_ompi_lock_data);
73  sion_lock_register_lock_callbacks(_sion_ompi_user_lock, _sion_ompi_user_unlock, &_sion_ompi_lock_data);
74 
75  MPI_Comm_size(gComm, &gSize);
76  MPI_Comm_rank(gComm, &gRank);
77  num_threads = omp_get_num_threads();
78 
79  thread_sync = malloc(sizeof(__ompi_thread_sync));
80  if (thread_sync == NULL)
81  (_sion_errorprint_ompi(SION_ID_NOT_VALID, _SION_ERROR_ABORT,
82  "sion_paropen_ompi: cannot allocate struct of size %lu (__ompi_thread_sync), aborting...", sizeof(__ompi_thread_sync)));
83 
84  thread_sync->grank_master_mpi = gRank;
85  thread_sync->gsize_mpi = gSize;
86  thread_sync->grank_master_ompi = _sion_map_rank_mpi_to_ompi(gRank, num_threads, thread_num);
87  thread_sync->gsize_ompi = _sion_get_size_ompi(gSize, num_threads);
88  thread_sync->num_threads = num_threads;
89  thread_sync->numFiles = *numFiles;
90  __ompi_thread_sync_struct = thread_sync;
91  }
92  /* sync to ensure that info in thread_sync is accessible */
93  {
94 #pragma omp barrier
95  }
96 
97  /* this is actually not necessary, but it makes for cleaner code by preventing us from doing lots of typecasts */
98  thread_sync = (__ompi_thread_sync *)__ompi_thread_sync_struct;
99 
100  DPRINTFP((1, "sion_paropen_ompi", thread_sync->grank_master_ompi + thread_num, "thread %d enters parallel open of file %s\n",
101  thread_num, fname));
102 
103  /* check parameters */
104  if (lComm == NULL) {
105  return _sion_errorprint_ompi(SION_ID_NOT_VALID, _SION_ERROR_ABORT, "sion_paropen_ompi: No lComm variable given");
106  }
107  if (numFiles == NULL) {
108  return _sion_errorprint_ompi(SION_ID_NOT_VALID, _SION_ERROR_ABORT, "sion_paropen_ompi: No numFiles variable given");
109  }
110  flags_store = _sion_parse_flags(file_mode);
111  if (!flags_store) {
112  return _sion_errorprint_ompi(
113  SION_ID_NOT_VALID, _SION_ERROR_RETURN, "sion_paropen_mpi: could not parse file mode in %s, aborting ...\n", file_mode);
114  }
115 
116 /* create generic API */
117 #pragma omp master
118  {
119  /* register callbacks for generic interface */
120  if (_sion_ompi_api_aid < 0) {
121  _sion_ompi_api_aid = _sion_register_callbacks_ompi();
122  }
123  }
124 
125  /* create global generic communicator container on all threads */
126  gen_gcomm = malloc(sizeof(_ompi_api_commdata));
127  if (gen_gcomm != NULL) {
128  gen_gcomm->commset = 0;
129  gen_gcomm->local = 0;
130  gen_gcomm->rank = thread_sync->grank_master_ompi + thread_num;
131  gen_gcomm->size = thread_sync->gsize_ompi;
132  gen_gcomm->num_threads = thread_sync->num_threads;
133  gen_gcomm->thread_num = thread_num;
134  gen_gcomm->lcommgroup = NULL;
135  } else {
136  _sion_flags_destroy_store(&flags_store);
137  return _sion_errorprint_ompi(SION_ID_NOT_VALID, _SION_ERROR_RETURN,
138  "cannot allocate ompi internal data structure of size %lu (_omp_api_commdata), aborting ...\n",
139  (unsigned long)sizeof(_ompi_api_commdata));
140  }
141 
142 /* store MPI communicator in global generic communicator container on master thread */
143 #pragma omp master
144  {
145  gen_gcomm->comm = gComm;
146  }
147 
148  /* sync to ensure that aid is accessible */
149  {
150 #pragma omp barrier
151  }
152 
153  if (flags_store->mask & _SION_FMODE_WRITE) {
154  /* file mode WRITE */
155 
156  /* create generic local communicator container on each thread */
157  if (*numFiles <= 0) {
158  gen_lcomm = malloc(sizeof(_ompi_api_commdata));
159  if (gen_lcomm != NULL) {
160  gen_lcomm->commset = 1;
161  gen_lcomm->commcreated = 0;
162  gen_lcomm->local = 1;
163  gen_lcomm->num_threads = gen_gcomm->num_threads;
164  gen_lcomm->thread_num = thread_num;
165  gen_gcomm->lcommgroup = gen_lcomm; /* store pointer in global comm group */
166  } else {
167  _sion_flags_destroy_store(&flags_store);
168  return _sion_errorprint_ompi(SION_ID_NOT_VALID, _SION_ERROR_RETURN,
169  "cannot allocate ompi internal data structure of size %lu (_ompi_api_commdata), aborting ...\n",
170  (unsigned long)sizeof(_ompi_api_commdata));
171  }
172  }
173 
174 #pragma omp master
175  {
176  if (*numFiles <= 0) {
177  /* lComm contains local communicator */
178 
179  rc = _sion_get_info_from_splitted_comm_ompi(gComm, *lComm, numFiles, &filenumber, &lRank, &lSize);
180  if (rc != SION_SUCCESS) {
181  _sion_errorprint_ompi(
182  SION_ID_NOT_VALID, _SION_ERROR_RETURN, "sion_paropen_ompi: error in _sion_get_info_from_splitted_comm_ompi");
183  }
184  DPRINTFP((1, DFUNCTION, gRank, "%d local communicators found\n", *numFiles));
185 
186  gen_lcomm->comm = *lComm;
187 
188  } else {
189  /* number of files is given */
190  rc = _sion_gen_info_from_gcomm_ompi(*numFiles, gComm, &filenumber, &lRank, &lSize);
191  if (rc != SION_SUCCESS) {
192  _sion_errorprint_ompi(
193  SION_ID_NOT_VALID, _SION_ERROR_RETURN, "sion_paropen_ompi: error in _sion_gen_info_from_gcomm_ompi");
194  }
195  DPRINTFP((1, "sion_paropen_ompi", gRank, "Global communicator divided in %d local communicators\n", *numFiles));
196  }
197 
198  thread_sync->filenumber = filenumber;
199  thread_sync->numFiles = *numFiles;
200  thread_sync->lrank_master_mpi = lRank;
201  thread_sync->lsize_mpi = lSize;
202  thread_sync->lrank_master_ompi = _sion_map_rank_mpi_to_ompi(lRank, num_threads, thread_num);
203  thread_sync->lsize_ompi = _sion_get_size_ompi(lSize, num_threads);
204 
205  } /* OMP MASTER END */
206 
207  {
208 #pragma omp barrier
209  }
210 
211  /* set up parameters of call to generic open (OMPI values) */
212  gRank = thread_sync->grank_master_ompi + thread_num;
213  gSize = thread_sync->gsize_ompi;
214  lRank = thread_sync->lrank_master_ompi + thread_num;
215  lSize = thread_sync->lsize_ompi;
216  filenumber = thread_sync->filenumber;
217  *numFiles = thread_sync->numFiles;
218 
219  if (gen_lcomm != NULL) {
220  gen_lcomm->rank = thread_sync->lrank_master_ompi + thread_num;
221  gen_lcomm->size = thread_sync->lsize_ompi;
222  }
223 
224  } else if (flags_store->mask & _SION_FMODE_READ) {
225  /* file mode READ */
226  /* set up parameters of call to generic open (OMPI values) */
227  gRank = thread_sync->grank_master_ompi + thread_num;
228  gSize = thread_sync->gsize_ompi;
229  lRank = -1; /* which determined after opening file by sion_generic_paropen */
230  lSize = -1; /* " */
231  filenumber = -1; /* " */
232  *numFiles = -1; /* " */
233 
234  } else {
235  _sion_flags_destroy_store(&flags_store);
236  return _sion_errorprint_ompi(SION_ID_NOT_VALID, _SION_ERROR_RETURN, "sion_paropen_ompi: unknown file mode");
237  }
238  _sion_flags_destroy_store(&flags_store);
239 
240  DPRINTFP((1, DFUNCTION, gRank, "enter parallel open of %d files (current name %s) in %s mode\n", *numFiles, fname, file_mode));
241  DPRINTFP(
242  (2, DFUNCTION, gRank, "enter parallel parameters: grank=%d gsize=%d fnum=%d numfiles=%d lrank=%d lsize=%d chunksize=%d\n",
243  gRank, gSize, filenumber, *numFiles, lRank, lSize, (int)*chunksize));
244  sid = sion_generic_paropen(_sion_ompi_api_aid, fname, file_mode, chunksize, fsblksize, gen_gcomm, gRank, gSize, &filenumber,
245  numFiles, &lRank, &lSize, fileptr, newfname);
246  DPRINTFP(
247  (1, DFUNCTION, gRank, "leave parallel open of %d files in %s mode #tasks=%d sid=%d\n", *numFiles, file_mode, lSize, sid));
248 
249  /* test return code from internal open */
250  if (sid == SION_ID_NOT_VALID) {
251  return _sion_errorprint_ompi(
252  SION_ID_NOT_VALID, _SION_ERROR_RETURN, "sion_paropen_mpi: invalid return code from internal open %d", rc);
253  }
254 
255  /* return parameter */
256  *globalrank = gRank;
257 
258  DPRINTFP((1, "sion_paropen_ompi", gRank, "leave parallel open of file %s sid=%d globalrank=%d\n", fname, sid, *globalrank));
259 
260  return sid;
261 }
262 #undef DFUNCTION
263 
264 int sion_parclose_ompi(int sid)
265 {
266  int rc = 0;
267 
268  DPRINTFP((1, "sion_parclose_ompi", _SION_DEFAULT_RANK, "enter parallel close of sid %d\n", sid));
269 
270  rc = sion_generic_parclose(sid);
271 
272  DPRINTFP((1, "sion_parclose_ompi", _SION_DEFAULT_RANK, "leave parallel close of sid %d rc=%d\n", sid, rc));
273 
274  return rc;
275 }
276 
277 /* end of ifdef OMPI */
278 #endif
int sion_paropen_ompi(const char *fname, const char *file_mode, int *numFiles, MPI_Comm gComm, const MPI_Comm *lComm, int64_t *chunksize, int32_t *fsblksize, int *globalrank, FILE **fileptr, char **newfname)
Open a sion file using OpenMP/MPI.
Definition: sion_ompi_gen.c:51
int sion_generic_paropen(int aid, const char *fname, const char *file_mode, int64_t *chunksize, int32_t *fsblksize, void *gcommgroup, int grank, int gsize, int *filenumber, int *numfiles, const int *lrank, const int *lsize, FILE **fileptr, char **newfname)
Open a sion file a generic interface.
Definition: sion_generic.c:362
int sion_lock_register_lock_callbacks(int lock(void *), int unlock(void *), void *lock_data)
Function which registers callback funtions for lock and unlock internal access to shared data structu...
Definition: sion_common.c:774
int sion_parclose_ompi(int sid)
closes a SION file previously opened in OpenMP/MPI mode