SIONlib  1.7.7
Scalable I/O library for parallel access to task-local files
sion_mpi_gen.c
Go to the documentation of this file.
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2019 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
15 #define _XOPEN_SOURCE 700
16 
17 #include <stdlib.h>
18 #include <stdio.h>
19 #include <stdarg.h>
20 #include <string.h>
21 #include <time.h>
22 
23 #include <sys/time.h>
24 
25 #include <sys/types.h>
26 #include <fcntl.h>
27 
28 #include <unistd.h>
29 
30 #include "mpi.h"
31 
32 #include "sion.h"
33 #include "sion_debug.h"
34 #include "sion_error_handler.h"
35 #include "sion_internal.h"
36 #include "sion_fd.h"
37 #include "sion_filedesc.h"
38 #include "sion_printts.h"
39 #include "sion_flags.h"
40 
41 #ifdef SION_MPI
42 
43 #include "sion_generic.h"
44 
45 #include "sion_mpi.h"
46 #include "sion_mpi_internal_gen.h"
47 
48 #include "sion_mpi_cb_gen.h"
49 
50 int _sion_mpi_api_aid = -1;
51 
85 int sion_paropen_mpi(const char* fname,
86  const char* file_mode,
87  int* numFiles,
88  MPI_Comm gComm,
89  const MPI_Comm* lComm,
90  sion_int64* chunksize,
91  sion_int32* fsblksize,
92  int* globalrank,
93  FILE** fileptr,
94  char** newfname
95  )
96 {
97  int rc = SION_NOT_SUCCESS, sid = SION_ID_UNDEF;
98  int filenumber, gtasks, gRank, lRank, lSize;
99 
100  _mpi_api_commdata *gen_gcomm;
101 
102  _sion_flags_store* flags_store = NULL;
103 
104  MPI_Comm_size(gComm, &gtasks);
105  MPI_Comm_rank(gComm, &gRank);
106 
107  DPRINTFP((1, "sion_paropen_mpi", gRank, "enter parallel open of file %s\n", fname));
108 
109  /* check parameters */
110  if (lComm == NULL) {
111  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mpi: No lComm variable given"));
112  }
113  if (numFiles == NULL) {
114  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mpi: No numFiles variable given"));
115  }
116 
117  flags_store = _sion_parse_flags(file_mode);
118  /* parse file mode */
119  if ( ! flags_store ) {
120  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mpi: could not parse file mode in %s, aborting ...\n", file_mode));
121  }
122 
123  /* register callbacks for generic interface */
124  if(_sion_mpi_api_aid<0) _sion_mpi_api_aid=_sion_register_callbacks_mpi();
125 
126 
127  if (flags_store->mask&_SION_FMODE_WRITE) {
128  /* file mode WRITE */
129 
130  if (*numFiles <= 0) {
131  /* lComm contains local communicator */
132  if (_sion_flags_get(flags_store, "collmsa")) {
133  _sion_flags_destroy_store(&flags_store);
134  return _sion_errorprint_mpi(SION_ID_NOT_VALID, _SION_ERROR_RETURN, "sion_paropen_mpi: numFiles <= 0 not supported with MSA aware collectives enabled");
135  }
136 
137  rc = _sion_get_info_from_splitted_comm_mpi(gComm, *lComm, numFiles, &filenumber, &lRank, &lSize);
138  if(rc != SION_SUCCESS) {
139  _sion_flags_destroy_store(&flags_store);
140  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mpi: error in _sion_get_info_from_splitted_comm_mpi"));
141  }
142  DPRINTFP((1, "sion_paropen_mpi", gRank, "%d local communicators found\n", *numFiles));
143 
144  } else {
145  /* number of files is given */
146  if (_sion_flags_get(flags_store, "collmsa")) {
147  lRank = lSize = -1; /* will be set by sion_generic_paropen */
148  rc = SION_SUCCESS;
149  } else {
150  rc = _sion_gen_info_from_gcomm_mpi(*numFiles, gComm, &filenumber, &lRank, &lSize);
151  }
152 
153  if(rc != SION_SUCCESS) {
154  _sion_flags_destroy_store(&flags_store);
155  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mpi: error in _sion_gen_info_from_gcomm_mpi"));
156  }
157  DPRINTFP((1, "sion_paropen_mpi", gRank, "Global communicator divided in %d local communicators\n", *numFiles));
158  }
159 
160  /* overwrite globalrank set by user, necessary for multi-file support */
161  *globalrank = gRank;
162 
163  } else if (flags_store->mask&_SION_FMODE_READ) {
164  /* file mode READ */
165  /* nothing to do info will be returned by generic paropen */
166 
167  /* set to gRank, current rank in global communicator, this is
168  different to older versions of SIONlib, where globalrank comes
169  from file in read case */
170  *globalrank = gRank;
171 
172 
173  if(! (flags_store->mask&_SION_FMODE_BUDDY) ) {
174  lRank=lSize=-1; /* will be set by sion_generic_paropen */
175  } else {
176  /* lvomm must be given for buddy checkpointing */
177  rc = _sion_get_info_from_splitted_comm_mpi(gComm, *lComm, numFiles, &filenumber, &lRank, &lSize);
178  if(rc != SION_SUCCESS) return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mpi: error in _sion_get_info_from_splitted_comm_mpi"));
179  DPRINTFP((1, "sion_paropen_mpi", gRank, "%d local communicators found\n", *numFiles));
180  }
181 
182  } else {
183 
184  _sion_flags_destroy_store(&flags_store);
185  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mpi: unknown file mode"));
186  }
187 
188  /* create generic communicator container */
189  gen_gcomm = (_mpi_api_commdata *) malloc(sizeof(_mpi_api_commdata));
190  if (gen_gcomm == NULL) {
191  _sion_flags_destroy_store(&flags_store);
192  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"cannot allocate mpi internal data structure of size %lu (_mpi_api_commdata), aborting ...\n",
193  (unsigned long) sizeof(_mpi_api_commdata)));
194  }
195  gen_gcomm->comm=gComm;
196  gen_gcomm->commset=1;
197  gen_gcomm->local=0;
198  gen_gcomm->rank=gRank;
199  gen_gcomm->size=gtasks;
200  gen_gcomm->lcommgroup=NULL;
201 
202  _sion_flags_destroy_store(&flags_store);
203 
204  DPRINTFP((1, "sion_paropen_mpi", gRank, "enter parallel open of %d files (current name %s) in %s mode\n", *numFiles, fname, file_mode));
205  sid = sion_generic_paropen(_sion_mpi_api_aid, fname, file_mode, chunksize, fsblksize, gen_gcomm, gRank, gtasks, &filenumber, numFiles, &lRank, &lSize,
206  fileptr, newfname);
207  DPRINTFP((1, "sion_paropen_mpi", gRank, "leave parallel open of %d files in %s mode #tasks=%d sid=%d\n", *numFiles, file_mode, lSize, sid));
208 
209  /* test return code from internal open */
210  if ( sid == SION_ID_NOT_VALID ) {
211  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mpi: invalid return code from internal open %d", rc));
212  }
213 
214  DPRINTFP((1, "sion_paropen_mpi", gRank, "leave parallel open of file %s sid=%d\n", fname, sid));
215 
216  return (sid);
217 }
218 
219 
230 int sion_parclose_mpi(int sid)
231 {
232  int rc = 0;
233 
234  DPRINTFP((1, "sion_parclose_mpi", _SION_DEFAULT_RANK, "enter parallel close of sid %d\n", sid));
235 
236  rc = sion_generic_parclose(sid);
237 
238  DPRINTFP((1, "sion_parclose_mpi", _SION_DEFAULT_RANK, "leave parallel close of sid %d rc=%d\n", sid, rc));
239 
240  return (rc);
241 }
242 
243 int sion_parreinit_mpi( int sid,
244  sion_int64 chunksize )
245 {
246  int rc = 0;
247 
248  DPRINTFP((1, "sion_parreinit_mpi", _SION_DEFAULT_RANK, "enter parallel reinit of sid %d\n", sid));
249 
250  rc = sion_generic_parreinit(sid, chunksize);
251 
252  DPRINTFP((1, "sion_parreinit_mpi", _SION_DEFAULT_RANK, "leave parallel reinit of sid %d rc=%d\n", sid, rc));
253 
254  return (rc);
255 }
256 
257 int sion_paropen_mapped_mpi( char *fname,
258  const char *file_mode,
259  int *numFiles,
260  MPI_Comm gComm,
261  int *nlocaltasks,
262  int **globalranks,
263  sion_int64 **chunksizes,
264  int **mapping_filenrs,
265  int **mapping_lranks,
266  sion_int32 *fsblksize,
267  FILE **fileptr) {
268 
269  int sid=SION_ID_UNDEF;
270  int gtasks, gRank;
271  char *lprefix;
272  _mpi_api_commdata *gen_gcomm;
273  _sion_flags_store* flags_store = NULL;
274 
275  MPI_Comm_size(gComm, &gtasks);
276  MPI_Comm_rank(gComm, &gRank);
277 
278  DPRINTFP((1, "sion_paropen_mapped_mpi", gRank, "enter parallel open of file %s\n", fname));
279 
280  /* check parameters */
281  if (numFiles == NULL) {
282  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mapped_mpi: No numFiles variable given"));
283  }
284 
285  lprefix = calloc(SION_FILENAME_LENGTH,1);
286  if (lprefix == NULL) {
287  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mapped_mpi: cannot allocate temporary memory of size %lu (lprefix), aborting ...\n", (unsigned long) SION_FILENAME_LENGTH));
288  }
289 
290  flags_store = _sion_parse_flags(file_mode);
291  if ( ! flags_store ) {
292  free(lprefix);
293  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mapped_mpi: could not parse file mode in %s, aborting ...\n", file_mode));
294  }
295 
296  /* register callbacks for generic interface */
297  if(_sion_mpi_api_aid<0) _sion_mpi_api_aid=_sion_register_callbacks_mpi();
298 
299  if (flags_store->mask&_SION_FMODE_WRITE) {
300  /* file mode WRITE */
301 
302  if (*numFiles <= 0) {
303  _sion_flags_destroy_store(&flags_store);
304  free(lprefix);
305  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mapped_mpi: numFiles variable <= 0 not allowed for mapped files in write mode"));
306  }
307 
308  /* prefix must be used in generic open function */
309  strcpy(lprefix, fname);
310 
311  }
312  else if (flags_store->mask&_SION_FMODE_READ) {
313  /* file mode READ */
314  /* nothing to do here so far, filenumbers and mapping will be determined by in generic routine */
315 
316  } else {
317  _sion_flags_destroy_store(&flags_store);
318  free(lprefix);
319  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mapped_mpi: unknown file mode"));
320  }
321  _sion_flags_destroy_store(&flags_store);
322 
323  /* create generic communicator container */
324  gen_gcomm = (_mpi_api_commdata *) malloc(sizeof(_mpi_api_commdata));
325  if (gen_gcomm == NULL) {
326  free(lprefix);
327  return(_sion_errorprint(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"cannot allocate mpi internal data structure of size %lu (_mpi_api_commdata), aborting ...\n",
328  (unsigned long) sizeof(_mpi_api_commdata)));
329  }
330  gen_gcomm->comm=gComm;
331  gen_gcomm->commset=1;
332  gen_gcomm->local=0;
333  gen_gcomm->rank=gRank;
334  gen_gcomm->size=gtasks;
335  gen_gcomm->lcommgroup=NULL;
336 
337 
338  DPRINTFP((1, "sion_paropen_mapped_mpi", gRank, "enter parallel open of %d files (current name %s) in %s mode (sid=%d)\n", *numFiles, fname, file_mode, sid));
339  sid=sion_generic_paropen_mapped(_sion_mpi_api_aid, fname, file_mode, numFiles, gen_gcomm, gRank, gtasks, nlocaltasks, globalranks, chunksizes,
340  mapping_filenrs, mapping_lranks, fsblksize, fileptr);
341  DPRINTFP((1, "sion_paropen_mapped_mpi", gRank, "leave parallel open of %d files in %s mode #tasks=%d sid=%d\n", *numFiles, file_mode, *nlocaltasks, sid));
342 
343  /* test return code from internal open */
344  if ( sid == SION_ID_NOT_VALID ) {
345  free(lprefix);
346  return(_sion_errorprint_mpi(SION_ID_NOT_VALID,_SION_ERROR_RETURN,"sion_paropen_mapped_mpi: invalid return code from internal open %d", sid));
347  }
348 
349  if(lprefix) free(lprefix);
350  DPRINTFP((1, "sion_paropen_mapped_mpi", gRank, "leave parallel open of file %s sid=%d\n", fname, sid));
351 
352 
353  return(sid);
354 }
355 
356 int sion_parclose_mapped_mpi( int sid ) {
357  int rc = 0;
358 
359  DPRINTFP((1, "sion_parclose_mapped_mpi", _SION_DEFAULT_RANK, "enter parallel close of sid %d\n", sid));
360 
361  rc = sion_generic_parclose_mapped(sid);
362 
363  DPRINTFP((1, "sion_parclose_mapped_mpi", _SION_DEFAULT_RANK, "leave parallel close of sid %d rc=%d\n", sid, rc));
364 
365  return(rc);
366 }
367 
368 /* end of ifdef MPI */
369 #endif
_sion_flags_store * _sion_parse_flags(const char *flags)
Parse flags and return a flags store with key value pairs.
Definition: sion_flags.c:326
int sion_generic_paropen(int aid, const char *fname, const char *file_mode, sion_int64 *chunksize, sion_int32 *fsblksize, void *gcommgroup, int grank, int gsize, int *filenumber, int *numfiles, const int *lrank, const int *lsize, FILE **fileptr, char **newfname)
Open a sion file a generic interface.
Definition: sion_generic.c:351
int sion_paropen_mpi(const char *fname, const char *file_mode, int *numFiles, MPI_Comm gComm, const MPI_Comm *lComm, sion_int64 *chunksize, sion_int32 *fsblksize, int *globalrank, FILE **fileptr, char **newfname)
Open a sion file using MPI.
Definition: sion_mpi_gen.c:85
int sion_parclose_mpi(int sid)
Close a sion file using MPI.
Definition: sion_mpi_gen.c:230
int _sion_gen_info_from_gcomm_mpi(int numFiles, MPI_Comm gComm, int *filenumber, int *lrank, int *lsize)
Splits a Communicator in numfiles different communicators.
Sion Time Stamp Header.