SIONlib  2.0.0-rc.2
Scalable I/O library for parallel access to task-local files
sion_ompi_gen.c
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2019 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
10 #define _XOPEN_SOURCE 700
11 
12 #include <mpi.h>
13 #include <stdint.h>
14 #include <stdio.h>
15 #include <stdlib.h>
16 
17 #include "sion_common.h"
18 #include "sion_const.h"
19 #include "sion_debug.h"
20 #include "sion_enums.h"
21 #include "sion_error_handler.h"
22 
23 #ifdef SION_OMPI
24 
25 #include <omp.h>
26 
27 #include "sion_generic.h"
28 #include "sion_ompi.h"
29 #include "sion_ompi_cb_gen.h"
30 #include "sion_ompi_internal_gen.h"
31 
32 int _sion_ompi_api_aid = -1;
33 static omp_lock_t _sion_ompi_lock_data;
34 
35 static void *__ompi_thread_sync_struct;
36 
37 int _sion_ompi_user_lock(void *data)
38 {
39  int rc = SION_SUCCESS;
40  omp_set_lock(&_sion_ompi_lock_data);
41  return rc;
42 }
43 int _sion_ompi_user_unlock(void *data)
44 {
45  int rc = SION_SUCCESS;
46  omp_unset_lock(&_sion_ompi_lock_data);
47  return rc;
48 }
49 
50 int sion_parclose_ompi(int sid)
51 {
52  int rc = 0;
53 
54  DPRINTFP((1, "sion_parclose_ompi", _SION_DEFAULT_RANK, "enter parallel close of sid %d\n", sid));
55 
56  rc = sion_generic_parclose(sid);
57 
58  DPRINTFP((1, "sion_parclose_ompi", _SION_DEFAULT_RANK, "leave parallel close of sid %d rc=%d\n", sid, rc));
59 
60  return rc;
61 }
62 
64 {
65  sion_ompi_options *options = malloc(sizeof(sion_ompi_options));
66  if (options) {
67  *options = SION_OMPI_OPTIONS_INIT;
68  }
69  return options;
70 }
71 
73 {
74  free(options);
75 }
76 
77 void sion_ompi_options_set_chunksize(sion_ompi_options *options, int64_t chunksize)
78 {
79  sion_generic_options_set_chunksize(&options->generic_options, chunksize);
80 }
81 
82 void sion_ompi_options_set_fsblksize(sion_ompi_options *options, int32_t fsblksize)
83 {
84  sion_generic_options_set_fsblksize(&options->generic_options, fsblksize);
85 }
86 
87 void sion_ompi_options_set_multifile_number(sion_ompi_options *options, int multifile_number)
88 {
89  options->multifile_mode = _SION_OMPI_MULTIFILE_NUMBER;
90  options->multifile.number = multifile_number;
91 }
92 
93 void sion_ompi_options_set_multifile_communicator(sion_ompi_options *options, MPI_Comm multifile_communicator)
94 {
95  options->multifile_mode = _SION_OMPI_MULTIFILE_COMMUNICATOR;
96  options->multifile.communicator = multifile_communicator;
97 }
98 
100 {
101  sion_generic_options_set_keyval_mode(&options->generic_options, keyval_mode);
102 }
103 
105 {
107 }
108 
109 void sion_ompi_options_set_buddylevel(sion_ompi_options *options, int32_t buddylevel)
110 {
111  sion_generic_options_set_buddylevel(&options->generic_options, buddylevel);
112 }
113 
115 {
117 }
118 
120 {
121  sion_generic_options_set_collective_size(&options->generic_options, size);
122 }
123 
125 {
126  sion_generic_options_set_collective_merge(&options->generic_options);
127 }
128 
130 {
131  sion_generic_options_set_lowlevel_api(&options->generic_options, lowlevel_api);
132 }
133 
135 {
136  sion_generic_options_set_endianness(&options->generic_options, endianness);
137 }
138 
139 int sion_paropen_ompi(const char *fname, sion_open_mode mode, MPI_Comm gcomm, const sion_ompi_options *options_)
140 {
141  sion_ompi_options options = (options_) ? *options_ : SION_OMPI_OPTIONS_INIT;
142 
143  int thread_num = omp_get_thread_num();
144  int num_threads = omp_get_num_threads();
145 
146  int grank, gsize;
147  __ompi_thread_sync *thread_sync;
148 #pragma omp master
149  {
150  _sion_debug_set_query_thread_num_function(omp_get_thread_num);
151  _sion_error_set_query_thread_num_function(omp_get_thread_num);
152  omp_init_lock(&_sion_ompi_lock_data);
153  sion_lock_register_lock_callbacks(_sion_ompi_user_lock, _sion_ompi_user_unlock, &_sion_ompi_lock_data);
154 
155  MPI_Comm_size(gcomm, &gsize);
156  MPI_Comm_rank(gcomm, &grank);
157 
158  thread_sync = malloc(sizeof(__ompi_thread_sync));
159  if (!thread_sync) {
160  _sion_errorprint_ompi(SION_ID_NOT_VALID, _SION_ERROR_ABORT,
161  "sion_paropen_ompi: cannot allocate struct of size %lu (__ompi_thread_sync), aborting...", sizeof(__ompi_thread_sync));
162  }
163 
164  thread_sync->grank_master_mpi = grank;
165  thread_sync->gsize_mpi = gsize;
166  thread_sync->grank_master_ompi = _sion_map_rank_mpi_to_ompi(grank, num_threads, thread_num);
167  thread_sync->gsize_ompi = _sion_get_size_ompi(gsize, num_threads);
168  thread_sync->num_threads = num_threads;
169  __ompi_thread_sync_struct = thread_sync;
170  }
171  /* sync to ensure that info in thread_sync is accessible */
172 #pragma omp barrier
173 
174  /* this is actually not necessary, but it makes for cleaner code by preventing us from doing lots of typecasts */
175  thread_sync = __ompi_thread_sync_struct;
176  DPRINTFP(
177  (1, __func__, thread_sync->grank_master_ompi + thread_num, "thread %d enters parallel open of file %s\n", thread_num, fname));
178 
179 /* create generic API */
180 #pragma omp master
181  {
182  /* register callbacks for generic interface */
183  if (_sion_ompi_api_aid < 0) {
184  _sion_ompi_api_aid = _sion_register_callbacks_ompi();
185  }
186  }
187 
188  /* create global generic communicator container on all threads */
189  _ompi_api_commdata *gen_gcomm = malloc(sizeof(_ompi_api_commdata));
190  if (gen_gcomm != NULL) {
191  gen_gcomm->commset = 0;
192  gen_gcomm->local = 0;
193  gen_gcomm->rank = thread_sync->grank_master_ompi + thread_num;
194  gen_gcomm->size = thread_sync->gsize_ompi;
195  gen_gcomm->num_threads = thread_sync->num_threads;
196  gen_gcomm->thread_num = thread_num;
197  gen_gcomm->lcommgroup = NULL;
198  } else {
199  return _sion_errorprint_ompi(SION_ID_NOT_VALID, _SION_ERROR_RETURN,
200  "cannot allocate ompi internal data structure of size %lu (_omp_api_commdata), aborting ...\n",
201  (unsigned long)sizeof(_ompi_api_commdata));
202  }
203 
204 /* store MPI communicator in global generic communicator container on master thread */
205 #pragma omp master
206  {
207  gen_gcomm->comm = gcomm;
208  }
209 
210  /* sync to ensure that aid is accessible */
211 #pragma omp barrier
212 
213  _ompi_api_commdata *gen_lcomm = NULL;
214  int filenumber, nfiles, lrank, lsize;
215  if (mode == SION_OPEN_WRITE) {
216  /* create generic local communicator container on each thread */
217  if (options.multifile_mode == _SION_OMPI_MULTIFILE_COMMUNICATOR) {
218  gen_lcomm = malloc(sizeof(_ompi_api_commdata));
219  if (!gen_lcomm) {
220  return _sion_errorprint_ompi(SION_ID_NOT_VALID, _SION_ERROR_RETURN,
221  "cannot allocate ompi internal data structure of size %lu (_ompi_api_commdata), aborting ...\n",
222  (unsigned long)sizeof(_ompi_api_commdata));
223  }
224  gen_lcomm->commset = 1;
225  gen_lcomm->commcreated = 0;
226  gen_lcomm->local = 1;
227  gen_lcomm->num_threads = gen_gcomm->num_threads;
228  gen_lcomm->thread_num = thread_num;
229  gen_gcomm->lcommgroup = gen_lcomm; /* store pointer in global comm group */
230  }
231 
232 #pragma omp master
233  {
234  if (options.multifile_mode == _SION_OMPI_MULTIFILE_COMMUNICATOR) {
235  int rc =
236  _sion_get_info_from_splitted_comm_ompi(gcomm, options.multifile.communicator, &nfiles, &filenumber, &lrank, &lsize);
237  if (rc != SION_SUCCESS) {
238  _sion_errorprint_ompi(
239  SION_ID_NOT_VALID, _SION_ERROR_RETURN, "sion_paropen_ompi: error in _sion_get_info_from_splitted_comm_ompi");
240  }
241  gen_lcomm->comm = options.multifile.communicator;
242  DPRINTFP((1, __func__, grank, "%d local communicators found\n", nfiles));
243  } else {
244  /* number of files is given */
245  int rc = _sion_gen_info_from_gcomm_ompi(options.multifile.number, gcomm, &filenumber, &lrank, &lsize);
246  if (rc != SION_SUCCESS) {
247  _sion_errorprint_ompi(
248  SION_ID_NOT_VALID, _SION_ERROR_RETURN, "sion_paropen_ompi: error in _sion_gen_info_from_gcomm_ompi");
249  }
250  nfiles = options.multifile.number;
251  DPRINTFP((1, __func__, grank, "Global communicator divided in %d local communicators\n", nfiles));
252  }
253  thread_sync->filenumber = filenumber;
254  thread_sync->numFiles = nfiles;
255  thread_sync->lrank_master_mpi = lrank;
256  thread_sync->lsize_mpi = lsize;
257  thread_sync->lrank_master_ompi = _sion_map_rank_mpi_to_ompi(lrank, num_threads, thread_num);
258  thread_sync->lsize_ompi = _sion_get_size_ompi(lsize, num_threads);
259  }
260 #pragma omp barrier
261 
262  /* set up parameters of call to generic open (OMPI values) */
263  grank = thread_sync->grank_master_ompi + thread_num;
264  gsize = thread_sync->gsize_ompi;
265  lrank = thread_sync->lrank_master_ompi + thread_num;
266  lsize = thread_sync->lsize_ompi;
267  filenumber = thread_sync->filenumber;
268  nfiles = thread_sync->numFiles;
269 
270  if (gen_lcomm) {
271  gen_lcomm->rank = lrank;
272  gen_lcomm->size = lsize;
273  }
274 
275  } else if (mode == SION_OPEN_READ) {
276  /* file mode READ */
277  /* set up parameters of call to generic open (OMPI values) */
278  grank = thread_sync->grank_master_ompi + thread_num;
279  gsize = thread_sync->gsize_ompi;
280  lrank = -1; /* which determined after opening file by sion_generic_paropen */
281  lsize = -1; /* " */
282  filenumber = -1; /* " */
283  nfiles = -1; /* " */
284 
285  } else {
286  return _sion_errorprint_ompi(SION_ID_NOT_VALID, _SION_ERROR_RETURN, "sion_paropen_ompi: unknown file mode");
287  }
288  // FIXME
289  // DPRINTFP((1, DFUNCTION, grank, "enter parallel open of %d files (current name %s) in %s mode\n", nfiles, fname, file_mode));
290  DPRINTFP((1, __func__, grank, "enter parallel open of %d files (current name %s)\n", nfiles, fname));
291  DPRINTFP((2, __func__, grank, "enter parallel parameters: grank=%d gsize=%d fnum=%d nfiles=%d lrank=%d lsize=%d\n", grank,
292  gsize, filenumber, nfiles, lrank, lsize));
293  int sid = sion_generic_paropen(
294  _sion_ompi_api_aid, fname, mode, gen_gcomm, grank, gsize, filenumber, nfiles, lrank, lsize, &options.generic_options);
295  // DPRINTFP(
296  // (1, __func__, grank, "leave parallel open of %d files in %s mode #tasks=%d sid=%d\n", nfiles, file_mode, lsize, sid));
297  DPRINTFP((1, __func__, grank, "leave parallel open of %d files #tasks=%d sid=%d\n", nfiles, lsize, sid));
298 
299  /* test return code from internal open */
300  if (sid == SION_ID_NOT_VALID) {
301  return _sion_errorprint_ompi(
302  SION_ID_NOT_VALID, _SION_ERROR_RETURN, "sion_paropen_mpi: invalid return code from internal open %d", sid);
303  }
304  DPRINTFP((1, __func__, grank, "leave parallel open of file %s sid=%d globalrank=%d\n", fname, sid, grank));
305  return sid;
306 }
307 
308 /* end of ifdef OMPI */
309 #endif
sion_lowlevel_api
specifies a low-level API to use for file system access
Definition: sion_enums.h:19
int sion_generic_paropen(int aid, const char *fname, sion_open_mode mode, void *gcommgroup, int grank, int gsize, int filenumber, int numfiles, int lrank, int lsize, const sion_generic_options *options_)
Open a SIONlib file through a generic interface.
Definition: sion_generic.c:727
void sion_generic_options_set_collective_size(sion_generic_options *options, int32_t size)
Enable collective I/O.
Definition: sion_generic.c:704
void sion_ompi_options_set_endianness(sion_ompi_options *options, sion_endianness endianness)
Set the endianness for the contents of a container.
struct sion_ompi_options sion_ompi_options
Holds non-essential arguments for sion_paropen_ompi().
Definition: sion_ompi.h:43
void sion_ompi_options_set_collective(sion_ompi_options *options)
Enable collective I/O.
void sion_ompi_options_set_keyval_mode(sion_ompi_options *options, sion_keyval_mode keyval_mode)
Set the key-value mode to use for a container.
Definition: sion_ompi_gen.c:99
int sion_lock_register_lock_callbacks(int lock(void *), int unlock(void *), void *lock_data)
Function which registers callback funtions for lock and unlock internal access to shared data structu...
Definition: sion_common.c:743
void sion_ompi_options_set_fsblksize(sion_ompi_options *options, int32_t fsblksize)
Set the file system block size to assume.
Definition: sion_ompi_gen.c:82
void sion_generic_options_set_collective_merge(sion_generic_options *options)
Use collective merging.
Definition: sion_generic.c:709
void sion_ompi_options_set_chunksize(sion_ompi_options *options, int64_t chunksize)
Set the chunk size of a logical file in the container.
Definition: sion_ompi_gen.c:77
void sion_generic_options_set_endianness(sion_generic_options *options, sion_endianness endianness)
Set the endianness for the contents of a container.
Definition: sion_generic.c:722
void sion_ompi_options_set_lowlevel_api(sion_ompi_options *options, sion_lowlevel_api lowlevel_api)
Set the low-level API to use for opening a container.
void sion_generic_options_set_keyval_mode(sion_generic_options *options, sion_keyval_mode keyval_mode)
Set the key-value mode to use for a container.
Definition: sion_generic.c:684
sion_ompi_options * sion_ompi_options_new()
Allocates and initializes an instance of sion_ompi_options
Definition: sion_ompi_gen.c:63
void sion_generic_options_set_fsblksize(sion_generic_options *options, int32_t fsblksize)
Set the file system block size to assume.
Definition: sion_generic.c:679
void sion_generic_options_set_chunksize(sion_generic_options *options, int64_t chunksize)
Set the chunk size of a logical file in the container.
Definition: sion_generic.c:674
void sion_ompi_options_set_multifile_number(sion_ompi_options *options, int multifile_number)
Set the number of physical files to use.
Definition: sion_ompi_gen.c:87
void sion_ompi_options_set_multifile_communicator(sion_ompi_options *options, MPI_Comm multifile_communicator)
Create multiple physical files based on disjunct communicators.
Definition: sion_ompi_gen.c:93
void sion_ompi_options_set_collective_merge(sion_ompi_options *options)
Use collective merging.
void sion_ompi_options_set_buddy(sion_ompi_options *options)
Enable buddy checkpointing mechanism.
int sion_parclose_ompi(int sid)
closes a SIONlib file previously opened in OpenMP/MPI mode
Definition: sion_ompi_gen.c:50
open the file for reading only
Definition: sion_enums.h:29
sion_keyval_mode
specifies whether to use SIONlib's key-value mechanism for accessing file content and if so in what m...
Definition: sion_enums.h:35
int sion_generic_parclose(int sid)
Close a SIONlib file.
Definition: sion_generic.c:362
void sion_ompi_options_delete(sion_ompi_options *options)
Delete an instance of sion_ompi_options
Definition: sion_ompi_gen.c:72
sion_endianness
declares the endianness of user data written to a file
Definition: sion_enums.h:63
sion_open_mode
specifies for what type of access to open a file
Definition: sion_enums.h:27
int sion_paropen_ompi(const char *fname, sion_open_mode mode, MPI_Comm gcomm, const sion_ompi_options *options_)
Open a SIONlib file from multiple OpenMP threads on multiple MPI processes in parallel.
void sion_generic_options_set_buddylevel(sion_generic_options *options, int32_t buddylevel)
Enable buddy checkpointing mechanism.
Definition: sion_generic.c:694
void sion_generic_options_set_lowlevel_api(sion_generic_options *options, sion_lowlevel_api lowlevel_api)
Set the low-level API to use for opening a container.
Definition: sion_generic.c:717
void sion_ompi_options_set_collective_size(sion_ompi_options *options, int32_t size)
Enable collective I/O.
open the file for writing only
Definition: sion_enums.h:31
void sion_ompi_options_set_buddylevel(sion_ompi_options *options, int32_t buddylevel)
Enable buddy checkpointing mechanism.