SIONlib  1.7.1
Scalable I/O library for parallel access to task-local files
sion_mpi_gen.c
Go to the documentation of this file.
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2016 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
15 #include <stdlib.h>
16 #include <stdio.h>
17 #include <stdarg.h>
18 #include <string.h>
19 #include <time.h>
20 
21 #include <sys/time.h>
22 
23 #include <sys/types.h>
24 #include <fcntl.h>
25 
26 #include <unistd.h>
27 
28 #include "mpi.h"
29 
30 #include "sion.h"
31 #include "sion_debug.h"
32 #include "sion_error_handler.h"
33 #include "sion_internal.h"
34 #include "sion_fd.h"
35 #include "sion_filedesc.h"
36 #include "sion_printts.h"
37 #include "sion_flags.h"
38 
39 #ifdef SION_MPI
40 
41 #include "sion_generic.h"
42 
43 #include "sion_mpi.h"
44 #include "sion_mpi_internal_gen.h"
45 
46 #include "sion_mpi_cb_gen.h"
47 
48 int _sion_mpi_api_aid = -1;
49 
83 int sion_paropen_mpi(const char* fname,
84  const char* file_mode,
85  int* numFiles,
86  MPI_Comm gComm,
87  const MPI_Comm* lComm,
88  sion_int64* chunksize,
89  sion_int32* fsblksize,
90  int* globalrank,
91  FILE** fileptr,
92  char** newfname
93  )
94 {
95  int rc = SION_NOT_SUCCESS, sid = SION_ID_UNDEF;
96  int filenumber, gtasks, gRank, lRank, lSize;
97 
98  _mpi_api_commdata *gen_gcomm;
99  _mpi_api_commdata *gen_lcomm;
100 
101  _sion_flags_store* flags_store = NULL;
102 
103  MPI_Comm_size(gComm, &gtasks);
104  MPI_Comm_rank(gComm, &gRank);
105 
106  DPRINTFP((1, "sion_paropen_mpi", gRank, "enter parallel open of file %s\n", fname));
107 
108  /* check parameters */
109  if (lComm == NULL) {
110  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mpi: No lComm variable given"));
111  }
112  if (numFiles == NULL) {
113  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mpi: No numFiles variable given"));
114  }
115 
116  flags_store = _sion_parse_flags(file_mode);
117  /* parse file mode */
118  if ( ! flags_store ) {
119  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mpi: could not parse file mode in %s, aborting ...\n", file_mode));
120  }
121 
122  /* register callbacks for generic interface */
123  if(_sion_mpi_api_aid<0) _sion_mpi_api_aid=_sion_register_callbacks_mpi();
124 
125 
126  if (flags_store->mask&_SION_FMODE_WRITE) {
127  /* file mode WRITE */
128 
129  if (*numFiles <= 0) {
130  /* lComm contains local communicator */
131 
132  rc = _sion_get_info_from_splitted_comm_mpi(gComm, *lComm, numFiles, &filenumber, &lRank, &lSize);
133  if(rc != SION_SUCCESS) {
134  _sion_flags_destroy_store(&flags_store);
135  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mpi: error in _sion_get_info_from_splitted_comm_mpi"));
136  }
137  DPRINTFP((1, "sion_paropen_mpi", gRank, "%d local communicators found\n", *numFiles));
138 
139  } else {
140  /* number of files is given */
141 
142  rc = _sion_gen_info_from_gcomm_mpi(*numFiles, gComm, &filenumber, &lRank, &lSize);
143  if(rc != SION_SUCCESS) {
144  _sion_flags_destroy_store(&flags_store);
145  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mpi: error in _sion_gen_info_from_gcomm_mpi"));
146  }
147  DPRINTFP((1, "sion_paropen_mpi", gRank, "Global communicator divided in %d local communicators\n", *numFiles));
148  }
149 
150  /* overwrite globalrank set by user, necessary for multi-file support */
151  *globalrank = gRank;
152 
153  } else if (flags_store->mask&_SION_FMODE_READ) {
154  /* file mode READ */
155  /* nothing to do info will be returned by generic paropen */
156 
157  /* set to gRank, current rank in global communicator, this is
158  different to older versions of SIONlib, where globalrank comes
159  from file in read case */
160  *globalrank = gRank;
161 
162 
163  if(! (flags_store->mask&_SION_FMODE_BUDDY) ) {
164  lRank=lSize=-1; /* will be set by sion_generic_paropen */
165  } else {
166  /* lvomm must be given for buddy checkpointing */
167  rc = _sion_get_info_from_splitted_comm_mpi(gComm, *lComm, numFiles, &filenumber, &lRank, &lSize);
168  if(rc != SION_SUCCESS) return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mpi: error in _sion_get_info_from_splitted_comm_mpi"));
169  DPRINTFP((1, "sion_paropen_mpi", gRank, "%d local communicators found\n", *numFiles));
170  }
171 
172  } else {
173 
174  _sion_flags_destroy_store(&flags_store);
175  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mpi: unknown file mode"));
176  }
177 
178  /* create generic communicator container */
179  gen_gcomm = (_mpi_api_commdata *) malloc(sizeof(_mpi_api_commdata));
180  if (gen_gcomm == NULL) {
181  _sion_flags_destroy_store(&flags_store);
182  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"cannot allocate mpi internal data structure of size %lu (_mpi_api_commdata), aborting ...\n",
183  (unsigned long) sizeof(_mpi_api_commdata)));
184  }
185  gen_gcomm->comm=gComm;
186  gen_gcomm->commset=1;
187  gen_gcomm->local=0;
188  gen_gcomm->rank=gRank;
189  gen_gcomm->size=gtasks;
190  gen_gcomm->lcommgroup=NULL;
191 
192  if ((flags_store->mask&_SION_FMODE_WRITE) && (*numFiles <= 0)) {
193 
194  /* create generic local communicator container */
195  gen_lcomm = (_mpi_api_commdata *) malloc(sizeof(_mpi_api_commdata));
196  if (gen_lcomm == NULL) {
197  _sion_flags_destroy_store(&flags_store);
198  free(gen_gcomm);
199  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"cannot allocate mpi internal data structure of size %lu (_mpi_api_commdata), aborting ...\n",
200  (unsigned long) sizeof(_mpi_api_commdata)));
201  }
202  gen_lcomm->comm=*lComm;
203  gen_lcomm->commset=1; /* =0 WF? */
204  gen_lcomm->commcreated=0;
205  gen_lcomm->local=1;
206  gen_lcomm->rank=lRank;
207  gen_lcomm->size=lSize;
208  /* store pointer in global comm group */
209  gen_gcomm->lcommgroup=gen_lcomm;
210  }
211  _sion_flags_destroy_store(&flags_store);
212 
213  DPRINTFP((1, "sion_paropen_mpi", gRank, "enter parallel open of %d files (current name %s) in %s mode\n", *numFiles, fname, file_mode));
214  sid = sion_generic_paropen(_sion_mpi_api_aid, fname, file_mode, chunksize, fsblksize, gen_gcomm, gRank, gtasks, &filenumber, numFiles, &lRank, &lSize,
215  fileptr, newfname);
216  DPRINTFP((1, "sion_paropen_mpi", gRank, "leave parallel open of %d files in %s mode #tasks=%d sid=%d\n", *numFiles, file_mode, lSize, sid));
217 
218  /* test return code from internal open */
219  if ( sid == SION_ID_NOT_VALID ) {
220  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mpi: invalid return code from internal open %d", rc));
221  }
222 
223  DPRINTFP((1, "sion_paropen_mpi", gRank, "leave parallel open of file %s sid=%d\n", fname, sid));
224 
225  return (sid);
226 }
227 
228 
239 int sion_parclose_mpi(int sid)
240 {
241  int rc = 0;
242 
243  DPRINTFP((1, "sion_parclose_mpi", _SION_DEFAULT_RANK, "enter parallel close of sid %d\n", sid));
244 
245  rc = sion_generic_parclose(sid);
246 
247  DPRINTFP((1, "sion_parclose_mpi", _SION_DEFAULT_RANK, "leave parallel close of sid %d rc=%d\n", sid, rc));
248 
249  return (rc);
250 }
251 
252 int sion_parreinit_mpi( int sid,
253  sion_int64 chunksize )
254 {
255  int rc = 0;
256 
257  DPRINTFP((1, "sion_parreinit_mpi", _SION_DEFAULT_RANK, "enter parallel reinit of sid %d\n", sid));
258 
259  rc = sion_generic_parreinit(sid, chunksize);
260 
261  DPRINTFP((1, "sion_parreinit_mpi", _SION_DEFAULT_RANK, "leave parallel reinit of sid %d rc=%d\n", sid, rc));
262 
263  return (rc);
264 }
265 
266 int sion_paropen_mapped_mpi( char *fname,
267  const char *file_mode,
268  int *numFiles,
269  MPI_Comm gComm,
270  int *nlocaltasks,
271  int **globalranks,
272  sion_int64 **chunksizes,
273  int **mapping_filenrs,
274  int **mapping_lranks,
275  sion_int32 *fsblksize,
276  FILE **fileptr) {
277 
278  int sid=SION_ID_UNDEF;
279  int gtasks, gRank;
280  char *lprefix;
281  _mpi_api_commdata *gen_gcomm;
282  _sion_flags_store* flags_store = NULL;
283 
284  MPI_Comm_size(gComm, &gtasks);
285  MPI_Comm_rank(gComm, &gRank);
286 
287  DPRINTFP((1, "sion_paropen_mapped_mpi", gRank, "enter parallel open of file %s\n", fname));
288 
289  /* check parameters */
290  if (numFiles == NULL) {
291  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mapped_mpi: No numFiles variable given"));
292  }
293 
294  lprefix = calloc(SION_FILENAME_LENGTH,1);
295  if (lprefix == NULL) {
296  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mapped_mpi: cannot allocate temporary memory of size %lu (lprefix), aborting ...\n", (unsigned long) SION_FILENAME_LENGTH));
297  }
298 
299  flags_store = _sion_parse_flags(file_mode);
300  if ( ! flags_store ) {
301  free(lprefix);
302  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mapped_mpi: could not parse file mode in %s, aborting ...\n", file_mode));
303  }
304 
305  /* register callbacks for generic interface */
306  if(_sion_mpi_api_aid<0) _sion_mpi_api_aid=_sion_register_callbacks_mpi();
307 
308  if (flags_store->mask&_SION_FMODE_WRITE) {
309  /* file mode WRITE */
310 
311  if (*numFiles <= 0) {
312  _sion_flags_destroy_store(&flags_store);
313  free(lprefix);
314  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mapped_mpi: numFiles variable <= 0 not allowed for mapped files in write mode"));
315  }
316 
317  /* prefix must be used in generic open function */
318  strcpy(lprefix, fname);
319 
320  }
321  else if (flags_store->mask&_SION_FMODE_READ) {
322  /* file mode READ */
323  /* nothing to do here so far, filenumbers and mapping will be determined by in generic routine */
324 
325  } else {
326  _sion_flags_destroy_store(&flags_store);
327  free(lprefix);
328  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mapped_mpi: unknown file mode"));
329  }
330  _sion_flags_destroy_store(&flags_store);
331 
332  /* create generic communicator container */
333  gen_gcomm = (_mpi_api_commdata *) malloc(sizeof(_mpi_api_commdata));
334  if (gen_gcomm == NULL) {
335  free(lprefix);
336  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"cannot allocate mpi internal data structure of size %lu (_mpi_api_commdata), aborting ...\n",
337  (unsigned long) sizeof(_mpi_api_commdata)));
338  }
339  gen_gcomm->comm=gComm;
340  gen_gcomm->commset=1;
341  gen_gcomm->local=0;
342  gen_gcomm->rank=gRank;
343  gen_gcomm->size=gtasks;
344  gen_gcomm->lcommgroup=NULL;
345 
346 
347  DPRINTFP((1, "sion_paropen_mapped_mpi", gRank, "enter parallel open of %d files (current name %s) in %s mode (sid=%d)\n", *numFiles, fname, file_mode, sid));
348  sid=sion_generic_paropen_mapped(_sion_mpi_api_aid, fname, file_mode, numFiles, gen_gcomm, gRank, gtasks, nlocaltasks, globalranks, chunksizes,
349  mapping_filenrs, mapping_lranks, fsblksize, fileptr);
350  DPRINTFP((1, "sion_paropen_mapped_mpi", gRank, "leave parallel open of %d files in %s mode #tasks=%d sid=%d\n", *numFiles, file_mode, *nlocaltasks, sid));
351 
352  /* test return code from internal open */
353  if ( sid == SION_ID_NOT_VALID ) {
354  free(lprefix);
355  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mapped_mpi: invalid return code from internal open %d", sid));
356  }
357 
358  if(lprefix) free(lprefix);
359  DPRINTFP((1, "sion_paropen_mapped_mpi", gRank, "leave parallel open of file %s sid=%d\n", fname, sid));
360 
361 
362  return(sid);
363 }
364 
365 int sion_parclose_mapped_mpi( int sid ) {
366  int rc = 0;
367 
368  DPRINTFP((1, "sion_parclose_mapped_mpi", _SION_DEFAULT_RANK, "enter parallel close of sid %d\n", sid));
369 
370  rc = sion_generic_parclose_mapped(sid);
371 
372  DPRINTFP((1, "sion_parclose_mapped_mpi", _SION_DEFAULT_RANK, "leave parallel close of sid %d rc=%d\n", sid, rc));
373 
374  return(rc);
375 }
376 
377 /* end of ifdef MPI */
378 #endif
int sion_generic_paropen(int aid, const char *fname, const char *file_mode, sion_int64 *chunksize, sion_int32 *fsblksize, void *gcommgroup, int grank, int gsize, int *filenumber, int *numfiles, const int *lrank, const int *lsize, FILE **fileptr, char **newfname)
Open a sion file a generic interface.
Definition: sion_generic.c:349
int sion_parclose_mpi(int sid)
Close a sion file using MPI.
Definition: sion_mpi_gen.c:239
int sion_paropen_mpi(const char *fname, const char *file_mode, int *numFiles, MPI_Comm gComm, const MPI_Comm *lComm, sion_int64 *chunksize, sion_int32 *fsblksize, int *globalrank, FILE **fileptr, char **newfname)
Open a sion file using MPI.
Definition: sion_mpi_gen.c:83
_sion_flags_store * _sion_parse_flags(const char *flags)
Parse flags and return a flags store with key value pairs.
Definition: sion_flags.c:288
int _sion_gen_info_from_gcomm_mpi(int numFiles, MPI_Comm gComm, int *filenumber, int *lrank, int *lsize)
Splits a Communicator in numfiles different communicators.
Sion Time Stamp Header.