SIONlib  1.7.7
Scalable I/O library for parallel access to task-local files
sion_mpi_cb_gen.c
Go to the documentation of this file.
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2019 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
14 #define _XOPEN_SOURCE 700
15 
16 #include <stdlib.h>
17 #include <stdio.h>
18 #include <stdarg.h>
19 #include <string.h>
20 #include <time.h>
21 
22 #include "mpi.h"
23 
24 #define USE_PMPIno
25 #ifdef USE_PMPI
26 #define MPI_Comm_rank PMPI_Comm_rank
27 #define MPI_Comm_size PMPI_Comm_size
28 #define MPI_Gather PMPI_Gather
29 #define MPI_Scatter PMPI_Scatter
30 #define MPI_Bcast PMPI_Bcast
31 #define MPI_Barrier PMPI_Barrier
32 #define MPI_Comm_split PMPI_Comm_split
33 #define MPI_Send PMPI_Send
34 #define MPI_Recv PMPI_Recv
35 #endif
36 
37 #include <sys/time.h>
38 
39 #include <sys/types.h>
40 #include <fcntl.h>
41 
42 #include <unistd.h>
43 
44 #include "sion.h"
45 #include "sion_debug.h"
46 #include "sion_error_handler.h"
47 #include "sion_internal.h"
48 #include "sion_fd.h"
49 #include "sion_filedesc.h"
50 #include "sion_printts.h"
51 
52 #include "sion_mpi_cb_gen.h"
53 
54 #ifdef SION_MPI
55 
56 int _sion_register_callbacks_mpi(void) {
57  int aid=0;
58  aid=sion_generic_create_api("SIONlib_MPI_API");
59 
60 
61  sion_generic_register_create_local_commgroup_cb(aid,&_sion_mpi_create_lcg_cb);
62  sion_generic_register_free_local_commgroup_cb(aid,&_sion_mpi_free_lcg_cb);
63 
64  sion_generic_register_barrier_cb(aid,&_sion_mpi_barrier_cb);
65  sion_generic_register_bcastr_cb(aid,&_sion_mpi_bcastr_cb);
66  sion_generic_register_gatherr_cb(aid,&_sion_mpi_gatherr_cb);
67  sion_generic_register_scatterr_cb(aid,&_sion_mpi_scatterr_cb);
68  sion_generic_register_gathervr_cb(aid,&_sion_mpi_gathervr_cb);
69  sion_generic_register_scattervr_cb(aid,&_sion_mpi_scattervr_cb);
70  sion_generic_register_gather_and_execute_cb(aid,&_sion_mpi_gather_process_cb);
71  sion_generic_register_execute_and_scatter_cb(aid,&_sion_mpi_process_scatter_cb);
72  sion_generic_register_get_capability_cb(aid,&_sion_mpi_get_capability_cb);
73 
74  return(aid);
75 }
76 
77 int _sion_mpi_create_lcg_cb(void **local_commgroup, void *global_commgroup,
78  int grank, int gsize,
79  int lrank, int lsize,
80  int filenumber, int numfiles
81  )
82 {
83  int rc=0;
84  _mpi_api_commdata* sapi_global = (_mpi_api_commdata *) global_commgroup;
85  _mpi_api_commdata* commgroup=NULL;
86  int create_lcomm=1, set_in_global=1, mrank=0, msize=1, color;
87 
88  DPRINTFP((256, "_mpi_create_lcg_cb", grank, " split now comm: grank=%d gsize=%d filenumber=%d, numfiles=%d, lrank=%d lsize=%d \n",
89  grank, gsize, filenumber, numfiles, lrank, lsize));
90 
91  if(global_commgroup==NULL) {
92  fprintf(stderr,"_mpi_create_lcg_cb: error no global commgroup given, aborting ...\n");
93  return(-1);
94  }
95  if(*local_commgroup!=NULL) {
96  fprintf(stderr,"_mpi_create_lcg_cb: error local commgroup already initialized, aborting ...\n");
97  return(-1);
98  }
99 
100  /* is local commgroup already set by calling program? */
101  if(sapi_global->lcommgroup!=NULL) {
102  /* use this communicator */
103  if(sapi_global->lcommgroup->commset==0) {
104  *local_commgroup=sapi_global->lcommgroup;
105  create_lcomm=0;set_in_global=0; /* all is done */
106  sapi_global->lcommgroup->commset=1;
107  } else {
108  create_lcomm=1;set_in_global=0; /* another communicator will be created */
109  }
110  }
111 
112  if(create_lcomm) {
113 
114  /* create new communicator */
115  commgroup = (_mpi_api_commdata *) malloc(sizeof(_mpi_api_commdata));
116  if (commgroup == NULL) {
117  fprintf(stderr,"_mpi_create_lcg_cb: cannot allocate memory for local commgroup of size %lu, aborting ...\n",
118  (unsigned long) sizeof(_mpi_api_commdata));
119  return(-1);
120  }
121  color=filenumber;
122  if(filenumber==-1) color=MPI_UNDEFINED;
123 
124  rc = MPI_Comm_split(sapi_global->comm, color, lrank, &commgroup->comm);
125  DPRINTFP((256, "_mpi_create_lcg_cb", grank, " rc=%d from MPI_Comm_split(comm,%d,%d,&newcomm)\n",rc,color,lrank));
126  commgroup->local=1; commgroup->commset=1; commgroup->lcommgroup=NULL;
127  commgroup->commcreated=1;
128  commgroup->rank=lrank;
129  commgroup->size=lsize;
130 
131  }
132 
133  if(set_in_global) {
134  sapi_global->lcommgroup=commgroup; /* needed for collective calls */
135  }
136 
137  *local_commgroup=commgroup;
138 
139  if((filenumber!=-1) && commgroup) {
140  MPI_Comm_rank(commgroup->comm, &mrank);
141  MPI_Comm_size(commgroup->comm, &msize);
142  }
143 
144  DPRINTFP((256, "_mpi_create_lcg_cb", grank, " leave rc=%d rank %d of %d\n",rc,mrank,msize));
145 
146  return rc;
147 }
148 
149 int _sion_mpi_free_lcg_cb(void *local_commgroup) {
150  int rc = 0;
151  _mpi_api_commdata* commgroup = (_mpi_api_commdata *) local_commgroup;
152 
153  if ( (commgroup->commset) && (commgroup->commcreated) ) {
154  DPRINTFP((256, "_mpi_free_lcg_cb", commgroup->rank, " free now comm\n"));
155  rc=MPI_Comm_free(&commgroup->comm);
156  DPRINTFP((256, "_mpi_free_lcg_cb", commgroup->rank, " free now comm rc=%d\n",rc));
157  }
158  free(commgroup);
159  return rc;
160 }
161 
162 int _sion_mpi_barrier_cb(void *commdata)
163 {
164  int rc;
165  _mpi_api_commdata* sapi= (_mpi_api_commdata *) commdata;
166  MPI_Comm commp = sapi->comm;
167  rc = MPI_Barrier(commp);
168  return rc;
169 }
170 
171 int _sion_mpi_bcastr_cb(void *data, void *commdata, int dtype, int nelem, int root)
172 {
173  int rc;
174  _mpi_api_commdata* sapi= (_mpi_api_commdata *) commdata;
175  MPI_Comm commp = sapi->comm;
176  switch (dtype) {
177  case _SION_INT32:
178  rc = MPI_Bcast((sion_int32 *) data, nelem, SION_MPI_INT32, root, commp);
179  break;
180  case _SION_INT64:
181  rc = MPI_Bcast((sion_int64 *) data, nelem, SION_MPI_INT64, root, commp);
182  break;
183  case _SION_CHAR:
184  rc = MPI_Bcast((char *) data, nelem, MPI_CHAR, root, commp);
185  break;
186  default:
187  rc = MPI_Bcast((sion_int64 *) data, nelem, SION_MPI_INT64, root, commp);
188  break;
189  }
190  return rc;
191 }
192 
193 int _sion_mpi_gatherr_cb(void *indata, void *outdata, void *commdata, int dtype, int nelem, int root)
194 {
195  int rc;
196  int size, rank;
197  _mpi_api_commdata* sapi= (_mpi_api_commdata *) commdata;
198  MPI_Comm commp = sapi->comm;
199 
200  MPI_Comm_rank(commp, &rank);
201  MPI_Comm_size(commp, &size);
202 
203  DPRINTFP((256, "_mpi_gatherr_cb", rank, " gatherr on %d of %d nelem=%d root=%d\n", rank, size, nelem, root));
204 
205  /* Dummy selects the type of gather */
206  switch (dtype) {
207  case _SION_INT32:
208  rc = MPI_Gather((sion_int32 *) indata, nelem, SION_MPI_INT32, (sion_int32 *) outdata, nelem, SION_MPI_INT32, root, commp);
209  break;
210  case _SION_INT64:
211  rc = MPI_Gather((sion_int64 *) indata, nelem, SION_MPI_INT64, (sion_int64 *) outdata, nelem, SION_MPI_INT64, root, commp);
212  break;
213  case _SION_CHAR:
214  rc = MPI_Gather((char *) indata, nelem, MPI_CHAR, (char *) outdata, nelem, MPI_CHAR, root, commp);
215  break;
216  default:
217  rc = MPI_Gather((sion_int64 *) indata, nelem, SION_MPI_INT64, (sion_int64 *) outdata, nelem, SION_MPI_INT64, root, commp);
218  break;
219  }
220 
221 
222  return rc;
223 }
224 
225 int _sion_mpi_scatterr_cb(void *indata, void *outdata, void *commdata, int dtype, int nelem, int root)
226 {
227  int rc;
228  _mpi_api_commdata* sapi= (_mpi_api_commdata *) commdata;
229  MPI_Comm commp = sapi->comm;
230  ONLY_DEBUG(int rank=sapi->rank);
231 
232  DPRINTFP((256, "_mpi_scatterr_cb", rank, " starting nelem=%d root=%d\n", nelem, root));
233 
234  switch (dtype) {
235  case _SION_INT32:
236  rc = MPI_Scatter((sion_int32 *) indata, nelem, SION_MPI_INT32, (sion_int32 *) outdata, nelem, SION_MPI_INT32, root, commp);
237  break;
238  case _SION_INT64:
239  rc = MPI_Scatter((sion_int64 *) indata, nelem, SION_MPI_INT64, (sion_int64 *) outdata, nelem, SION_MPI_INT64, root, commp);
240  break;
241  case _SION_CHAR:
242  rc = MPI_Scatter((char *) indata, nelem, MPI_CHAR, (char *) outdata, nelem, MPI_CHAR, root, commp);
243  break;
244  default:
245  rc = MPI_Scatter((sion_int64 *) indata, nelem, SION_MPI_INT64, (sion_int64 *) outdata, nelem, SION_MPI_INT64, root, commp);
246  break;
247  }
248 
249  return rc;
250 }
251 
252 int _sion_mpi_gathervr_cb(void *indata, void *outdata, void *commdata, int dtype, int *counts, int nelem, int root)
253 {
254  int rc;
255  int size, rank, t, offset;
256  int *displs=NULL;
257  _mpi_api_commdata* sapi= (_mpi_api_commdata *) commdata;
258  MPI_Comm commp = sapi->comm;
259 
260 
261  MPI_Comm_rank(commp, &rank);
262  MPI_Comm_size(commp, &size);
263 
264  DPRINTFP((256, "_mpi_gathervr_cb", rank, " input nelem=%d root=%d indata=%x, outdata=%x\n", nelem, root, indata, outdata));
265 
266  /* compute displs and counts */
267  if(rank==root) {
268  displs = (int *) malloc(size * sizeof(int));
269  if (displs == NULL) {
270  fprintf(stderr,"_mpi_gathervr_cb: cannot allocate temporary memory of size %zu (displs), aborting ...\n",(size_t) size * sizeof(int));
271  return(-1);
272  }
273  offset=0;
274  for(t=0;t<size;t++) {
275  displs[t]=offset;
276  offset+=counts[t];
277  /* DPRINTFP((256, "_mpi_gathervr_cb", rank, " after MPI_Gather %2d -> dpls=%2d count=%d\n", t,displs[t],counts[t])); */
278  }
279  }
280 
281  /* Dummy selects the type of gather */
282  switch (dtype) {
283  case _SION_INT32:
284  rc = MPI_Gatherv((sion_int32 *) indata, nelem, SION_MPI_INT32, (sion_int32 *) outdata, counts, displs, SION_MPI_INT32, root, commp);
285  break;
286  case _SION_INT64:
287  rc = MPI_Gatherv((sion_int64 *) indata, nelem, SION_MPI_INT64, (sion_int64 *) outdata, counts, displs, SION_MPI_INT64, root, commp);
288  break;
289  case _SION_CHAR:
290  rc = MPI_Gatherv((char *) indata, nelem, MPI_CHAR, (sion_int32 *) outdata, counts, displs, MPI_CHAR, root, commp);
291  break;
292  default:
293  rc = MPI_Gatherv((sion_int64 *) indata, nelem, SION_MPI_INT64, (sion_int64 *) outdata, counts, displs, SION_MPI_INT64, root, commp);
294  break;
295  }
296 
297  if(rank==root) {
298  if(displs) free(displs);
299  }
300  return rc;
301 }
302 
303 int _sion_mpi_scattervr_cb(void *indata, void *outdata, void *commdata, int dtype, int *counts, int nelem, int root)
304 {
305  int rc;
306  int size, rank, t, offset;
307  int *displs=NULL;
308  _mpi_api_commdata* sapi= (_mpi_api_commdata *) commdata;
309  MPI_Comm commp = sapi->comm;
310 
311 
312  MPI_Comm_rank(commp, &rank);
313  MPI_Comm_size(commp, &size);
314 
315  DPRINTFP((256, "_mpi_scattervr_cb", rank, " input nelem=%d root=%d\n", nelem, root));
316 
317  /* compute offset */
318  if(rank==root) {
319  displs = (int *) malloc(size * sizeof(int));
320  if (displs == NULL) {
321  fprintf(stderr,"_mpi_scattervr_cb: cannot allocate temporary memory of size %zu (displs), aborting ...\n",(size_t) size * sizeof(int));
322  return(-1);
323  }
324  offset=0;
325  for(t=0;t<size;t++) {
326  displs[t]=offset;
327  offset+=counts[t];
328  DPRINTFP((256, "_mpi_scattervr_cb", rank, " after MPI_Gather %2d -> dpls=%2d sendcounts=%d\n", t,displs[t],counts[t]));
329  }
330  }
331 
332  /* Dummy selects the type of gather */
333  switch (dtype) {
334  case _SION_INT32:
335  rc = MPI_Scatterv((sion_int32 *) outdata, counts, displs, SION_MPI_INT32, (sion_int32 *) indata, nelem, SION_MPI_INT32, root, commp);
336  break;
337  case _SION_INT64:
338  rc = MPI_Scatterv((sion_int64 *) outdata, counts, displs, SION_MPI_INT64, (sion_int64 *) indata, nelem, SION_MPI_INT64, root, commp);
339  break;
340  case _SION_CHAR:
341  rc = MPI_Scatterv((char *) outdata, counts, displs, MPI_CHAR, (sion_int32 *) indata, nelem, MPI_CHAR, root, commp);
342  break;
343  default:
344  rc = MPI_Scatterv((sion_int64 *) outdata, counts, displs, SION_MPI_INT64, (sion_int64 *) indata, nelem, SION_MPI_INT64, root, commp);
345  break;
346  }
347 
348  if(rank==root) {
349  if(displs) free(displs);
350  }
351  return rc;
352 }
353 
354 /* end of ifdef MPI */
355 #endif
Sion Time Stamp Header.