SIONlib  2.0.0-rc.2
Scalable I/O library for parallel access to task-local files
sion_mpi_gen.c
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2019 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
10 #define _XOPEN_SOURCE 700
11 
12 #ifdef SION_MPI
13 
14 #include <assert.h>
15 #include <mpi.h>
16 #include <stdint.h>
17 #include <stdio.h>
18 #include <stdlib.h>
19 
20 #include "sion_const.h"
21 #include "sion_debug.h"
22 #include "sion_enums.h"
23 #include "sion_error_handler.h"
24 #include "sion_flags.h"
25 #include "sion_generic.h"
26 #include "sion_generic_internal.h"
27 #include "sion_internal.h"
28 #include "sion_mpi.h"
29 #include "sion_mpi_cb_gen.h"
30 #include "sion_mpi_internal_gen.h"
31 
32 int _sion_mpi_api_aid = -1;
33 
34 int sion_parclose_mpi(int sid)
35 {
36  int rc = 0;
37 
38  DPRINTFP((1, "sion_parclose_mpi", _SION_DEFAULT_RANK, "enter parallel close of sid %d\n", sid));
39 
40  rc = sion_generic_parclose(sid);
41 
42  DPRINTFP((1, "sion_parclose_mpi", _SION_DEFAULT_RANK, "leave parallel close of sid %d rc=%d\n", sid, rc));
43 
44  return rc;
45 }
46 
47 int sion_parreinit_mpi(int sid, int64_t chunksize)
48 {
49  int rc = 0;
50 
51  DPRINTFP((1, "sion_parreinit_mpi", _SION_DEFAULT_RANK, "enter parallel reinit of sid %d\n", sid));
52 
53  rc = sion_generic_parreinit(sid, chunksize);
54 
55  DPRINTFP((1, "sion_parreinit_mpi", _SION_DEFAULT_RANK, "leave parallel reinit of sid %d rc=%d\n", sid, rc));
56 
57  return rc;
58 }
59 
60 int sion_paropen_mapped_mpi(char *fname, const char *file_mode, int *numFiles, MPI_Comm gComm, int *nlocaltasks,
61  int **globalranks, int64_t **chunksizes, int **mapping_filenrs, int **mapping_lranks, int32_t *fsblksize, FILE **fileptr)
62 {
63  int sid;
64  _sion_flags_store *flags_store = _sion_parse_flags(file_mode);
65  if (!flags_store) {
66  return _sion_errorprint_mpi(SION_ID_NOT_VALID, _SION_ERROR_RETURN,
67  "sion_paropen_mapped_mpi: could not parse file mode in %s, aborting ...\n", file_mode);
68  }
69  sid = _sion_paropen_mapped_mpi(fname, flags_store, numFiles, gComm, nlocaltasks, globalranks, chunksizes, mapping_filenrs,
70  mapping_lranks, fsblksize, fileptr);
71  _sion_flags_destroy_store(&flags_store);
72  return sid;
73 }
74 
75 int _sion_paropen_mapped_mpi(char *fname, const _sion_flags_store *flags_store, int *numFiles, MPI_Comm gComm, int *nlocaltasks,
76  int **globalranks, int64_t **chunksizes, int **mapping_filenrs, int **mapping_lranks, int32_t *fsblksize, FILE **fileptr)
77 {
78  int sid = SION_ID_UNDEF;
79  int gtasks, gRank;
80  _mpi_api_commdata *gen_gcomm;
81 
82  MPI_Comm_size(gComm, &gtasks);
83  MPI_Comm_rank(gComm, &gRank);
84 
85  DPRINTFP((1, "sion_paropen_mapped_mpi", gRank, "enter parallel open of file %s\n", fname));
86 
87  /* check parameters */
88  if (numFiles == NULL) {
89  return _sion_errorprint_mpi(SION_ID_NOT_VALID, _SION_ERROR_RETURN, "sion_paropen_mapped_mpi: No numFiles variable given");
90  }
91 
92  /* register callbacks for generic interface */
93  if (_sion_mpi_api_aid < 0) {
94  _sion_mpi_api_aid = _sion_register_callbacks_mpi();
95  }
96 
97  if (flags_store->mask & _SION_FMODE_WRITE) {
98  /* file mode WRITE */
99 
100  if (*numFiles <= 0) {
101  return _sion_errorprint_mpi(SION_ID_NOT_VALID, _SION_ERROR_RETURN,
102  "sion_paropen_mapped_mpi: numFiles variable <= 0 not allowed for mapped files in write mode");
103  }
104  } else if (flags_store->mask & _SION_FMODE_READ) {
105  /* file mode READ */
106  /* nothing to do here so far, filenumbers and mapping will be determined by in generic routine */
107 
108  } else {
109  return _sion_errorprint_mpi(SION_ID_NOT_VALID, _SION_ERROR_RETURN, "sion_paropen_mapped_mpi: unknown file mode");
110  }
111 
112  /* create generic communicator container */
113  gen_gcomm = malloc(sizeof(_mpi_api_commdata));
114  if (gen_gcomm == NULL) {
115  return _sion_errorprint(SION_ID_NOT_VALID, _SION_ERROR_RETURN,
116  "cannot allocate mpi internal data structure of size %lu (_mpi_api_commdata), aborting ...\n",
117  (unsigned long)sizeof(_mpi_api_commdata));
118  }
119  gen_gcomm->comm = gComm;
120  gen_gcomm->commset = 1;
121  gen_gcomm->local = 0;
122  gen_gcomm->rank = gRank;
123  gen_gcomm->size = gtasks;
124  gen_gcomm->lcommgroup = NULL;
125 
126  // FIXME: re-enable output of file mode
127  // DPRINTFP((1, "sion_paropen_mapped_mpi", gRank, "enter parallel open of %d files (current name %s) in %s mode (sid=%d)\n", *numFiles, fname, file_mode, sid));
128  DPRINTFP(
129  (1, "sion_paropen_mapped_mpi", gRank, "enter parallel open of %d files (current name %s) (sid=%d)\n", *numFiles, fname, sid));
130  sid = _sion_generic_paropen_mapped(_sion_mpi_api_aid, fname, flags_store, numFiles, gen_gcomm, gRank, gtasks, nlocaltasks,
131  globalranks, chunksizes, mapping_filenrs, mapping_lranks, fsblksize, fileptr);
132  // FIXME: re-enable output of file mode
133  // DPRINTFP((1, "sion_paropen_mapped_mpi", gRank, "leave parallel open of %d files in %s mode #tasks=%d sid=%d\n", *numFiles, file_mode, *nlocaltasks, sid));
134  DPRINTFP(
135  (1, "sion_paropen_mapped_mpi", gRank, "leave parallel open of %d files #tasks=%d sid=%d\n", *numFiles, *nlocaltasks, sid));
136 
137  /* test return code from internal open */
138  if (sid == SION_ID_NOT_VALID) {
139  return _sion_errorprint_mpi(
140  SION_ID_NOT_VALID, _SION_ERROR_RETURN, "sion_paropen_mapped_mpi: invalid return code from internal open %d", sid);
141  }
142 
143  DPRINTFP((1, "sion_paropen_mapped_mpi", gRank, "leave parallel open of file %s sid=%d\n", fname, sid));
144 
145  return sid;
146 }
147 
148 int sion_parclose_mapped_mpi(int sid)
149 {
150  int rc = 0;
151 
152  DPRINTFP((1, "sion_parclose_mapped_mpi", _SION_DEFAULT_RANK, "enter parallel close of sid %d\n", sid));
153 
154  rc = sion_generic_parclose_mapped(sid);
155 
156  DPRINTFP((1, "sion_parclose_mapped_mpi", _SION_DEFAULT_RANK, "leave parallel close of sid %d rc=%d\n", sid, rc));
157 
158  return rc;
159 }
160 
162 {
163  sion_mpi_options *options = malloc(sizeof(sion_mpi_options));
164  if (options) {
165  *options = SION_MPI_OPTIONS_INIT;
166  }
167  return options;
168 }
169 
171 {
172  free(options);
173 }
174 
175 void sion_mpi_options_set_chunksize(sion_mpi_options *options, int64_t chunksize)
176 {
177  sion_generic_options_set_chunksize(&options->generic_options, chunksize);
178 }
179 
180 void sion_mpi_options_set_fsblksize(sion_mpi_options *options, int32_t fsblksize)
181 {
182  sion_generic_options_set_fsblksize(&options->generic_options, fsblksize);
183 }
184 
185 void sion_mpi_options_set_multifile_number(sion_mpi_options *options, int multifile_number)
186 {
187  options->multifile_mode = _SION_MPI_MULTIFILE_NUMBER;
188  options->multifile.number = multifile_number;
189 }
190 
191 void sion_mpi_options_set_multifile_communicator(sion_mpi_options *options, MPI_Comm multifile_communicator)
192 {
193  options->multifile_mode = _SION_MPI_MULTIFILE_COMMUNICATOR;
194  options->multifile.communicator = multifile_communicator;
195 }
196 
198 {
199  sion_generic_options_set_keyval_mode(&options->generic_options, keyval_mode);
200 }
201 
203 {
205 }
206 
207 void sion_mpi_options_set_buddylevel(sion_mpi_options *options, int32_t buddylevel)
208 {
209  sion_generic_options_set_buddylevel(&options->generic_options, buddylevel);
210 }
211 
213 {
215 }
216 
218 {
219  sion_generic_options_set_collective_size(&options->generic_options, size);
220 }
221 
223 {
224  sion_generic_options_set_collective_merge(&options->generic_options);
225 }
226 
228 {
229  sion_generic_options_set_lowlevel_api(&options->generic_options, lowlevel_api);
230 }
231 
233 {
234  sion_generic_options_set_endianness(&options->generic_options, endianness);
235 }
236 
237 int sion_paropen_mpi(const char *filename, sion_open_mode mode, MPI_Comm communicator, const sion_mpi_options *options_)
238 {
239  sion_mpi_options options = (options_) ? *options_ : SION_MPI_OPTIONS_INIT;
240 
241  int gsize, grank;
242  MPI_Comm_size(communicator, &gsize);
243  MPI_Comm_rank(communicator, &grank);
244 
245  DPRINTFP((1, "sion_paropen_mpi", grank, "enter parallel open of file %s\n", filename));
246 
247  /* register callbacks for generic interface */
248  if (_sion_mpi_api_aid < 0) {
249  _sion_mpi_api_aid = _sion_register_callbacks_mpi();
250  }
251 
252  MPI_Comm lcomm;
253  int nfiles, filenumber, lrank, lsize;
254  if (mode == SION_OPEN_WRITE) {
255  switch (options.multifile_mode) {
256  int rc;
257  case _SION_MPI_MULTIFILE_COMMUNICATOR:
258  lcomm = options.multifile.communicator;
259  rc = _sion_get_info_from_splitted_comm_mpi(communicator, lcomm, &nfiles, &filenumber, &lrank, &lsize);
260  if (rc != SION_SUCCESS) {
261  return _sion_errorprint_mpi(
262  SION_ID_NOT_VALID, _SION_ERROR_RETURN, "sion_paropen_mpi: error in _sion_get_info_from_splitted_comm_mpi");
263  }
264  DPRINTFP((1, "sion_paropen_mpi", grank, "%d local communicators found\n", nfiles));
265  break;
266  case _SION_MPI_MULTIFILE_NUMBER: // FALLTHROUGH
267  default:
268  nfiles = options.multifile.number;
269  rc = _sion_gen_info_from_gcomm_mpi(nfiles, communicator, &filenumber, &lrank, &lsize);
270  if (rc != SION_SUCCESS) {
271  return _sion_errorprint_mpi(
272  SION_ID_NOT_VALID, _SION_ERROR_RETURN, "sion_paropen_mpi: error in _sion_gen_info_from_gcomm_mpi");
273  }
274  DPRINTFP((1, "sion_paropen_mpi", grank, "Global communicator divided in %d local communicators\n", nfiles));
275  }
276  } else if (mode == SION_OPEN_READ) {
277  if (options.generic_options.buddylevel > 0) {
278  // lcomm must be given for buddy checkpointing
279  // FIXME
280  assert(options.multifile_mode == _SION_MPI_MULTIFILE_COMMUNICATOR);
281  lcomm = options.multifile.communicator;
282  int rc = _sion_get_info_from_splitted_comm_mpi(communicator, lcomm, &nfiles, &filenumber, &lrank, &lsize);
283  if (rc != SION_SUCCESS) {
284  return _sion_errorprint_mpi(
285  SION_ID_NOT_VALID, _SION_ERROR_RETURN, "sion_paropen_mpi: error in _sion_get_info_from_splitted_comm_mpi");
286  }
287  DPRINTFP((1, "sion_paropen_mpi", grank, "%d local communicators found\n", nfiles));
288  } else {
289  lrank = lsize = -1; // will be set by sion_generic_paropen
290  }
291  } else {
292  return _sion_errorprint_mpi(SION_ID_NOT_VALID, _SION_ERROR_RETURN, "sion_paropen_mpi: unknown file mode");
293  }
294 
295  /* create generic communicator container */
296  _mpi_api_commdata *gen_gcomm = malloc(sizeof(_mpi_api_commdata));
297  if (!gen_gcomm) {
298  return _sion_errorprint(SION_ID_NOT_VALID, _SION_ERROR_RETURN,
299  "cannot allocate mpi internal data structure of size %lu (_mpi_api_commdata), aborting ...\n",
300  (unsigned long)sizeof(_mpi_api_commdata));
301  }
302  gen_gcomm->comm = communicator;
303  gen_gcomm->commset = 1;
304  gen_gcomm->local = 0;
305  gen_gcomm->rank = grank;
306  gen_gcomm->size = gsize;
307  gen_gcomm->lcommgroup = NULL;
308 
309  /* FIXME: add debug output for flags_store
310  DPRINTFP((1, "sion_paropen_mpi", grank, "enter parallel open of %d files (current name %s) in %s mode\n", nfiles, filename, file_mode)); */
311  DPRINTFP((1, "sion_paropen_mpi", grank, "enter parallel open of %d files (current name %s)\n", nfiles, filename));
312  int sid = sion_generic_paropen(
313  _sion_mpi_api_aid, filename, mode, gen_gcomm, grank, gsize, filenumber, nfiles, lrank, lsize, &options.generic_options);
314  /* FIXME: add debug output for flags_store
315  DPRINTFP((1, "sion_paropen_mpi", grank, "leave parallel open of %d files in %s mode #tasks=%d sid=%d\n", nfiles, file_mode, lSize, sid)); */
316  DPRINTFP((1, "sion_paropen_mpi", grank, "leave parallel open of %d files #tasks=%d sid=%d\n", nfiles, lsize, sid));
317 
318  /* test return code from internal open */
319  if (sid == SION_ID_NOT_VALID) {
320  return _sion_errorprint_mpi(
321  SION_ID_NOT_VALID, _SION_ERROR_RETURN, "sion_paropen_mpi: invalid return code from internal open %d", sid);
322  }
323 
324  DPRINTFP((1, "sion_paropen_mpi", grank, "leave parallel open of file %s sid=%d\n", filename, sid));
325 
326  return sid;
327 }
328 
329 /* end of ifdef MPI */
330 #endif
sion_lowlevel_api
specifies a low-level API to use for file system access
Definition: sion_enums.h:19
sion_mpi_options * sion_mpi_options_new()
Allocates and initializes an instance of sion_mpi_options
Definition: sion_mpi_gen.c:161
void sion_mpi_options_set_collective(sion_mpi_options *options)
Enable collective I/O.
Definition: sion_mpi_gen.c:212
void sion_mpi_options_set_buddy(sion_mpi_options *options)
Enable buddy checkpointing mechanism.
Definition: sion_mpi_gen.c:202
int sion_generic_paropen(int aid, const char *fname, sion_open_mode mode, void *gcommgroup, int grank, int gsize, int filenumber, int numfiles, int lrank, int lsize, const sion_generic_options *options_)
Open a SIONlib file through a generic interface.
Definition: sion_generic.c:727
void sion_generic_options_set_collective_size(sion_generic_options *options, int32_t size)
Enable collective I/O.
Definition: sion_generic.c:704
void sion_mpi_options_set_collective_size(sion_mpi_options *options, int32_t size)
Enable collective I/O.
Definition: sion_mpi_gen.c:217
void sion_mpi_options_delete(sion_mpi_options *options)
Delete an instance of sion_mpi_options
Definition: sion_mpi_gen.c:170
void sion_mpi_options_set_chunksize(sion_mpi_options *options, int64_t chunksize)
Set the chunk size of a logical file in the container.
Definition: sion_mpi_gen.c:175
int sion_paropen_mpi(const char *filename, sion_open_mode mode, MPI_Comm communicator, const sion_mpi_options *options_)
Open a SIONlib file from multiple MPI processes.
Definition: sion_mpi_gen.c:237
void sion_mpi_options_set_fsblksize(sion_mpi_options *options, int32_t fsblksize)
Set the file system block size to assume.
Definition: sion_mpi_gen.c:180
void sion_generic_options_set_collective_merge(sion_generic_options *options)
Use collective merging.
Definition: sion_generic.c:709
void sion_generic_options_set_endianness(sion_generic_options *options, sion_endianness endianness)
Set the endianness for the contents of a container.
Definition: sion_generic.c:722
int sion_parclose_mpi(int sid)
Close a SIONlib file using MPI.
Definition: sion_mpi_gen.c:34
void sion_generic_options_set_keyval_mode(sion_generic_options *options, sion_keyval_mode keyval_mode)
Set the key-value mode to use for a container.
Definition: sion_generic.c:684
void sion_mpi_options_set_multifile_communicator(sion_mpi_options *options, MPI_Comm multifile_communicator)
Create multiple physical files based on disjunct communicators.
Definition: sion_mpi_gen.c:191
void sion_mpi_options_set_keyval_mode(sion_mpi_options *options, sion_keyval_mode keyval_mode)
Set the key-value mode to use for a container.
Definition: sion_mpi_gen.c:197
void sion_generic_options_set_fsblksize(sion_generic_options *options, int32_t fsblksize)
Set the file system block size to assume.
Definition: sion_generic.c:679
void sion_generic_options_set_chunksize(sion_generic_options *options, int64_t chunksize)
Set the chunk size of a logical file in the container.
Definition: sion_generic.c:674
open the file for reading only
Definition: sion_enums.h:29
void sion_mpi_options_set_endianness(sion_mpi_options *options, sion_endianness endianness)
Set the endianness for the contents of a container.
Definition: sion_mpi_gen.c:232
sion_keyval_mode
specifies whether to use SIONlib's key-value mechanism for accessing file content and if so in what m...
Definition: sion_enums.h:35
void sion_mpi_options_set_multifile_number(sion_mpi_options *options, int multifile_number)
Set the number of physical files to use.
Definition: sion_mpi_gen.c:185
int sion_generic_parclose(int sid)
Close a SIONlib file.
Definition: sion_generic.c:362
sion_endianness
declares the endianness of user data written to a file
Definition: sion_enums.h:63
sion_open_mode
specifies for what type of access to open a file
Definition: sion_enums.h:27
void sion_generic_options_set_buddylevel(sion_generic_options *options, int32_t buddylevel)
Enable buddy checkpointing mechanism.
Definition: sion_generic.c:694
void sion_generic_options_set_lowlevel_api(sion_generic_options *options, sion_lowlevel_api lowlevel_api)
Set the low-level API to use for opening a container.
Definition: sion_generic.c:717
open the file for writing only
Definition: sion_enums.h:31
void sion_mpi_options_set_collective_merge(sion_mpi_options *options)
Use collective merging.
Definition: sion_mpi_gen.c:222
void sion_mpi_options_set_buddylevel(sion_mpi_options *options, int32_t buddylevel)
Enable buddy checkpointing mechanism.
Definition: sion_mpi_gen.c:207
struct sion_mpi_options sion_mpi_options
Holds non-essential arguments for sion_paropen_mpi().
Definition: sion_mpi.h:44
void sion_mpi_options_set_lowlevel_api(sion_mpi_options *options, sion_lowlevel_api lowlevel_api)
Set the low-level API to use for opening a container.
Definition: sion_mpi_gen.c:227