SIONlib  1.7.1
Scalable I/O library for parallel access to task-local files
sion_mpi_coll_cb_gen.c
Go to the documentation of this file.
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2016 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
14 #include <stdlib.h>
15 #include <stdio.h>
16 #include <stdarg.h>
17 #include <string.h>
18 #include <time.h>
19 
20 #include "mpi.h"
21 
22 #define USE_PMPIno
23 #ifdef USE_PMPI
24 #define MPI_Comm_rank PMPI_Comm_rank
25 #define MPI_Comm_size PMPI_Comm_size
26 #define MPI_Gather PMPI_Gather
27 #define MPI_Scatter PMPI_Scatter
28 #define MPI_Bcast PMPI_Bcast
29 #define MPI_Barrier PMPI_Barrier
30 #define MPI_Comm_split PMPI_Comm_split
31 #define MPI_Send PMPI_Send
32 #define MPI_Recv PMPI_Recv
33 #endif
34 
35 #include <sys/time.h>
36 
37 #include <sys/types.h>
38 #include <fcntl.h>
39 
40 #include <unistd.h>
41 
42 #include "sion.h"
43 #include "sion_debug.h"
44 #include "sion_error_handler.h"
45 #include "sion_internal.h"
46 #include "sion_fd.h"
47 #include "sion_filedesc.h"
48 #include "sion_printts.h"
49 
50 #include "sion_mpi_cb_gen.h"
51 
52 #ifdef SION_MPI
53 
54 #define DFUNCTION "_mpi_gather_execute_cb"
55 int _sion_mpi_gather_process_cb(const void *indata, sion_int64 *spec, int spec_len, sion_int64 fsblksize,
56  void *commdata, int collector, int range_start, int range_end, int sid,
57  int process_cb(const void *,sion_int64 *, int ) ) {
58  int rc=SION_STD_SUCCESS;
59  int size, rank, t, startsignal=1;
60  MPI_Status status;
61  char *p, *buffer;
62  sion_int64 bytestorecv, bytestosend, datasize;
63  _mpi_api_commdata* sapi= (_mpi_api_commdata *) commdata;
64  MPI_Comm commp = sapi->comm;
65 
66 
67 
68  MPI_Comm_rank(commp, &rank);
69  MPI_Comm_size(commp, &size);
70 
71  DPRINTFP((256, DFUNCTION, rank, " input collector=%d range_start=%d range_end=%d sid=%d\n", collector,range_start,range_end, sid));
72 
73  if(rank == collector) {
74  /* its the collector */
75 
76  /* allocate buffer */
77  buffer = (char *) malloc(fsblksize * sizeof(char));
78  if (buffer == NULL) {
79  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"_mpi_gather_process_cb: cannot allocate temporary memory of size %lu (buffer), aborting ...\n",
80  (unsigned long) fsblksize * sizeof(char)));
81  }
82 
83  /* scan all other tasks */
84  for(t=range_start;t<=range_end;t++) {
85 
86  /* receive spec */
87  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector wait for spec from %d\n", t));
88  MPI_Recv(spec, spec_len, SION_MPI_INT64, t, 1534, commp, &status);
89  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector got spec from %d (%lld,%lld)\n",
90  t, (long long) spec[0], (long long) spec[1]));
91 
92  /* send signal to send data */
93  if(spec[0]!=-1) { /* no error on sender */
94  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector send signal to %d\n", t));
95  MPI_Send(&startsignal, 1, MPI_INT, t, 1535, commp);
96  }
97 
98  /* get and write data */
99  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector start to process data of size %lld at offset %lld\n",
100  (long long) spec[1], (long long) spec[0]));
101 
102  bytestorecv=spec[1];
103 
104  /* loop over data parts */
105  while(bytestorecv>0) {
106  if(bytestorecv>fsblksize) datasize=fsblksize;
107  else datasize=bytestorecv;
108 
109  /* receive portion or all data */
110  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector wait for data block from %d\n", t));
111  MPI_Recv(buffer, datasize, MPI_CHAR, t, 1536, commp, &status);
112  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector got data block from %d datasize=%lld bytestorecv=%lld\n",
113  t, (long long) datasize, (long long) bytestorecv));
114 
115  spec[1]=datasize; /* adjust size */
116 
117  /* process data with callback function */
118  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector process data block of size %lld at pos %lld\n",
119  (long long) spec[1], (long long) spec[0]));
120 
121  rc=process_cb(buffer,spec, sid);
122 
123  if(rc != SION_STD_SUCCESS) {
124  return(_sion_errorprint(SION_STD_NOT_SUCCESS,_SION_ERROR_RETURN,"_mpi_gather_process_cb: problems writing data ...\n"));
125  }
126 
127  /* advance counter */
128  bytestorecv-=datasize;spec[0]+=datasize;
129 
130  }
131 
132  }
133 
134  /* remove buffer */
135  if (buffer) free(buffer);
136 
137  } else {
138  if ( (rank>=range_start) && (rank<=range_end) ) {
139  /* its a sender */
140 
141  /* send spec to collector */
142  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender send spec to %d (%lld,%lld)\n",
143  collector,(long long) spec[0], (long long) spec[1]));
144  MPI_Send(spec, spec_len, SION_MPI_INT64, collector, 1534, commp);
145 
146  /* wait for start signal */
147  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender wait for signal from %d\n", collector));
148  MPI_Recv(&startsignal, 1, MPI_INT, collector, 1535, commp, &status);
149  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender got signal from %d\n", collector));
150 
151  /* send data in chunks of fsblksize */
152  bytestosend=spec[1];
153  p=(char *) indata;
154  while(bytestosend>0) {
155  if(bytestosend>fsblksize) datasize=fsblksize;
156  else datasize=bytestosend;
157  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender send data block to %d of size %lld\n", collector, (long long) datasize));
158  MPI_Send(p, datasize, MPI_CHAR, collector, 1536, commp);
159  bytestosend-=datasize;p+=datasize;
160  }
161  } else {
162  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "task take not part in collective operation\n"));
163  }
164  }
165 
166  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "leave collector=%d rc=%d\n", collector, rc ));
167 
168  return rc;
169 }
170 #undef DFUNCTION
171 
172 
173 #define DFUNCTION "_mpi_process_scatter_cb"
174 int _sion_mpi_process_scatter_cb(void *outdata, sion_int64 *spec, int spec_len, sion_int64 fsblksize,
175  void *commdata, int collector, int range_start, int range_end, int sid,
176  int process_cb(void *,sion_int64 *, int ) ) {
177  int rc=SION_STD_SUCCESS;
178  int size, rank, t, startsignal=1, count;
179  MPI_Status status;
180  char *p, *buffer;
181  sion_int64 bytestorecv, bytestosend, datasize;
182  _mpi_api_commdata* sapi= (_mpi_api_commdata *) commdata;
183  MPI_Comm commp = sapi->comm;
184 
185 
186  MPI_Comm_rank(commp, &rank);
187  MPI_Comm_size(commp, &size);
188 
189  DPRINTFP((256, DFUNCTION, rank, " input collector=%d range_start=%d range_end=%d sid=%d\n", collector,range_start,range_end, sid));
190 
191  if(rank == collector) {
192  /* its the collector */
193 
194  /* allocate buffer */
195  buffer = (char *) malloc(fsblksize * sizeof(char));
196  if (buffer == NULL) {
197  return(_sion_errorprint(SION_STD_NOT_SUCCESS,_SION_ERROR_RETURN,"_mpi_gather_process_cb: cannot allocate temporary memory of size %lu (buffer), aborting ...\n",
198  (unsigned long) fsblksize * sizeof(char)));
199  }
200 
201  /* scan all other tasks */
202  for(t=range_start;t<=range_end;t++) {
203 
204  /* receive spec */
205  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector wait for spec from %d\n", t));
206  MPI_Recv(spec, spec_len, SION_MPI_INT64, t, 1534, commp, &status);
207  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector got spec from %d (%lld,%lld)\n",
208  t, (long long) spec[0], (long long) spec[1]));
209 
210  /* send signal to send data */
211  if(spec[0]!=-1) { /* sender waits for data */
212  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector send signal to %d\n", t));
213  MPI_Send(&startsignal, 1, MPI_INT, t, 1535, commp);
214  }
215 
216 
217 
218  /* get and send data */
219  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector start to process data of size %lld at offset %lld\n",
220  (long long) spec[1], (long long) spec[0]));
221 
222  bytestosend=spec[1];
223 
224  /* loop over data parts */
225  while(bytestosend>0) {
226 
227  if(bytestosend>fsblksize) datasize=fsblksize;
228  else datasize=bytestosend;
229 
230  spec[1]=datasize; /* adjust size */
231 
232  /* process data with callback function */
233  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector process data block of size %lld at pos %lld\n",
234  (long long) spec[1], (long long) spec[0]));
235 
236  rc=process_cb(buffer,spec, sid);
237 
238  if(rc != SION_STD_SUCCESS) {
239  return(_sion_errorprint(SION_STD_NOT_SUCCESS,_SION_ERROR_RETURN,"_mpi_gather_process_cb: problems writing data ...\n"));
240  }
241 
242  /* send portion or all data */
243  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector send for data block to %d\n", t));
244  MPI_Send(buffer, datasize, MPI_CHAR, t, 1536, commp);
245  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector sent data block to %d datasize=%lld bytestorecv=%lld\n",
246  t, (long long) datasize, (long long) bytestosend));
247 
248  /* advance counter */
249  bytestosend-=datasize;spec[0]+=datasize;
250 
251  }
252 
253  }
254 
255  /* remove buffer */
256  if (buffer) free(buffer);
257 
258  } else {
259  if ( (rank>=range_start) && (rank<=range_end) ) {
260  /* its a sender */
261 
262  /* send spec to collector */
263  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender send spec to %d (%lld,%lld)\n",
264  collector,(long long) spec[0], (long long) spec[1]));
265  MPI_Send(spec, spec_len, SION_MPI_INT64, collector, 1534, commp);
266 
267  if(spec[0]!=-1) { /* no error in sion_feof */
268 
269  /* wait for start signal */
270  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender wait for signal from %d\n", collector));
271  MPI_Recv(&startsignal, 1, MPI_INT, collector, 1535, commp, &status);
272  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender got signal from %d\n", collector));
273 
274  /* send data in chunks of fsblksize */
275  bytestorecv=spec[1];
276  p=(char *) outdata;
277  while(bytestorecv>0) {
278  if(bytestorecv>fsblksize) datasize=fsblksize;
279  else datasize=bytestorecv;
280  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender recv data block from %d of size %lld\n", collector, (long long) datasize));
281  MPI_Recv(p, datasize, MPI_CHAR, collector, 1536, commp, &status);
282  MPI_Get_count(&status,MPI_CHAR,&count);
283 
284  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender recv data block from %d of size %lld (%d)\n", collector, (long long) datasize, count));
285  bytestorecv-=datasize;p+=datasize;
286 
287  }
288  }
289  } else {
290  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "task take not part in collective operation\n"));
291  }
292 
293  }
294 
295  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "leave collector=%d rc=%d\n", collector, rc ));
296 
297  return rc;
298 }
299 #undef DFUNCTION
300 
301 
302 #define DFUNCTION "_sion_ompi_get_capability_cb"
303 int _sion_mpi_get_capability_cb(void *commdata )
304 {
305  int rc=SION_CAPABILITY_NONE;
306  ONLY_DEBUG(_mpi_api_commdata* sapi= (_mpi_api_commdata *) commdata;)
307 
309  DPRINTFP((256, DFUNCTION, sapi->rank, "FULL capability\n"));
310  return rc;
311 }
312 #undef DFUNCTION
313 
314 /* end of ifdef MPI */
315 #endif
#define SION_CAPABILITY_FULL
Definition: sion_filedesc.h:50
#define SION_CAPABILITY_NONE
Definition: sion_filedesc.h:52
Sion Time Stamp Header.