SIONlib  1.7.7
Scalable I/O library for parallel access to task-local files
sion_mpi_coll_cb_gen.c
Go to the documentation of this file.
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2019 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
14 #define _XOPEN_SOURCE 700
15 
16 #include <stdlib.h>
17 #include <stdio.h>
18 #include <stdarg.h>
19 #include <string.h>
20 #include <time.h>
21 
22 #include "mpi.h"
23 
24 #define USE_PMPIno
25 #ifdef USE_PMPI
26 #define MPI_Comm_rank PMPI_Comm_rank
27 #define MPI_Comm_size PMPI_Comm_size
28 #define MPI_Gather PMPI_Gather
29 #define MPI_Scatter PMPI_Scatter
30 #define MPI_Bcast PMPI_Bcast
31 #define MPI_Barrier PMPI_Barrier
32 #define MPI_Comm_split PMPI_Comm_split
33 #define MPI_Send PMPI_Send
34 #define MPI_Recv PMPI_Recv
35 #endif
36 
37 #include <sys/time.h>
38 
39 #include <sys/types.h>
40 #include <fcntl.h>
41 
42 #include <unistd.h>
43 
44 #include "sion.h"
45 #include "sion_debug.h"
46 #include "sion_error_handler.h"
47 #include "sion_internal.h"
48 #include "sion_fd.h"
49 #include "sion_filedesc.h"
50 #include "sion_printts.h"
51 
52 #include "sion_mpi_cb_gen.h"
53 
54 #ifdef SION_MPI
55 
56 #define DFUNCTION "_mpi_gather_execute_cb"
57 int _sion_mpi_gather_process_cb(const void *indata, sion_int64 *spec, int spec_len, sion_int64 fsblksize,
58  void *commdata, int collector, int range_start, int range_end, int sid,
59  int process_cb(const void *,sion_int64 *, int ) ) {
60  int rc=SION_STD_SUCCESS;
61  int size, rank, t, startsignal=1;
62  MPI_Status status;
63  char *p, *buffer;
64  sion_int64 bytestorecv, bytestosend, datasize;
65  _mpi_api_commdata* sapi= (_mpi_api_commdata *) commdata;
66  MPI_Comm commp = sapi->comm;
67 
68 
69 
70  MPI_Comm_rank(commp, &rank);
71  MPI_Comm_size(commp, &size);
72 
73  DPRINTFP((256, DFUNCTION, rank, " input collector=%d range_start=%d range_end=%d sid=%d\n", collector,range_start,range_end, sid));
74 
75  if(rank == collector) {
76  /* its the collector */
77 
78  /* allocate buffer */
79  buffer = (char *) malloc(fsblksize * sizeof(char));
80  if (buffer == NULL) {
81  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"_mpi_gather_process_cb: cannot allocate temporary memory of size %lu (buffer), aborting ...\n",
82  (unsigned long) fsblksize * sizeof(char)));
83  }
84 
85  /* scan all other tasks */
86  for(t=range_start;t<=range_end;t++) {
87 
88  /* receive spec */
89  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector wait for spec from %d\n", t));
90  MPI_Recv(spec, spec_len, SION_MPI_INT64, t, 1534, commp, &status);
91  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector got spec from %d (%lld,%lld)\n",
92  t, (long long) spec[0], (long long) spec[1]));
93 
94  /* send signal to send data */
95  if(spec[0]!=-1) { /* no error on sender */
96  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector send signal to %d\n", t));
97  MPI_Send(&startsignal, 1, MPI_INT, t, 1535, commp);
98  }
99 
100  /* get and write data */
101  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector start to process data of size %lld at offset %lld\n",
102  (long long) spec[1], (long long) spec[0]));
103 
104  bytestorecv=spec[1];
105 
106  /* loop over data parts */
107  while(bytestorecv>0) {
108  if(bytestorecv>fsblksize) datasize=fsblksize;
109  else datasize=bytestorecv;
110 
111  /* receive portion or all data */
112  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector wait for data block from %d\n", t));
113  MPI_Recv(buffer, datasize, MPI_CHAR, t, 1536, commp, &status);
114  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector got data block from %d datasize=%lld bytestorecv=%lld\n",
115  t, (long long) datasize, (long long) bytestorecv));
116 
117  spec[1]=datasize; /* adjust size */
118 
119  /* process data with callback function */
120  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector process data block of size %lld at pos %lld\n",
121  (long long) spec[1], (long long) spec[0]));
122 
123  rc=process_cb(buffer,spec, sid);
124 
125  if(rc != SION_STD_SUCCESS) {
126  return(_sion_errorprint(SION_STD_NOT_SUCCESS,_SION_ERROR_RETURN,"_mpi_gather_process_cb: problems writing data ...\n"));
127  }
128 
129  /* advance counter */
130  bytestorecv-=datasize;spec[0]+=datasize;
131 
132  }
133 
134  }
135 
136  /* remove buffer */
137  if (buffer) free(buffer);
138 
139  } else {
140  if ( (rank>=range_start) && (rank<=range_end) ) {
141  /* its a sender */
142 
143  /* send spec to collector */
144  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender send spec to %d (%lld,%lld)\n",
145  collector,(long long) spec[0], (long long) spec[1]));
146  MPI_Send(spec, spec_len, SION_MPI_INT64, collector, 1534, commp);
147 
148  /* wait for start signal */
149  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender wait for signal from %d\n", collector));
150  MPI_Recv(&startsignal, 1, MPI_INT, collector, 1535, commp, &status);
151  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender got signal from %d\n", collector));
152 
153  /* send data in chunks of fsblksize */
154  bytestosend=spec[1];
155  p=(char *) indata;
156  while(bytestosend>0) {
157  if(bytestosend>fsblksize) datasize=fsblksize;
158  else datasize=bytestosend;
159  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender send data block to %d of size %lld\n", collector, (long long) datasize));
160  MPI_Send(p, datasize, MPI_CHAR, collector, 1536, commp);
161  bytestosend-=datasize;p+=datasize;
162  }
163  } else {
164  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "task take not part in collective operation\n"));
165  }
166  }
167 
168  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "leave collector=%d rc=%d\n", collector, rc ));
169 
170  return rc;
171 }
172 #undef DFUNCTION
173 
174 
175 #define DFUNCTION "_mpi_process_scatter_cb"
176 int _sion_mpi_process_scatter_cb(void *outdata, sion_int64 *spec, int spec_len, sion_int64 fsblksize,
177  void *commdata, int collector, int range_start, int range_end, int sid,
178  int process_cb(void *,sion_int64 *, int ) ) {
179  int rc=SION_STD_SUCCESS;
180  int size, rank, t, startsignal=1, count;
181  MPI_Status status;
182  char *p, *buffer;
183  sion_int64 bytestorecv, bytestosend, datasize;
184  _mpi_api_commdata* sapi= (_mpi_api_commdata *) commdata;
185  MPI_Comm commp = sapi->comm;
186 
187 
188  MPI_Comm_rank(commp, &rank);
189  MPI_Comm_size(commp, &size);
190 
191  DPRINTFP((256, DFUNCTION, rank, " input collector=%d range_start=%d range_end=%d sid=%d\n", collector,range_start,range_end, sid));
192 
193  if(rank == collector) {
194  /* its the collector */
195 
196  /* allocate buffer */
197  buffer = (char *) malloc(fsblksize * sizeof(char));
198  if (buffer == NULL) {
199  return(_sion_errorprint(SION_STD_NOT_SUCCESS,_SION_ERROR_RETURN,"_mpi_gather_process_cb: cannot allocate temporary memory of size %lu (buffer), aborting ...\n",
200  (unsigned long) fsblksize * sizeof(char)));
201  }
202 
203  /* scan all other tasks */
204  for(t=range_start;t<=range_end;t++) {
205 
206  /* receive spec */
207  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector wait for spec from %d\n", t));
208  MPI_Recv(spec, spec_len, SION_MPI_INT64, t, 1534, commp, &status);
209  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector got spec from %d (%lld,%lld)\n",
210  t, (long long) spec[0], (long long) spec[1]));
211 
212  /* send signal to send data */
213  if(spec[0]!=-1) { /* sender waits for data */
214  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector send signal to %d\n", t));
215  MPI_Send(&startsignal, 1, MPI_INT, t, 1535, commp);
216  }
217 
218 
219 
220  /* get and send data */
221  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector start to process data of size %lld at offset %lld\n",
222  (long long) spec[1], (long long) spec[0]));
223 
224  bytestosend=spec[1];
225 
226  /* loop over data parts */
227  while(bytestosend>0) {
228 
229  if(bytestosend>fsblksize) datasize=fsblksize;
230  else datasize=bytestosend;
231 
232  spec[1]=datasize; /* adjust size */
233 
234  /* process data with callback function */
235  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector process data block of size %lld at pos %lld\n",
236  (long long) spec[1], (long long) spec[0]));
237 
238  rc=process_cb(buffer,spec, sid);
239 
240  if(rc != SION_STD_SUCCESS) {
241  return(_sion_errorprint(SION_STD_NOT_SUCCESS,_SION_ERROR_RETURN,"_mpi_gather_process_cb: problems writing data ...\n"));
242  }
243 
244  /* send portion or all data */
245  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector send for data block to %d\n", t));
246  MPI_Send(buffer, datasize, MPI_CHAR, t, 1536, commp);
247  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector sent data block to %d datasize=%lld bytestorecv=%lld\n",
248  t, (long long) datasize, (long long) bytestosend));
249 
250  /* advance counter */
251  bytestosend-=datasize;spec[0]+=datasize;
252 
253  }
254 
255  }
256 
257  /* remove buffer */
258  if (buffer) free(buffer);
259 
260  } else {
261  if ( (rank>=range_start) && (rank<=range_end) ) {
262  /* its a sender */
263 
264  /* send spec to collector */
265  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender send spec to %d (%lld,%lld)\n",
266  collector,(long long) spec[0], (long long) spec[1]));
267  MPI_Send(spec, spec_len, SION_MPI_INT64, collector, 1534, commp);
268 
269  if(spec[0]!=-1) { /* no error in sion_feof */
270 
271  /* wait for start signal */
272  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender wait for signal from %d\n", collector));
273  MPI_Recv(&startsignal, 1, MPI_INT, collector, 1535, commp, &status);
274  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender got signal from %d\n", collector));
275 
276  /* send data in chunks of fsblksize */
277  bytestorecv=spec[1];
278  p=(char *) outdata;
279  while(bytestorecv>0) {
280  if(bytestorecv>fsblksize) datasize=fsblksize;
281  else datasize=bytestorecv;
282  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender recv data block from %d of size %lld\n", collector, (long long) datasize));
283  MPI_Recv(p, datasize, MPI_CHAR, collector, 1536, commp, &status);
284  MPI_Get_count(&status,MPI_CHAR,&count);
285 
286  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender recv data block from %d of size %lld (%d)\n", collector, (long long) datasize, count));
287  bytestorecv-=datasize;p+=datasize;
288 
289  }
290  }
291  } else {
292  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "task take not part in collective operation\n"));
293  }
294 
295  }
296 
297  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "leave collector=%d rc=%d\n", collector, rc ));
298 
299  return rc;
300 }
301 #undef DFUNCTION
302 
303 
304 #define DFUNCTION "_sion_ompi_get_capability_cb"
305 int _sion_mpi_get_capability_cb(void *commdata )
306 {
307  int rc=SION_CAPABILITY_NONE;
308  ONLY_DEBUG(_mpi_api_commdata* sapi= (_mpi_api_commdata *) commdata;)
309 
311  DPRINTFP((256, DFUNCTION, sapi->rank, "FULL capability\n"));
312  return rc;
313 }
314 #undef DFUNCTION
315 
316 /* end of ifdef MPI */
317 #endif
#define SION_CAPABILITY_NONE
Definition: sion_filedesc.h:56
#define SION_CAPABILITY_FULL
Definition: sion_filedesc.h:54
Sion Time Stamp Header.