SIONlib  2.0.0-rc.1
Scalable I/O library for parallel access to task-local files
sion_mpi_gen.c
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2018 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
10 #define _XOPEN_SOURCE 700
11 
12 #ifdef SION_MPI
13 
14 #include <inttypes.h>
15 #include <mpi.h>
16 #include <stdbool.h>
17 #include <stdint.h>
18 #include <stdio.h>
19 #include <stdlib.h>
20 
21 #include "sion_const.h"
22 #include "sion_debug.h"
23 #include "sion_enums.h"
24 #include "sion_error_handler.h"
25 #include "sion_flags.h"
26 #include "sion_generic.h"
27 #include "sion_generic_internal.h"
28 #include "sion_internal.h"
29 #include "sion_mpi.h"
30 #include "sion_mpi_cb_gen.h"
31 #include "sion_mpi_internal_gen.h"
32 
33 int _sion_mpi_api_aid = -1;
34 
35 int sion_paropen_mpi(const char *fname, const char *file_mode, int *numFiles, MPI_Comm gComm, const MPI_Comm *lComm,
36  int64_t *chunksize, int32_t *fsblksize, int *globalrank, FILE **fileptr, char **newfname)
37 {
38  /* parse file mode */
39  int sid;
40  _sion_flags_store *flags_store = _sion_parse_flags(file_mode);
41  if (!flags_store) {
42  return _sion_errorprint_mpi(
43  SION_ID_NOT_VALID, _SION_ERROR_RETURN, "sion_paropen_mpi: could not parse file mode in %s, aborting ...\n", file_mode);
44  }
45  sid = _sion_paropen_mpi(fname, flags_store, numFiles, gComm, lComm, chunksize, fsblksize, globalrank, fileptr, newfname);
46  _sion_flags_destroy_store(&flags_store);
47  return sid;
48 }
49 
50 int _sion_paropen_mpi(const char *fname, const _sion_flags_store *flags_store, int *numFiles, MPI_Comm gComm,
51  const MPI_Comm *lComm, int64_t *chunksize, int32_t *fsblksize, int *globalrank, FILE **fileptr, char **newfname)
52 {
53  int rc = SION_NOT_SUCCESS, sid = SION_ID_UNDEF;
54  int filenumber, gtasks, gRank, lRank, lSize;
55 
56  _mpi_api_commdata *gen_gcomm;
57 
58  MPI_Comm_size(gComm, &gtasks);
59  MPI_Comm_rank(gComm, &gRank);
60 
61  DPRINTFP((1, "sion_paropen_mpi", gRank, "enter parallel open of file %s\n", fname));
62 
63  /* check parameters */
64  if (lComm == NULL) {
65  return _sion_errorprint_mpi(SION_ID_NOT_VALID, _SION_ERROR_RETURN, "sion_paropen_mpi: No lComm variable given");
66  }
67  if (numFiles == NULL) {
68  return _sion_errorprint_mpi(SION_ID_NOT_VALID, _SION_ERROR_RETURN, "sion_paropen_mpi: No numFiles variable given");
69  }
70 
71  /* register callbacks for generic interface */
72  if (_sion_mpi_api_aid < 0) {
73  _sion_mpi_api_aid = _sion_register_callbacks_mpi();
74  }
75 
76  if (flags_store->mask & _SION_FMODE_WRITE) {
77  /* file mode WRITE */
78 
79  if (*numFiles <= 0) {
80  /* lComm contains local communicator */
81 
82  rc = _sion_get_info_from_splitted_comm_mpi(gComm, *lComm, numFiles, &filenumber, &lRank, &lSize);
83  if (rc != SION_SUCCESS) {
84  return _sion_errorprint_mpi(
85  SION_ID_NOT_VALID, _SION_ERROR_RETURN, "sion_paropen_mpi: error in _sion_get_info_from_splitted_comm_mpi");
86  }
87  DPRINTFP((1, "sion_paropen_mpi", gRank, "%d local communicators found\n", *numFiles));
88 
89  } else {
90  /* number of files is given */
91 
92  rc = _sion_gen_info_from_gcomm_mpi(*numFiles, gComm, &filenumber, &lRank, &lSize);
93  if (rc != SION_SUCCESS) {
94  return _sion_errorprint_mpi(
95  SION_ID_NOT_VALID, _SION_ERROR_RETURN, "sion_paropen_mpi: error in _sion_gen_info_from_gcomm_mpi");
96  }
97  DPRINTFP((1, "sion_paropen_mpi", gRank, "Global communicator divided in %d local communicators\n", *numFiles));
98  }
99 
100  /* overwrite globalrank set by user, necessary for multi-file support */
101  *globalrank = gRank;
102 
103  } else if (flags_store->mask & _SION_FMODE_READ) {
104  /* file mode READ */
105  /* nothing to do info will be returned by generic paropen */
106 
107  /* set to gRank, current rank in global communicator, this is
108  different to older versions of SIONlib, where globalrank comes
109  from file in read case */
110  *globalrank = gRank;
111 
112  if (!(flags_store->mask & _SION_FMODE_BUDDY)) {
113  lRank = lSize = -1; /* will be set by sion_generic_paropen */
114  } else {
115  /* lvomm must be given for buddy checkpointing */
116  rc = _sion_get_info_from_splitted_comm_mpi(gComm, *lComm, numFiles, &filenumber, &lRank, &lSize);
117  if (rc != SION_SUCCESS) {
118  return _sion_errorprint_mpi(
119  SION_ID_NOT_VALID, _SION_ERROR_RETURN, "sion_paropen_mpi: error in _sion_get_info_from_splitted_comm_mpi");
120  }
121  DPRINTFP((1, "sion_paropen_mpi", gRank, "%d local communicators found\n", *numFiles));
122  }
123 
124  } else {
125  return _sion_errorprint_mpi(SION_ID_NOT_VALID, _SION_ERROR_RETURN, "sion_paropen_mpi: unknown file mode");
126  }
127 
128  /* create generic communicator container */
129  gen_gcomm = malloc(sizeof(_mpi_api_commdata));
130  if (gen_gcomm == NULL) {
131  return _sion_errorprint(SION_ID_NOT_VALID, _SION_ERROR_RETURN,
132  "cannot allocate mpi internal data structure of size %lu (_mpi_api_commdata), aborting ...\n",
133  (unsigned long)sizeof(_mpi_api_commdata));
134  }
135  gen_gcomm->comm = gComm;
136  gen_gcomm->commset = 1;
137  gen_gcomm->local = 0;
138  gen_gcomm->rank = gRank;
139  gen_gcomm->size = gtasks;
140  gen_gcomm->lcommgroup = NULL;
141 
142  /* FIXME: add debug output for flags_store
143  DPRINTFP((1, "sion_paropen_mpi", gRank, "enter parallel open of %d files (current name %s) in %s mode\n", *numFiles, fname, file_mode)); */
144  DPRINTFP((1, "sion_paropen_mpi", gRank, "enter parallel open of %d files (current name %s)\n", *numFiles, fname));
145  sid = _sion_generic_paropen(_sion_mpi_api_aid, fname, flags_store, chunksize, fsblksize, gen_gcomm, gRank, gtasks, &filenumber,
146  numFiles, &lRank, &lSize, fileptr, newfname);
147  /* FIXME: add debug output for flags_store
148  DPRINTFP((1, "sion_paropen_mpi", gRank, "leave parallel open of %d files in %s mode #tasks=%d sid=%d\n", *numFiles, file_mode, lSize, sid)); */
149  DPRINTFP((1, "sion_paropen_mpi", gRank, "leave parallel open of %d files #tasks=%d sid=%d\n", *numFiles, lSize, sid));
150 
151  /* test return code from internal open */
152  if (sid == SION_ID_NOT_VALID) {
153  return _sion_errorprint_mpi(
154  SION_ID_NOT_VALID, _SION_ERROR_RETURN, "sion_paropen_mpi: invalid return code from internal open %d", rc);
155  }
156 
157  DPRINTFP((1, "sion_paropen_mpi", gRank, "leave parallel open of file %s sid=%d\n", fname, sid));
158 
159  return sid;
160 }
161 
162 int sion_parclose_mpi(int sid)
163 {
164  int rc = 0;
165 
166  DPRINTFP((1, "sion_parclose_mpi", _SION_DEFAULT_RANK, "enter parallel close of sid %d\n", sid));
167 
168  rc = sion_generic_parclose(sid);
169 
170  DPRINTFP((1, "sion_parclose_mpi", _SION_DEFAULT_RANK, "leave parallel close of sid %d rc=%d\n", sid, rc));
171 
172  return rc;
173 }
174 
175 int sion_parreinit_mpi(int sid, int64_t chunksize)
176 {
177  int rc = 0;
178 
179  DPRINTFP((1, "sion_parreinit_mpi", _SION_DEFAULT_RANK, "enter parallel reinit of sid %d\n", sid));
180 
181  rc = sion_generic_parreinit(sid, chunksize);
182 
183  DPRINTFP((1, "sion_parreinit_mpi", _SION_DEFAULT_RANK, "leave parallel reinit of sid %d rc=%d\n", sid, rc));
184 
185  return rc;
186 }
187 
188 int sion_paropen_mapped_mpi(char *fname, const char *file_mode, int *numFiles, MPI_Comm gComm, int *nlocaltasks,
189  int **globalranks, int64_t **chunksizes, int **mapping_filenrs, int **mapping_lranks, int32_t *fsblksize, FILE **fileptr)
190 {
191  int sid;
192  _sion_flags_store *flags_store = _sion_parse_flags(file_mode);
193  if (!flags_store) {
194  return _sion_errorprint_mpi(SION_ID_NOT_VALID, _SION_ERROR_RETURN,
195  "sion_paropen_mapped_mpi: could not parse file mode in %s, aborting ...\n", file_mode);
196  }
197  sid = _sion_paropen_mapped_mpi(fname, flags_store, numFiles, gComm, nlocaltasks, globalranks, chunksizes, mapping_filenrs,
198  mapping_lranks, fsblksize, fileptr);
199  _sion_flags_destroy_store(&flags_store);
200  return sid;
201 }
202 
203 int _sion_paropen_mapped_mpi(char *fname, const _sion_flags_store *flags_store, int *numFiles, MPI_Comm gComm, int *nlocaltasks,
204  int **globalranks, int64_t **chunksizes, int **mapping_filenrs, int **mapping_lranks, int32_t *fsblksize, FILE **fileptr)
205 {
206  int sid = SION_ID_UNDEF;
207  int gtasks, gRank;
208  _mpi_api_commdata *gen_gcomm;
209 
210  MPI_Comm_size(gComm, &gtasks);
211  MPI_Comm_rank(gComm, &gRank);
212 
213  DPRINTFP((1, "sion_paropen_mapped_mpi", gRank, "enter parallel open of file %s\n", fname));
214 
215  /* check parameters */
216  if (numFiles == NULL) {
217  return _sion_errorprint_mpi(SION_ID_NOT_VALID, _SION_ERROR_RETURN, "sion_paropen_mapped_mpi: No numFiles variable given");
218  }
219 
220  /* register callbacks for generic interface */
221  if (_sion_mpi_api_aid < 0) {
222  _sion_mpi_api_aid = _sion_register_callbacks_mpi();
223  }
224 
225  if (flags_store->mask & _SION_FMODE_WRITE) {
226  /* file mode WRITE */
227 
228  if (*numFiles <= 0) {
229  return _sion_errorprint_mpi(SION_ID_NOT_VALID, _SION_ERROR_RETURN,
230  "sion_paropen_mapped_mpi: numFiles variable <= 0 not allowed for mapped files in write mode");
231  }
232  } else if (flags_store->mask & _SION_FMODE_READ) {
233  /* file mode READ */
234  /* nothing to do here so far, filenumbers and mapping will be determined by in generic routine */
235 
236  } else {
237  return _sion_errorprint_mpi(SION_ID_NOT_VALID, _SION_ERROR_RETURN, "sion_paropen_mapped_mpi: unknown file mode");
238  }
239 
240  /* create generic communicator container */
241  gen_gcomm = malloc(sizeof(_mpi_api_commdata));
242  if (gen_gcomm == NULL) {
243  return _sion_errorprint(SION_ID_NOT_VALID, _SION_ERROR_RETURN,
244  "cannot allocate mpi internal data structure of size %lu (_mpi_api_commdata), aborting ...\n",
245  (unsigned long)sizeof(_mpi_api_commdata));
246  }
247  gen_gcomm->comm = gComm;
248  gen_gcomm->commset = 1;
249  gen_gcomm->local = 0;
250  gen_gcomm->rank = gRank;
251  gen_gcomm->size = gtasks;
252  gen_gcomm->lcommgroup = NULL;
253 
254  // FIXME: re-enable output of file mode
255  // DPRINTFP((1, "sion_paropen_mapped_mpi", gRank, "enter parallel open of %d files (current name %s) in %s mode (sid=%d)\n", *numFiles, fname, file_mode, sid));
256  DPRINTFP(
257  (1, "sion_paropen_mapped_mpi", gRank, "enter parallel open of %d files (current name %s) (sid=%d)\n", *numFiles, fname, sid));
258  sid = _sion_generic_paropen_mapped(_sion_mpi_api_aid, fname, flags_store, numFiles, gen_gcomm, gRank, gtasks, nlocaltasks,
259  globalranks, chunksizes, mapping_filenrs, mapping_lranks, fsblksize, fileptr);
260  // FIXME: re-enable output of file mode
261  // DPRINTFP((1, "sion_paropen_mapped_mpi", gRank, "leave parallel open of %d files in %s mode #tasks=%d sid=%d\n", *numFiles, file_mode, *nlocaltasks, sid));
262  DPRINTFP(
263  (1, "sion_paropen_mapped_mpi", gRank, "leave parallel open of %d files #tasks=%d sid=%d\n", *numFiles, *nlocaltasks, sid));
264 
265  /* test return code from internal open */
266  if (sid == SION_ID_NOT_VALID) {
267  return _sion_errorprint_mpi(
268  SION_ID_NOT_VALID, _SION_ERROR_RETURN, "sion_paropen_mapped_mpi: invalid return code from internal open %d", sid);
269  }
270 
271  DPRINTFP((1, "sion_paropen_mapped_mpi", gRank, "leave parallel open of file %s sid=%d\n", fname, sid));
272 
273  return sid;
274 }
275 
276 int sion_parclose_mapped_mpi(int sid)
277 {
278  int rc = 0;
279 
280  DPRINTFP((1, "sion_parclose_mapped_mpi", _SION_DEFAULT_RANK, "enter parallel close of sid %d\n", sid));
281 
282  rc = sion_generic_parclose_mapped(sid);
283 
284  DPRINTFP((1, "sion_parclose_mapped_mpi", _SION_DEFAULT_RANK, "leave parallel close of sid %d rc=%d\n", sid, rc));
285 
286  return rc;
287 }
288 
289 sion_mpi_options sion_mpi_options_new()
290 {
291  sion_mpi_options options;
292 
293  options.chunksize = 1024 * 1024; // FIXME: default to fsblksize (which is only known on open)
294  options.fsblksize = -1;
295 
296  options.multifile_mode = _SION_MULTIFILE_SINGLE;
297 
298  options.keyval_mode = SION_KEYVAL_MODE_NONE;
299 
300  options.buddylevel = 0;
301 
302  options.collsize = 0;
303  options.collective_merge = false;
304 
305  return options;
306 }
307 
308 void sion_mpi_options_set_chunksize(sion_mpi_options *options, int64_t chunksize)
309 {
310  // TODO: assert write mode
311  options->chunksize = chunksize;
312 }
313 
314 void sion_mpi_options_set_fsblksize(sion_mpi_options *options, int32_t fsblksize)
315 {
316  // TODO: assert write mode
317  // TODO: adapt chunksize?
318  options->fsblksize = fsblksize;
319 }
320 
321 void sion_mpi_options_set_multifile_number(sion_mpi_options *options, int multifile_number)
322 {
323  // TODO: assert write mode
324  options->multifile_mode = _SION_MULTIFILE_NUMBER;
325  options->multifile.number = multifile_number;
326 }
327 
328 void sion_mpi_options_set_multifile_communicator(sion_mpi_options *options, MPI_Comm multifile_communicator)
329 {
330  options->multifile_mode = _SION_MULTIFILE_COMMUNICATOR;
331  options->multifile.communicator = multifile_communicator;
332 }
333 
334 void sion_mpi_options_set_keyval_mode(sion_mpi_options *options, sion_keyval_mode keyval_mode)
335 {
336  options->keyval_mode = keyval_mode;
337 }
338 
339 void sion_mpi_options_set_buddy(sion_mpi_options *options)
340 {
341  sion_mpi_options_set_buddylevel(options, 1);
342 }
343 
344 void sion_mpi_options_set_buddylevel(sion_mpi_options *options, int32_t buddylevel)
345 {
346  options->buddylevel = buddylevel;
347 }
348 
349 void sion_mpi_options_set_collective(sion_mpi_options *options)
350 {
351  options->collsize = -1;
352 }
353 
354 void sion_mpi_options_set_collective_size(sion_mpi_options *options, int32_t size)
355 {
356  options->collsize = size;
357 }
358 
359 void sion_mpi_options_set_collective_merge(sion_mpi_options *options)
360 {
361  if (options->collsize == 0) {
362  options->collsize = -1;
363  }
364  options->collective_merge = true;
365 }
366 
367 _sion_flags_store *_sion_mpi_options_into_flags_store(sion_open_mode mode, const sion_mpi_options *options)
368 {
369  _sion_flags_store *flags_store = _sion_flags_create_store();
370  if (!flags_store) {
371  /* FIXME: add error message */
372  fprintf(stderr, "could not create flags store.\n");
373  return NULL;
374  }
375 
376  switch (mode) {
377  case SION_OPEN_READ:
378  _sion_flags_add(flags_store, "br", "");
379 
380  switch (options->keyval_mode) {
381  case SION_KEYVAL_MODE_NONE:
382  /* do nothing */
383  break;
384  case SION_KEYVAL_MODE_DEFAULT:
385  /* FALLTHROUGH */
386  case SION_KEYVAL_MODE_INLINE:
387  _sion_flags_add(flags_store, "keyval", "inline");
388  break;
389  case SION_KEYVAL_MODE_META:
390  _sion_flags_add(flags_store, "keyval", "meta");
391  break;
392  case SION_KEYVAL_MODE_HASH:
393  _sion_flags_add(flags_store, "keyval", "hash");
394  break;
395  case SION_KEYVAL_MODE_UNKNOWN:
396  _sion_flags_add(flags_store, "keyval", "unknown");
397  break;
398  default:
399  /* FIXME: add error message */
400  fprintf(stderr, "wrong keyval mode in read mode.\n");
401  _sion_flags_destroy_store(&flags_store);
402  return NULL;
403  }
404 
405  break;
406  case SION_OPEN_WRITE:
407  _sion_flags_add(flags_store, "bw", "");
408 
409  switch (options->keyval_mode) {
410  case SION_KEYVAL_MODE_NONE:
411  /* do nothing */
412  break;
413  case SION_KEYVAL_MODE_DEFAULT:
414  /* FALLTHROUGH */
415  case SION_KEYVAL_MODE_INLINE:
416  _sion_flags_add(flags_store, "keyval", "inline");
417  break;
418  case SION_KEYVAL_MODE_META:
419  _sion_flags_add(flags_store, "keyval", "meta");
420  break;
421  case SION_KEYVAL_MODE_HASH:
422  _sion_flags_add(flags_store, "keyval", "hash");
423  break;
424  default: /* 'unknown' keyval mode is not allowed in write mode */
425  /* FIXME: add error message */
426  fprintf(stderr, "wrong keyval mode in read mode.\n");
427  _sion_flags_destroy_store(&flags_store);
428  return NULL;
429  }
430 
431  break;
432  }
433 
434  if (options->buddylevel > 0) {
435  char buddylevel_str[12];
436  int written = snprintf(buddylevel_str, 12, "%" PRId32, (int32_t)options->buddylevel);
437  // FIXME: assert written < 12
438  (void)written;
439  _sion_flags_add(flags_store, "buddy", buddylevel_str);
440  fprintf(stderr, "added buddy level %s.\n", buddylevel_str);
441  }
442 
443  if (options->collsize) {
444  _sion_flags_add(flags_store, "collective", "");
445  char collsize_str[12];
446  int written = snprintf(collsize_str, 12, "%" PRId32, (int32_t)options->collsize);
447  // FIXME: assert written < 12
448  (void)written;
449  _sion_flags_add(flags_store, "collsize", collsize_str);
450  if (options->collective_merge) {
451  _sion_flags_add(flags_store, "cmerge", "");
452  }
453  }
454 
455  _sion_flags_update_mask(flags_store);
456 
457  return flags_store;
458 }
459 
460 int sion_paropen_mpi_with_options(
461  const char *filename, sion_open_mode mode, MPI_Comm communicator, const sion_mpi_options *options_)
462 {
463  sion_mpi_options options; // FIXME: need local copy to satisfy signature of sion_paropen_mpi()
464  if (options_ == NULL) {
465  options = sion_mpi_options_new();
466  } else {
467  options = *options_;
468  }
469 
470  int num_files;
471  MPI_Comm lcomm;
472  int rank_;
473 
474  switch (options.multifile_mode) {
475  case _SION_MULTIFILE_SINGLE:
476  num_files = 1;
477  lcomm = communicator;
478  break;
479  case _SION_MULTIFILE_NUMBER:
480  num_files = options.multifile.number;
481  lcomm = communicator;
482  break;
483  case _SION_MULTIFILE_COMMUNICATOR:
484  num_files = -1;
485  lcomm = options.multifile.communicator;
486  break;
487  }
488 
489  _sion_flags_store *flags_store = _sion_mpi_options_into_flags_store(mode, &options);
490  if (!flags_store) {
491  return -1;
492  }
493 
494  int sid = _sion_paropen_mpi(
495  filename, flags_store, &num_files, communicator, &lcomm, &options.chunksize, &options.fsblksize, &rank_, NULL, NULL);
496 
497  _sion_flags_destroy_store(&flags_store);
498  return sid;
499 }
500 
501 /* end of ifdef MPI */
502 #endif
int sion_parclose_mpi(int sid)
Close a sion file using MPI.
Definition: sion_mpi_gen.c:162
int sion_paropen_mpi(const char *fname, const char *file_mode, int *numFiles, MPI_Comm gComm, const MPI_Comm *lComm, int64_t *chunksize, int32_t *fsblksize, int *globalrank, FILE **fileptr, char **newfname)
Open a sion file using MPI.
Definition: sion_mpi_gen.c:35