SIONlib  1.6.2
Scalable I/O library for parallel access to task-local files
sion_generic_collective.c
Go to the documentation of this file.
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2016 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
14 #include <stdlib.h>
15 #include <stdio.h>
16 #include <stdarg.h>
17 #include <string.h>
18 #include <time.h>
19 
20 #include <sys/time.h>
21 
22 #include <sys/types.h>
23 #include <fcntl.h>
24 
25 #include <unistd.h>
26 
27 #include "sion.h"
28 #include "sion_debug.h"
29 #include "sion_internal.h"
30 #include "sion_fd.h"
31 #include "sion_filedesc.h"
32 #include "sion_printts.h"
33 #include "sion_generic_apidesc.h"
35 
36 /* callback functions */
37 int _sion_generic_collective_process_write( const void *data, sion_int64 *spec, int sid );
38 int _sion_generic_collective_process_read( void *data, sion_int64 *spec, int sid );
39 
40 /* collective I/O */
41 #define DFUNCTION "sion_coll_fwrite"
42 size_t sion_coll_fwrite(const void *data, size_t size, size_t nitems, int sid) {
43  _sion_filedesc *sion_filedesc;
44  _sion_generic_gendata *sion_gendata;
45  _sion_generic_apidesc *sion_apidesc;
46  sion_int64 bwrote=0, spec[2], ownnewposition, items_wrote;
47  int rc_own=SION_STD_SUCCESS,rc_cb=SION_STD_SUCCESS;
48  int collector, firstsender, lastsender;
49 
50  if ( (sid<0) || (_sion_vcdtype(sid) != SION_FILEDESCRIPTOR) || !(sion_filedesc = _sion_vcdtovcon(sid))) {
51  return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"sion_coll_fwrite: invalid sion_filedesc %d", sid));
52  }
53  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "enter usecoll=%d collector=%d collsize=%d (%d tasks, %d files)\n",
54  sion_filedesc->usecoll, sion_filedesc->collector, sion_filedesc->collsize, sion_filedesc->ntasks,sion_filedesc->nfiles));
55 
56  sion_gendata=sion_filedesc->dataptr;
57  sion_apidesc=sion_gendata->apidesc;
58 
59  /* no collective mode */
60  if(!sion_filedesc->usecoll) {
61  return(sion_fwrite(data,size,nitems,sid));
62  }
63 
64  /* branch to merge mode if enabled */
65  if(sion_filedesc->collmergemode) {
66  return(_sion_coll_fwrite_merge(data,size,nitems,sid));
67  }
68 
69 
70  /* needed for avoiding subsequent non-collective calls */
71  sion_filedesc->collcmdused=1;
72 
73  /* check collsize */
74  if (sion_filedesc->collsize<=0) {
75  return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"sion_coll_fwrite: collsize=%d <= 0, returning ...\n", (int) sion_filedesc->collsize));
76  }
77 
78  /* parameter of callback function */
79  collector=(int) sion_filedesc->collector;
80  firstsender=collector+1;
81  lastsender=collector+sion_filedesc->collsize-1;
82  if(lastsender>sion_filedesc->ntasks) lastsender=sion_filedesc->ntasks-1;
83  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector=%d collsize=%d firstsender=%d lastsender=%d\n",
84  collector, sion_filedesc->collsize, firstsender, lastsender));
85 
86  /* ensure free space for this block */
87  if(sion_ensure_free_space(sid,size*nitems) != SION_SUCCESS) {
88  _sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"could not ensure free space for this block, returning %d ...\n", sid);
89  spec[0]=spec[1]=-1; /* signaling error */
90  } else {
91  spec[0]=sion_filedesc->currentpos;
92  spec[1]=size*nitems;
93  }
94 
95  /* write own part */
96  if(sion_filedesc->rank == sion_filedesc->collector) {
97  rc_own=_sion_generic_collective_process_write(data,spec,sid);
98  }
99  ownnewposition=sion_filedesc->currentpos;
100 
101  /* collect and write parts of sender tasks via callback function */
102  if(!sion_apidesc->gather_execute_cb ) {
103  return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"sion_coll_fwrite: API %s not correctly initalized, collective I/O calls missing, aborting",sion_apidesc->name));
104  }
105  rc_cb=sion_apidesc->gather_execute_cb(data,spec,2, sion_filedesc->fsblksize,
106  sion_gendata->comm_data_local,collector,firstsender,lastsender,sid,
107  _sion_generic_collective_process_write);
108 
109  /* set own position to end of own block written in this call */
110  if(sion_filedesc->rank == sion_filedesc->collector) {
111  _sion_file_flush(sion_filedesc->fileptr);
112  _sion_file_set_position(sion_filedesc->fileptr,ownnewposition);sion_filedesc->currentpos=ownnewposition;
113  }
114 
115  /* set file pointer in data structure and in file if it is exported and can be used without control of SIONlib */
116  if(sion_filedesc->rank != sion_filedesc->collector) {
117  sion_filedesc->currentpos+=size*nitems;
118  if(sion_filedesc->fileptr_exported) {
119  _sion_file_set_position(sion_filedesc->fileptr,sion_filedesc->currentpos);
120  }
121  }
122 
123  /* return code */
124  if( (rc_own == SION_STD_SUCCESS) && (rc_cb == SION_STD_SUCCESS) ) {
125  bwrote=size*nitems;
126  } else {
127  bwrote=0;
128  }
129 
130  items_wrote = size ? bwrote / size : 0;
131 
132  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "leave usecoll=%d collector=%d collsize=%d (%d tasks, %d files) rc=%d\n",
133  sion_filedesc->usecoll, sion_filedesc->collector, sion_filedesc->collsize, sion_filedesc->ntasks,sion_filedesc->nfiles,items_wrote));
134 
135  return items_wrote;
136 }
137 #undef DFUNCTION
138 
139 #define DFUNCTION "sion_coll_fread"
140 size_t sion_coll_fread( void *data, size_t size, size_t nitems, int sid) {
141  _sion_filedesc *sion_filedesc;
142  _sion_generic_gendata *sion_gendata;
143  _sion_generic_apidesc *sion_apidesc;
144  sion_int64 bread=-1, spec[2], ownnewposition, items_read;
145  int rc_own=SION_STD_SUCCESS,rc_cb=SION_STD_SUCCESS;
146  int collector, firstsender, lastsender;
147 
148  if ( (sid<0) || (_sion_vcdtype(sid) != SION_FILEDESCRIPTOR) || !(sion_filedesc = _sion_vcdtovcon(sid))) {
149  return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"sion_coll_fread: invalid sion_filedesc %d", sid));
150  }
151  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "enter usecoll=%d collector=%d collsize=%d (%d tasks, %d files)\n",
152  sion_filedesc->usecoll, sion_filedesc->collector, sion_filedesc->collsize, sion_filedesc->ntasks,sion_filedesc->nfiles));
153 
154  sion_gendata=sion_filedesc->dataptr;
155  sion_apidesc=sion_gendata->apidesc;
156 
157  /* no collective mode */
158  if(!sion_filedesc->usecoll) {
159  return(sion_fread(data,size,nitems,sid));
160  }
161 
162  /* needed for avoiding subsequent non-collective calls */
163  sion_filedesc->collcmdused=1;
164 
165  /* check collsize */
166  if (sion_filedesc->collsize<=0) {
167  return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"sion_coll_fread: collsize=%d <= 0, returning ...\n",
168  (int) sion_filedesc->collsize));
169  }
170 
171  /* parameter of callback function */
172  collector=(int) sion_filedesc->collector;
173  firstsender=collector+1;
174  lastsender=sion_filedesc->rank+sion_filedesc->collsize-1;
175  if(lastsender>sion_filedesc->ntasks) lastsender=sion_filedesc->ntasks-1;
176 
177  /* ensure to be at the beginning of the right block */
178  if(size*nitems>0) {
179  if(sion_feof(sid)) {
180  _sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"early eof found for this block, returning %d ...\n", sid);
181  spec[0]=spec[1]=-1; /* signaling that no data is requested */
182  } else {
183  /* specification of location in file */
184  spec[0]=sion_filedesc->currentpos;
185  spec[1]=size*nitems;
186  }
187  } else {
188  /* signaling that no data is requested */
189  spec[0]=spec[1]=-1;
190  }
191 
192  /* read own part */
193  if(sion_filedesc->rank == sion_filedesc->collector) {
194  rc_own=_sion_generic_collective_process_read(data,spec,sid);
195  }
196  ownnewposition=sion_filedesc->currentpos;
197 
198 
199  /* read parts and scatter these to sender tasks via callback function */
200  if(!sion_apidesc->execute_scatter_cb ) {
201  return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,
202  "sion_coll_fread: API %s not correctly initalized, collective I/O calls missing, aborting",sion_apidesc->name));
203  }
204  rc_cb=sion_apidesc->execute_scatter_cb(data,spec,2, sion_filedesc->fsblksize,
205  sion_gendata->comm_data_local,collector,firstsender,lastsender,sid,
206  _sion_generic_collective_process_read);
207 
208  /* set own position to end of own block read in this call */
209  if(sion_filedesc->rank == sion_filedesc->collector) {
210  _sion_file_flush(sion_filedesc->fileptr);
211  _sion_file_set_position(sion_filedesc->fileptr,ownnewposition);sion_filedesc->currentpos=ownnewposition;
212  }
213 
214  /* set file pointer in data structure and in file if it is exported and can be used without control of SIONlib */
215  if(sion_filedesc->rank != sion_filedesc->collector) {
216  sion_filedesc->currentpos+=size*nitems;
217  if(sion_filedesc->fileptr_exported) {
218  _sion_file_set_position(sion_filedesc->fileptr,sion_filedesc->currentpos);
219  }
220  }
221 
222  if( (rc_own == SION_STD_SUCCESS) && (rc_cb == SION_STD_SUCCESS) ) {
223  bread=size*nitems;
224  } else {
225  bread=0;
226  }
227 
228  items_read = size ? bread / size : 0;
229 
230  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "leave usecoll=%d collector=%d collsize=%d (%d tasks, %d files) rc=%d\n",
231  sion_filedesc->usecoll, sion_filedesc->collector, sion_filedesc->collsize, sion_filedesc->ntasks,sion_filedesc->nfiles,items_read));
232 
233  return items_read;
234 
235 }
236 #undef DFUNCTION
237 
238 
239 #define DFUNCTION "_sion_generic_collective_process_write"
240 int _sion_generic_collective_process_write( const void *data, sion_int64 *spec, int sid ) {
241  _sion_filedesc *sion_filedesc;
242  int rc=SION_STD_SUCCESS;
243  sion_int64 bwrote=0, destpos, bytestowrite;
244  if ( (sid<0) || (_sion_vcdtype(sid) != SION_FILEDESCRIPTOR) || !(sion_filedesc = _sion_vcdtovcon(sid))) {
245  return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"_sion_generic_collective_process_write: invalid sion_filedesc %d", sid));
246  }
247  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "enter spec[0]=%d spec[1]=%d sid=%d\n", (int) spec[0],(int) spec[1],sid));
248 
249  /* move file pointer */
250  destpos=spec[0];
251  bytestowrite=spec[1];
252  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "currentpos=%d destpos=%d\n", (int) sion_filedesc->currentpos,(int) destpos));
253  if(sion_filedesc->currentpos!=destpos) {
254  _sion_file_flush(sion_filedesc->fileptr);
255  _sion_file_set_position(sion_filedesc->fileptr,destpos);sion_filedesc->currentpos=destpos;
256  }
257 
258  /* get and write data */
259  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector start to write data of size %lld at position %lld\n",
260  (long long) bytestowrite, (long long) destpos));
261  bwrote = _sion_file_write(data, bytestowrite, sion_filedesc->fileptr);
262  if(bwrote != bytestowrite) {
263  return(_sion_errorprint(SION_STD_NOT_SUCCESS,_SION_ERROR_RETURN,"_sion_generic_collective_process_write: problems writing data ...\n"));
264  }
265 
266  /* update internal data */
267  sion_filedesc->currentpos+=bytestowrite;
268  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector wrote data block (bwrote=%d) of size %ld new pos %lld, %lld\n",
269  bwrote, (long) bytestowrite, (long long) sion_filedesc->currentpos, (long long) _sion_file_get_position(sion_filedesc->fileptr)));
270 
271 
272  return(rc);
273 
274 }
275 #undef DFUNCTION
276 
277 #define DFUNCTION "_sion_generic_collective_process_read"
278 int _sion_generic_collective_process_read( void *data, sion_int64 *spec, int sid ) {
279  _sion_filedesc *sion_filedesc;
280  int rc=SION_STD_SUCCESS;
281  sion_int64 bread=0, destpos, bytestoread;
282 
283  if ( (sid<0) || (_sion_vcdtype(sid) != SION_FILEDESCRIPTOR) || !(sion_filedesc = _sion_vcdtovcon(sid))) {
284  return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"_sion_generic_collective_process_read: invalid sion_filedesc %d", sid));
285  }
286  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "enter spec[0]=%d spec[1]=%d sid=%d\n", (int) spec[0],(int) spec[1],sid));
287 
288  /* move file pointer */
289  destpos=spec[0];
290  bytestoread=spec[1];
291  if(sion_filedesc->currentpos!=destpos) {
292  _sion_file_flush(sion_filedesc->fileptr);
293  _sion_file_set_position(sion_filedesc->fileptr,destpos);sion_filedesc->currentpos=destpos;
294  }
295 
296  /* get and read data */
297  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector start to read data of size %lld at position %lld\n",
298  (long long) bytestoread, (long long) destpos));
299  bread = _sion_file_read(data, bytestoread, sion_filedesc->fileptr);
300  if(bread != bytestoread) {
301  return(_sion_errorprint(SION_STD_NOT_SUCCESS,_SION_ERROR_RETURN,"_sion_generic_collective_process_read: problems reading data ...\n"));
302  }
303 
304  {
305  ONLY_DEBUG(char *p=data;)
306  DPRINTFP((128, DFUNCTION, _SION_DEFAULT_RANK, "data[0]=%c data[%d]=%c\n",(char) p[0], (int) bytestoread, (char) p[bytestoread-1]));
307  }
308 
309  /* update internal data */
310  sion_filedesc->currentpos+=bytestoread;
311  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector read data block (bread=%d) of size %ld new pos %lld, %lld\n",
312  bread, (long) bytestoread, (long long) sion_filedesc->currentpos, (long long) _sion_file_get_position(sion_filedesc->fileptr)));
313 
314 
315  return(rc);
316 
317 }
318 #undef DFUNCTION
319 
320 
330 #define DFUNCTION "_sion_coll_check_env"
331 int _sion_coll_check_env(_sion_filedesc *sion_filedesc) {
332  const char *cs;
333  const char *cn;
334  const char *cd;
335  int rc = SION_SUCCESS, numcoll;
336 
337 
338  cd = _sion_getenv("SION_COLLDEBUG");
339  if(cd) {
340  sion_filedesc->colldebug=atoi(cd);
341  }
342 
343  cs = _sion_getenv("SION_COLLSIZE");
344  cn = _sion_getenv("SION_COLLNUM");
345  if(cs) {
346  sion_filedesc->collsize=atoi(cs);
347  if(sion_filedesc->collsize>sion_filedesc->ntasks) sion_filedesc->collsize=sion_filedesc->ntasks;
348  if(sion_filedesc->colldebug>=1) {
349  fprintf(stderr, "collective statistics: SION_COLLSIZE=%11d\n",sion_filedesc->collsize);
350  }
351  } else if(cn) {
352  numcoll=atoi(cn);
353  if(numcoll>0) {
354  if(numcoll>sion_filedesc->ntasks) numcoll=sion_filedesc->ntasks;
355  sion_filedesc->collsize=sion_filedesc->ntasks/numcoll;
356  if(sion_filedesc->ntasks%numcoll>0) sion_filedesc->collsize++;
357 
358  if(sion_filedesc->colldebug>=1) {
359  fprintf(stderr, "collective statistics: SION_COLLNUM=%11d\n",numcoll);
360  fprintf(stderr, "collective statistics: collsize=%11d\n",sion_filedesc->collsize);
361  }
362  }
363  }
364 
365  /* enable collective operation? */
366  if((cs) || (cn)) {
367  if(sion_filedesc->collsize>0) sion_filedesc->usecoll=1;
368  if(sion_filedesc->collsize<0) sion_filedesc->usecoll=1;
369  if(sion_filedesc->collsize==0) sion_filedesc->usecoll=0;
370  }
371 
372 
373  DPRINTFP((2, DFUNCTION, _SION_DEFAULT_RANK, "usecoll=%d collsize=%d (%d tasks, %d files) colldebug=%d\n", sion_filedesc->usecoll, sion_filedesc->collsize,sion_filedesc->ntasks,sion_filedesc->nfiles,sion_filedesc->colldebug));
374 
375  return (rc);
376 }
377 #undef DFUNCTION
int sion_feof(int sid)
Function that indicates whether the end of file is reached for this task.
Definition: sion_common.c:732
sion_int64 _sion_file_write(const void *data, sion_int64 bytes, _sion_fileptr *sion_fileptr)
Write data to file.
Definition: sion_file.c:147
sion_int64 _sion_file_get_position(_sion_fileptr *sion_fileptr)
Get new position in file.
Definition: sion_file.c:268
sion_int64 _sion_file_set_position(_sion_fileptr *sion_fileptr, sion_int64 startpointer)
Set new position in file.
Definition: sion_file.c:238
Sion File Descriptor Structure.
Definition: sion_filedesc.h:77
int sion_ensure_free_space(int sid, sion_int64 bytes)
Funtion to ensure that enough space is available for writing.
Definition: sion_common.c:976
sion_int64 _sion_file_read(void *data, sion_int64 bytes, _sion_fileptr *sion_fileptr)
Read data from file.
Definition: sion_file.c:168
int _sion_vcdtype(int sid)
Definition: sion_fd.c:56
sion_int32 fileptr_exported
char * _sion_getenv(const char *name)
size_t sion_fwrite(const void *data, size_t size, size_t nitems, int sid)
Write data to sion file.
Definition: sion_common.c:433
void * _sion_vcdtovcon(int sid)
Definition: sion_fd.c:51
int _sion_errorprint(int rc, int level, const char *format,...)
Internal SION error.
#define SION_FILEDESCRIPTOR
Definition: sion_fd.h:17
#define DFUNCTION
checks if environment variables are set to use collective I/O SION_COLLSIZE = -1 -> number of collect...
size_t sion_fread(void *data, size_t size, size_t nitems, int sid)
Read data from sion file.
Definition: sion_common.c:553
Sion Time Stamp Header.
int _sion_file_flush(_sion_fileptr *sion_fileptr)
Flush data to file.
Definition: sion_file.c:298
_sion_fileptr * fileptr
Definition: sion_filedesc.h:80