SIONlib  1.6.2
Scalable I/O library for parallel access to task-local files
sion_mpi_gen.c
Go to the documentation of this file.
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2016 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
15 #include <stdlib.h>
16 #include <stdio.h>
17 #include <stdarg.h>
18 #include <string.h>
19 #include <time.h>
20 
21 #include <sys/time.h>
22 
23 #include <sys/types.h>
24 #include <fcntl.h>
25 
26 #include <unistd.h>
27 
28 #include "mpi.h"
29 
30 #include "sion.h"
31 #include "sion_debug.h"
32 #include "sion_internal.h"
33 #include "sion_fd.h"
34 #include "sion_filedesc.h"
35 #include "sion_printts.h"
36 #include "sion_flags.h"
37 
38 #ifdef SION_MPI
39 
40 #include "sion_generic.h"
41 
42 #include "sion_mpi.h"
43 #include "sion_mpi_internal_gen.h"
44 
45 #include "sion_mpi_cb_gen.h"
46 
47 int _sion_mpi_api_aid = -1;
48 
82 int sion_paropen_mpi(const char* fname,
83  const char* file_mode,
84  int* numFiles,
85  MPI_Comm gComm,
86  const MPI_Comm* lComm,
87  sion_int64* chunksize,
88  sion_int32* fsblksize,
89  int* globalrank,
90  FILE** fileptr,
91  char** newfname
92  )
93 {
94  int rc, sid = SION_ID_UNDEF;
95  int filenumber, gtasks, gRank, lRank, lSize;
96 
97  _mpi_api_commdata *gen_gcomm;
98  _mpi_api_commdata *gen_lcomm;
99 
100  _sion_flags_store* flags_store = NULL;
101 
102  MPI_Comm_size(gComm, &gtasks);
103  MPI_Comm_rank(gComm, &gRank);
104 
105  DPRINTFP((1, "sion_paropen_mpi", gRank, "enter parallel open of file %s\n", fname));
106 
107  /* check parameters */
108  if (lComm == NULL) {
109  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mpi: No lComm variable given"));
110  }
111  if (numFiles == NULL) {
112  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mpi: No numFiles variable given"));
113  }
114 
115  flags_store = _sion_parse_flags(file_mode);
116  /* parse file mode */
117  if ( ! flags_store ) {
118  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mpi: could not parse file mode in %s, aborting ...\n", file_mode));
119  }
120 
121  /* register callbacks for generic interface */
122  if(_sion_mpi_api_aid<0) _sion_mpi_api_aid=_sion_register_callbacks_mpi();
123 
124 
125  if (flags_store->mask&_SION_FMODE_WRITE) {
126  /* file mode WRITE */
127 
128  if (*numFiles <= 0) {
129  /* lComm contains local communicator */
130 
131  rc = _sion_get_info_from_splitted_comm_mpi(gComm, *lComm, numFiles, &filenumber, &lRank, &lSize);
132  if(rc != SION_SUCCESS) {
133  _sion_flags_destroy_store(flags_store);
134  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mpi: error in _sion_get_info_from_splitted_comm_mpi"));
135  }
136  DPRINTFP((1, "sion_paropen_mpi", gRank, "%d local communicators found\n", *numFiles));
137 
138  } else {
139  /* number of files is given */
140 
141  rc = _sion_gen_info_from_gcomm_mpi(*numFiles, gComm, &filenumber, &lRank, &lSize);
142  if(rc != SION_SUCCESS) {
143  _sion_flags_destroy_store(flags_store);
144  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mpi: error in _sion_gen_info_from_gcomm_mpi"));
145  }
146  DPRINTFP((1, "sion_paropen_mpi", gRank, "Global communicator divided in %d local communicators\n", *numFiles));
147  }
148 
149  /* overwrite globalrank set by user, necessary for multi-file support */
150  *globalrank = gRank;
151 
152  } else if (flags_store->mask&_SION_FMODE_READ) {
153  /* file mode READ */
154  /* nothing to do info will be returned by generic paropen */
155 
156  /* set to gRank, current rank in global communicator, this is
157  different to older versions of SIONlib, where globalrank comes
158  from file in read case */
159  *globalrank = gRank;
160 
161  lRank=lSize=-1; /* will be set by sion_generic_paropen */
162 
163  } else {
164 
165  _sion_flags_destroy_store(flags_store);
166  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mpi: unknown file mode"));
167  }
168 
169  /* create generic communicator container */
170  gen_gcomm = (_mpi_api_commdata *) malloc(sizeof(_mpi_api_commdata));
171  if (gen_gcomm == NULL) {
172  _sion_flags_destroy_store(flags_store);
173  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"cannot allocate mpi internal data structure of size %lu (_mpi_api_commdata), aborting ...\n",
174  (unsigned long) sizeof(_mpi_api_commdata)));
175  }
176  gen_gcomm->comm=gComm;
177  gen_gcomm->commset=1;
178  gen_gcomm->local=0;
179  gen_gcomm->rank=gRank;
180  gen_gcomm->size=gtasks;
181  gen_gcomm->lcommgroup=NULL;
182 
183  if ((flags_store->mask&_SION_FMODE_WRITE) && (*numFiles <= 0)) {
184 
185  /* create generic local communicator container */
186  gen_lcomm = (_mpi_api_commdata *) malloc(sizeof(_mpi_api_commdata));
187  if (gen_lcomm == NULL) {
188  _sion_flags_destroy_store(flags_store);
189  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"cannot allocate mpi internal data structure of size %lu (_mpi_api_commdata), aborting ...\n",
190  (unsigned long) sizeof(_mpi_api_commdata)));
191  }
192  gen_lcomm->comm=*lComm;
193  gen_lcomm->commcreated=0;
194  gen_lcomm->commset=1;
195  gen_lcomm->local=1;
196  gen_lcomm->rank=lRank;
197  gen_lcomm->size=lSize;
198  /* store pointer in global comm group */
199  gen_gcomm->lcommgroup=gen_lcomm;
200  }
201  _sion_flags_destroy_store(flags_store);
202 
203  DPRINTFP((1, "sion_paropen_mpi", gRank, "enter parallel open of %d files (current name %s) in %s mode\n", *numFiles, fname, file_mode));
204  sid = sion_generic_paropen(_sion_mpi_api_aid, fname, file_mode, chunksize, fsblksize, gen_gcomm, gRank, gtasks, &filenumber, numFiles, &lRank, &lSize,
205  fileptr, newfname);
206  DPRINTFP((1, "sion_paropen_mpi", gRank, "leave parallel open of %d files in %s mode #tasks=%d sid=%d\n", *numFiles, file_mode, lSize, sid));
207 
208  /* test return code from internal open */
209  if ( sid == SION_ID_NOT_VALID ) {
210  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mpi: invalid return code from internal open %d", rc));
211  }
212 
213  DPRINTFP((1, "sion_paropen_mpi", gRank, "leave parallel open of file %s sid=%d\n", fname, sid));
214 
215  return (sid);
216 }
217 
218 
229 int sion_parclose_mpi(int sid)
230 {
231  int rc = SION_SUCCESS;
232 
233  DPRINTFP((1, "sion_parclose_mpi", _SION_DEFAULT_RANK, "enter parallel close of sid %d\n", sid));
234 
235  rc = sion_generic_parclose(sid);
236 
237  DPRINTFP((1, "sion_parclose_mpi", _SION_DEFAULT_RANK, "leave parallel close of sid %d rc=%d\n", sid, rc));
238 
239  return (rc);
240 }
241 
242 int sion_parreinit_mpi( int sid,
243  sion_int64 chunksize )
244 {
245  int rc = SION_SUCCESS;
246 
247  DPRINTFP((1, "sion_parreinit_mpi", _SION_DEFAULT_RANK, "enter parallel reinit of sid %d\n", sid));
248 
249  rc = sion_generic_parreinit(sid, chunksize);
250 
251  DPRINTFP((1, "sion_parreinit_mpi", _SION_DEFAULT_RANK, "leave parallel reinit of sid %d rc=%d\n", sid, rc));
252 
253  return (rc);
254 }
255 
256 int sion_paropen_mapped_mpi( char *fname,
257  const char *file_mode,
258  int *numFiles,
259  MPI_Comm gComm,
260  int *nlocaltasks,
261  int **globalranks,
262  sion_int64 **chunksizes,
263  int **mapping_filenrs,
264  int **mapping_lranks,
265  sion_int32 *fsblksize,
266  FILE **fileptr) {
267 
268  int sid=SION_ID_UNDEF;
269  int gtasks, gRank;
270  char *lprefix;
271  _mpi_api_commdata *gen_gcomm;
272  _sion_flags_store* flags_store = NULL;
273 
274  MPI_Comm_size(gComm, &gtasks);
275  MPI_Comm_rank(gComm, &gRank);
276 
277  DPRINTFP((1, "sion_paropen_mapped_mpi", gRank, "enter parallel open of file %s\n", fname));
278 
279  /* check parameters */
280  if (numFiles == NULL) {
281  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mapped_mpi: No numFiles variable given"));
282  }
283 
284  lprefix = calloc(SION_FILENAME_LENGTH,1);
285  if (lprefix == NULL) {
286  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mapped_mpi: cannot allocate temporary memory of size %lu (lprefix), aborting ...\n", (unsigned long) SION_FILENAME_LENGTH));
287  }
288 
289  flags_store = _sion_parse_flags(file_mode);
290  if ( ! flags_store ) {
291  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mapped_mpi: could not parse file mode in %s, aborting ...\n", file_mode));
292  }
293 
294  /* register callbacks for generic interface */
295  if(_sion_mpi_api_aid<0) _sion_mpi_api_aid=_sion_register_callbacks_mpi();
296 
297  if (flags_store->mask&_SION_FMODE_WRITE) {
298  /* file mode WRITE */
299 
300  if (*numFiles <= 0) {
301  _sion_flags_destroy_store(flags_store);
302  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mapped_mpi: numFiles variable <= 0 not allowed for mapped files in write mode"));
303  }
304 
305  /* prefix must be used in generic open function */
306  strcpy(lprefix, fname);
307 
308  }
309  else if (flags_store->mask&_SION_FMODE_READ) {
310  /* file mode READ */
311  /* nothing to do here so far, filenumbers and mapping will be determined by in generic routine */
312 
313  } else {
314  _sion_flags_destroy_store(flags_store);
315  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mapped_mpi: unknown file mode"));
316  }
317  _sion_flags_destroy_store(flags_store);
318 
319  /* create generic communicator container */
320  gen_gcomm = (_mpi_api_commdata *) malloc(sizeof(_mpi_api_commdata));
321  if (gen_gcomm == NULL) {
322  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"cannot allocate mpi internal data structure of size %lu (_mpi_api_commdata), aborting ...\n",
323  (unsigned long) sizeof(_mpi_api_commdata)));
324  }
325  gen_gcomm->comm=gComm;
326  gen_gcomm->commset=1;
327  gen_gcomm->local=0;
328  gen_gcomm->rank=gRank;
329  gen_gcomm->size=gtasks;
330  gen_gcomm->lcommgroup=NULL;
331 
332 
333  DPRINTFP((1, "sion_paropen_mapped_mpi", gRank, "enter parallel open of %d files (current name %s) in %s mode (sid=%d)\n", *numFiles, fname, file_mode, sid));
334  sid=sion_generic_paropen_mapped(_sion_mpi_api_aid, fname, file_mode, numFiles, gen_gcomm, gRank, gtasks, nlocaltasks, globalranks, chunksizes,
335  mapping_filenrs, mapping_lranks, fsblksize, fileptr);
336  DPRINTFP((1, "sion_paropen_mapped_mpi", gRank, "leave parallel open of %d files in %s mode #tasks=%d sid=%d\n", *numFiles, file_mode, *nlocaltasks, sid));
337 
338  /* test return code from internal open */
339  if ( sid == SION_ID_NOT_VALID ) {
340  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mapped_mpi: invalid return code from internal open %d", sid));
341  }
342 
343  if(lprefix) free(lprefix);
344  DPRINTFP((1, "sion_paropen_mapped_mpi", gRank, "leave parallel open of file %s sid=%d\n", fname, sid));
345 
346 
347  return(sid);
348 }
349 
350 int sion_parclose_mapped_mpi( int sid ) {
351  int rc = SION_SUCCESS;
352 
353  DPRINTFP((1, "sion_parclose_mapped_mpi", _SION_DEFAULT_RANK, "enter parallel close of sid %d\n", sid));
354 
355  rc = sion_generic_parclose_mapped(sid);
356 
357  DPRINTFP((1, "sion_parclose_mapped_mpi", _SION_DEFAULT_RANK, "leave parallel close of sid %d rc=%d\n", sid, rc));
358 
359  return(rc);
360 }
361 
362 /* end of ifdef MPI */
363 #endif
int sion_generic_paropen(int aid, const char *fname, const char *file_mode, sion_int64 *chunksize, sion_int32 *fsblksize, void *gcommgroup, int grank, int gsize, int *filenumber, int *numfiles, const int *lrank, const int *lsize, FILE **fileptr, char **newfname)
Open a sion file a generic interface.
Definition: sion_generic.c:347
int sion_parclose_mpi(int sid)
Close a sion file using MPI.
Definition: sion_mpi_gen.c:229
int sion_paropen_mpi(const char *fname, const char *file_mode, int *numFiles, MPI_Comm gComm, const MPI_Comm *lComm, sion_int64 *chunksize, sion_int32 *fsblksize, int *globalrank, FILE **fileptr, char **newfname)
Open a sion file using MPI.
Definition: sion_mpi_gen.c:82
_sion_flags_store * _sion_parse_flags(const char *flags)
Parse flags and return a flags store with key value pairs.
Definition: sion_flags.c:285
int _sion_errorprint(int rc, int level, const char *format,...)
Internal SION error.
int _sion_gen_info_from_gcomm_mpi(int numFiles, MPI_Comm gComm, int *filenumber, int *lrank, int *lsize)
Splits a Communicator in numfiles different communicators.
Sion Time Stamp Header.