SIONlib  1.7.0
Scalable I/O library for parallel access to task-local files
sion_generic_collective.c
Go to the documentation of this file.
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2016 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
14 #include <stdlib.h>
15 #include <stdio.h>
16 #include <stdarg.h>
17 #include <string.h>
18 #include <time.h>
19 
20 #include <sys/time.h>
21 
22 #include <sys/types.h>
23 #include <fcntl.h>
24 
25 #include <unistd.h>
26 
27 #include "sion.h"
28 #include "sion_debug.h"
29 #include "sion_error_handler.h"
30 #include "sion_internal.h"
31 #include "sion_fd.h"
32 #include "sion_filedesc.h"
33 #include "sion_printts.h"
34 #include "sion_generic_apidesc.h"
35 #include "sion_generic_buddy.h"
37 
38 /* collective I/O */
39 #define DFUNCTION "sion_coll_fwrite"
40 size_t sion_coll_fwrite(const void *data, size_t size, size_t nitems, int sid) {
41  _sion_filedesc *sion_filedesc;
42  _sion_generic_gendata *sion_gendata;
43  _sion_generic_apidesc *sion_apidesc;
44  sion_int64 bwrote=0, spec[2], ownnewposition, items_wrote;
45  int rc_own=SION_STD_SUCCESS,rc_cb=SION_STD_SUCCESS,rc_buddy=SION_STD_SUCCESS;
46  int collector, firstsender, lastsender;
47 
48  if ( (sid<0) || (_sion_vcdtype(sid) != SION_FILEDESCRIPTOR) || !(sion_filedesc = _sion_vcdtovcon(sid))) {
49  return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"sion_coll_fwrite: invalid sion_filedesc %d", sid));
50  }
51  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "enter usecoll=%d collector=%d collsize=%d (%d tasks, %d files)\n",
52  sion_filedesc->usecoll, sion_filedesc->collector, sion_filedesc->collsize, sion_filedesc->ntasks,sion_filedesc->nfiles));
53 
54  sion_gendata=sion_filedesc->dataptr;
55  sion_apidesc=sion_gendata->apidesc;
56 
57  /* no collective mode */
58  if(!sion_filedesc->usecoll) {
59  return(sion_fwrite(data,size,nitems,sid));
60  }
61 
62  /* branch to merge mode if enabled */
63  if(sion_filedesc->collmergemode) {
64  return(_sion_coll_fwrite_merge(data,size,nitems,sid));
65  }
66 
67 
68  /* needed for avoiding subsequent non-collective calls */
69  sion_filedesc->collcmdused=1;
70 
71  /* check collsize */
72  if (sion_filedesc->collsize<=0) {
73  return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"sion_coll_fwrite: collsize=%d <= 0, returning ...\n", (int) sion_filedesc->collsize));
74  }
75 
76  /* parameter of callback function */
77  collector=(int) sion_filedesc->collector;
78  firstsender=collector+1;
79  lastsender=collector+sion_filedesc->collsize-1;
80  if(lastsender>sion_filedesc->ntasks) lastsender=sion_filedesc->ntasks-1;
81  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector=%d collsize=%d firstsender=%d lastsender=%d\n",
82  collector, sion_filedesc->collsize, firstsender, lastsender));
83 
84  /* ensure free space for this block */
85  if(sion_ensure_free_space(sid,size*nitems) != SION_SUCCESS) {
86  _sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"could not ensure free space for this block, returning %d ...\n", sid);
87  spec[0]=spec[1]=-1; /* signaling error */
88  } else {
89  spec[0]=sion_filedesc->currentpos;
90  spec[1]=size*nitems;
91  }
92 
93  /* write own part */
94  if(sion_filedesc->rank == sion_filedesc->collector) {
95  rc_own=_sion_generic_collective_process_write(data,spec,sid);
96  }
97  ownnewposition=sion_filedesc->currentpos;
98 
99  /* collect and write parts of sender tasks via callback function */
100  if(!sion_apidesc->gather_execute_cb ) {
101  return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"sion_coll_fwrite: API %s not correctly initalized, collective I/O calls missing, aborting",sion_apidesc->name));
102  }
103  rc_cb=sion_apidesc->gather_execute_cb(data,spec,2, sion_filedesc->fsblksize,
104  sion_gendata->comm_data_local,collector,firstsender,lastsender,sid,
105  _sion_generic_collective_process_write);
106 
107  /* set own position to end of own block written in this call */
108  if(sion_filedesc->rank == sion_filedesc->collector) {
109  _sion_file_flush(sion_filedesc->fileptr);
110  _sion_file_set_position(sion_filedesc->fileptr,ownnewposition);sion_filedesc->currentpos=ownnewposition;
111  }
112 
113  /* set file pointer in data structure and in file if it is exported and can be used without control of SIONlib */
114  if(sion_filedesc->rank != sion_filedesc->collector) {
115  sion_filedesc->currentpos+=size*nitems;
116  if(sion_filedesc->fileptr_exported) {
117  _sion_file_set_position(sion_filedesc->fileptr,sion_filedesc->currentpos);
118  }
119  }
120 
121  /* switch to buddy checkpointing, if enabled */
122  if(sion_filedesc->usebuddy ) {
123  rc_buddy=_sion_coll_fwrite_buddy(data, size, nitems, sid, sion_gendata);
124  }
125 
126  /* return code */
127  if( (rc_own == SION_STD_SUCCESS) && (rc_cb == SION_STD_SUCCESS)&& (rc_buddy == SION_STD_SUCCESS) ) {
128  bwrote=size*nitems;
129  } else {
130  bwrote=0;
131  }
132 
133  items_wrote = size ? bwrote / size : 0;
134 
135  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "leave usecoll=%d collector=%d collsize=%d (%d tasks, %d files) rc=%d\n",
136  sion_filedesc->usecoll, sion_filedesc->collector, sion_filedesc->collsize, sion_filedesc->ntasks,sion_filedesc->nfiles,items_wrote));
137 
138  return items_wrote;
139 }
140 #undef DFUNCTION
141 
142 #define DFUNCTION "sion_coll_fread"
143 size_t sion_coll_fread( void *data, size_t size, size_t nitems, int sid) {
144  _sion_filedesc *sion_filedesc;
145  _sion_generic_gendata *sion_gendata;
146  _sion_generic_apidesc *sion_apidesc;
147  sion_int64 bread=-1, spec[2], ownnewposition, items_read;
148  int rc_own=SION_STD_SUCCESS,rc_cb=SION_STD_SUCCESS;
149  int collector, firstsender, lastsender;
150 
151  if ( (sid<0) || (_sion_vcdtype(sid) != SION_FILEDESCRIPTOR) || !(sion_filedesc = _sion_vcdtovcon(sid))) {
152  return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"sion_coll_fread: invalid sion_filedesc %d", sid));
153  }
154  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "enter usecoll=%d collector=%d collsize=%d (%d tasks, %d files)\n",
155  sion_filedesc->usecoll, sion_filedesc->collector, sion_filedesc->collsize, sion_filedesc->ntasks,sion_filedesc->nfiles));
156 
157  sion_gendata=sion_filedesc->dataptr;
158  sion_apidesc=sion_gendata->apidesc;
159 
160  /* no collective mode */
161  if(!sion_filedesc->usecoll) {
162  return(sion_fread(data,size,nitems,sid));
163  }
164 
165  /* switch to buddy checkpointing, if enabled */
166  if(sion_filedesc->usebuddy ) {
167  return( _sion_coll_fread_buddy(data, size, nitems, sid));
168  }
169 
170  /* needed for avoiding subsequent non-collective calls */
171  sion_filedesc->collcmdused=1;
172 
173  /* check collsize */
174  if (sion_filedesc->collsize<=0) {
175  return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"sion_coll_fread: collsize=%d <= 0, returning ...\n",
176  (int) sion_filedesc->collsize));
177  }
178 
179  /* parameter of callback function */
180  collector=(int) sion_filedesc->collector;
181  firstsender=collector+1;
182  lastsender=sion_filedesc->rank+sion_filedesc->collsize-1;
183  if(lastsender>sion_filedesc->ntasks) lastsender=sion_filedesc->ntasks-1;
184 
185  /* ensure to be at the beginning of the right block */
186  if(size*nitems>0) {
187  if(sion_feof(sid)) {
188  _sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"early eof found for this block, returning %d ...\n", sid);
189  spec[0]=spec[1]=-1; /* signaling that no data is requested */
190  } else {
191  /* specification of location in file */
192  spec[0]=sion_filedesc->currentpos;
193  spec[1]=size*nitems;
194  }
195  } else {
196  /* signaling that no data is requested */
197  spec[0]=spec[1]=-1;
198  }
199 
200  /* read own part */
201  if(sion_filedesc->rank == sion_filedesc->collector) {
202  rc_own=_sion_generic_collective_process_read(data,spec,sid);
203  }
204  ownnewposition=sion_filedesc->currentpos;
205 
206 
207  /* read parts and scatter these to sender tasks via callback function */
208  if(!sion_apidesc->execute_scatter_cb ) {
209  return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,
210  "sion_coll_fread: API %s not correctly initalized, collective I/O calls missing, aborting",sion_apidesc->name));
211  }
212  rc_cb=sion_apidesc->execute_scatter_cb(data,spec,2, sion_filedesc->fsblksize,
213  sion_gendata->comm_data_local,collector,firstsender,lastsender,sid,
214  _sion_generic_collective_process_read);
215 
216  /* set own position to end of own block read in this call */
217  if(sion_filedesc->rank == sion_filedesc->collector) {
218  _sion_file_flush(sion_filedesc->fileptr);
219  _sion_file_set_position(sion_filedesc->fileptr,ownnewposition);sion_filedesc->currentpos=ownnewposition;
220  }
221 
222  /* set file pointer in data structure and in file if it is exported and can be used without control of SIONlib */
223  if(sion_filedesc->rank != sion_filedesc->collector) {
224  sion_filedesc->currentpos+=size*nitems;
225  if(sion_filedesc->fileptr_exported) {
226  _sion_file_set_position(sion_filedesc->fileptr,sion_filedesc->currentpos);
227  }
228  }
229 
230  if( (rc_own == SION_STD_SUCCESS) && (rc_cb == SION_STD_SUCCESS) ) {
231  bread=size*nitems;
232  } else {
233  bread=0;
234  }
235 
236  items_read = size ? bread / size : 0;
237 
238  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "leave usecoll=%d collector=%d collsize=%d (%d tasks, %d files) rc=%d\n",
239  sion_filedesc->usecoll, sion_filedesc->collector, sion_filedesc->collsize, sion_filedesc->ntasks,sion_filedesc->nfiles,items_read));
240 
241  return items_read;
242 
243 }
244 #undef DFUNCTION
245 
246 
247 #define DFUNCTION "_sion_generic_collective_process_write"
248 int _sion_generic_collective_process_write( const void *data, sion_int64 *spec, int sid ) {
249  _sion_filedesc *sion_filedesc;
250  int rc=SION_STD_SUCCESS;
251  sion_int64 bwrote=0, destpos, bytestowrite;
252  if ( (sid<0) || (_sion_vcdtype(sid) != SION_FILEDESCRIPTOR) || !(sion_filedesc = _sion_vcdtovcon(sid))) {
253  return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"_sion_generic_collective_process_write: invalid sion_filedesc %d", sid));
254  }
255  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "enter spec[0]=%d spec[1]=%d sid=%d\n", (int) spec[0],(int) spec[1],sid));
256 
257  /* move file pointer */
258  destpos=spec[0];
259  bytestowrite=spec[1];
260  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "currentpos=%d destpos=%d\n", (int) sion_filedesc->currentpos,(int) destpos));
261  if(sion_filedesc->currentpos!=destpos) {
262  _sion_file_flush(sion_filedesc->fileptr);
263  _sion_file_set_position(sion_filedesc->fileptr,destpos);sion_filedesc->currentpos=destpos;
264  }
265 
266  /* get and write data */
267  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector start to write data of size %lld at position %lld\n",
268  (long long) bytestowrite, (long long) destpos));
269  bwrote = _sion_file_write(data, bytestowrite, sion_filedesc->fileptr);
270  if(bwrote != bytestowrite) {
271  return(_sion_errorprint(SION_STD_NOT_SUCCESS,_SION_ERROR_RETURN,"_sion_generic_collective_process_write: problems writing data ...\n"));
272  }
273 
274  /* update internal data */
275  sion_filedesc->currentpos+=bytestowrite;
276  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector wrote data block (bwrote=%d) of size %ld new pos %lld, %lld\n",
277  bwrote, (long) bytestowrite, (long long) sion_filedesc->currentpos, (long long) _sion_file_get_position(sion_filedesc->fileptr)));
278 
279 
280  return(rc);
281 
282 }
283 #undef DFUNCTION
284 
285 #define DFUNCTION "_sion_generic_collective_process_read"
286 int _sion_generic_collective_process_read( void *data, sion_int64 *spec, int sid ) {
287  _sion_filedesc *sion_filedesc;
288  int rc=SION_STD_SUCCESS;
289  sion_int64 bread=0, destpos, bytestoread;
290 
291  if ( (sid<0) || (_sion_vcdtype(sid) != SION_FILEDESCRIPTOR) || !(sion_filedesc = _sion_vcdtovcon(sid))) {
292  return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"_sion_generic_collective_process_read: invalid sion_filedesc %d", sid));
293  }
294  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "enter spec[0]=%d spec[1]=%d sid=%d\n", (int) spec[0],(int) spec[1],sid));
295 
296  /* move file pointer */
297  destpos=spec[0];
298  bytestoread=spec[1];
299  if(sion_filedesc->currentpos!=destpos) {
300  if(sion_filedesc->fileptr!=NULL) {
301  _sion_file_flush(sion_filedesc->fileptr);
302  _sion_file_set_position(sion_filedesc->fileptr,destpos);
303  }
304  sion_filedesc->currentpos=destpos;
305  }
306 
307  /* get and read data */
308  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector start to read data of size %lld at position %lld\n",
309  (long long) bytestoread, (long long) destpos));
310  bread = _sion_file_read(data, bytestoread, sion_filedesc->fileptr);
311  if(bread != bytestoread) {
312  return(_sion_errorprint(SION_STD_NOT_SUCCESS,_SION_ERROR_RETURN,"_sion_generic_collective_process_read: problems reading data ...\n"));
313  }
314 
315  {
316  ONLY_DEBUG(char *p=data;)
317  DPRINTFP((128, DFUNCTION, _SION_DEFAULT_RANK, "data[0]=%c data[%d]=%c\n",(char) p[0], (int) bytestoread, (char) p[bytestoread-1]));
318  }
319 
320  /* update internal data */
321  sion_filedesc->currentpos+=bytestoread;
322  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector read data block (bread=%d) of size %ld new pos %lld, %lld\n",
323  bread, (long) bytestoread, (long long) sion_filedesc->currentpos, (long long) _sion_file_get_position(sion_filedesc->fileptr)));
324 
325 
326  return(rc);
327 
328 }
329 #undef DFUNCTION
330 
331 
341 #define DFUNCTION "_sion_coll_check_env"
342 int _sion_coll_check_env(_sion_filedesc *sion_filedesc) {
343  const char *cs;
344  const char *cn;
345  const char *cd;
346  int rc = SION_SUCCESS, numcoll;
347 
348 
349  cd = _sion_getenv("SION_COLLDEBUG");
350  if(cd) {
351  sion_filedesc->colldebug=atoi(cd);
352  }
353 
354  cs = _sion_getenv("SION_COLLSIZE");
355  cn = _sion_getenv("SION_COLLNUM");
356  if(cs) {
357  sion_filedesc->collsize=atoi(cs);
358  if(sion_filedesc->collsize>sion_filedesc->ntasks) sion_filedesc->collsize=sion_filedesc->ntasks;
359  if(sion_filedesc->colldebug>=1) {
360  fprintf(stderr, "collective statistics: SION_COLLSIZE=%11d\n",sion_filedesc->collsize);
361  }
362  } else if(cn) {
363  numcoll=atoi(cn);
364  if(numcoll>0) {
365  if(numcoll>sion_filedesc->ntasks) numcoll=sion_filedesc->ntasks;
366  sion_filedesc->collsize=sion_filedesc->ntasks/numcoll;
367  if(sion_filedesc->ntasks%numcoll>0) sion_filedesc->collsize++;
368 
369  if(sion_filedesc->colldebug>=1) {
370  fprintf(stderr, "collective statistics: SION_COLLNUM=%11d\n",numcoll);
371  fprintf(stderr, "collective statistics: collsize=%11d\n",sion_filedesc->collsize);
372  }
373  }
374  }
375 
376  /* enable collective operation? */
377  if((cs) || (cn)) {
378  if(sion_filedesc->collsize>0) sion_filedesc->usecoll=1;
379  if(sion_filedesc->collsize<0) sion_filedesc->usecoll=1;
380  if(sion_filedesc->collsize==0) sion_filedesc->usecoll=0;
381  }
382 
383 
384  DPRINTFP((2, DFUNCTION, _SION_DEFAULT_RANK, "usecoll=%d collsize=%d (%d tasks, %d files) colldebug=%d\n", sion_filedesc->usecoll, sion_filedesc->collsize,sion_filedesc->ntasks,sion_filedesc->nfiles,sion_filedesc->colldebug));
385 
386  return (rc);
387 }
388 #undef DFUNCTION
int sion_feof(int sid)
Function that indicates whether the end of file is reached for this task.
Definition: sion_common.c:770
sion_int64 _sion_file_write(const void *data, sion_int64 bytes, _sion_fileptr *sion_fileptr)
Write data to file.
Definition: sion_file.c:148
sion_int64 _sion_file_get_position(_sion_fileptr *sion_fileptr)
Get new position in file.
Definition: sion_file.c:272
sion_int64 _sion_file_set_position(_sion_fileptr *sion_fileptr, sion_int64 startpointer)
Set new position in file.
Definition: sion_file.c:239
Sion File Descriptor Structure.
Definition: sion_filedesc.h:77
int sion_ensure_free_space(int sid, sion_int64 bytes)
Funtion to ensure that enough space is available for writing.
Definition: sion_common.c:1014
sion_int64 _sion_file_read(void *data, sion_int64 bytes, _sion_fileptr *sion_fileptr)
Read data from file.
Definition: sion_file.c:169
int _sion_vcdtype(int sid)
Definition: sion_fd.c:56
sion_int32 fileptr_exported
char * _sion_getenv(const char *name)
size_t sion_fwrite(const void *data, size_t size, size_t nitems, int sid)
Write data to sion file.
Definition: sion_common.c:472
void * _sion_vcdtovcon(int sid)
Definition: sion_fd.c:51
#define SION_FILEDESCRIPTOR
Definition: sion_fd.h:17
#define DFUNCTION
checks if environment variables are set to use collective I/O SION_COLLSIZE = -1 -> number of collect...
size_t sion_fread(void *data, size_t size, size_t nitems, int sid)
Read data from sion file.
Definition: sion_common.c:591
Sion Time Stamp Header.
int _sion_file_flush(_sion_fileptr *sion_fileptr)
Flush data to file.
Definition: sion_file.c:304
_sion_fileptr * fileptr
Definition: sion_filedesc.h:80