SIONlib  1.7.7
Scalable I/O library for parallel access to task-local files
sion_generic_collective.c
Go to the documentation of this file.
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2019 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
14 #define _XOPEN_SOURCE 700
15 
16 #include <stdlib.h>
17 #include <stdio.h>
18 #include <stdarg.h>
19 #include <string.h>
20 #include <time.h>
21 
22 #include <sys/time.h>
23 
24 #include <sys/types.h>
25 #include <fcntl.h>
26 
27 #include <unistd.h>
28 
29 #ifdef _SION_CUDA
30 #include <cuda_runtime.h>
31 #endif
32 
33 #include "sion.h"
34 #include "sion_debug.h"
35 #include "sion_error_handler.h"
36 #include "sion_internal.h"
37 #include "sion_fd.h"
38 #include "sion_filedesc.h"
39 #include "sion_printts.h"
40 #include "sion_generic_apidesc.h"
41 #include "sion_generic_buddy.h"
43 
44 /* collective I/O */
45 #define DFUNCTION "sion_coll_fwrite"
46 size_t sion_coll_fwrite(const void *data, size_t size, size_t nitems, int sid) {
47  _sion_filedesc *sion_filedesc;
48  _sion_generic_gendata *sion_gendata;
49  _sion_generic_apidesc *sion_apidesc;
50  sion_int64 bwrote=0, spec[2], ownnewposition, items_wrote;
51  int rc_own=SION_STD_SUCCESS,rc_cb=SION_STD_SUCCESS,rc_buddy=SION_STD_SUCCESS;
52  int collector, firstsender, lastsender;
53 
54  if ( (sid<0) || (_sion_vcdtype(sid) != SION_FILEDESCRIPTOR) || !(sion_filedesc = _sion_vcdtovcon(sid))) {
55  return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"sion_coll_fwrite: invalid sion_filedesc %d", sid));
56  }
57  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "enter usecoll=%d collector=%d collsize=%d (%d tasks, %d files)\n",
58  sion_filedesc->usecoll, sion_filedesc->collector, sion_filedesc->collsize, sion_filedesc->ntasks,sion_filedesc->nfiles));
59 
60  sion_gendata=sion_filedesc->dataptr;
61  sion_apidesc=sion_gendata->apidesc;
62 
63  /* no collective mode */
64  if(!sion_filedesc->usecoll) {
65  return(sion_fwrite(data,size,nitems,sid));
66  }
67 
68  /* branch to merge mode if enabled */
69  if(sion_filedesc->collmergemode) {
70  return(_sion_coll_fwrite_merge(data,size,nitems,sid));
71  }
72 
73 
74  /* needed for avoiding subsequent non-collective calls */
75  sion_filedesc->collcmdused=1;
76 
77  /* check collsize */
78  if (sion_filedesc->collsize<=0) {
79  return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"sion_coll_fwrite: collsize=%d <= 0, returning ...\n", (int) sion_filedesc->collsize));
80  }
81 
82  /* parameter of callback function */
83  collector=(int) sion_filedesc->collector;
84  firstsender=collector+1;
85  lastsender=collector+sion_filedesc->collsize-1;
86  if(lastsender>sion_filedesc->ntasks) lastsender=sion_filedesc->ntasks-1;
87  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector=%d collsize=%d firstsender=%d lastsender=%d\n",
88  collector, sion_filedesc->collsize, firstsender, lastsender));
89 
90  /* ensure free space for this block */
91  if(sion_ensure_free_space(sid,size*nitems) != SION_SUCCESS) {
92  _sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"could not ensure free space for this block, returning %d ...\n", sid);
93  spec[0]=spec[1]=-1; /* signaling error */
94  } else {
95  spec[0]=sion_filedesc->currentpos;
96  spec[1]=size*nitems;
97  }
98 
99  /* write own part */
100  if(sion_filedesc->rank == sion_filedesc->collector) {
101  rc_own=_sion_generic_collective_process_write(data,spec,sid);
102  }
103  ownnewposition=sion_filedesc->currentpos;
104 
105  /* collect and write parts of sender tasks via callback function */
106  if(!sion_apidesc->gather_execute_cb ) {
107  return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"sion_coll_fwrite: API %s not correctly initalized, collective I/O calls missing, aborting",sion_apidesc->name));
108  }
109  rc_cb=sion_apidesc->gather_execute_cb(data,spec,2, sion_filedesc->fsblksize,
110  sion_gendata->comm_data_local,collector,firstsender,lastsender,sid,
111  _sion_generic_collective_process_write);
112 
113  /* set own position to end of own block written in this call */
114  if(sion_filedesc->rank == sion_filedesc->collector) {
115  _sion_file_flush(sion_filedesc->fileptr);
116  _sion_file_set_position(sion_filedesc->fileptr,ownnewposition);sion_filedesc->currentpos=ownnewposition;
117  }
118 
119  /* set file pointer in data structure and in file if it is exported and can be used without control of SIONlib */
120  if(sion_filedesc->rank != sion_filedesc->collector) {
121  sion_filedesc->currentpos+=size*nitems;
122  if(sion_filedesc->fileptr_exported) {
123  _sion_file_set_position(sion_filedesc->fileptr,sion_filedesc->currentpos);
124  }
125  }
126 
127  /* switch to buddy checkpointing, if enabled */
128  if(sion_filedesc->usebuddy ) {
129  rc_buddy=_sion_coll_fwrite_buddy(data, size, nitems, sid, sion_gendata);
130  }
131 
132  /* return code */
133  if( (rc_own == SION_STD_SUCCESS) && (rc_cb == SION_STD_SUCCESS)&& (rc_buddy == SION_STD_SUCCESS) ) {
134  bwrote=size*nitems;
135  } else {
136  bwrote=0;
137  }
138 
139  items_wrote = size ? bwrote / size : 0;
140 
141  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "leave usecoll=%d collector=%d collsize=%d (%d tasks, %d files) rc=%d\n",
142  sion_filedesc->usecoll, sion_filedesc->collector, sion_filedesc->collsize, sion_filedesc->ntasks,sion_filedesc->nfiles,items_wrote));
143 
144  return items_wrote;
145 }
146 #undef DFUNCTION
147 
148 #define DFUNCTION "sion_coll_fread"
149 size_t sion_coll_fread( void *data, size_t size, size_t nitems, int sid) {
150  _sion_filedesc *sion_filedesc;
151  _sion_generic_gendata *sion_gendata;
152  _sion_generic_apidesc *sion_apidesc;
153  sion_int64 bread=-1, spec[2], ownnewposition, items_read;
154  int rc_own=SION_STD_SUCCESS,rc_cb=SION_STD_SUCCESS;
155  int collector, firstsender, lastsender;
156 
157  if ( (sid<0) || (_sion_vcdtype(sid) != SION_FILEDESCRIPTOR) || !(sion_filedesc = _sion_vcdtovcon(sid))) {
158  return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"sion_coll_fread: invalid sion_filedesc %d", sid));
159  }
160  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "enter usecoll=%d collector=%d collsize=%d (%d tasks, %d files)\n",
161  sion_filedesc->usecoll, sion_filedesc->collector, sion_filedesc->collsize, sion_filedesc->ntasks,sion_filedesc->nfiles));
162 
163  sion_gendata=sion_filedesc->dataptr;
164  sion_apidesc=sion_gendata->apidesc;
165 
166  /* no collective mode */
167  if(!sion_filedesc->usecoll) {
168  return(sion_fread(data,size,nitems,sid));
169  }
170 
171  /* switch to buddy checkpointing, if enabled */
172  if(sion_filedesc->usebuddy ) {
173  return( _sion_coll_fread_buddy(data, size, nitems, sid));
174  }
175 
176  /* needed for avoiding subsequent non-collective calls */
177  sion_filedesc->collcmdused=1;
178 
179  /* check collsize */
180  if (sion_filedesc->collsize<=0) {
181  return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"sion_coll_fread: collsize=%d <= 0, returning ...\n",
182  (int) sion_filedesc->collsize));
183  }
184 
185  /* parameter of callback function */
186  collector=(int) sion_filedesc->collector;
187  firstsender=collector+1;
188  lastsender=sion_filedesc->rank+sion_filedesc->collsize-1;
189  if(lastsender>sion_filedesc->ntasks) lastsender=sion_filedesc->ntasks-1;
190 
191  /* ensure to be at the beginning of the right block */
192  if(size*nitems>0) {
193  if(sion_feof(sid)) {
194  _sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"early eof found for this block, returning %d ...\n", sid);
195  spec[0]=spec[1]=-1; /* signaling that no data is requested */
196  } else {
197  /* specification of location in file */
198  spec[0]=sion_filedesc->currentpos;
199  spec[1]=size*nitems;
200  }
201  } else {
202  /* signaling that no data is requested */
203  spec[0]=spec[1]=-1;
204  }
205 
206  /* read own part */
207  if(sion_filedesc->rank == sion_filedesc->collector) {
208  rc_own=_sion_generic_collective_process_read(data,spec,sid);
209  }
210  ownnewposition=sion_filedesc->currentpos;
211 
212 
213  /* read parts and scatter these to sender tasks via callback function */
214  if(!sion_apidesc->execute_scatter_cb ) {
215  return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,
216  "sion_coll_fread: API %s not correctly initalized, collective I/O calls missing, aborting",sion_apidesc->name));
217  }
218  rc_cb=sion_apidesc->execute_scatter_cb(data,spec,2, sion_filedesc->fsblksize,
219  sion_gendata->comm_data_local,collector,firstsender,lastsender,sid,
220  _sion_generic_collective_process_read);
221 
222  /* set own position to end of own block read in this call */
223  if(sion_filedesc->rank == sion_filedesc->collector) {
224  _sion_file_flush(sion_filedesc->fileptr);
225  _sion_file_set_position(sion_filedesc->fileptr,ownnewposition);sion_filedesc->currentpos=ownnewposition;
226  }
227 
228  /* set file pointer in data structure and in file if it is exported and can be used without control of SIONlib */
229  if(sion_filedesc->rank != sion_filedesc->collector) {
230  sion_filedesc->currentpos+=size*nitems;
231  if(sion_filedesc->fileptr_exported) {
232  _sion_file_set_position(sion_filedesc->fileptr,sion_filedesc->currentpos);
233  }
234  }
235 
236  if( (rc_own == SION_STD_SUCCESS) && (rc_cb == SION_STD_SUCCESS) ) {
237  bread=size*nitems;
238  } else {
239  bread=0;
240  }
241 
242  items_read = size ? bread / size : 0;
243 
244  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "leave usecoll=%d collector=%d collsize=%d (%d tasks, %d files) rc=%d\n",
245  sion_filedesc->usecoll, sion_filedesc->collector, sion_filedesc->collsize, sion_filedesc->ntasks,sion_filedesc->nfiles,items_read));
246 
247  return items_read;
248 
249 }
250 #undef DFUNCTION
251 
252 
253 #define DFUNCTION "_sion_generic_collective_process_write"
254 int _sion_generic_collective_process_write( const void *data, sion_int64 *spec, int sid ) {
255  _sion_filedesc *sion_filedesc;
256  int rc=SION_STD_SUCCESS;
257  sion_int64 bwrote=0, destpos, bytestowrite;
258  if ( (sid<0) || (_sion_vcdtype(sid) != SION_FILEDESCRIPTOR) || !(sion_filedesc = _sion_vcdtovcon(sid))) {
259  return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"_sion_generic_collective_process_write: invalid sion_filedesc %d", sid));
260  }
261  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "enter spec[0]=%d spec[1]=%d sid=%d\n", (int) spec[0],(int) spec[1],sid));
262 
263  /* move file pointer */
264  destpos=spec[0];
265  bytestowrite=spec[1];
266  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "currentpos=%d destpos=%d\n", (int) sion_filedesc->currentpos,(int) destpos));
267  if(sion_filedesc->currentpos!=destpos) {
268  _sion_file_flush(sion_filedesc->fileptr);
269  _sion_file_set_position(sion_filedesc->fileptr,destpos);sion_filedesc->currentpos=destpos;
270  }
271 
272  /* get and write data */
273  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector start to write data of size %lld at position %lld\n",
274  (long long) bytestowrite, (long long) destpos));
275 #ifdef _SION_CUDA
276  struct cudaPointerAttributes attrs;
277  cudaError_t err = cudaPointerGetAttributes(&attrs, data);
278  if ((err == cudaSuccess) && _sion_cuda_ptr_is_device(attrs) ) {
279  char* buffer = malloc(sion_filedesc->fsblksize);
280  const char* data_ = data;
281  while (bwrote < bytestowrite) {
282  sion_int64 to_write = (bytestowrite - bwrote) > sion_filedesc->fsblksize ? sion_filedesc->fsblksize : (bytestowrite - bwrote);
283  cudaMemcpy(buffer, data_, to_write, cudaMemcpyDeviceToHost);
284  sion_int64 bwrote_ = _sion_file_write(buffer, to_write, sion_filedesc->fileptr);
285  if (bwrote_ != to_write) break;
286  bwrote += bwrote_;
287  data_ += bwrote_;
288  }
289  free(buffer);
290  } else {
291  bwrote = _sion_file_write(data, bytestowrite, sion_filedesc->fileptr);
292  }
293 #else
294  bwrote = _sion_file_write(data, bytestowrite, sion_filedesc->fileptr);
295 #endif
296  if(bwrote != bytestowrite) {
297  return(_sion_errorprint(SION_STD_NOT_SUCCESS,_SION_ERROR_RETURN,"_sion_generic_collective_process_write: problems writing data ...\n"));
298  }
299 
300  /* update internal data */
301  sion_filedesc->currentpos+=bytestowrite;
302  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector wrote data block (bwrote=%d) of size %ld new pos %lld, %lld\n",
303  bwrote, (long) bytestowrite, (long long) sion_filedesc->currentpos, (long long) _sion_file_get_position(sion_filedesc->fileptr)));
304 
305 
306  return(rc);
307 
308 }
309 #undef DFUNCTION
310 
311 #define DFUNCTION "_sion_generic_collective_process_read"
312 int _sion_generic_collective_process_read( void *data, sion_int64 *spec, int sid ) {
313  _sion_filedesc *sion_filedesc;
314  int rc=SION_STD_SUCCESS;
315  sion_int64 bread=0, destpos, bytestoread;
316 
317  if ( (sid<0) || (_sion_vcdtype(sid) != SION_FILEDESCRIPTOR) || !(sion_filedesc = _sion_vcdtovcon(sid))) {
318  return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"_sion_generic_collective_process_read: invalid sion_filedesc %d", sid));
319  }
320  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "enter spec[0]=%d spec[1]=%d sid=%d\n", (int) spec[0],(int) spec[1],sid));
321 
322  /* move file pointer */
323  destpos=spec[0];
324  bytestoread=spec[1];
325  if(sion_filedesc->currentpos!=destpos) {
326  if(sion_filedesc->fileptr!=NULL) {
327  _sion_file_flush(sion_filedesc->fileptr);
328  _sion_file_set_position(sion_filedesc->fileptr,destpos);
329  }
330  sion_filedesc->currentpos=destpos;
331  }
332 
333  /* get and read data */
334  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector start to read data of size %lld at position %lld\n",
335  (long long) bytestoread, (long long) destpos));
336 #ifdef _SION_CUDA
337  struct cudaPointerAttributes attrs;
338  cudaError_t err = cudaPointerGetAttributes(&attrs, data);
339  if ((err == cudaSuccess) && _sion_cuda_ptr_is_device(attrs) ) {
340  char* buffer = malloc(sion_filedesc->fsblksize);
341  char* data_ = data;
342  while (bread < bytestoread) {
343  sion_int64 to_read = (bytestoread - bread) > sion_filedesc->fsblksize ? sion_filedesc->fsblksize : (bytestoread - bread);
344  sion_int64 bread_ = _sion_file_read(buffer, to_read, sion_filedesc->fileptr);
345  if (bread_ != to_read) break;
346  cudaMemcpy(data_, buffer, bread_, cudaMemcpyHostToDevice);
347  bread += bread_;
348  data_ += bread_;
349  }
350  free(buffer);
351  } else {
352  bread = _sion_file_read(data, bytestoread, sion_filedesc->fileptr);
353  }
354 #else
355  bread = _sion_file_read(data, bytestoread, sion_filedesc->fileptr);
356 #endif
357  if(bread != bytestoread) {
358  return(_sion_errorprint(SION_STD_NOT_SUCCESS,_SION_ERROR_RETURN,"_sion_generic_collective_process_read: problems reading data ...\n"));
359  }
360 
361  {
362  ONLY_DEBUG(char *p=data;)
363  DPRINTFP((128, DFUNCTION, _SION_DEFAULT_RANK, "data[0]=%c data[%d]=%c\n",(char) p[0], (int) bytestoread, (char) p[bytestoread-1]));
364  }
365 
366  /* update internal data */
367  sion_filedesc->currentpos+=bytestoread;
368  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector read data block (bread=%d) of size %ld new pos %lld, %lld\n",
369  bread, (long) bytestoread, (long long) sion_filedesc->currentpos, (long long) _sion_file_get_position(sion_filedesc->fileptr)));
370 
371 
372  return(rc);
373 
374 }
375 #undef DFUNCTION
376 
377 
387 #define DFUNCTION "_sion_coll_check_env"
388 int _sion_coll_check_env(_sion_filedesc *sion_filedesc) {
389  const char *cs;
390  const char *cn;
391  const char *cd;
392  int rc = SION_SUCCESS, numcoll;
393 
394 
395  cd = _sion_getenv("SION_COLLDEBUG");
396  if(cd) {
397  sion_filedesc->colldebug=atoi(cd);
398  }
399 
400  cs = _sion_getenv("SION_COLLSIZE");
401  cn = _sion_getenv("SION_COLLNUM");
402  if(cs) {
403  sion_filedesc->collsize=atoi(cs);
404  if(sion_filedesc->collsize>sion_filedesc->ntasks) sion_filedesc->collsize=sion_filedesc->ntasks;
405  if(sion_filedesc->colldebug>=1) {
406  fprintf(stderr, "collective statistics: SION_COLLSIZE=%11d\n",sion_filedesc->collsize);
407  }
408  } else if(cn) {
409  numcoll=atoi(cn);
410  if(numcoll>0) {
411  if(numcoll>sion_filedesc->ntasks) numcoll=sion_filedesc->ntasks;
412  sion_filedesc->collsize=sion_filedesc->ntasks/numcoll;
413  if(sion_filedesc->ntasks%numcoll>0) sion_filedesc->collsize++;
414 
415  if(sion_filedesc->colldebug>=1) {
416  fprintf(stderr, "collective statistics: SION_COLLNUM=%11d\n",numcoll);
417  fprintf(stderr, "collective statistics: collsize=%11d\n",sion_filedesc->collsize);
418  }
419  }
420  }
421 
422  /* enable collective operation? */
423  if((cs) || (cn)) {
424  if(sion_filedesc->collsize>0) sion_filedesc->usecoll=1;
425  if(sion_filedesc->collsize<0) sion_filedesc->usecoll=1;
426  if(sion_filedesc->collsize==0) sion_filedesc->usecoll=0;
427  }
428 
429 
430  DPRINTFP((2, DFUNCTION, _SION_DEFAULT_RANK, "usecoll=%d collsize=%d (%d tasks, %d files) colldebug=%d\n", sion_filedesc->usecoll, sion_filedesc->collsize,sion_filedesc->ntasks,sion_filedesc->nfiles,sion_filedesc->colldebug));
431 
432  return (rc);
433 }
434 #undef DFUNCTION
size_t sion_fread(void *data, size_t size, size_t nitems, int sid)
Read data from sion file.
Definition: sion_common.c:609
size_t sion_fwrite(const void *data, size_t size, size_t nitems, int sid)
Write data to sion file.
Definition: sion_common.c:470
int sion_ensure_free_space(int sid, sion_int64 bytes)
Funtion to ensure that enough space is available for writing.
Definition: sion_common.c:1053
int sion_feof(int sid)
Function that indicates whether the end of file is reached for this task.
Definition: sion_common.c:809
void * _sion_vcdtovcon(int sid)
Definition: sion_fd.c:53
int _sion_vcdtype(int sid)
Definition: sion_fd.c:58
#define SION_FILEDESCRIPTOR
Definition: sion_fd.h:17
int _sion_file_flush(_sion_fileptr *sion_fileptr)
Flush data to file.
Definition: sion_file.c:434
sion_int64 _sion_file_set_position(_sion_fileptr *sion_fileptr, sion_int64 startpointer)
Set new position in file.
Definition: sion_file.c:367
sion_int64 _sion_file_get_position(_sion_fileptr *sion_fileptr)
Get new position in file.
Definition: sion_file.c:401
sion_int64 _sion_file_write(const void *data, sion_int64 bytes, _sion_fileptr *sion_fileptr)
Write data to file.
Definition: sion_file.c:221
sion_int64 _sion_file_read(void *data, sion_int64 bytes, _sion_fileptr *sion_fileptr)
Read data from file.
Definition: sion_file.c:257
#define DFUNCTION
checks if environment variables are set to use collective I/O SION_COLLSIZE = -1 -> number of collect...
char * _sion_getenv(const char *name)
Sion Time Stamp Header.
Sion File Descriptor Structure.
Definition: sion_filedesc.h:79
sion_int32 fileptr_exported
_sion_fileptr * fileptr
Definition: sion_filedesc.h:82