SIONlib  1.6.1
Scalable I/O library for parallel access to task-local files
sion_mpi_gen.c
Go to the documentation of this file.
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2014 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
15 #include <stdlib.h>
16 #include <stdio.h>
17 #include <stdarg.h>
18 #include <string.h>
19 #include <time.h>
20 
21 #include <sys/time.h>
22 
23 #include <sys/types.h>
24 #include <fcntl.h>
25 
26 #include <unistd.h>
27 
28 #include "mpi.h"
29 
30 #include "sion.h"
31 #include "sion_debug.h"
32 #include "sion_internal.h"
33 #include "sion_fd.h"
34 #include "sion_filedesc.h"
35 #include "sion_printts.h"
36 #include "sion_flags.h"
37 
38 #ifdef SION_MPI
39 
40 #include "sion_generic.h"
41 
42 #include "sion_mpi.h"
43 #include "sion_mpi_internal_gen.h"
44 
45 #include "sion_mpi_cb_gen.h"
46 
47 int _sion_mpi_api_aid = -1;
48 
81 int sion_paropen_mpi(const char* fname,
82  const char* file_mode,
83  int* numFiles,
84  MPI_Comm gComm,
85  const MPI_Comm* lComm,
86  sion_int64* chunksize,
87  sion_int32* fsblksize,
88  int* globalrank,
89  FILE** fileptr,
90  char** newfname
91  )
92 {
93  int rc, sid = SION_ID_UNDEF;
94  int filenumber, gtasks, gRank, lRank, lSize;
95 
96  _mpi_api_commdata *gen_gcomm;
97  _mpi_api_commdata *gen_lcomm;
98 
99  _sion_flags_store* flags_store = NULL;
100 
101  MPI_Comm_size(gComm, &gtasks);
102  MPI_Comm_rank(gComm, &gRank);
103 
104  DPRINTFP((1, "sion_paropen_mpi", gRank, "enter parallel open of file %s\n", fname));
105 
106  /* check parameters */
107  if (lComm == NULL) {
108  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mpi: No lComm variable given"));
109  }
110  if (numFiles == NULL) {
111  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mpi: No numFiles variable given"));
112  }
113 
114  flags_store = _sion_parse_flags(file_mode);
115  /* parse file mode */
116  if ( ! flags_store ) {
117  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mpi: could not parse file mode in %s, aborting ...\n", file_mode));
118  }
119 
120  /* register callbacks for generic interface */
121  if(_sion_mpi_api_aid<0) _sion_mpi_api_aid=_sion_register_callbacks_mpi();
122 
123 
124  if (flags_store->mask&_SION_FMODE_WRITE) {
125  /* file mode WRITE */
126 
127  if (*numFiles <= 0) {
128  /* lComm contains local communicator */
129 
130  rc = _sion_get_info_from_splitted_comm_mpi(gComm, *lComm, numFiles, &filenumber, &lRank, &lSize);
131  if(rc != SION_SUCCESS) {
132  _sion_flags_destroy_store(flags_store);
133  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mpi: error in _sion_get_info_from_splitted_comm_mpi"));
134  }
135  DPRINTFP((1, "sion_paropen_mpi", gRank, "%d local communicators found\n", *numFiles));
136 
137  } else {
138  /* number of files is given */
139 
140  rc = _sion_gen_info_from_gcomm_mpi(*numFiles, gComm, &filenumber, &lRank, &lSize);
141  if(rc != SION_SUCCESS) {
142  _sion_flags_destroy_store(flags_store);
143  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mpi: error in _sion_gen_info_from_gcomm_mpi"));
144  }
145  DPRINTFP((1, "sion_paropen_mpi", gRank, "Global communicator divided in %d local communicators\n", *numFiles));
146  }
147 
148  /* overwrite globalrank set by user, necessary for multi-file support */
149  *globalrank = gRank;
150 
151  } else if (flags_store->mask&_SION_FMODE_READ) {
152  /* file mode READ */
153  /* nothing to do info will be returned by generic paropen */
154 
155  /* set to gRank, current rank in global communicator, this is
156  different to older versions of SIONlib, where globalrank comes
157  from file in read case */
158  *globalrank = gRank;
159 
160  lRank=lSize=-1; /* will be set by sion_generic_paropen */
161 
162  } else {
163 
164  _sion_flags_destroy_store(flags_store);
165  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mpi: unknown file mode"));
166  }
167 
168  /* create generic communicator container */
169  gen_gcomm = (_mpi_api_commdata *) malloc(sizeof(_mpi_api_commdata));
170  if (gen_gcomm == NULL) {
171  _sion_flags_destroy_store(flags_store);
172  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"cannot allocate mpi internal data structure of size %lu (_mpi_api_commdata), aborting ...\n",
173  (unsigned long) sizeof(_mpi_api_commdata)));
174  }
175  gen_gcomm->comm=gComm;
176  gen_gcomm->commset=1;
177  gen_gcomm->local=0;
178  gen_gcomm->rank=gRank;
179  gen_gcomm->size=gtasks;
180  gen_gcomm->lcommgroup=NULL;
181 
182  if ((flags_store->mask&_SION_FMODE_WRITE) && (*numFiles <= 0)) {
183 
184  /* create generic local communicator container */
185  gen_lcomm = (_mpi_api_commdata *) malloc(sizeof(_mpi_api_commdata));
186  if (gen_lcomm == NULL) {
187  _sion_flags_destroy_store(flags_store);
188  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"cannot allocate mpi internal data structure of size %lu (_mpi_api_commdata), aborting ...\n",
189  (unsigned long) sizeof(_mpi_api_commdata)));
190  }
191  gen_lcomm->comm=*lComm;
192  gen_lcomm->commcreated=0;
193  gen_lcomm->commset=1;
194  gen_lcomm->local=1;
195  gen_lcomm->rank=lRank;
196  gen_lcomm->size=lSize;
197  /* store pointer in global comm group */
198  gen_gcomm->lcommgroup=gen_lcomm;
199  }
200  _sion_flags_destroy_store(flags_store);
201 
202  DPRINTFP((1, "sion_paropen_mpi", gRank, "enter parallel open of %d files (current name %s) in %s mode\n", *numFiles, fname, file_mode));
203  sid = sion_generic_paropen(_sion_mpi_api_aid, fname, file_mode, chunksize, fsblksize, gen_gcomm, gRank, gtasks, &filenumber, numFiles, &lRank, &lSize,
204  fileptr, newfname);
205  DPRINTFP((1, "sion_paropen_mpi", gRank, "leave parallel open of %d files in %s mode #tasks=%d sid=%d\n", *numFiles, file_mode, lSize, sid));
206 
207  /* test return code from internal open */
208  if ( sid == SION_ID_NOT_VALID ) {
209  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mpi: invalid return code from internal open %d", rc));
210  }
211 
212  DPRINTFP((1, "sion_paropen_mpi", gRank, "leave parallel open of file %s sid=%d\n", fname, sid));
213 
214  return (sid);
215 }
216 
217 
228 int sion_parclose_mpi(int sid)
229 {
230  int rc = SION_SUCCESS;
231 
232  DPRINTFP((1, "sion_parclose_mpi", _SION_DEFAULT_RANK, "enter parallel close of sid %d\n", sid));
233 
234  rc = sion_generic_parclose(sid);
235 
236  DPRINTFP((1, "sion_parclose_mpi", _SION_DEFAULT_RANK, "leave parallel close of sid %d rc=%d\n", sid, rc));
237 
238  return (rc);
239 }
240 
241 int sion_parreinit_mpi( int sid,
242  sion_int64 chunksize )
243 {
244  int rc = SION_SUCCESS;
245 
246  DPRINTFP((1, "sion_parreinit_mpi", _SION_DEFAULT_RANK, "enter parallel reinit of sid %d\n", sid));
247 
248  rc = sion_generic_parreinit(sid, chunksize);
249 
250  DPRINTFP((1, "sion_parreinit_mpi", _SION_DEFAULT_RANK, "leave parallel reinit of sid %d rc=%d\n", sid, rc));
251 
252  return (rc);
253 }
254 
255 int sion_paropen_mapped_mpi( char *fname,
256  const char *file_mode,
257  int *numFiles,
258  MPI_Comm gComm,
259  int *nlocaltasks,
260  int **globalranks,
261  sion_int64 **chunksizes,
262  int **mapping_filenrs,
263  int **mapping_lranks,
264  sion_int32 *fsblksize,
265  FILE **fileptr) {
266 
267  int sid=SION_ID_UNDEF;
268  int gtasks, gRank;
269  char *lprefix;
270  _mpi_api_commdata *gen_gcomm;
271  _sion_flags_store* flags_store = NULL;
272 
273  MPI_Comm_size(gComm, &gtasks);
274  MPI_Comm_rank(gComm, &gRank);
275 
276  DPRINTFP((1, "sion_paropen_mapped_mpi", gRank, "enter parallel open of file %s\n", fname));
277 
278  /* check parameters */
279  if (numFiles == NULL) {
280  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mapped_mpi: No numFiles variable given"));
281  }
282 
283  lprefix = calloc(SION_FILENAME_LENGTH,1);
284  if (lprefix == NULL) {
285  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mapped_mpi: cannot allocate temporary memory of size %lu (lprefix), aborting ...\n", (unsigned long) SION_FILENAME_LENGTH));
286  }
287 
288  flags_store = _sion_parse_flags(file_mode);
289  if ( ! flags_store ) {
290  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mapped_mpi: could not parse file mode in %s, aborting ...\n", file_mode));
291  }
292 
293  /* register callbacks for generic interface */
294  if(_sion_mpi_api_aid<0) _sion_mpi_api_aid=_sion_register_callbacks_mpi();
295 
296  if (flags_store->mask&_SION_FMODE_WRITE) {
297  /* file mode WRITE */
298 
299  if (*numFiles <= 0) {
300  _sion_flags_destroy_store(flags_store);
301  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mapped_mpi: numFiles variable <= 0 not allowed for mapped files in write mode"));
302  }
303 
304  /* prefix must be used in generic open function */
305  strcpy(lprefix, fname);
306 
307  }
308  else if (flags_store->mask&_SION_FMODE_READ) {
309  /* file mode READ */
310  /* nothing to do here so far, filenumbers and mapping will be determined by in generic routine */
311 
312  } else {
313  _sion_flags_destroy_store(flags_store);
314  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mapped_mpi: unknown file mode"));
315  }
316  _sion_flags_destroy_store(flags_store);
317 
318  /* create generic communicator container */
319  gen_gcomm = (_mpi_api_commdata *) malloc(sizeof(_mpi_api_commdata));
320  if (gen_gcomm == NULL) {
321  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"cannot allocate mpi internal data structure of size %lu (_mpi_api_commdata), aborting ...\n",
322  (unsigned long) sizeof(_mpi_api_commdata)));
323  }
324  gen_gcomm->comm=gComm;
325  gen_gcomm->commset=1;
326  gen_gcomm->local=0;
327  gen_gcomm->rank=gRank;
328  gen_gcomm->size=gtasks;
329  gen_gcomm->lcommgroup=NULL;
330 
331 
332  DPRINTFP((1, "sion_paropen_mapped_mpi", gRank, "enter parallel open of %d files (current name %s) in %s mode (sid=%d)\n", *numFiles, fname, file_mode, sid));
333  sid=sion_generic_paropen_mapped(_sion_mpi_api_aid, fname, file_mode, numFiles, gen_gcomm, gRank, gtasks, nlocaltasks, globalranks, chunksizes,
334  mapping_filenrs, mapping_lranks, fsblksize, fileptr);
335  DPRINTFP((1, "sion_paropen_mapped_mpi", gRank, "leave parallel open of %d files in %s mode #tasks=%d sid=%d\n", *numFiles, file_mode, *nlocaltasks, sid));
336 
337  /* test return code from internal open */
338  if ( sid == SION_ID_NOT_VALID ) {
339  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mapped_mpi: invalid return code from internal open %d", sid));
340  }
341 
342  if(lprefix) free(lprefix);
343  DPRINTFP((1, "sion_paropen_mapped_mpi", gRank, "leave parallel open of file %s sid=%d\n", fname, sid));
344 
345 
346  return(sid);
347 }
348 
349 int sion_parclose_mapped_mpi( int sid ) {
350  int rc = SION_SUCCESS;
351 
352  DPRINTFP((1, "sion_parclose_mapped_mpi", _SION_DEFAULT_RANK, "enter parallel close of sid %d\n", sid));
353 
354  rc = sion_generic_parclose_mapped(sid);
355 
356  DPRINTFP((1, "sion_parclose_mapped_mpi", _SION_DEFAULT_RANK, "leave parallel close of sid %d rc=%d\n", sid, rc));
357 
358  return(rc);
359 }
360 
361 /* end of ifdef MPI */
362 #endif
int sion_generic_paropen(int aid, const char *fname, const char *file_mode, sion_int64 *chunksize, sion_int32 *fsblksize, void *gcommgroup, int grank, int gsize, int *filenumber, int *numfiles, const int *lrank, const int *lsize, FILE **fileptr, char **newfname)
Open a sion file a generic interface.
Definition: sion_generic.c:347
int sion_parclose_mpi(int sid)
Close a sion file using MPI.
Definition: sion_mpi_gen.c:228
int sion_paropen_mpi(const char *fname, const char *file_mode, int *numFiles, MPI_Comm gComm, const MPI_Comm *lComm, sion_int64 *chunksize, sion_int32 *fsblksize, int *globalrank, FILE **fileptr, char **newfname)
Open a sion file using MPI.
Definition: sion_mpi_gen.c:81
_sion_flags_store * _sion_parse_flags(const char *flags)
Parse flags and return a flags store with key value pairs.
Definition: sion_flags.c:285
Sion Time Stamp Header.