SIONlib  1.7.7
Scalable I/O library for parallel access to task-local files
sion_ompi_gen.c
Go to the documentation of this file.
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2019 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
17 #define _XOPEN_SOURCE 700
18 
19 #include <stdlib.h>
20 #include <stdio.h>
21 #include <stdarg.h>
22 #include <string.h>
23 #include <time.h>
24 
25 #include "mpi.h"
26 
27 #include <sys/time.h>
28 
29 #include <sys/types.h>
30 #include <fcntl.h>
31 
32 #include <unistd.h>
33 
34 #include "sion.h"
35 #include "sion_debug.h"
36 #include "sion_error_handler.h"
37 #include "sion_internal.h"
38 #include "sion_fd.h"
39 #include "sion_filedesc.h"
40 #include "sion_printts.h"
41 #include "sion_flags.h"
42 
43 #ifdef SION_OMPI
44 
45 #include "sion_generic.h"
46 
47 #include "sion_ompi.h"
48 #include "sion_ompi_internal_gen.h"
49 #include "sion_ompi_cb_gen.h"
50 #include "sion_lock.h"
51 
52 #include "omp.h"
53 
54 int _sion_ompi_api_aid = -1;
55 static omp_lock_t _sion_ompi_lock_data;
56 
57 static void * __ompi_thread_sync_struct;
58 
59 int _sion_ompi_user_lock(void * data) {
60  int rc=SION_SUCCESS;
61  omp_set_lock(&_sion_ompi_lock_data);
62  return(rc);
63 }
64 int _sion_ompi_user_unlock(void * data) {
65  int rc=SION_SUCCESS;
66  omp_unset_lock(&_sion_ompi_lock_data);
67  return(rc);
68 }
69 
90 #define DFUNCTION "sion_paropen_ompi"
91 int sion_paropen_ompi(const char* fname,
92  const char* file_mode,
93  int* numFiles,
94  MPI_Comm gComm,
95  const MPI_Comm* lComm,
96  sion_int64* chunksize,
97  sion_int32* fsblksize,
98  int* globalrank,
99  FILE** fileptr,
100  char** newfname)
101 {
102 
103  /* gRank and lRank refer to the MPI process rank in the global and local communicator respectively */
104  int rc, sid = -1;
105  int filenumber, gRank, lRank, lSize, gSize;
106 
107  _sion_flags_store* flags_store = NULL;
108 
109  _ompi_api_commdata *gen_gcomm;
110  _ompi_api_commdata *gen_lcomm=NULL;
111 
112  int num_threads, thread_num;
113  __ompi_thread_sync *thread_sync;
114 
115 
116 
117  thread_num = omp_get_thread_num();
118 
119  #pragma omp master
120  {
121 
122  _sion_debug_set_query_thread_num_function(omp_get_thread_num);
123  _sion_error_set_query_thread_num_function(omp_get_thread_num);
124  omp_init_lock(&_sion_ompi_lock_data);
125  sion_lock_register_lock_callbacks(_sion_ompi_user_lock,_sion_ompi_user_unlock,&_sion_ompi_lock_data);
126 
127  MPI_Comm_size(gComm, &gSize);
128  MPI_Comm_rank(gComm, &gRank);
129  num_threads = omp_get_num_threads();
130 
131  thread_sync = malloc(sizeof(__ompi_thread_sync));
132  if(thread_sync==NULL) (_sion_errorprint_ompi(SION_ID_NOT_VALID,_SION_ERROR_ABORT,"sion_paropen_ompi: cannot allocate struct of size %lu (__ompi_thread_sync), aborting...",
133  sizeof(__ompi_thread_sync)));
134 
135  thread_sync->grank_master_mpi = gRank;
136  thread_sync->gsize_mpi = gSize;
137  thread_sync->grank_master_ompi = _sion_map_rank_mpi_to_ompi(gRank,num_threads,thread_num);
138  thread_sync->gsize_ompi = _sion_get_size_ompi(gSize,num_threads);
139  thread_sync->num_threads = num_threads;
140  thread_sync->numFiles = *numFiles;
141  __ompi_thread_sync_struct = thread_sync;
142  }
143  /* sync to ensure that info in thread_sync is accessible */
144  {
145 #pragma omp barrier
146  }
147 
148 
149  /* this is actually not necessary, but it makes for cleaner code by preventing us from doing lots of typecasts */
150  thread_sync = (__ompi_thread_sync *) __ompi_thread_sync_struct;
151 
152  DPRINTFP((1, "sion_paropen_ompi", thread_sync->grank_master_ompi+thread_num, "thread %d enters parallel open of file %s\n", thread_num, fname));
153 
154 
155  /* check parameters */
156  if (lComm == NULL) {
157  return(_sion_errorprint_ompi(SION_ID_NOT_VALID,_SION_ERROR_ABORT,"sion_paropen_ompi: No lComm variable given"));
158  }
159  if (numFiles == NULL) {
160  return(_sion_errorprint_ompi(SION_ID_NOT_VALID,_SION_ERROR_ABORT,"sion_paropen_ompi: No numFiles variable given"));
161  }
162  flags_store = _sion_parse_flags(file_mode);
163  if ( ! flags_store ) {
164  return(_sion_errorprint_ompi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mpi: could not parse file mode in %s, aborting ...\n", file_mode));
165  }
166  if (_sion_flags_get(flags_store, "collmsa")) {
167  _sion_flags_destroy_store(&flags_store);
168  return _sion_errorprint(SION_ID_NOT_VALID, _SION_ERROR_ABORT, "sion_paropen_omp: MSA aware collective operations not supported with OpenMP API, aborting ...\n");
169  }
170 
171  /* create generic API */
172  #pragma omp master
173  {
174  /* register callbacks for generic interface */
175  if(_sion_ompi_api_aid<0) _sion_ompi_api_aid=_sion_register_callbacks_ompi();
176  }
177 
178  /* create global generic communicator container on all threads */
179  gen_gcomm = (_ompi_api_commdata *) malloc(sizeof(_ompi_api_commdata));
180  if (gen_gcomm != NULL) {
181  gen_gcomm->commset=0;
182  gen_gcomm->local=0;
183  gen_gcomm->rank=thread_sync->grank_master_ompi+thread_num;
184  gen_gcomm->size=thread_sync->gsize_ompi;
185  gen_gcomm->num_threads=thread_sync->num_threads;
186  gen_gcomm->thread_num=thread_num;
187  gen_gcomm->lcommgroup=NULL;
188  } else {
189  _sion_flags_destroy_store(&flags_store);
190  return(_sion_errorprint_ompi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,
191  "cannot allocate ompi internal data structure of size %lu (_omp_api_commdata), aborting ...\n",
192  (unsigned long) sizeof(_ompi_api_commdata)));
193  }
194 
195  /* store MPI communicator in global generic communicator container on master thread */
196 #pragma omp master
197  {
198  gen_gcomm->comm=gComm;
199  }
200 
201 
202 
203  /* sync to ensure that aid is accessible */
204  {
205 #pragma omp barrier
206  }
207 
208  if (flags_store->mask&_SION_FMODE_WRITE) {
209  /* file mode WRITE */
210 
211  /* create generic local communicator container on each thread */
212  if (*numFiles <= 0) {
213  gen_lcomm = (_ompi_api_commdata *) malloc(sizeof(_ompi_api_commdata));
214  if (gen_lcomm != NULL) {
215  gen_lcomm->commset=1;
216  gen_lcomm->commcreated=0;
217  gen_lcomm->local=1;
218  gen_lcomm->num_threads=gen_gcomm->num_threads;
219  gen_lcomm->thread_num=thread_num;
220  gen_gcomm->lcommgroup=gen_lcomm; /* store pointer in global comm group */
221  } else {
222  _sion_flags_destroy_store(&flags_store);
223  return(_sion_errorprint_ompi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"cannot allocate ompi internal data structure of size %lu (_ompi_api_commdata), aborting ...\n",
224  (unsigned long) sizeof(_ompi_api_commdata)));
225  }
226  }
227 
228 #pragma omp master
229  {
230 
231  if (*numFiles <= 0) {
232  /* lComm contains local communicator */
233 
234  rc = _sion_get_info_from_splitted_comm_ompi(gComm, *lComm, numFiles, &filenumber, &lRank, &lSize);
235  if(rc != SION_SUCCESS) _sion_errorprint_ompi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_ompi: error in _sion_get_info_from_splitted_comm_ompi");
236  DPRINTFP((1, DFUNCTION, gRank, "%d local communicators found\n", *numFiles));
237 
238  gen_lcomm->comm=*lComm;
239 
240  } else {
241  /* number of files is given */
242  rc = _sion_gen_info_from_gcomm_ompi(*numFiles, gComm, &filenumber, &lRank, &lSize);
243  if(rc != SION_SUCCESS) _sion_errorprint_ompi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_ompi: error in _sion_gen_info_from_gcomm_ompi");
244  DPRINTFP((1, "sion_paropen_ompi", gRank, "Global communicator divided in %d local communicators\n", *numFiles));
245  }
246 
247  thread_sync->filenumber = filenumber;
248  thread_sync->numFiles = *numFiles;
249  thread_sync->lrank_master_mpi = lRank;
250  thread_sync->lsize_mpi = lSize;
251  thread_sync->lrank_master_ompi = _sion_map_rank_mpi_to_ompi(lRank,num_threads,thread_num);
252  thread_sync->lsize_ompi = _sion_get_size_ompi(lSize,num_threads);
253 
254  } /* OMP MASTER END */
255 
256  {
257 #pragma omp barrier
258  }
259 
260 
261  /* set up parameters of call to generic open (OMPI values) */
262  gRank = thread_sync->grank_master_ompi+thread_num;
263  gSize = thread_sync->gsize_ompi;
264  lRank = thread_sync->lrank_master_ompi+thread_num;
265  lSize = thread_sync->lsize_ompi;
266  filenumber = thread_sync->filenumber;
267  *numFiles = thread_sync->numFiles;
268 
269  if (gen_lcomm != NULL) {
270  gen_lcomm->rank=thread_sync->lrank_master_ompi+thread_num;
271  gen_lcomm->size=thread_sync->lsize_ompi;
272  }
273 
274  } else if (flags_store->mask&_SION_FMODE_READ) {
275 
276  /* file mode READ */
277  /* set up parameters of call to generic open (OMPI values) */
278  gRank = thread_sync->grank_master_ompi+thread_num;
279  gSize = thread_sync->gsize_ompi;
280  lRank = -1; /* which determined after opening file by sion_generic_paropen */
281  lSize = -1; /* " */
282  filenumber = -1; /* " */
283  *numFiles = -1; /* " */
284 
285  } else {
286  _sion_flags_destroy_store(&flags_store);
287  return(_sion_errorprint_ompi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_ompi: unknown file mode"));
288  }
289  _sion_flags_destroy_store(&flags_store);
290 
291 
292  DPRINTFP((1, DFUNCTION, gRank, "enter parallel open of %d files (current name %s) in %s mode\n", *numFiles, fname, file_mode));
293  DPRINTFP((2, DFUNCTION, gRank, "enter parallel parameters: grank=%d gsize=%d fnum=%d numfiles=%d lrank=%d lsize=%d chunksize=%d\n",
294  gRank, gSize,filenumber, *numFiles, lRank, lSize, (int) *chunksize));
295  sid = sion_generic_paropen(_sion_ompi_api_aid, fname, file_mode, chunksize, fsblksize, gen_gcomm,
296  gRank, gSize, &filenumber, numFiles, &lRank, &lSize,
297  fileptr, newfname);
298  DPRINTFP((1, DFUNCTION, gRank, "leave parallel open of %d files in %s mode #tasks=%d sid=%d\n", *numFiles, file_mode, lSize, sid));
299 
300  /* test return code from internal open */
301  if ( sid == SION_ID_NOT_VALID ) {
302  return(_sion_errorprint_ompi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mpi: invalid return code from internal open %d", rc));
303  }
304 
305  /* return parameter */
306  *globalrank=gRank;
307 
308  DPRINTFP((1, "sion_paropen_ompi", gRank, "leave parallel open of file %s sid=%d globalrank=%d\n", fname, sid,*globalrank));
309 
310  return (sid);
311 }
312 #undef DFUNCTION
313 
314 
321 int sion_parclose_ompi(int sid)
322 {
323  int rc=0;
324 
325  DPRINTFP((1, "sion_parclose_ompi", _SION_DEFAULT_RANK, "enter parallel close of sid %d\n", sid));
326 
327  rc = sion_generic_parclose(sid);
328 
329  DPRINTFP((1, "sion_parclose_ompi", _SION_DEFAULT_RANK, "leave parallel close of sid %d rc=%d\n", sid, rc));
330 
331  return (rc);
332 }
333 
334 /* end of ifdef OMPI */
335 #endif
int sion_lock_register_lock_callbacks(int lock(void *), int unlock(void *), void *lock_data)
Function which registers callback funtions for lock and unlock internal access to shared data structu...
Definition: sion_common.c:1220
_sion_flags_store * _sion_parse_flags(const char *flags)
Parse flags and return a flags store with key value pairs.
Definition: sion_flags.c:326
int sion_generic_paropen(int aid, const char *fname, const char *file_mode, sion_int64 *chunksize, sion_int32 *fsblksize, void *gcommgroup, int grank, int gsize, int *filenumber, int *numfiles, const int *lrank, const int *lsize, FILE **fileptr, char **newfname)
Open a sion file a generic interface.
Definition: sion_generic.c:351
Sion Time Stamp Header.