SIONlib  1.7.2
Scalable I/O library for parallel access to task-local files
sion_generic_collective.c
Go to the documentation of this file.
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2018 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
14 #define _XOPEN_SOURCE 700
15 
16 #include <stdlib.h>
17 #include <stdio.h>
18 #include <stdarg.h>
19 #include <string.h>
20 #include <time.h>
21 
22 #include <sys/time.h>
23 
24 #include <sys/types.h>
25 #include <fcntl.h>
26 
27 #include <unistd.h>
28 
29 #include "sion.h"
30 #include "sion_debug.h"
31 #include "sion_error_handler.h"
32 #include "sion_internal.h"
33 #include "sion_fd.h"
34 #include "sion_filedesc.h"
35 #include "sion_printts.h"
36 #include "sion_generic_apidesc.h"
37 #include "sion_generic_buddy.h"
39 
40 /* collective I/O */
41 #define DFUNCTION "sion_coll_fwrite"
42 size_t sion_coll_fwrite(const void *data, size_t size, size_t nitems, int sid) {
43  _sion_filedesc *sion_filedesc;
44  _sion_generic_gendata *sion_gendata;
45  _sion_generic_apidesc *sion_apidesc;
46  sion_int64 bwrote=0, spec[2], ownnewposition, items_wrote;
47  int rc_own=SION_STD_SUCCESS,rc_cb=SION_STD_SUCCESS,rc_buddy=SION_STD_SUCCESS;
48  int collector, firstsender, lastsender;
49 
50  if ( (sid<0) || (_sion_vcdtype(sid) != SION_FILEDESCRIPTOR) || !(sion_filedesc = _sion_vcdtovcon(sid))) {
51  return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"sion_coll_fwrite: invalid sion_filedesc %d", sid));
52  }
53  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "enter usecoll=%d collector=%d collsize=%d (%d tasks, %d files)\n",
54  sion_filedesc->usecoll, sion_filedesc->collector, sion_filedesc->collsize, sion_filedesc->ntasks,sion_filedesc->nfiles));
55 
56  sion_gendata=sion_filedesc->dataptr;
57  sion_apidesc=sion_gendata->apidesc;
58 
59  /* no collective mode */
60  if(!sion_filedesc->usecoll) {
61  return(sion_fwrite(data,size,nitems,sid));
62  }
63 
64  /* branch to merge mode if enabled */
65  if(sion_filedesc->collmergemode) {
66  return(_sion_coll_fwrite_merge(data,size,nitems,sid));
67  }
68 
69 
70  /* needed for avoiding subsequent non-collective calls */
71  sion_filedesc->collcmdused=1;
72 
73  /* check collsize */
74  if (sion_filedesc->collsize<=0) {
75  return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"sion_coll_fwrite: collsize=%d <= 0, returning ...\n", (int) sion_filedesc->collsize));
76  }
77 
78  /* parameter of callback function */
79  collector=(int) sion_filedesc->collector;
80  firstsender=collector+1;
81  lastsender=collector+sion_filedesc->collsize-1;
82  if(lastsender>sion_filedesc->ntasks) lastsender=sion_filedesc->ntasks-1;
83  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector=%d collsize=%d firstsender=%d lastsender=%d\n",
84  collector, sion_filedesc->collsize, firstsender, lastsender));
85 
86  /* ensure free space for this block */
87  if(sion_ensure_free_space(sid,size*nitems) != SION_SUCCESS) {
88  _sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"could not ensure free space for this block, returning %d ...\n", sid);
89  spec[0]=spec[1]=-1; /* signaling error */
90  } else {
91  spec[0]=sion_filedesc->currentpos;
92  spec[1]=size*nitems;
93  }
94 
95  /* write own part */
96  if(sion_filedesc->rank == sion_filedesc->collector) {
97  rc_own=_sion_generic_collective_process_write(data,spec,sid);
98  }
99  ownnewposition=sion_filedesc->currentpos;
100 
101  /* collect and write parts of sender tasks via callback function */
102  if(!sion_apidesc->gather_execute_cb ) {
103  return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"sion_coll_fwrite: API %s not correctly initalized, collective I/O calls missing, aborting",sion_apidesc->name));
104  }
105  rc_cb=sion_apidesc->gather_execute_cb(data,spec,2, sion_filedesc->fsblksize,
106  sion_gendata->comm_data_local,collector,firstsender,lastsender,sid,
107  _sion_generic_collective_process_write);
108 
109  /* set own position to end of own block written in this call */
110  if(sion_filedesc->rank == sion_filedesc->collector) {
111  _sion_file_flush(sion_filedesc->fileptr);
112  _sion_file_set_position(sion_filedesc->fileptr,ownnewposition);sion_filedesc->currentpos=ownnewposition;
113  }
114 
115  /* set file pointer in data structure and in file if it is exported and can be used without control of SIONlib */
116  if(sion_filedesc->rank != sion_filedesc->collector) {
117  sion_filedesc->currentpos+=size*nitems;
118  if(sion_filedesc->fileptr_exported) {
119  _sion_file_set_position(sion_filedesc->fileptr,sion_filedesc->currentpos);
120  }
121  }
122 
123  /* switch to buddy checkpointing, if enabled */
124  if(sion_filedesc->usebuddy ) {
125  rc_buddy=_sion_coll_fwrite_buddy(data, size, nitems, sid, sion_gendata);
126  }
127 
128  /* return code */
129  if( (rc_own == SION_STD_SUCCESS) && (rc_cb == SION_STD_SUCCESS)&& (rc_buddy == SION_STD_SUCCESS) ) {
130  bwrote=size*nitems;
131  } else {
132  bwrote=0;
133  }
134 
135  items_wrote = size ? bwrote / size : 0;
136 
137  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "leave usecoll=%d collector=%d collsize=%d (%d tasks, %d files) rc=%d\n",
138  sion_filedesc->usecoll, sion_filedesc->collector, sion_filedesc->collsize, sion_filedesc->ntasks,sion_filedesc->nfiles,items_wrote));
139 
140  return items_wrote;
141 }
142 #undef DFUNCTION
143 
144 #define DFUNCTION "sion_coll_fread"
145 size_t sion_coll_fread( void *data, size_t size, size_t nitems, int sid) {
146  _sion_filedesc *sion_filedesc;
147  _sion_generic_gendata *sion_gendata;
148  _sion_generic_apidesc *sion_apidesc;
149  sion_int64 bread=-1, spec[2], ownnewposition, items_read;
150  int rc_own=SION_STD_SUCCESS,rc_cb=SION_STD_SUCCESS;
151  int collector, firstsender, lastsender;
152 
153  if ( (sid<0) || (_sion_vcdtype(sid) != SION_FILEDESCRIPTOR) || !(sion_filedesc = _sion_vcdtovcon(sid))) {
154  return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"sion_coll_fread: invalid sion_filedesc %d", sid));
155  }
156  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "enter usecoll=%d collector=%d collsize=%d (%d tasks, %d files)\n",
157  sion_filedesc->usecoll, sion_filedesc->collector, sion_filedesc->collsize, sion_filedesc->ntasks,sion_filedesc->nfiles));
158 
159  sion_gendata=sion_filedesc->dataptr;
160  sion_apidesc=sion_gendata->apidesc;
161 
162  /* no collective mode */
163  if(!sion_filedesc->usecoll) {
164  return(sion_fread(data,size,nitems,sid));
165  }
166 
167  /* switch to buddy checkpointing, if enabled */
168  if(sion_filedesc->usebuddy ) {
169  return( _sion_coll_fread_buddy(data, size, nitems, sid));
170  }
171 
172  /* needed for avoiding subsequent non-collective calls */
173  sion_filedesc->collcmdused=1;
174 
175  /* check collsize */
176  if (sion_filedesc->collsize<=0) {
177  return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"sion_coll_fread: collsize=%d <= 0, returning ...\n",
178  (int) sion_filedesc->collsize));
179  }
180 
181  /* parameter of callback function */
182  collector=(int) sion_filedesc->collector;
183  firstsender=collector+1;
184  lastsender=sion_filedesc->rank+sion_filedesc->collsize-1;
185  if(lastsender>sion_filedesc->ntasks) lastsender=sion_filedesc->ntasks-1;
186 
187  /* ensure to be at the beginning of the right block */
188  if(size*nitems>0) {
189  if(sion_feof(sid)) {
190  _sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"early eof found for this block, returning %d ...\n", sid);
191  spec[0]=spec[1]=-1; /* signaling that no data is requested */
192  } else {
193  /* specification of location in file */
194  spec[0]=sion_filedesc->currentpos;
195  spec[1]=size*nitems;
196  }
197  } else {
198  /* signaling that no data is requested */
199  spec[0]=spec[1]=-1;
200  }
201 
202  /* read own part */
203  if(sion_filedesc->rank == sion_filedesc->collector) {
204  rc_own=_sion_generic_collective_process_read(data,spec,sid);
205  }
206  ownnewposition=sion_filedesc->currentpos;
207 
208 
209  /* read parts and scatter these to sender tasks via callback function */
210  if(!sion_apidesc->execute_scatter_cb ) {
211  return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,
212  "sion_coll_fread: API %s not correctly initalized, collective I/O calls missing, aborting",sion_apidesc->name));
213  }
214  rc_cb=sion_apidesc->execute_scatter_cb(data,spec,2, sion_filedesc->fsblksize,
215  sion_gendata->comm_data_local,collector,firstsender,lastsender,sid,
216  _sion_generic_collective_process_read);
217 
218  /* set own position to end of own block read in this call */
219  if(sion_filedesc->rank == sion_filedesc->collector) {
220  _sion_file_flush(sion_filedesc->fileptr);
221  _sion_file_set_position(sion_filedesc->fileptr,ownnewposition);sion_filedesc->currentpos=ownnewposition;
222  }
223 
224  /* set file pointer in data structure and in file if it is exported and can be used without control of SIONlib */
225  if(sion_filedesc->rank != sion_filedesc->collector) {
226  sion_filedesc->currentpos+=size*nitems;
227  if(sion_filedesc->fileptr_exported) {
228  _sion_file_set_position(sion_filedesc->fileptr,sion_filedesc->currentpos);
229  }
230  }
231 
232  if( (rc_own == SION_STD_SUCCESS) && (rc_cb == SION_STD_SUCCESS) ) {
233  bread=size*nitems;
234  } else {
235  bread=0;
236  }
237 
238  items_read = size ? bread / size : 0;
239 
240  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "leave usecoll=%d collector=%d collsize=%d (%d tasks, %d files) rc=%d\n",
241  sion_filedesc->usecoll, sion_filedesc->collector, sion_filedesc->collsize, sion_filedesc->ntasks,sion_filedesc->nfiles,items_read));
242 
243  return items_read;
244 
245 }
246 #undef DFUNCTION
247 
248 
249 #define DFUNCTION "_sion_generic_collective_process_write"
250 int _sion_generic_collective_process_write( const void *data, sion_int64 *spec, int sid ) {
251  _sion_filedesc *sion_filedesc;
252  int rc=SION_STD_SUCCESS;
253  sion_int64 bwrote=0, destpos, bytestowrite;
254  if ( (sid<0) || (_sion_vcdtype(sid) != SION_FILEDESCRIPTOR) || !(sion_filedesc = _sion_vcdtovcon(sid))) {
255  return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"_sion_generic_collective_process_write: invalid sion_filedesc %d", sid));
256  }
257  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "enter spec[0]=%d spec[1]=%d sid=%d\n", (int) spec[0],(int) spec[1],sid));
258 
259  /* move file pointer */
260  destpos=spec[0];
261  bytestowrite=spec[1];
262  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "currentpos=%d destpos=%d\n", (int) sion_filedesc->currentpos,(int) destpos));
263  if(sion_filedesc->currentpos!=destpos) {
264  _sion_file_flush(sion_filedesc->fileptr);
265  _sion_file_set_position(sion_filedesc->fileptr,destpos);sion_filedesc->currentpos=destpos;
266  }
267 
268  /* get and write data */
269  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector start to write data of size %lld at position %lld\n",
270  (long long) bytestowrite, (long long) destpos));
271  bwrote = _sion_file_write(data, bytestowrite, sion_filedesc->fileptr);
272  if(bwrote != bytestowrite) {
273  return(_sion_errorprint(SION_STD_NOT_SUCCESS,_SION_ERROR_RETURN,"_sion_generic_collective_process_write: problems writing data ...\n"));
274  }
275 
276  /* update internal data */
277  sion_filedesc->currentpos+=bytestowrite;
278  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector wrote data block (bwrote=%d) of size %ld new pos %lld, %lld\n",
279  bwrote, (long) bytestowrite, (long long) sion_filedesc->currentpos, (long long) _sion_file_get_position(sion_filedesc->fileptr)));
280 
281 
282  return(rc);
283 
284 }
285 #undef DFUNCTION
286 
287 #define DFUNCTION "_sion_generic_collective_process_read"
288 int _sion_generic_collective_process_read( void *data, sion_int64 *spec, int sid ) {
289  _sion_filedesc *sion_filedesc;
290  int rc=SION_STD_SUCCESS;
291  sion_int64 bread=0, destpos, bytestoread;
292 
293  if ( (sid<0) || (_sion_vcdtype(sid) != SION_FILEDESCRIPTOR) || !(sion_filedesc = _sion_vcdtovcon(sid))) {
294  return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"_sion_generic_collective_process_read: invalid sion_filedesc %d", sid));
295  }
296  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "enter spec[0]=%d spec[1]=%d sid=%d\n", (int) spec[0],(int) spec[1],sid));
297 
298  /* move file pointer */
299  destpos=spec[0];
300  bytestoread=spec[1];
301  if(sion_filedesc->currentpos!=destpos) {
302  if(sion_filedesc->fileptr!=NULL) {
303  _sion_file_flush(sion_filedesc->fileptr);
304  _sion_file_set_position(sion_filedesc->fileptr,destpos);
305  }
306  sion_filedesc->currentpos=destpos;
307  }
308 
309  /* get and read data */
310  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector start to read data of size %lld at position %lld\n",
311  (long long) bytestoread, (long long) destpos));
312  bread = _sion_file_read(data, bytestoread, sion_filedesc->fileptr);
313  if(bread != bytestoread) {
314  return(_sion_errorprint(SION_STD_NOT_SUCCESS,_SION_ERROR_RETURN,"_sion_generic_collective_process_read: problems reading data ...\n"));
315  }
316 
317  {
318  ONLY_DEBUG(char *p=data;)
319  DPRINTFP((128, DFUNCTION, _SION_DEFAULT_RANK, "data[0]=%c data[%d]=%c\n",(char) p[0], (int) bytestoread, (char) p[bytestoread-1]));
320  }
321 
322  /* update internal data */
323  sion_filedesc->currentpos+=bytestoread;
324  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector read data block (bread=%d) of size %ld new pos %lld, %lld\n",
325  bread, (long) bytestoread, (long long) sion_filedesc->currentpos, (long long) _sion_file_get_position(sion_filedesc->fileptr)));
326 
327 
328  return(rc);
329 
330 }
331 #undef DFUNCTION
332 
333 
343 #define DFUNCTION "_sion_coll_check_env"
344 int _sion_coll_check_env(_sion_filedesc *sion_filedesc) {
345  const char *cs;
346  const char *cn;
347  const char *cd;
348  int rc = SION_SUCCESS, numcoll;
349 
350 
351  cd = _sion_getenv("SION_COLLDEBUG");
352  if(cd) {
353  sion_filedesc->colldebug=atoi(cd);
354  }
355 
356  cs = _sion_getenv("SION_COLLSIZE");
357  cn = _sion_getenv("SION_COLLNUM");
358  if(cs) {
359  sion_filedesc->collsize=atoi(cs);
360  if(sion_filedesc->collsize>sion_filedesc->ntasks) sion_filedesc->collsize=sion_filedesc->ntasks;
361  if(sion_filedesc->colldebug>=1) {
362  fprintf(stderr, "collective statistics: SION_COLLSIZE=%11d\n",sion_filedesc->collsize);
363  }
364  } else if(cn) {
365  numcoll=atoi(cn);
366  if(numcoll>0) {
367  if(numcoll>sion_filedesc->ntasks) numcoll=sion_filedesc->ntasks;
368  sion_filedesc->collsize=sion_filedesc->ntasks/numcoll;
369  if(sion_filedesc->ntasks%numcoll>0) sion_filedesc->collsize++;
370 
371  if(sion_filedesc->colldebug>=1) {
372  fprintf(stderr, "collective statistics: SION_COLLNUM=%11d\n",numcoll);
373  fprintf(stderr, "collective statistics: collsize=%11d\n",sion_filedesc->collsize);
374  }
375  }
376  }
377 
378  /* enable collective operation? */
379  if((cs) || (cn)) {
380  if(sion_filedesc->collsize>0) sion_filedesc->usecoll=1;
381  if(sion_filedesc->collsize<0) sion_filedesc->usecoll=1;
382  if(sion_filedesc->collsize==0) sion_filedesc->usecoll=0;
383  }
384 
385 
386  DPRINTFP((2, DFUNCTION, _SION_DEFAULT_RANK, "usecoll=%d collsize=%d (%d tasks, %d files) colldebug=%d\n", sion_filedesc->usecoll, sion_filedesc->collsize,sion_filedesc->ntasks,sion_filedesc->nfiles,sion_filedesc->colldebug));
387 
388  return (rc);
389 }
390 #undef DFUNCTION
int sion_feof(int sid)
Function that indicates whether the end of file is reached for this task.
Definition: sion_common.c:764
sion_int64 _sion_file_write(const void *data, sion_int64 bytes, _sion_fileptr *sion_fileptr)
Write data to file.
Definition: sion_file.c:141
sion_int64 _sion_file_get_position(_sion_fileptr *sion_fileptr)
Get new position in file.
Definition: sion_file.c:241
sion_int64 _sion_file_set_position(_sion_fileptr *sion_fileptr, sion_int64 startpointer)
Set new position in file.
Definition: sion_file.c:219
Sion File Descriptor Structure.
Definition: sion_filedesc.h:79
int sion_ensure_free_space(int sid, sion_int64 bytes)
Funtion to ensure that enough space is available for writing.
Definition: sion_common.c:1008
sion_int64 _sion_file_read(void *data, sion_int64 bytes, _sion_fileptr *sion_fileptr)
Read data from file.
Definition: sion_file.c:166
int _sion_vcdtype(int sid)
Definition: sion_fd.c:58
sion_int32 fileptr_exported
char * _sion_getenv(const char *name)
size_t sion_fwrite(const void *data, size_t size, size_t nitems, int sid)
Write data to sion file.
Definition: sion_common.c:466
void * _sion_vcdtovcon(int sid)
Definition: sion_fd.c:53
#define SION_FILEDESCRIPTOR
Definition: sion_fd.h:17
#define DFUNCTION
checks if environment variables are set to use collective I/O SION_COLLSIZE = -1 -> number of collect...
size_t sion_fread(void *data, size_t size, size_t nitems, int sid)
Read data from sion file.
Definition: sion_common.c:585
Sion Time Stamp Header.
int _sion_file_flush(_sion_fileptr *sion_fileptr)
Flush data to file.
Definition: sion_file.c:263
_sion_fileptr * fileptr
Definition: sion_filedesc.h:82