SIONlib  2.0.0-rc.2
Scalable I/O library for parallel access to task-local files
sion_generic_collective.c
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2019 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
10 #define _XOPEN_SOURCE 700
11 
12 #include <assert.h>
13 #include <inttypes.h>
14 #include <stdint.h>
15 #include <stdio.h>
16 #include <stdlib.h>
17 
18 #include "sion_common.h"
19 #include "sion_const.h"
20 #include "sion_debug.h"
21 #include "sion_error_handler.h"
22 #include "sion_fd.h"
23 #include "sion_file.h"
24 #include "sion_filedesc.h"
25 #include "sion_generic_apidesc.h"
26 #include "sion_generic_buddy.h"
27 #include "sion_generic_collective.h"
28 #include "sion_internal.h"
29 #include "sion_internal_positioning.h"
30 
31 /* collective I/O */
32 #define DFUNCTION "sion_coll_write"
33 size_t sion_coll_write(const void *data, size_t size, size_t nitems, int sid)
34 {
35  _sion_filedesc *sion_filedesc;
36  _sion_generic_gendata *sion_gendata;
37  _sion_generic_apidesc *sion_apidesc;
38 
39  if ((sid < 0) || (_sion_vcdtype(sid) != SION_FILEDESCRIPTOR) || !(sion_filedesc = _sion_vcdtovcon(sid))) {
40  _sion_errorprint(SION_SIZE_NOT_VALID, _SION_ERROR_ABORT, "sion_coll_write: invalid sion_filedesc %d", sid);
41  assert(0);
42  }
43  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "enter usecoll=%d collector=%d collsize=%d (%d tasks, %d files)\n",
44  sion_filedesc->usecoll, sion_filedesc->collector, sion_filedesc->collsize, sion_filedesc->ntasks, sion_filedesc->nfiles));
45 
46  sion_gendata = sion_filedesc->dataptr;
47  sion_apidesc = sion_gendata->apidesc;
48 
49  /* no collective mode */
50  if (!sion_filedesc->usecoll) {
51  return sion_write(data, size, nitems, sid);
52  }
53 
54  /* branch to merge mode if enabled */
55  if (sion_filedesc->collmergemode) {
56  return _sion_coll_fwrite_merge(data, size, nitems, sid);
57  }
58 
59  /* needed for avoiding subsequent non-collective calls */
60  sion_filedesc->collcmdused = 1;
61 
62  /* check collsize */
63  if (sion_filedesc->collsize <= 0) {
64  _sion_errorprint(SION_SIZE_NOT_VALID, _SION_ERROR_ABORT, "sion_coll_write: collsize=%d <= 0, returning ...\n",
65  (int)sion_filedesc->collsize);
66  assert(0);
67  }
68 
69  /* check availability of gather_execute callback */
70  if (!sion_apidesc->gather_execute_cb) {
71  _sion_errorprint(SION_SIZE_NOT_VALID, _SION_ERROR_ABORT,
72  "sion_coll_write: API %s not correctly initalized, collective I/O calls missing, aborting", sion_apidesc->name);
73  assert(0);
74  }
75 
76  /* parameter of callback function */
77  int collector = (int)sion_filedesc->collector;
78  int firstsender = collector + 1;
79  int lastsender = collector + sion_filedesc->collsize - 1;
80  if (lastsender > sion_filedesc->ntasks) {
81  lastsender = sion_filedesc->ntasks - 1;
82  }
83  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector=%d collsize=%d firstsender=%d lastsender=%d\n", collector,
84  sion_filedesc->collsize, firstsender, lastsender));
85 
86  int64_t bytes_written;
87  if (sion_filedesc->rank == sion_filedesc->collector) {
88  /* write own part */
89  bytes_written = _sion_write_multi_chunk(sion_filedesc, data, nitems * size);
90  int64_t ownnewposition = sion_filedesc->currentpos;
91 
92  // TODO: handle return code
93  sion_apidesc->gather_execute_cb(data, sion_filedesc->fsblksize, sion_gendata->comm_data_local, collector, firstsender,
94  lastsender, sid, _sion_generic_collective_process_write, NULL, _sion_generic_collective_next_write_spec);
95 
96  /* set own position to end of own block written in this call */
97  _sion_file_set_position(sion_filedesc->fileptr, ownnewposition);
98  sion_filedesc->currentpos = ownnewposition;
99  } else {
100  _sion_partitioned_range_iterator ranges =
101  _sion_partitioned_range_iterator_from_filedesc(sion_filedesc, size * nitems, sion_filedesc->fsblksize);
102 
103  bytes_written = sion_apidesc->gather_execute_cb(data, sion_filedesc->fsblksize, sion_gendata->comm_data_local, collector,
104  firstsender, lastsender, sid, _sion_generic_collective_process_write, &ranges, _sion_generic_collective_next_write_spec);
105 
106  /* set file pointer in data structure and in file if it is exported and can be used without control of SIONlib */
107  _sion_write_dry_run(sion_filedesc, nitems * size);
108  }
109  int64_t items_written = size ? bytes_written / size : 0;
110 
111  /* switch to buddy checkpointing, if enabled */
112  if (sion_filedesc->usebuddy) {
113  // TODO: how to include buddy result in return value
114  _sion_coll_fwrite_buddy(data, size, nitems, sid, sion_gendata);
115  }
116 
117  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "leave usecoll=%d collector=%d collsize=%d (%d tasks, %d files) rc=%d\n",
118  sion_filedesc->usecoll, sion_filedesc->collector, sion_filedesc->collsize, sion_filedesc->ntasks, sion_filedesc->nfiles,
119  items_written));
120 
121  return items_written;
122 }
123 #undef DFUNCTION
124 
125 size_t sion_coll_fwrite(const void *data, size_t size, size_t nitems, int sid)
126 {
127  return sion_coll_write(data, size, nitems, sid);
128 }
129 
130 #define DFUNCTION "sion_coll_read"
131 size_t sion_coll_read(void *data, size_t size, size_t nitems, int sid)
132 {
133  _sion_filedesc *sion_filedesc;
134  _sion_generic_gendata *sion_gendata;
135  _sion_generic_apidesc *sion_apidesc;
136 
137  if ((sid < 0) || (_sion_vcdtype(sid) != SION_FILEDESCRIPTOR) || !(sion_filedesc = _sion_vcdtovcon(sid))) {
138  _sion_errorprint(SION_SIZE_NOT_VALID, _SION_ERROR_ABORT, "sion_coll_read: invalid sion_filedesc %d", sid);
139  assert(0);
140  }
141  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "enter usecoll=%d collector=%d collsize=%d (%d tasks, %d files)\n",
142  sion_filedesc->usecoll, sion_filedesc->collector, sion_filedesc->collsize, sion_filedesc->ntasks, sion_filedesc->nfiles));
143 
144  sion_gendata = sion_filedesc->dataptr;
145  sion_apidesc = sion_gendata->apidesc;
146 
147  /* no collective mode */
148  if (!sion_filedesc->usecoll) {
149  return sion_read(data, size, nitems, sid);
150  }
151 
152  /* switch to buddy checkpointing, if enabled */
153  if (sion_filedesc->usebuddy) {
154  return _sion_coll_fread_buddy(data, size, nitems, sid);
155  }
156 
157  /* needed for avoiding subsequent non-collective calls */
158  sion_filedesc->collcmdused = 1;
159 
160  /* check collsize */
161  if (sion_filedesc->collsize <= 0) {
162  _sion_errorprint(
163  SION_SIZE_NOT_VALID, _SION_ERROR_ABORT, "sion_coll_read: collsize=%d <= 0, returning ...\n", (int)sion_filedesc->collsize);
164  assert(0);
165  }
166 
167  // check availability of execute_scatter callback
168  if (!sion_apidesc->execute_scatter_cb) {
169  _sion_errorprint(SION_SIZE_NOT_VALID, _SION_ERROR_ABORT,
170  "sion_coll_read: API %s not correctly initalized, collective I/O calls missing, aborting", sion_apidesc->name);
171  assert(0);
172  }
173 
174  /* parameter of callback function */
175  int collector = (int)sion_filedesc->collector;
176  int firstsender = collector + 1;
177  int lastsender = sion_filedesc->rank + sion_filedesc->collsize - 1;
178  if (lastsender > sion_filedesc->ntasks) {
179  lastsender = sion_filedesc->ntasks - 1;
180  }
181  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector=%d collsize=%d firstsender=%d lastsender=%d\n", collector,
182  sion_filedesc->collsize, firstsender, lastsender));
183 
184  int64_t bytes_read;
185  if (sion_filedesc->rank == sion_filedesc->collector) {
186  /* read own part */
187  bytes_read = _sion_read_multi_chunk(sion_filedesc, data, nitems * size);
188  int64_t ownnewposition = sion_filedesc->currentpos;
189 
190  /* read parts and scatter these to sender tasks via callback function */
191  // TODO: handle return code
192  sion_apidesc->execute_scatter_cb(data, sion_filedesc->fsblksize, sion_gendata->comm_data_local, collector, firstsender,
193  lastsender, sid, _sion_generic_collective_process_read, NULL, _sion_generic_collective_next_read_spec);
194 
195  /* set own position to end of own block read in this call */
196  _sion_file_set_position(sion_filedesc->fileptr, ownnewposition);
197  sion_filedesc->currentpos = ownnewposition;
198  } else {
199  _sion_partitioned_limited_range_iterator ranges =
200  _sion_partitioned_limited_range_iterator_from_filedesc(sion_filedesc, size * nitems, sion_filedesc->fsblksize);
201 
202  /* read parts and scatter these to sender tasks via callback function */
203  bytes_read = sion_apidesc->execute_scatter_cb(data, sion_filedesc->fsblksize, sion_gendata->comm_data_local, collector,
204  firstsender, lastsender, sid, _sion_generic_collective_process_read, &ranges, _sion_generic_collective_next_read_spec);
205  bytes_read = nitems * size;
206 
207  /* set file pointer in data structure and in file if it is exported and can be used without control of SIONlib */
208  _sion_read_dry_run(sion_filedesc, nitems * size);
209  }
210  int64_t items_read = size ? bytes_read / size : 0;
211 
212  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "leave usecoll=%d collector=%d collsize=%d (%d tasks, %d files) rc=%d\n",
213  sion_filedesc->usecoll, sion_filedesc->collector, sion_filedesc->collsize, sion_filedesc->ntasks, sion_filedesc->nfiles,
214  items_read));
215 
216  return items_read;
217 }
218 #undef DFUNCTION
219 
220 size_t sion_coll_fread(void *data, size_t size, size_t nitems, int sid)
221 {
222  return sion_coll_read(data, size, nitems, sid);
223 }
224 
225 #define DFUNCTION "_sion_generic_collective_process_write"
226 int64_t _sion_generic_collective_process_write(const void *data, int64_t *spec, int sid)
227 {
228  _sion_filedesc *sion_filedesc;
229  if ((sid < 0) || (_sion_vcdtype(sid) != SION_FILEDESCRIPTOR) || !(sion_filedesc = _sion_vcdtovcon(sid))) {
230  return _sion_errorprint(
231  SION_SIZE_NOT_VALID, _SION_ERROR_RETURN, "_sion_generic_collective_process_write: invalid sion_filedesc %d", sid);
232  }
233  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "enter spec[0]=%d spec[1]=%d sid=%d\n", (int)spec[0], (int)spec[1], sid));
234 
235  /* move file pointer */
236  int64_t destpos = spec[0], bytestowrite = spec[1];
237  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "currentpos=%d destpos=%d\n", (int)sion_filedesc->currentpos, (int)destpos));
238  if (sion_filedesc->currentpos != destpos) {
239  _sion_file_set_position(sion_filedesc->fileptr, destpos);
240  sion_filedesc->currentpos = destpos;
241  }
242 
243  /* write data */
244  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector start to write data of size %lld at position %lld\n",
245  (long long)bytestowrite, (long long)destpos));
246  int64_t bwrote = _sion_file_write(data, bytestowrite, sion_filedesc->fileptr);
247  if (bwrote != bytestowrite) {
248  return _sion_errorprint(bwrote, _SION_ERROR_RETURN, "_sion_generic_collective_process_write: problems writing data ...\n");
249  }
250 
251  /* update internal data */
252  sion_filedesc->currentpos += bytestowrite;
253  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector wrote data block (bwrote=%d) of size %ld new pos %lld, %lld\n", bwrote,
254  (long)bytestowrite, (long long)sion_filedesc->currentpos, (long long)_sion_file_get_position(sion_filedesc->fileptr)));
255 
256  return bwrote;
257 }
258 #undef DFUNCTION
259 
260 bool _sion_generic_collective_next_write_spec(void *iterator, int64_t *spec)
261 {
262  _sion_range range;
263  if (_sion_partitioned_range_iterator_next((_sion_partitioned_range_iterator *)iterator, &range)) {
264  _sion_range_into_spec(range, spec);
265  return true;
266  } else {
267  return false;
268  }
269 }
270 
271 #define DFUNCTION "_sion_generic_collective_process_read"
272 int64_t _sion_generic_collective_process_read(void *data, int64_t *spec, int sid)
273 {
274  _sion_filedesc *sion_filedesc;
275 
276  if ((sid < 0) || (_sion_vcdtype(sid) != SION_FILEDESCRIPTOR) || !(sion_filedesc = _sion_vcdtovcon(sid))) {
277  return _sion_errorprint(
278  SION_SIZE_NOT_VALID, _SION_ERROR_RETURN, "_sion_generic_collective_process_read: invalid sion_filedesc %d", sid);
279  }
280  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "enter spec[0]=%" PRId64 " spec[1]=%" PRId64 " sid=%d\n", spec[0], spec[1], sid));
281 
282  /* move file pointer */
283  int64_t destpos = spec[0], bytestoread = spec[1];
284  if (sion_filedesc->currentpos != destpos) {
285  if (sion_filedesc->fileptr != NULL) {
286  _sion_file_set_position(sion_filedesc->fileptr, destpos);
287  }
288  sion_filedesc->currentpos = destpos;
289  }
290 
291  /* get and read data */
292  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector start to read data of size %lld at position %lld\n",
293  (long long)bytestoread, (long long)destpos));
294  int64_t bread = _sion_file_read(data, bytestoread, sion_filedesc->fileptr);
295  if (bread != bytestoread) {
296  return _sion_errorprint(bread, _SION_ERROR_RETURN, "_sion_generic_collective_process_read: problems reading data ...\n");
297  }
298 
299  {
300  ONLY_DEBUG(char *p = data;)
301  DPRINTFP(
302  (128, DFUNCTION, _SION_DEFAULT_RANK, "data[0]=%c data[%d]=%c\n", (char)p[0], (int)bytestoread, (char)p[bytestoread - 1]));
303  }
304 
305  /* update internal data */
306  sion_filedesc->currentpos += bytestoread;
307  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector read data block (bread=%d) of size %ld new pos %lld, %lld\n", bread,
308  (long)bytestoread, (long long)sion_filedesc->currentpos, (long long)_sion_file_get_position(sion_filedesc->fileptr)));
309 
310  return bread;
311 }
312 #undef DFUNCTION
313 
314 bool _sion_generic_collective_next_read_spec(void *iterator, int64_t *spec)
315 {
316  _sion_range range;
317  if (_sion_partitioned_limited_range_iterator_next((_sion_partitioned_limited_range_iterator *)iterator, &range)) {
318  _sion_range_into_spec(range, spec);
319  return true;
320  } else {
321  return false;
322  }
323 }
324 
325 #define DFUNCTION "_sion_coll_check_env"
326 void _sion_coll_check_env(_sion_filedesc *sion_filedesc)
327 {
328  const char *cd = _sion_getenv("SION_COLLDEBUG");
329  if (cd) {
330  sion_filedesc->colldebug = atoi(cd);
331  }
332 
333  const char *cs = _sion_getenv("SION_COLLSIZE");
334  const char *cn = _sion_getenv("SION_COLLNUM");
335  if (cs) {
336  sion_filedesc->collsize = atoi(cs);
337  if (sion_filedesc->collsize > sion_filedesc->ntasks) {
338  sion_filedesc->collsize = sion_filedesc->ntasks;
339  }
340  if (sion_filedesc->colldebug >= 1) {
341  fprintf(stderr, "collective statistics: SION_COLLSIZE=%11d\n", sion_filedesc->collsize);
342  }
343  } else if (cn) {
344  int numcoll = atoi(cn);
345  if (numcoll > 0) {
346  if (numcoll > sion_filedesc->ntasks) {
347  numcoll = sion_filedesc->ntasks;
348  }
349  sion_filedesc->collsize = sion_filedesc->ntasks / numcoll;
350  if (sion_filedesc->ntasks % numcoll > 0) {
351  sion_filedesc->collsize++;
352  }
353 
354  if (sion_filedesc->colldebug >= 1) {
355  fprintf(stderr, "collective statistics: SION_COLLNUM=%11d\n", numcoll);
356  fprintf(stderr, "collective statistics: collsize=%11d\n", sion_filedesc->collsize);
357  }
358  }
359  }
360 
361  /* enable collective operation? */
362  if ((cs) || (cn)) {
363  if (sion_filedesc->collsize > 0) {
364  sion_filedesc->usecoll = 1;
365  }
366  if (sion_filedesc->collsize < 0) {
367  sion_filedesc->usecoll = 1;
368  }
369  if (sion_filedesc->collsize == 0) {
370  sion_filedesc->usecoll = 0;
371  }
372  }
373 
374  DPRINTFP((2, DFUNCTION, _SION_DEFAULT_RANK, "usecoll=%d collsize=%d (%d tasks, %d files) colldebug=%d\n",
375  sion_filedesc->usecoll, sion_filedesc->collsize, sion_filedesc->ntasks, sion_filedesc->nfiles, sion_filedesc->colldebug));
376 }
377 #undef DFUNCTION
size_t sion_coll_write(const void *data, size_t size, size_t nitems, int sid)
Write data to a SIONlib file using collective I/O.
size_t sion_coll_fread(void *data, size_t size, size_t nitems, int sid)
Read data from SIONlib file using collective I/O.
size_t sion_read(void *data, size_t size, size_t nitems, int sid)
Read data from SIONlib file.
Definition: sion_common.c:353
size_t sion_write(const void *data, size_t size, size_t nitems, int sid)
Write data to a SIONlib file.
Definition: sion_common.c:292
size_t sion_coll_read(void *data, size_t size, size_t nitems, int sid)
Read data from SIONlib file using collective I/O.
size_t sion_coll_fwrite(const void *data, size_t size, size_t nitems, int sid)
Write data to a SIONlib file using collective I/O.