SIONlib  1.6.2
Scalable I/O library for parallel access to task-local files
sion_mpi_cb_gen.c
Go to the documentation of this file.
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2016 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
14 #include <stdlib.h>
15 #include <stdio.h>
16 #include <stdarg.h>
17 #include <string.h>
18 #include <time.h>
19 
20 #include "mpi.h"
21 
22 #define USE_PMPIno
23 #ifdef USE_PMPI
24 #define MPI_Comm_rank PMPI_Comm_rank
25 #define MPI_Comm_size PMPI_Comm_size
26 #define MPI_Gather PMPI_Gather
27 #define MPI_Scatter PMPI_Scatter
28 #define MPI_Bcast PMPI_Bcast
29 #define MPI_Barrier PMPI_Barrier
30 #define MPI_Comm_split PMPI_Comm_split
31 #define MPI_Send PMPI_Send
32 #define MPI_Recv PMPI_Recv
33 #endif
34 
35 #include <sys/time.h>
36 
37 #include <sys/types.h>
38 #include <fcntl.h>
39 
40 #include <unistd.h>
41 
42 #include "sion.h"
43 #include "sion_debug.h"
44 #include "sion_internal.h"
45 #include "sion_fd.h"
46 #include "sion_filedesc.h"
47 #include "sion_printts.h"
48 
49 #include "sion_mpi_cb_gen.h"
50 
51 #ifdef SION_MPI
52 
53 int _sion_register_callbacks_mpi() {
54  int aid=-1;
55  aid=sion_generic_create_api("SIONlib_MPI_API");
56 
57 
58  sion_generic_register_create_local_commgroup_cb(aid,&_sion_mpi_create_lcg_cb);
59  sion_generic_register_free_local_commgroup_cb(aid,&_sion_mpi_free_lcg_cb);
60 
61  sion_generic_register_barrier_cb(aid,&_sion_mpi_barrier_cb);
62  sion_generic_register_bcastr_cb(aid,&_sion_mpi_bcastr_cb);
63  sion_generic_register_gatherr_cb(aid,&_sion_mpi_gatherr_cb);
64  sion_generic_register_scatterr_cb(aid,&_sion_mpi_scatterr_cb);
65  sion_generic_register_gathervr_cb(aid,&_sion_mpi_gathervr_cb);
66  sion_generic_register_scattervr_cb(aid,&_sion_mpi_scattervr_cb);
67  sion_generic_register_gather_and_execute_cb(aid,&_sion_mpi_gather_process_cb);
68  sion_generic_register_execute_and_scatter_cb(aid,&_sion_mpi_process_scatter_cb);
69  sion_generic_register_get_capability_cb(aid,&_sion_mpi_get_capability_cb);
70 
71  return(aid);
72 }
73 
74 int _sion_mpi_create_lcg_cb(void **local_commgroup, void *global_commgroup,
75  int grank, int gsize,
76  int lrank, int lsize,
77  int filenumber, int numfiles
78  )
79 {
80  int rc=0;
81  _mpi_api_commdata* sapi_global = (_mpi_api_commdata *) global_commgroup;
82  _mpi_api_commdata* commgroup=NULL;
83 
84  DPRINTFP((256, "_mpi_create_lcg_cb", grank, " split now comm: grank=%d gsize=%d filenumber=%d, numfiles=%d, lrank=%d lsize=%d \n",
85  grank, gsize, filenumber, numfiles, lrank, lsize));
86 
87  if(global_commgroup==NULL) {
88  fprintf(stderr,"_mpi_create_lcg_cb: error no global commgroup given, aborting ...\n");
89  return(-1);
90  }
91  if(*local_commgroup!=NULL) {
92  fprintf(stderr,"_mpi_create_lcg_cb: error local commgroup already initialized, aborting ...\n");
93  return(-1);
94  }
95 
96  /* is local commgroup already set by calling program? */
97  if(sapi_global->lcommgroup!=NULL) {
98  /* use this communicator */
99  *local_commgroup=sapi_global->lcommgroup;
100  } else {
101 
102  /* create new communicator */
103  commgroup = (_mpi_api_commdata *) malloc(sizeof(_mpi_api_commdata));
104  if (commgroup == NULL) {
105  fprintf(stderr,"_mpi_create_lcg_cb: cannot allocate memory for local commgroup of size %lu, aborting ...\n",
106  (unsigned long) sizeof(_mpi_api_commdata));
107  return(-1);
108  }
109 
110  rc = MPI_Comm_split(sapi_global->comm, filenumber, lrank, &commgroup->comm);
111  DPRINTFP((256, "_mpi_create_lcg_cb", grank, " rc=%d from MPI_Comm_split\n",rc));
112  commgroup->local=1; commgroup->commset=1; commgroup->lcommgroup=NULL;
113  commgroup->commcreated=1;
114  commgroup->rank=lrank;
115  commgroup->size=lsize;
116 
117  *local_commgroup=commgroup;
118  sapi_global->lcommgroup=commgroup; /* needed for collective calls */
119 
120  }
121  return rc;
122 }
123 
124 int _sion_mpi_free_lcg_cb(void *local_commgroup) {
125  int rc;
126  _mpi_api_commdata* commgroup = (_mpi_api_commdata *) local_commgroup;
127 
128  if ( (commgroup->commset) && (commgroup->commcreated) ) {
129  DPRINTFP((256, "_mpi_free_lcg_cb", commgroup->rank, " free now comm\n"));
130  rc=MPI_Comm_free(&commgroup->comm);
131  DPRINTFP((256, "_mpi_free_lcg_cb", commgroup->rank, " free now comm rc=%d\n",rc));
132  }
133  free(commgroup);
134  return rc;
135 }
136 
137 int _sion_mpi_barrier_cb(void *commdata)
138 {
139  int rc;
140  _mpi_api_commdata* sapi= (_mpi_api_commdata *) commdata;
141  MPI_Comm commp = sapi->comm;
142  rc = MPI_Barrier(commp);
143  return rc;
144 }
145 
146 int _sion_mpi_bcastr_cb(void *data, void *commdata, int dtype, int nelem, int root)
147 {
148  int rc;
149  _mpi_api_commdata* sapi= (_mpi_api_commdata *) commdata;
150  MPI_Comm commp = sapi->comm;
151  switch (dtype) {
152  case _SION_INT32:
153  rc = MPI_Bcast((sion_int32 *) data, nelem, SION_MPI_INT32, root, commp);
154  break;
155  case _SION_INT64:
156  rc = MPI_Bcast((sion_int64 *) data, nelem, SION_MPI_INT64, root, commp);
157  break;
158  case _SION_CHAR:
159  rc = MPI_Bcast((char *) data, nelem, MPI_CHAR, root, commp);
160  break;
161  default:
162  rc = MPI_Bcast((sion_int64 *) data, nelem, SION_MPI_INT64, root, commp);
163  break;
164  }
165  return rc;
166 }
167 
168 int _sion_mpi_gatherr_cb(void *indata, void *outdata, void *commdata, int dtype, int nelem, int root)
169 {
170  int rc;
171  int size, rank;
172  _mpi_api_commdata* sapi= (_mpi_api_commdata *) commdata;
173  MPI_Comm commp = sapi->comm;
174 
175  MPI_Comm_rank(commp, &rank);
176  MPI_Comm_size(commp, &size);
177 
178  DPRINTFP((256, "_mpi_gatherr_cb", rank, " gatherr on %d of %d nelem=%d root=%d\n", rank, size, nelem, root));
179 
180  /* Dummy selects the type of gather */
181  switch (dtype) {
182  case _SION_INT32:
183  rc = MPI_Gather((sion_int32 *) indata, nelem, SION_MPI_INT32, (sion_int32 *) outdata, nelem, SION_MPI_INT32, root, commp);
184  break;
185  case _SION_INT64:
186  rc = MPI_Gather((sion_int64 *) indata, nelem, SION_MPI_INT64, (sion_int64 *) outdata, nelem, SION_MPI_INT64, root, commp);
187  break;
188  case _SION_CHAR:
189  rc = MPI_Gather((char *) indata, nelem, MPI_CHAR, (char *) outdata, nelem, MPI_CHAR, root, commp);
190  break;
191  default:
192  rc = MPI_Gather((sion_int64 *) indata, nelem, SION_MPI_INT64, (sion_int64 *) outdata, nelem, SION_MPI_INT64, root, commp);
193  break;
194  }
195 
196 
197  return rc;
198 }
199 
200 int _sion_mpi_scatterr_cb(void *indata, void *outdata, void *commdata, int dtype, int nelem, int root)
201 {
202  int rc, t;
203  _mpi_api_commdata* sapi= (_mpi_api_commdata *) commdata;
204  MPI_Comm commp = sapi->comm;
205  int rank=sapi->rank;
206  int size=sapi->size;
207 
208  DPRINTFP((256, "_mpi_scatterr_cb", rank, " starting nelem=%d root=%d\n", nelem, root));
209 
210  if(rank==root) {
211  for(t=0;t<nelem*size;t++) {
212  DPRINTFP((256, "_mpi_scatterr_cb", rank, " before scatter: %d -> %d\n", t,(int) ((int *) indata)[t]));
213  }
214  }
215 
216  switch (dtype) {
217  case _SION_INT32:
218  rc = MPI_Scatter((sion_int32 *) indata, nelem, SION_MPI_INT32, (sion_int32 *) outdata, nelem, SION_MPI_INT32, root, commp);
219  break;
220  case _SION_INT64:
221  rc = MPI_Scatter((sion_int64 *) indata, nelem, SION_MPI_INT64, (sion_int64 *) outdata, nelem, SION_MPI_INT64, root, commp);
222  break;
223  case _SION_CHAR:
224  rc = MPI_Scatter((char *) indata, nelem, MPI_CHAR, (char *) outdata, nelem, MPI_CHAR, root, commp);
225  break;
226  default:
227  rc = MPI_Scatter((sion_int64 *) indata, nelem, SION_MPI_INT64, (sion_int64 *) outdata, nelem, SION_MPI_INT64, root, commp);
228  break;
229  }
230 
231  return rc;
232 }
233 
234 int _sion_mpi_gathervr_cb(void *indata, void *outdata, void *commdata, int dtype, int *counts, int nelem, int root)
235 {
236  int rc;
237  int size, rank, t, offset;
238  int *displs=NULL;
239  _mpi_api_commdata* sapi= (_mpi_api_commdata *) commdata;
240  MPI_Comm commp = sapi->comm;
241 
242 
243  MPI_Comm_rank(commp, &rank);
244  MPI_Comm_size(commp, &size);
245 
246  DPRINTFP((256, "_mpi_gathervr_cb", rank, " input nelem=%d root=%d indata=%x, outdata=%x\n", nelem, root, indata, outdata));
247 
248  /* compute displs and counts */
249  if(rank==root) {
250  displs = (int *) malloc(size * sizeof(int));
251  if (displs == NULL) {
252  fprintf(stderr,"_mpi_gathervr_cb: cannot allocate temporary memory of size %lu (displs), aborting ...\n",(unsigned long) size * sizeof(int));
253  return(-1);
254  }
255  offset=0;
256  for(t=0;t<size;t++) {
257  displs[t]=offset;
258  offset+=counts[t];
259  DPRINTFP((256, "_mpi_gathervr_cb", rank, " after MPI_Gather %2d -> dpls=%2d count=%d\n", t,displs[t],counts[t]));
260  }
261  }
262 
263  /* Dummy selects the type of gather */
264  switch (dtype) {
265  case _SION_INT32:
266  rc = MPI_Gatherv((sion_int32 *) indata, nelem, SION_MPI_INT32, (sion_int32 *) outdata, counts, displs, SION_MPI_INT32, root, commp);
267  break;
268  case _SION_INT64:
269  rc = MPI_Gatherv((sion_int64 *) indata, nelem, SION_MPI_INT64, (sion_int64 *) outdata, counts, displs, SION_MPI_INT64, root, commp);
270  break;
271  case _SION_CHAR:
272  rc = MPI_Gatherv((char *) indata, nelem, MPI_CHAR, (sion_int32 *) outdata, counts, displs, MPI_CHAR, root, commp);
273  break;
274  default:
275  rc = MPI_Gatherv((sion_int64 *) indata, nelem, SION_MPI_INT64, (sion_int64 *) outdata, counts, displs, SION_MPI_INT64, root, commp);
276  break;
277  }
278 
279  if(rank==root) {
280  if(displs) free(displs);
281  }
282  return rc;
283 }
284 
285 int _sion_mpi_scattervr_cb(void *indata, void *outdata, void *commdata, int dtype, int *counts, int nelem, int root)
286 {
287  int rc;
288  int size, rank, t, offset;
289  int *displs=NULL;
290  _mpi_api_commdata* sapi= (_mpi_api_commdata *) commdata;
291  MPI_Comm commp = sapi->comm;
292 
293 
294  MPI_Comm_rank(commp, &rank);
295  MPI_Comm_size(commp, &size);
296 
297  DPRINTFP((256, "_mpi_scattervr_cb", rank, " input nelem=%d root=%d\n", nelem, root));
298 
299  /* compute offset */
300  if(rank==root) {
301  displs = (int *) malloc(size * sizeof(int));
302  if (displs == NULL) {
303  fprintf(stderr,"_mpi_scattervr_cb: cannot allocate temporary memory of size %lu (displs), aborting ...\n",(unsigned long) size * sizeof(int));
304  return(-1);
305  }
306  offset=0;
307  for(t=0;t<size;t++) {
308  displs[t]=offset;
309  offset+=counts[t];
310  DPRINTFP((256, "_mpi_scattervr_cb", rank, " after MPI_Gather %2d -> dpls=%2d sendcounts=%d\n", t,displs[t],counts[t]));
311  }
312  }
313 
314  /* Dummy selects the type of gather */
315  switch (dtype) {
316  case _SION_INT32:
317  rc = MPI_Scatterv((sion_int32 *) outdata, counts, displs, SION_MPI_INT32, (sion_int32 *) indata, nelem, SION_MPI_INT32, root, commp);
318  break;
319  case _SION_INT64:
320  rc = MPI_Scatterv((sion_int64 *) outdata, counts, displs, SION_MPI_INT64, (sion_int64 *) indata, nelem, SION_MPI_INT64, root, commp);
321  break;
322  case _SION_CHAR:
323  rc = MPI_Scatterv((char *) outdata, counts, displs, MPI_CHAR, (sion_int32 *) indata, nelem, MPI_CHAR, root, commp);
324  break;
325  default:
326  rc = MPI_Scatterv((sion_int64 *) outdata, counts, displs, SION_MPI_INT64, (sion_int64 *) indata, nelem, SION_MPI_INT64, root, commp);
327  break;
328  }
329 
330  if(rank==root) {
331  if(displs) free(displs);
332  }
333  return rc;
334 }
335 
336 /* end of ifdef MPI */
337 #endif
Sion Time Stamp Header.