SIONlib  1.7.0
Scalable I/O library for parallel access to task-local files
sion_mpi_cb_gen.c
Go to the documentation of this file.
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2016 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
14 #include <stdlib.h>
15 #include <stdio.h>
16 #include <stdarg.h>
17 #include <string.h>
18 #include <time.h>
19 
20 #include "mpi.h"
21 
22 #define USE_PMPIno
23 #ifdef USE_PMPI
24 #define MPI_Comm_rank PMPI_Comm_rank
25 #define MPI_Comm_size PMPI_Comm_size
26 #define MPI_Gather PMPI_Gather
27 #define MPI_Scatter PMPI_Scatter
28 #define MPI_Bcast PMPI_Bcast
29 #define MPI_Barrier PMPI_Barrier
30 #define MPI_Comm_split PMPI_Comm_split
31 #define MPI_Send PMPI_Send
32 #define MPI_Recv PMPI_Recv
33 #endif
34 
35 #include <sys/time.h>
36 
37 #include <sys/types.h>
38 #include <fcntl.h>
39 
40 #include <unistd.h>
41 
42 #include "sion.h"
43 #include "sion_debug.h"
44 #include "sion_error_handler.h"
45 #include "sion_internal.h"
46 #include "sion_fd.h"
47 #include "sion_filedesc.h"
48 #include "sion_printts.h"
49 
50 #include "sion_mpi_cb_gen.h"
51 
52 #ifdef SION_MPI
53 
54 int _sion_register_callbacks_mpi() {
55  int aid=0;
56  aid=sion_generic_create_api("SIONlib_MPI_API");
57 
58 
59  sion_generic_register_create_local_commgroup_cb(aid,&_sion_mpi_create_lcg_cb);
60  sion_generic_register_free_local_commgroup_cb(aid,&_sion_mpi_free_lcg_cb);
61 
62  sion_generic_register_barrier_cb(aid,&_sion_mpi_barrier_cb);
63  sion_generic_register_bcastr_cb(aid,&_sion_mpi_bcastr_cb);
64  sion_generic_register_gatherr_cb(aid,&_sion_mpi_gatherr_cb);
65  sion_generic_register_scatterr_cb(aid,&_sion_mpi_scatterr_cb);
66  sion_generic_register_gathervr_cb(aid,&_sion_mpi_gathervr_cb);
67  sion_generic_register_scattervr_cb(aid,&_sion_mpi_scattervr_cb);
68  sion_generic_register_gather_and_execute_cb(aid,&_sion_mpi_gather_process_cb);
69  sion_generic_register_execute_and_scatter_cb(aid,&_sion_mpi_process_scatter_cb);
70  sion_generic_register_get_capability_cb(aid,&_sion_mpi_get_capability_cb);
71 
72  return(aid);
73 }
74 
75 int _sion_mpi_create_lcg_cb(void **local_commgroup, void *global_commgroup,
76  int grank, int gsize,
77  int lrank, int lsize,
78  int filenumber, int numfiles
79  )
80 {
81  int rc=0;
82  _mpi_api_commdata* sapi_global = (_mpi_api_commdata *) global_commgroup;
83  _mpi_api_commdata* commgroup=NULL;
84  int create_lcomm=1, set_in_global=1, mrank=0, msize=1, color;
85 
86  DPRINTFP((256, "_mpi_create_lcg_cb", grank, " split now comm: grank=%d gsize=%d filenumber=%d, numfiles=%d, lrank=%d lsize=%d \n",
87  grank, gsize, filenumber, numfiles, lrank, lsize));
88 
89  if(global_commgroup==NULL) {
90  fprintf(stderr,"_mpi_create_lcg_cb: error no global commgroup given, aborting ...\n");
91  return(-1);
92  }
93  if(*local_commgroup!=NULL) {
94  fprintf(stderr,"_mpi_create_lcg_cb: error local commgroup already initialized, aborting ...\n");
95  return(-1);
96  }
97 
98  /* is local commgroup already set by calling program? */
99  if(sapi_global->lcommgroup!=NULL) {
100  /* use this communicator */
101  if(sapi_global->lcommgroup->commset==0) {
102  *local_commgroup=sapi_global->lcommgroup;
103  create_lcomm=0;set_in_global=0; /* all is done */
104  sapi_global->lcommgroup->commset=1;
105  } else {
106  create_lcomm=1;set_in_global=0; /* another communicator will be created */
107  }
108  }
109 
110  if(create_lcomm) {
111 
112  /* create new communicator */
113  commgroup = (_mpi_api_commdata *) malloc(sizeof(_mpi_api_commdata));
114  if (commgroup == NULL) {
115  fprintf(stderr,"_mpi_create_lcg_cb: cannot allocate memory for local commgroup of size %lu, aborting ...\n",
116  (unsigned long) sizeof(_mpi_api_commdata));
117  return(-1);
118  }
119  color=filenumber;
120  if(filenumber==-1) color=MPI_UNDEFINED;
121 
122  rc = MPI_Comm_split(sapi_global->comm, color, lrank, &commgroup->comm);
123  DPRINTFP((256, "_mpi_create_lcg_cb", grank, " rc=%d from MPI_Comm_split(comm,%d,%d,&newcomm)\n",rc,color,lrank));
124  commgroup->local=1; commgroup->commset=1; commgroup->lcommgroup=NULL;
125  commgroup->commcreated=1;
126  commgroup->rank=lrank;
127  commgroup->size=lsize;
128 
129  }
130 
131  if(set_in_global) {
132  sapi_global->lcommgroup=commgroup; /* needed for collective calls */
133  }
134 
135  *local_commgroup=commgroup;
136 
137  if((filenumber!=-1) && commgroup) {
138  MPI_Comm_rank(commgroup->comm, &mrank);
139  MPI_Comm_size(commgroup->comm, &msize);
140  }
141 
142  DPRINTFP((256, "_mpi_create_lcg_cb", grank, " leave rc=%d rank %d of %d\n",rc,mrank,msize));
143 
144  return rc;
145 }
146 
147 int _sion_mpi_free_lcg_cb(void *local_commgroup) {
148  int rc = 0;
149  _mpi_api_commdata* commgroup = (_mpi_api_commdata *) local_commgroup;
150 
151  if ( (commgroup->commset) && (commgroup->commcreated) ) {
152  DPRINTFP((256, "_mpi_free_lcg_cb", commgroup->rank, " free now comm\n"));
153  rc=MPI_Comm_free(&commgroup->comm);
154  DPRINTFP((256, "_mpi_free_lcg_cb", commgroup->rank, " free now comm rc=%d\n",rc));
155  }
156  free(commgroup);
157  return rc;
158 }
159 
160 int _sion_mpi_barrier_cb(void *commdata)
161 {
162  int rc;
163  _mpi_api_commdata* sapi= (_mpi_api_commdata *) commdata;
164  MPI_Comm commp = sapi->comm;
165  rc = MPI_Barrier(commp);
166  return rc;
167 }
168 
169 int _sion_mpi_bcastr_cb(void *data, void *commdata, int dtype, int nelem, int root)
170 {
171  int rc;
172  _mpi_api_commdata* sapi= (_mpi_api_commdata *) commdata;
173  MPI_Comm commp = sapi->comm;
174  switch (dtype) {
175  case _SION_INT32:
176  rc = MPI_Bcast((sion_int32 *) data, nelem, SION_MPI_INT32, root, commp);
177  break;
178  case _SION_INT64:
179  rc = MPI_Bcast((sion_int64 *) data, nelem, SION_MPI_INT64, root, commp);
180  break;
181  case _SION_CHAR:
182  rc = MPI_Bcast((char *) data, nelem, MPI_CHAR, root, commp);
183  break;
184  default:
185  rc = MPI_Bcast((sion_int64 *) data, nelem, SION_MPI_INT64, root, commp);
186  break;
187  }
188  return rc;
189 }
190 
191 int _sion_mpi_gatherr_cb(void *indata, void *outdata, void *commdata, int dtype, int nelem, int root)
192 {
193  int rc;
194  int size, rank;
195  _mpi_api_commdata* sapi= (_mpi_api_commdata *) commdata;
196  MPI_Comm commp = sapi->comm;
197 
198  MPI_Comm_rank(commp, &rank);
199  MPI_Comm_size(commp, &size);
200 
201  DPRINTFP((256, "_mpi_gatherr_cb", rank, " gatherr on %d of %d nelem=%d root=%d\n", rank, size, nelem, root));
202 
203  /* Dummy selects the type of gather */
204  switch (dtype) {
205  case _SION_INT32:
206  rc = MPI_Gather((sion_int32 *) indata, nelem, SION_MPI_INT32, (sion_int32 *) outdata, nelem, SION_MPI_INT32, root, commp);
207  break;
208  case _SION_INT64:
209  rc = MPI_Gather((sion_int64 *) indata, nelem, SION_MPI_INT64, (sion_int64 *) outdata, nelem, SION_MPI_INT64, root, commp);
210  break;
211  case _SION_CHAR:
212  rc = MPI_Gather((char *) indata, nelem, MPI_CHAR, (char *) outdata, nelem, MPI_CHAR, root, commp);
213  break;
214  default:
215  rc = MPI_Gather((sion_int64 *) indata, nelem, SION_MPI_INT64, (sion_int64 *) outdata, nelem, SION_MPI_INT64, root, commp);
216  break;
217  }
218 
219 
220  return rc;
221 }
222 
223 int _sion_mpi_scatterr_cb(void *indata, void *outdata, void *commdata, int dtype, int nelem, int root)
224 {
225  int rc;
226  _mpi_api_commdata* sapi= (_mpi_api_commdata *) commdata;
227  MPI_Comm commp = sapi->comm;
228  ONLY_DEBUG(int rank=sapi->rank);
229 
230  DPRINTFP((256, "_mpi_scatterr_cb", rank, " starting nelem=%d root=%d\n", nelem, root));
231 
232  switch (dtype) {
233  case _SION_INT32:
234  rc = MPI_Scatter((sion_int32 *) indata, nelem, SION_MPI_INT32, (sion_int32 *) outdata, nelem, SION_MPI_INT32, root, commp);
235  break;
236  case _SION_INT64:
237  rc = MPI_Scatter((sion_int64 *) indata, nelem, SION_MPI_INT64, (sion_int64 *) outdata, nelem, SION_MPI_INT64, root, commp);
238  break;
239  case _SION_CHAR:
240  rc = MPI_Scatter((char *) indata, nelem, MPI_CHAR, (char *) outdata, nelem, MPI_CHAR, root, commp);
241  break;
242  default:
243  rc = MPI_Scatter((sion_int64 *) indata, nelem, SION_MPI_INT64, (sion_int64 *) outdata, nelem, SION_MPI_INT64, root, commp);
244  break;
245  }
246 
247  return rc;
248 }
249 
250 int _sion_mpi_gathervr_cb(void *indata, void *outdata, void *commdata, int dtype, int *counts, int nelem, int root)
251 {
252  int rc;
253  int size, rank, t, offset;
254  int *displs=NULL;
255  _mpi_api_commdata* sapi= (_mpi_api_commdata *) commdata;
256  MPI_Comm commp = sapi->comm;
257 
258 
259  MPI_Comm_rank(commp, &rank);
260  MPI_Comm_size(commp, &size);
261 
262  DPRINTFP((256, "_mpi_gathervr_cb", rank, " input nelem=%d root=%d indata=%x, outdata=%x\n", nelem, root, indata, outdata));
263 
264  /* compute displs and counts */
265  if(rank==root) {
266  displs = (int *) malloc(size * sizeof(int));
267  if (displs == NULL) {
268  fprintf(stderr,"_mpi_gathervr_cb: cannot allocate temporary memory of size %zu (displs), aborting ...\n",(size_t) size * sizeof(int));
269  return(-1);
270  }
271  offset=0;
272  for(t=0;t<size;t++) {
273  displs[t]=offset;
274  offset+=counts[t];
275  /* DPRINTFP((256, "_mpi_gathervr_cb", rank, " after MPI_Gather %2d -> dpls=%2d count=%d\n", t,displs[t],counts[t])); */
276  }
277  }
278 
279  /* Dummy selects the type of gather */
280  switch (dtype) {
281  case _SION_INT32:
282  rc = MPI_Gatherv((sion_int32 *) indata, nelem, SION_MPI_INT32, (sion_int32 *) outdata, counts, displs, SION_MPI_INT32, root, commp);
283  break;
284  case _SION_INT64:
285  rc = MPI_Gatherv((sion_int64 *) indata, nelem, SION_MPI_INT64, (sion_int64 *) outdata, counts, displs, SION_MPI_INT64, root, commp);
286  break;
287  case _SION_CHAR:
288  rc = MPI_Gatherv((char *) indata, nelem, MPI_CHAR, (sion_int32 *) outdata, counts, displs, MPI_CHAR, root, commp);
289  break;
290  default:
291  rc = MPI_Gatherv((sion_int64 *) indata, nelem, SION_MPI_INT64, (sion_int64 *) outdata, counts, displs, SION_MPI_INT64, root, commp);
292  break;
293  }
294 
295  if(rank==root) {
296  if(displs) free(displs);
297  }
298  return rc;
299 }
300 
301 int _sion_mpi_scattervr_cb(void *indata, void *outdata, void *commdata, int dtype, int *counts, int nelem, int root)
302 {
303  int rc;
304  int size, rank, t, offset;
305  int *displs=NULL;
306  _mpi_api_commdata* sapi= (_mpi_api_commdata *) commdata;
307  MPI_Comm commp = sapi->comm;
308 
309 
310  MPI_Comm_rank(commp, &rank);
311  MPI_Comm_size(commp, &size);
312 
313  DPRINTFP((256, "_mpi_scattervr_cb", rank, " input nelem=%d root=%d\n", nelem, root));
314 
315  /* compute offset */
316  if(rank==root) {
317  displs = (int *) malloc(size * sizeof(int));
318  if (displs == NULL) {
319  fprintf(stderr,"_mpi_scattervr_cb: cannot allocate temporary memory of size %zu (displs), aborting ...\n",(size_t) size * sizeof(int));
320  return(-1);
321  }
322  offset=0;
323  for(t=0;t<size;t++) {
324  displs[t]=offset;
325  offset+=counts[t];
326  DPRINTFP((256, "_mpi_scattervr_cb", rank, " after MPI_Gather %2d -> dpls=%2d sendcounts=%d\n", t,displs[t],counts[t]));
327  }
328  }
329 
330  /* Dummy selects the type of gather */
331  switch (dtype) {
332  case _SION_INT32:
333  rc = MPI_Scatterv((sion_int32 *) outdata, counts, displs, SION_MPI_INT32, (sion_int32 *) indata, nelem, SION_MPI_INT32, root, commp);
334  break;
335  case _SION_INT64:
336  rc = MPI_Scatterv((sion_int64 *) outdata, counts, displs, SION_MPI_INT64, (sion_int64 *) indata, nelem, SION_MPI_INT64, root, commp);
337  break;
338  case _SION_CHAR:
339  rc = MPI_Scatterv((char *) outdata, counts, displs, MPI_CHAR, (sion_int32 *) indata, nelem, MPI_CHAR, root, commp);
340  break;
341  default:
342  rc = MPI_Scatterv((sion_int64 *) outdata, counts, displs, SION_MPI_INT64, (sion_int64 *) indata, nelem, SION_MPI_INT64, root, commp);
343  break;
344  }
345 
346  if(rank==root) {
347  if(displs) free(displs);
348  }
349  return rc;
350 }
351 
352 /* end of ifdef MPI */
353 #endif
Sion Time Stamp Header.