SIONlib  1.6.2
Scalable I/O library for parallel access to task-local files
sion_generic_collective_merge.c
Go to the documentation of this file.
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2016 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
14 #include <stdlib.h>
15 #include <stdio.h>
16 #include <stdarg.h>
17 #include <string.h>
18 #include <time.h>
19 
20 #include <sys/time.h>
21 
22 #include <sys/types.h>
23 #include <fcntl.h>
24 
25 #include <unistd.h>
26 
27 #include "sion.h"
28 #include "sion_debug.h"
29 #include "sion_internal.h"
30 #include "sion_fd.h"
31 #include "sion_filedesc.h"
32 #include "sion_printts.h"
33 #include "sion_generic_apidesc.h"
35 
36 /* callback functions */
37 int _sion_generic_collective_process_write_merge( const void *data, sion_int64 *spec, int sid );
38 
39 
40 /* collective I/O */
41 #define DFUNCTION "_sion_coll_fwrite_merge"
42 size_t _sion_coll_fwrite_merge(const void *data, size_t size, size_t nitems, int sid) {
43  _sion_filedesc *sion_filedesc;
44  _sion_generic_gendata *sion_gendata;
45  _sion_generic_apidesc *sion_apidesc;
46  sion_int64 bwrote=0, spec[2];
47  int rc_own=SION_STD_SUCCESS,rc_cb=SION_STD_SUCCESS;
48  int collector, firstsender, lastsender;
49 
50  if ( (sid<0) || (_sion_vcdtype(sid) != SION_FILEDESCRIPTOR) || !(sion_filedesc = _sion_vcdtovcon(sid))) {
51  return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"sion_coll_fwrite: invalid sion_filedesc %d", sid));
52  }
53  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "enter usecoll=%d collector=%d collsize=%d (%d tasks, %d files)\n",
54  sion_filedesc->usecoll, sion_filedesc->collector, sion_filedesc->collsize, sion_filedesc->ntasks,sion_filedesc->nfiles));
55 
56  sion_gendata=sion_filedesc->dataptr;
57  sion_apidesc=sion_gendata->apidesc;
58 
59  /* needed for avoiding subsequent non-collective calls */
60  sion_filedesc->collcmdused=1;
61 
62  /* check collsize */
63  if (sion_filedesc->collsize<=0) {
64  return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"sion_coll_fwrite: collsize=%d <= 0, returning ...\n", (int) sion_filedesc->collsize));
65  }
66 
67  /* parameter of callback function */
68  collector=(int) sion_filedesc->collector;
69  firstsender=collector+1;
70  lastsender=sion_filedesc->rank+sion_filedesc->collsize-1;
71  if(lastsender>sion_filedesc->ntasks) lastsender=sion_filedesc->ntasks-1;
72 
73  spec[0]=-2; /* signaling that data should be written in chunk of collector */
74  spec[1]=size*nitems;
75 
76  /* write own part */
77  if(sion_filedesc->rank == sion_filedesc->collector) {
78  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "COLLECTOR write data of size %ld at current position\n", (long) spec[1]));
79  rc_own=_sion_generic_collective_process_write_merge(data,spec,sid);
80  } else {
81  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "SENDER send data of size %ld at current position\n", (long) spec[1]));
82  }
83 
84 
85  /* collect and write parts of sender tasks via callback function */
86  if(!sion_apidesc->gather_execute_cb ) {
87  return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"sion_coll_fwrite: API %s not correctly initalized, collective I/O calls missing, aborting",sion_apidesc->name));
88  }
89  rc_cb=sion_apidesc->gather_execute_cb(data,spec,2, sion_filedesc->fsblksize,
90  sion_gendata->comm_data_local,collector,firstsender,lastsender,sid,
91  _sion_generic_collective_process_write_merge);
92 
93  /* return code */
94  if( (rc_own == SION_STD_SUCCESS) && (rc_cb == SION_STD_SUCCESS) ) {
95  bwrote=size*nitems;
96  } else {
97  bwrote=0;
98  }
99 
100  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "leave usecoll=%d collector=%d collsize=%d (%d tasks, %d files) rc=%d\n",
101  sion_filedesc->usecoll, sion_filedesc->collector, sion_filedesc->collsize, sion_filedesc->ntasks,sion_filedesc->nfiles,bwrote));
102 
103  return(bwrote);
104 }
105 #undef DFUNCTION
106 
107 
108 #define DFUNCTION "_sion_generic_collective_process_write_merge"
109 int _sion_generic_collective_process_write_merge( const void *data, sion_int64 *spec, int sid ) {
110  _sion_filedesc *sion_filedesc;
111  int rc=SION_STD_SUCCESS;
112  sion_int64 bwrote=0, bytestowrite;
113  if ( (sid<0) || (_sion_vcdtype(sid) != SION_FILEDESCRIPTOR) || !(sion_filedesc = _sion_vcdtovcon(sid))) {
114  return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"_sion_generic_collective_process_write: invalid sion_filedesc %d", sid));
115  }
116  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "enter spec[0]=%d spec[1]=%d sid=%d\n", (int) spec[0],(int) spec[1],sid));
117 
118  /* move file pointer */
119  bytestowrite=spec[1];
120 
121  /* ensure free space for this block */
122  if(sion_ensure_free_space(sid,bytestowrite) != SION_SUCCESS) {
123  _sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"could not ensure free space for this block, returning %d ...\n", sid);
124  }
125 
126  /* get and write data */
127  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector start to write data of size %lld at position %lld\n",
128  (long long) bytestowrite, (long long) sion_filedesc->currentpos));
129  bwrote = _sion_file_write(data, bytestowrite, sion_filedesc->fileptr);
130  if(bwrote != bytestowrite) {
131  return(_sion_errorprint(SION_STD_NOT_SUCCESS,_SION_ERROR_RETURN,"_sion_generic_collective_process_write: problems writing data ...\n"));
132  }
133 
134  /* update internal data */
135  sion_filedesc->currentpos+=bytestowrite;
136  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector wrote data block (bwrote=%d) of size %ld new pos %lld, %lld\n",
137  bwrote, (long) bytestowrite, (long long) sion_filedesc->currentpos, (long long) _sion_file_get_position(sion_filedesc->fileptr)));
138 
139 
140  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "leave spec[0]=%d spec[1]=%d sid=%d rc=%d\n", (int) spec[0],(int) spec[1],sid,rc));
141  return(rc);
142 
143 }
144 #undef DFUNCTION
145 
sion_int64 _sion_file_write(const void *data, sion_int64 bytes, _sion_fileptr *sion_fileptr)
Write data to file.
Definition: sion_file.c:147
sion_int64 _sion_file_get_position(_sion_fileptr *sion_fileptr)
Get new position in file.
Definition: sion_file.c:268
Sion File Descriptor Structure.
Definition: sion_filedesc.h:77
int sion_ensure_free_space(int sid, sion_int64 bytes)
Funtion to ensure that enough space is available for writing.
Definition: sion_common.c:976
int _sion_vcdtype(int sid)
Definition: sion_fd.c:56
void * _sion_vcdtovcon(int sid)
Definition: sion_fd.c:51
int _sion_errorprint(int rc, int level, const char *format,...)
Internal SION error.
#define SION_FILEDESCRIPTOR
Definition: sion_fd.h:17
Sion Time Stamp Header.
_sion_fileptr * fileptr
Definition: sion_filedesc.h:80