SIONlib  1.6.2
Scalable I/O library for parallel access to task-local files
sion_mpi_coll_cb_gen.c
Go to the documentation of this file.
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2016 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
14 #include <stdlib.h>
15 #include <stdio.h>
16 #include <stdarg.h>
17 #include <string.h>
18 #include <time.h>
19 
20 #include "mpi.h"
21 
22 #define USE_PMPIno
23 #ifdef USE_PMPI
24 #define MPI_Comm_rank PMPI_Comm_rank
25 #define MPI_Comm_size PMPI_Comm_size
26 #define MPI_Gather PMPI_Gather
27 #define MPI_Scatter PMPI_Scatter
28 #define MPI_Bcast PMPI_Bcast
29 #define MPI_Barrier PMPI_Barrier
30 #define MPI_Comm_split PMPI_Comm_split
31 #define MPI_Send PMPI_Send
32 #define MPI_Recv PMPI_Recv
33 #endif
34 
35 #include <sys/time.h>
36 
37 #include <sys/types.h>
38 #include <fcntl.h>
39 
40 #include <unistd.h>
41 
42 #include "sion.h"
43 #include "sion_debug.h"
44 #include "sion_internal.h"
45 #include "sion_fd.h"
46 #include "sion_filedesc.h"
47 #include "sion_printts.h"
48 
49 #include "sion_mpi_cb_gen.h"
50 
51 #ifdef SION_MPI
52 
53 #define DFUNCTION "_mpi_gather_execute_cb"
54 int _sion_mpi_gather_process_cb(const void *indata, sion_int64 *spec, int spec_len, sion_int64 fsblksize,
55  void *commdata, int collector, int range_start, int range_end, int sid,
56  int process_cb(const void *,sion_int64 *, int ) ) {
57  int rc=SION_STD_SUCCESS;
58  int size, rank, t, startsignal=1;
59  MPI_Status status;
60  char *p, *buffer;
61  sion_int64 bytestorecv, bytestosend, datasize;
62  _mpi_api_commdata* sapi= (_mpi_api_commdata *) commdata;
63  MPI_Comm commp = sapi->comm;
64 
65 
66 
67  MPI_Comm_rank(commp, &rank);
68  MPI_Comm_size(commp, &size);
69 
70  DPRINTFP((256, DFUNCTION, rank, " input collector=%d range_start=%d range_end=%d sid=%d\n", collector,range_start,range_end, sid));
71 
72  if(rank == collector) {
73  /* its the collector */
74 
75  /* allocate buffer */
76  buffer = (char *) malloc(fsblksize * sizeof(char));
77  if (buffer == NULL) {
78  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"_mpi_gather_process_cb: cannot allocate temporary memory of size %lu (buffer), aborting ...\n",
79  (unsigned long) fsblksize * sizeof(char)));
80  }
81 
82  /* scan all other tasks */
83  for(t=range_start;t<=range_end;t++) {
84 
85  /* receive spec */
86  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector wait for spec from %d\n", t));
87  MPI_Recv(spec, spec_len, SION_MPI_INT64, t, 1534, commp, &status);
88  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector got spec from %d (%lld,%lld)\n",
89  t, (long long) spec[0], (long long) spec[1]));
90 
91  /* send signal to send data */
92  if(spec[0]!=-1) { /* no error on sender */
93  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector send signal to %d\n", t));
94  MPI_Send(&startsignal, 1, MPI_INT, t, 1535, commp);
95  }
96 
97  /* get and write data */
98  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector start to proces data of size %lld at offset %lld\n",
99  (long long) spec[1], (long long) spec[0]));
100 
101  bytestorecv=spec[1];
102 
103  /* loop over data parts */
104  while(bytestorecv>0) {
105  if(bytestorecv>fsblksize) datasize=fsblksize;
106  else datasize=bytestorecv;
107 
108  /* receive portion or all data */
109  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector wait for data block from %d\n", t));
110  MPI_Recv(buffer, datasize, MPI_CHAR, t, 1536, commp, &status);
111  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector got data block from %d datasize=%lld bytestorecv=%lld\n",
112  t, (long long) datasize, (long long) bytestorecv));
113 
114  spec[1]=datasize; /* adjust size */
115 
116  /* process data with callback function */
117  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector process data block of size %lld at pos %lld\n",
118  (long long) spec[1], (long long) spec[0]));
119 
120  rc=process_cb(buffer,spec, sid);
121 
122  if(rc != SION_STD_SUCCESS) {
123  return(_sion_errorprint(SION_STD_NOT_SUCCESS,_SION_ERROR_RETURN,"_mpi_gather_process_cb: problems writing data ...\n"));
124  }
125 
126  /* advance counter */
127  bytestorecv-=datasize;spec[0]+=datasize;
128 
129  }
130 
131  }
132 
133  /* remove buffer */
134  if (buffer) free(buffer);
135 
136  } else {
137  /* its a sender */
138 
139  /* send spec to collector */
140  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender send spec to %d (%lld,%lld)\n",
141  collector,(long long) spec[0], (long long) spec[1]));
142  MPI_Send(spec, spec_len, SION_MPI_INT64, collector, 1534, commp);
143 
144  /* wait for start signal */
145  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender wait for signal from %d\n", collector));
146  MPI_Recv(&startsignal, 1, MPI_INT, collector, 1535, commp, &status);
147  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender got signal from %d\n", collector));
148 
149  /* send data in chunks of fsblksize */
150  bytestosend=spec[1];
151  p=(char *) indata;
152  while(bytestosend>0) {
153  if(bytestosend>fsblksize) datasize=fsblksize;
154  else datasize=bytestosend;
155  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender send data block to %d of size %lld\n", collector, (long long) datasize));
156  MPI_Send(p, datasize, MPI_CHAR, collector, 1536, commp);
157  bytestosend-=datasize;p+=datasize;
158  }
159 
160  }
161 
162  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "leave collector=%d rc=%d\n", collector, rc ));
163 
164  return rc;
165 }
166 #undef DFUNCTION
167 
168 
169 #define DFUNCTION "_mpi_process_scatter_cb"
170 int _sion_mpi_process_scatter_cb(void *outdata, sion_int64 *spec, int spec_len, sion_int64 fsblksize,
171  void *commdata, int collector, int range_start, int range_end, int sid,
172  int process_cb(void *,sion_int64 *, int ) ) {
173  int rc=SION_STD_SUCCESS;
174  int size, rank, t, startsignal=1, count;
175  MPI_Status status;
176  char *p, *buffer;
177  sion_int64 bytestorecv, bytestosend, datasize;
178  _mpi_api_commdata* sapi= (_mpi_api_commdata *) commdata;
179  MPI_Comm commp = sapi->comm;
180 
181 
182  MPI_Comm_rank(commp, &rank);
183  MPI_Comm_size(commp, &size);
184 
185  DPRINTFP((256, DFUNCTION, rank, " input collector=%d range_start=%d range_end=%d sid=%d\n", collector,range_start,range_end, sid));
186 
187  if(rank == collector) {
188  /* its the collector */
189 
190  /* allocate buffer */
191  buffer = (char *) malloc(fsblksize * sizeof(char));
192  if (buffer == NULL) {
193  return(_sion_errorprint(SION_STD_NOT_SUCCESS,_SION_ERROR_RETURN,"_mpi_gather_process_cb: cannot allocate temporary memory of size %lu (buffer), aborting ...\n",
194  (unsigned long) fsblksize * sizeof(char)));
195  }
196 
197  /* scan all other tasks */
198  for(t=range_start;t<=range_end;t++) {
199 
200  /* receive spec */
201  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector wait for spec from %d\n", t));
202  MPI_Recv(spec, spec_len, SION_MPI_INT64, t, 1534, commp, &status);
203  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector got spec from %d (%lld,%lld)\n",
204  t, (long long) spec[0], (long long) spec[1]));
205 
206  /* send signal to send data */
207  if(spec[0]!=-1) { /* sender waits for data */
208  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector send signal to %d\n", t));
209  MPI_Send(&startsignal, 1, MPI_INT, t, 1535, commp);
210  }
211 
212 
213 
214  /* get and send data */
215  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector start to proces data of size %lld at offset %lld\n",
216  (long long) spec[1], (long long) spec[0]));
217 
218  bytestosend=spec[1];
219 
220  /* loop over data parts */
221  while(bytestosend>0) {
222 
223  if(bytestosend>fsblksize) datasize=fsblksize;
224  else datasize=bytestosend;
225 
226  spec[1]=datasize; /* adjust size */
227 
228  /* process data with callback function */
229  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector process data block of size %lld at pos %lld\n",
230  (long long) spec[1], (long long) spec[0]));
231 
232  rc=process_cb(buffer,spec, sid);
233 
234  if(rc != SION_STD_SUCCESS) {
235  return(_sion_errorprint(SION_STD_NOT_SUCCESS,_SION_ERROR_RETURN,"_mpi_gather_process_cb: problems writing data ...\n"));
236  }
237 
238  /* send portion or all data */
239  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector send for data block to %d\n", t));
240  MPI_Send(buffer, datasize, MPI_CHAR, t, 1536, commp);
241  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector sent data block to %d datasize=%lld bytestorecv=%lld\n",
242  t, (long long) datasize, (long long) bytestosend));
243 
244  /* advance counter */
245  bytestosend-=datasize;spec[0]+=datasize;
246 
247  }
248 
249  }
250 
251  /* remove buffer */
252  if (buffer) free(buffer);
253 
254  } else {
255  /* its a sender */
256 
257  /* send spec to collector */
258  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender send spec to %d (%lld,%lld)\n",
259  collector,(long long) spec[0], (long long) spec[1]));
260  MPI_Send(spec, spec_len, SION_MPI_INT64, collector, 1534, commp);
261 
262  if(spec[0]!=-1) { /* no error in sion_feof */
263 
264  /* wait for start signal */
265  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender wait for signal from %d\n", collector));
266  MPI_Recv(&startsignal, 1, MPI_INT, collector, 1535, commp, &status);
267  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender got signal from %d\n", collector));
268 
269  /* send data in chunks of fsblksize */
270  bytestorecv=spec[1];
271  p=(char *) outdata;
272  while(bytestorecv>0) {
273  if(bytestorecv>fsblksize) datasize=fsblksize;
274  else datasize=bytestorecv;
275  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender recv data block from %d of size %lld\n", collector, (long long) datasize));
276  MPI_Recv(p, datasize, MPI_CHAR, collector, 1536, commp, &status);
277  MPI_Get_count(&status,MPI_CHAR,&count);
278 
279  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender recv data block from %d of size %lld (%d)\n", collector, (long long) datasize, count));
280  bytestorecv-=datasize;p+=datasize;
281 
282  }
283  }
284  }
285 
286  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "leave collector=%d rc=%d\n", collector, rc ));
287 
288  return rc;
289 }
290 #undef DFUNCTION
291 
292 
293 #define DFUNCTION "_sion_ompi_get_capability_cb"
294 int _sion_mpi_get_capability_cb(void *commdata )
295 {
296  int rc=SION_CAPABILITY_NONE;
297  ONLY_DEBUG(_mpi_api_commdata* sapi= (_mpi_api_commdata *) commdata;)
298 
300  DPRINTFP((256, DFUNCTION, sapi->rank, "FULL capability\n"));
301  return rc;
302 }
303 #undef DFUNCTION
304 
305 /* end of ifdef MPI */
306 #endif
#define SION_CAPABILITY_FULL
Definition: sion_filedesc.h:50
#define SION_CAPABILITY_NONE
Definition: sion_filedesc.h:52
int _sion_errorprint(int rc, int level, const char *format,...)
Internal SION error.
Sion Time Stamp Header.