SIONlib  1.6.2
Scalable I/O library for parallel access to task-local files
sion_ompi_cb_gen.c
Go to the documentation of this file.
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2016 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
17 #include <stdlib.h>
18 #include <stdio.h>
19 #include <stdarg.h>
20 #include <string.h>
21 #include <time.h>
22 
23 #include "mpi.h"
24 
25 #define USE_PMPIno
26 #ifdef USE_PMPI
27 #define MPI_Comm_rank PMPI_Comm_rank
28 #define MPI_Comm_size PMPI_Comm_size
29 #define MPI_Gather PMPI_Gather
30 #define MPI_Scatter PMPI_Scatter
31 #define MPI_Bcast PMPI_Bcast
32 #define MPI_Barrier PMPI_Barrier
33 #define MPI_Comm_split PMPI_Comm_split
34 #define MPI_Send PMPI_Send
35 #define MPI_Recv PMPI_Recv
36 #endif
37 
38 #include <sys/time.h>
39 
40 #include <sys/types.h>
41 #include <fcntl.h>
42 
43 #include <unistd.h>
44 
45 #include "sion.h"
46 #include "sion_debug.h"
47 #include "sion_internal.h"
48 #include "sion_fd.h"
49 #include "sion_filedesc.h"
50 #include "sion_printts.h"
51 
52 #include "sion_ompi_cb_gen.h"
53 #include "sion_ompi_internal_gen.h"
54 
55 #ifdef SION_OMPI
56 
57 #include "omp.h"
58 
59 
60 static void *__ompi_global_pointer;
61 static int _sion_opmi_grc=SION_SUCCESS;
62 
63 int _sion_ompi_size_of_dtype(int dtype);
64 void * __sion_ompi_share_ptr(void * in_ptr);
65 
66 
67 int _sion_register_callbacks_ompi() {
68  int aid=-1;
69  aid=sion_generic_create_api("SIONlib_OMPI_API");
70 
71 
72  sion_generic_register_create_local_commgroup_cb(aid,&_sion_ompi_create_lcg_cb);
73  sion_generic_register_free_local_commgroup_cb(aid,&_sion_ompi_free_lcg_cb);
74 
75  sion_generic_register_barrier_cb(aid,&_sion_ompi_barrier_cb);
76  sion_generic_register_bcastr_cb(aid,&_sion_ompi_bcastr_cb);
77  sion_generic_register_gatherr_cb(aid,&_sion_ompi_gatherr_cb);
78  sion_generic_register_scatterr_cb(aid,&_sion_ompi_scatterr_cb);
79  sion_generic_register_gathervr_cb(aid,&_sion_ompi_gathervr_cb);
80  sion_generic_register_scattervr_cb(aid,&_sion_ompi_scattervr_cb);
81  sion_generic_register_gather_and_execute_cb(aid,&_sion_ompi_gather_process_cb);
82  sion_generic_register_execute_and_scatter_cb(aid,&_sion_ompi_process_scatter_cb);
83  sion_generic_register_get_capability_cb(aid,&_sion_ompi_get_capability_cb);
84 
85  return(aid);
86 }
87 
88 #define DFUNCTION "_sion_ompi_create_lcg_cb"
89 int _sion_ompi_create_lcg_cb(void **local_commgroup, void *global_commgroup,
90  int grank, int gsize,
91  int lrank, int lsize,
92  int filenumber, int numfiles
93  )
94 {
95  int rc=SION_STD_SUCCESS;
96  _ompi_api_commdata* sapi_global = (_ompi_api_commdata *) global_commgroup;
97  _ompi_api_commdata* commgroup=NULL;
98 
99  DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, " split now comm on master: grank=%d gsize=%d filenumber=%d, numfiles=%d, lrank=%d lsize=%d \n",
100  grank, gsize, filenumber, numfiles, lrank, lsize));
101 #pragma omp master
102  {
103  _sion_opmi_grc=SION_STD_SUCCESS;
104  DPRINTFP((256, "_mpi_create_lcg_cb", _SION_DEFAULT_RANK, " I'm on master\n",rc));
105  }
106 
107  if(global_commgroup==NULL) {
108  fprintf(stderr,"_mpi_create_lcg_cb: error no global commgroup given, aborting ...\n");
109  return(SION_STD_NOT_SUCCESS);
110  }
111  if(*local_commgroup!=NULL) {
112  fprintf(stderr,"_mpi_create_lcg_cb: error local commgroup already initialized, aborting ...\n");
113  return(SION_STD_NOT_SUCCESS);
114  }
115 
116  /* is local commgroup already set by calling program? */
117  if(sapi_global->lcommgroup!=NULL) {
118  /* use this communicator */
119  commgroup=*local_commgroup=sapi_global->lcommgroup;
120  DPRINTFP((256, "_mpi_create_lcg_cb", _SION_DEFAULT_RANK, " local comm group is already set\n"));
121  } else {
122 
123  /* create new communicator */
124  commgroup = (_ompi_api_commdata *) malloc(sizeof(_ompi_api_commdata));
125  if (commgroup == NULL) {
126  fprintf(stderr,"_ompi_create_lcg_cb: cannot allocate memory for local commgroup of size %lu, aborting ...\n",
127  (unsigned long) sizeof(_ompi_api_commdata));
128  _sion_opmi_grc=SION_STD_NOT_SUCCESS;
129  }
130 
131  commgroup->commset=0; commgroup->lcommgroup=NULL;
132  commgroup->commcreated=0;
133  commgroup->rank=lrank;
134  commgroup->size=lsize;
135  commgroup->num_threads=sapi_global->num_threads;
136  commgroup->thread_num=sapi_global->thread_num;
137 
138  *local_commgroup=commgroup;
139  sapi_global->lcommgroup=commgroup; /* needed for collective calls */
140  }
141 
142 #pragma omp master
143  {
144  if(commgroup->commset==0) {
145  /* */
146  DPRINTFP((256, "_mpi_create_lcg_cb", _SION_DEFAULT_RANK, "MPI_Comm_split(%x,%d,%d,%x)\n",sapi_global->comm,filenumber,lrank,&commgroup->comm));
147  _sion_opmi_grc = MPI_Comm_split(sapi_global->comm, filenumber, lrank, &commgroup->comm);
148  DPRINTFP((256, "_mpi_create_lcg_cb", _SION_DEFAULT_RANK, " rc=%d from MPI_Comm_split\n",rc));
149  commgroup->local=1; commgroup->commset=1; commgroup->commcreated=1;
150  }
151  } /* omp master */
152 
153  {
154 #pragma omp barrier
155  }
156  rc=_sion_opmi_grc;
157  return rc;
158 }
159 #undef DFUNCTION
160 
161 #define DFUNCTION "_sion_ompi_free_lcg_cb"
162 int _sion_ompi_free_lcg_cb(void *local_commgroup) {
163  int rc=SION_STD_SUCCESS;
164  _ompi_api_commdata* commgroup = (_ompi_api_commdata *) local_commgroup;
165 #pragma omp master
166  {
167  _sion_opmi_grc=SION_STD_SUCCESS;
168 
169  if ( (commgroup->commset) && (commgroup->commcreated) ) {
170  DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, " free now comm\n"));
171  _sion_opmi_grc=MPI_Comm_free(&commgroup->comm);
172  DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, " free now comm rc=%d\n",_sion_opmi_grc));
173  }
174  free(commgroup);
175  }
176 
177  {
178 #pragma omp barrier
179  }
180  rc=_sion_opmi_grc;
181  return rc;
182 }
183 #undef DFUNCTION
184 
185 
186 #define DFUNCTION "_sion_ompi_barrier_cb"
187 int _sion_ompi_barrier_cb(void *commdata)
188 {
189  int rc=SION_STD_SUCCESS;
190  _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
191  MPI_Comm commgroup;
192  {
193 #pragma omp barrier
194  }
195 #pragma omp master
196  {
197  commgroup = sapi->comm;
198  DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, " performing MPI barrier now\n"));
199  _sion_opmi_grc = MPI_Barrier(commgroup);
200  }
201 
202  {
203 #pragma omp barrier
204  }
205  rc=_sion_opmi_grc;
206  {
207 #pragma omp barrier
208  }
209  return rc;
210 }
211 #undef DFUNCTION
212 
213 
214 #define DFUNCTION "_sion_ompi_bcastr_cb"
215 int _sion_ompi_bcastr_cb(void *data, void *commdata, int dtype, int nelem, int root)
216 {
217  int rc=SION_STD_SUCCESS;
218  _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
219  MPI_Comm commgroup;
220  void *help;
221 
222  /* first distribute data to master threads (over MPI) */
223 #pragma omp master
224  {
225  commgroup = sapi->comm;
226  switch (dtype) {
227  case _SION_INT32:
228  _sion_opmi_grc = MPI_Bcast((sion_int32 *) data, nelem, SION_MPI_INT32, root, commgroup);
229  break;
230  case _SION_INT64:
231  _sion_opmi_grc = MPI_Bcast((sion_int64 *) data, nelem, SION_MPI_INT64, root, commgroup);
232  break;
233  case _SION_CHAR:
234  _sion_opmi_grc = MPI_Bcast((char *) data, nelem, MPI_CHAR, root, commgroup);
235  break;
236  default:
237  _sion_opmi_grc = MPI_Bcast((sion_int64 *) data, nelem, SION_MPI_INT64, root, commgroup);
238  break;
239  }
240 
241  }
242 
243 
244  /* share data ptr among threads */
245  help=__sion_ompi_share_ptr((void *) data);
246 
247  /* copy data to local val on non-master threads */
248  if((omp_get_thread_num()!=root) && (help != NULL)) {
249  memcpy(data,help,nelem*_sion_ompi_size_of_dtype(dtype));
250  }
251 
252  {
253 #pragma omp barrier
254  }
255  rc=_sion_opmi_grc;
256  {
257 #pragma omp barrier
258  }
259 
260 return rc;
261 }
262 #undef DFUNCTION
263 
264 
265 /* master receives data in outdata, number of OpenMP threads is equal on all tasks */
266 #define DFUNCTION "_sion_ompi_gatherr_cb"
267 int _sion_ompi_gatherr_cb(void *indata, void *outdata, void *commdata, int dtype, int nelem, int root)
268 {
269  int rc=SION_STD_SUCCESS;
270  int mroot;
271  _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
272  MPI_Comm commgroup;
273  void *helpdata, *help;
274  ONLY_DEBUG(int rank=sapi->rank;)
275  ONLY_DEBUG(int size=sapi->size;)
276 
277  mroot=_sion_map_rank_ompi_to_mpi(root,sapi->num_threads);
278 
279  DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, " starting on %d of %d nelem=%d root=%d (MPI: %d)\n", rank, size, nelem, root, mroot));
280 
281  /* allocate temp buffer */
282 #pragma omp master
283  {
284  _sion_opmi_grc=SION_STD_SUCCESS;
285 
286  helpdata = (int *) malloc(sapi->num_threads * nelem * _sion_ompi_size_of_dtype(dtype));
287  if (helpdata == NULL) {
288  fprintf(stderr,"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %lu (helpdata), aborting ...\n",
289  (unsigned long) sapi->num_threads * nelem * _sion_ompi_size_of_dtype(dtype));
290  _sion_opmi_grc=SION_STD_NOT_SUCCESS;
291  }
292  }
293 
294  /* share data ptr among threads, internal barrier */
295  help=__sion_ompi_share_ptr((void *) helpdata);
296 
297  /* check return code from malloc */
298  if(_sion_opmi_grc) return(_sion_opmi_grc);
299 
300  /* copy data from indata on non-master threads */
301  memcpy(help+sapi->thread_num*nelem*_sion_ompi_size_of_dtype(dtype), /* to */
302  indata, /* from */
303  nelem*_sion_ompi_size_of_dtype(dtype));
304 
305  {
306 #pragma omp barrier
307  }
308 
309  /* gather on MPI level on master thread */
310 #pragma omp master
311  {
312  commgroup = sapi->comm;
313  switch (dtype) {
314  case _SION_INT32:
315  _sion_opmi_grc = MPI_Gather((sion_int32 *) help, sapi->num_threads*nelem, SION_MPI_INT32, (sion_int32 *) outdata, sapi->num_threads*nelem, SION_MPI_INT32, mroot, commgroup);
316  break;
317  case _SION_INT64:
318  _sion_opmi_grc = MPI_Gather((sion_int64 *) help, sapi->num_threads*nelem, SION_MPI_INT64, (sion_int64 *) outdata, sapi->num_threads*nelem, SION_MPI_INT64, mroot, commgroup);
319  break;
320  case _SION_CHAR:
321  _sion_opmi_grc = MPI_Gather((char *) help, sapi->num_threads*nelem, MPI_CHAR, (char *) outdata, sapi->num_threads*nelem, MPI_CHAR, mroot, commgroup);
322  break;
323  default:
324  _sion_opmi_grc = MPI_Gather((sion_int64 *) help, sapi->num_threads*nelem, SION_MPI_INT64, (sion_int64 *) outdata, sapi->num_threads*nelem, SION_MPI_INT64, mroot, commgroup);
325  break;
326  }
327 
328  free(helpdata);
329  }
330 
331  /* sychronize */
332  {
333 #pragma omp barrier
334  }
335  rc=_sion_opmi_grc;
336  {
337 #pragma omp barrier
338  }
339 
340  return rc;
341 }
342 #undef DFUNCTION
343 
344 /* indata (root) -> outdata */
345 #define DFUNCTION "_sion_ompi_scatterr_cb"
346 int _sion_ompi_scatterr_cb(void *indata, void *outdata, void *commdata, int dtype, int nelem, int root)
347 {
348  int rc=SION_STD_SUCCESS, mroot;
349  _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
350  MPI_Comm commgroup;
351  void *helpdata, *help;
352  ONLY_DEBUG(int rank=sapi->rank;)
353  ONLY_DEBUG(int size=sapi->size;)
354 
355  mroot=_sion_map_rank_ompi_to_mpi(root,sapi->num_threads);
356 
357  DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, " starting on %d of %d nelem=%d root=%d (MPI: %d)\n", rank, size, nelem, root, mroot));
358 
359  /* allocate temp buffer */
360 #pragma omp master
361  {
362  _sion_opmi_grc=0;
363 
364  helpdata = (int *) malloc(sapi->num_threads * nelem * _sion_ompi_size_of_dtype(dtype));
365  if (helpdata == NULL) {
366  fprintf(stderr,"_sion_ompi_scatterr_cb: cannot allocate temporary memory of size %lu (helpdata), aborting ...\n",
367  (unsigned long) sapi->num_threads * nelem * _sion_ompi_size_of_dtype(dtype));
368  _sion_opmi_grc=SION_STD_NOT_SUCCESS;
369  }
370  }
371 
372  /* share data ptr among threads, internal barrier */
373  help=__sion_ompi_share_ptr((void *) helpdata);
374 
375  /* check return code from malloc */
376  if(_sion_opmi_grc!=SION_STD_SUCCESS) return(_sion_opmi_grc);
377 
378  {
379 #pragma omp barrier
380  }
381 
382  /* scatter data on MPI level */
383 #pragma omp master
384  {
385  commgroup = sapi->comm;
386  switch (dtype) {
387  case _SION_INT32:
388  _sion_opmi_grc = MPI_Scatter((sion_int32 *) indata, sapi->num_threads*nelem, SION_MPI_INT32, (sion_int32 *) help, sapi->num_threads*nelem, SION_MPI_INT32, mroot, commgroup);
389  break;
390  case _SION_INT64:
391  _sion_opmi_grc = MPI_Scatter((sion_int64 *) indata, sapi->num_threads*nelem, SION_MPI_INT64, (sion_int64 *) help, sapi->num_threads*nelem, SION_MPI_INT64, mroot, commgroup);
392  break;
393  case _SION_CHAR:
394  _sion_opmi_grc = MPI_Scatter((char *) indata, sapi->num_threads*nelem, MPI_CHAR, (char *) help, sapi->num_threads*nelem, MPI_CHAR, mroot, commgroup);
395  break;
396  default:
397  _sion_opmi_grc = MPI_Scatter((sion_int64 *) indata, sapi->num_threads*nelem, SION_MPI_INT64, (sion_int64 *) help, sapi->num_threads*nelem, SION_MPI_INT64, mroot, commgroup);
398  break;
399  }
400 
401  } /* omp master */
402 
403  /* synchronize */
404  {
405 #pragma omp barrier
406  }
407 
408  /* copy data from indata on non-master threads */
409  memcpy(outdata, /* to */
410  help+sapi->thread_num*nelem*_sion_ompi_size_of_dtype(dtype), /* from */
411  nelem*_sion_ompi_size_of_dtype(dtype));
412 
413  /* synchronize */
414  {
415 #pragma omp barrier
416  }
417 
418  /* free temp buffer on master */
419 #pragma omp master
420  {
421  free(helpdata);
422  }
423 
424  rc=_sion_opmi_grc;
425  {
426 #pragma omp barrier
427  }
428 
429  DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, " leaving nelem=%d root=%d, rc=%d\n", nelem, root, rc));
430 
431  return rc;
432 }
433 #undef DFUNCTION
434 
435 
436 
437 /* master receives data in outdata, number of OpenMP threads is equal on all tasks */
438 #define DFUNCTION "_sion_ompi_gathervr_cb"
439 int _sion_ompi_gathervr_cb(void *indata, void *outdata, void *commdata, int dtype, int *counts, int nelem, int root)
440 {
441  int rc=SION_STD_SUCCESS;
442  int m, t, offset, mroot, mcount, toffset;
443  int *mcounts=NULL,*mdispls=NULL;
444  int *tcounts=NULL,*tdispls=NULL;
445  void *helpdata, *help;
446  _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
447  MPI_Comm commgroup;
448  int rank=sapi->rank;
449  int size=sapi->size;
450 
451  mroot=_sion_map_rank_ompi_to_mpi(root,sapi->num_threads);
452 
453  DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, " input nelem=%d root=%d indata=%x, outdata=%x\n", nelem, root, indata, outdata));
454 
455 
456  /* STEP1: collect counts on thread level */
457 #pragma omp master
458  {
459  helpdata = (int *) malloc(sapi->num_threads * sizeof(int));
460  if (helpdata == NULL) {
461  fprintf(stderr,"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %lu (tcounts), aborting ...\n",
462  (unsigned long) sapi->num_threads * sizeof(int));
463  _sion_opmi_grc=SION_STD_NOT_SUCCESS;
464  }
465  } /* omp master */
466 
467  /* share data ptr among threads, internal barrier */
468  tcounts=__sion_ompi_share_ptr((void *) helpdata);
469 
470  /* check return code from malloc */
471  if(_sion_opmi_grc!=SION_STD_SUCCESS) return(_sion_opmi_grc);
472 
473  tcounts[sapi->thread_num]=nelem;
474 
475  /* STEP2: calculate offsets on thread level */
476 #pragma omp master
477  {
478  helpdata = (int *) malloc(sapi->num_threads * sizeof(int));
479  if (helpdata == NULL) {
480  fprintf(stderr,"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %lu (tcounts), aborting ...\n",
481  (unsigned long) sapi->num_threads * sizeof(int));
482  _sion_opmi_grc=SION_STD_NOT_SUCCESS;
483  }
484  } /* omp master */
485 
486  /* share data ptr among threads, internal barrier */
487  tdispls=__sion_ompi_share_ptr((void *) helpdata);
488 
489  /* check return code from malloc */
490  if(_sion_opmi_grc!=SION_STD_SUCCESS) return(_sion_opmi_grc);
491 
492 #pragma omp master
493  {
494  offset=0;
495  for(t=0;t<size;t++) {
496  tdispls[t]=offset;
497  offset+=tcounts[t];
498  }
499  mcount=tdispls[size=1];
500  } /* omp master */
501 
502  /* synchronize */
503  {
504 #pragma omp barrier
505  }
506 
507  /* STEP3: get offset on thread level */
508  toffset=tdispls[sapi->thread_num];
509 
510 
511  /* STEP4: allocate temp buffer on master */
512 #pragma omp master
513  {
514  helpdata = (int *) malloc(mcount * _sion_ompi_size_of_dtype(dtype));
515  if (helpdata == NULL) {
516  fprintf(stderr,"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %lu (helpdata), aborting ...\n",
517  (unsigned long) mcount * _sion_ompi_size_of_dtype(dtype));
518  _sion_opmi_grc=SION_STD_NOT_SUCCESS;
519  }
520  } /* omp master */
521 
522  /* STEP5: gather data on thread level */
523 
524  /* share data ptr among threads, internal barrier */
525  help=__sion_ompi_share_ptr((void *) helpdata);
526 
527  /* check return code from malloc */
528  if(_sion_opmi_grc!=SION_STD_SUCCESS) return(_sion_opmi_grc);
529 
530  /* copy data from indata on non-master threads */
531  memcpy(help+toffset*_sion_ompi_size_of_dtype(dtype), /* to */
532  indata, /* from */
533  nelem*_sion_ompi_size_of_dtype(dtype));
534 
535 
536 
537  /* STEP6: gather data on MPI level */
538 #pragma omp master
539  {
540  /* allocate compute displs and counts */
541  if(rank==root) {
542 
543  mcounts = (int *) malloc(size * sizeof(int));
544  if (mcounts == NULL) {
545  fprintf(stderr,"_mpi_gathervr_cb: cannot allocate temporary memory of size %lu (displs), aborting ...\n",(unsigned long) size * sizeof(int));
546  _sion_opmi_grc=SION_STD_NOT_SUCCESS;
547  }
548 
549  if(_sion_opmi_grc==SION_SUCCESS) {
550  mdispls = (int *) malloc(size * sizeof(int));
551  if (mdispls == NULL) {
552  fprintf(stderr,"_mpi_gathervr_cb: cannot allocate temporary memory of size %lu (displs), aborting ...\n",(unsigned long) size * sizeof(int));
553  _sion_opmi_grc=SION_STD_NOT_SUCCESS;
554  }
555  }
556 
557  /* calculate counts and displs on MPI level */
558  if(_sion_opmi_grc==SION_SUCCESS) {
559  for(m=0;m<size;m++) {
560  mcounts[m]=0;
561  for(t=0;t<sapi->num_threads;t++) {
562  mcounts[m]+=counts[m*sapi->num_threads+t];
563  }
564  }
565 
566  offset=0;
567  for(m=0;m<size;m++) {
568  mdispls[m]=offset;
569  offset+=mcounts[m];
570  DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, " after MPI_Gather %2d -> dpls=%2d count=%d\n", m,mdispls[m],mcounts[m]));
571  }
572  }
573 
574  }
575  } /* omp master */
576 
577  /* check return code from malloc */
578  if(_sion_opmi_grc!=SION_STD_SUCCESS) return(_sion_opmi_grc);
579 
580 
581  /* call MPI gatherv */
582 #pragma omp master
583  {
584  commgroup = sapi->comm;
585  switch (dtype) {
586  case _SION_INT32:
587  _sion_opmi_grc = MPI_Gatherv((sion_int32 *) help, mcount, SION_MPI_INT32, (sion_int32 *) outdata, mcounts, mdispls, SION_MPI_INT32, mroot, commgroup);
588  break;
589  case _SION_INT64:
590  _sion_opmi_grc = MPI_Gatherv((sion_int64 *) help, mcount, SION_MPI_INT64, (sion_int64 *) outdata, mcounts, mdispls, SION_MPI_INT64, mroot, commgroup);
591  break;
592  case _SION_CHAR:
593  _sion_opmi_grc = MPI_Gatherv((char *) help, mcount, MPI_CHAR, (sion_int32 *) outdata, mcounts, mdispls, MPI_CHAR, mroot, commgroup);
594  break;
595  default:
596  _sion_opmi_grc = MPI_Gatherv((sion_int64 *) help, mcount, SION_MPI_INT64, (sion_int64 *) outdata, mcounts, mdispls, SION_MPI_INT64, mroot, commgroup);
597  break;
598  }
599 
600  } /* omp master */
601 
602 #pragma omp master
603  {
604  if(tcounts) free(tcounts);
605  if(tdispls) free(tdispls);
606  if(help) free(help);
607 
608  if(rank==root) {
609  if(mcounts) free(mcounts);
610  if(mdispls) free(mdispls);
611  }
612  }
613 
614  /* synchronize */
615  {
616 #pragma omp barrier
617  }
618  rc=_sion_opmi_grc;
619  {
620 #pragma omp barrier
621  }
622 
623  return rc;
624 }
625 #undef DFUNCTION
626 
627 
628 
629 /* outdata (root) -> indata */
630 #define DFUNCTION "_sion_ompi_scatterr_cb"
631 int _sion_ompi_scattervr_cb(void *indata, void *outdata, void *commdata, int dtype, int *counts, int nelem, int root)
632 {
633  int rc=SION_STD_SUCCESS;
634  int m, t, offset, mroot, mcount, toffset;
635  int *mcounts=NULL,*mdispls=NULL;
636  int *tcounts=NULL,*tdispls=NULL;
637  void *helpdata, *help;
638  _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
639  MPI_Comm commgroup;
640  int rank=sapi->rank;
641  int size=sapi->size;
642 
643  mroot=_sion_map_rank_ompi_to_mpi(root,sapi->num_threads);
644 
645  DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, " input nelem=%d root=%d indata=%x, outdata=%x\n", nelem, root, indata, outdata));
646 
647 
648  /* STEP1: collect counts on thread level */
649 #pragma omp master
650  {
651  _sion_opmi_grc=SION_STD_SUCCESS;
652 
653  helpdata = (int *) malloc(sapi->num_threads * sizeof(int));
654  if (helpdata == NULL) {
655  fprintf(stderr,"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %lu (tcounts), aborting ...\n",
656  (unsigned long) sapi->num_threads * sizeof(int));
657  _sion_opmi_grc=SION_STD_NOT_SUCCESS;
658  }
659  } /* omp master */
660 
661  /* share data ptr among threads, internal barrier */
662  tcounts=__sion_ompi_share_ptr((void *) helpdata);
663 
664  /* check return code from malloc */
665  if(_sion_opmi_grc!=SION_STD_SUCCESS) return(_sion_opmi_grc);
666 
667  tcounts[sapi->thread_num]=nelem;
668 
669  /* STEP2: calculate offsets on thread level */
670 #pragma omp master
671  {
672  helpdata = (int *) malloc(sapi->num_threads * sizeof(int));
673  if (helpdata == NULL) {
674  fprintf(stderr,"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %lu (tcounts), aborting ...\n",
675  (unsigned long) sapi->num_threads * sizeof(int));
676  _sion_opmi_grc=SION_STD_NOT_SUCCESS;
677  }
678  } /* omp master */
679 
680  /* share data ptr among threads, internal barrier */
681  tdispls=__sion_ompi_share_ptr((void *) helpdata);
682 
683  /* check return code from malloc */
684  if(_sion_opmi_grc!=SION_STD_SUCCESS) return(_sion_opmi_grc);
685 
686 #pragma omp master
687  {
688  offset=0;
689  for(t=0;t<size;t++) {
690  tdispls[t]=offset;
691  offset+=tcounts[t];
692  }
693  mcount=tdispls[size=1];
694  } /* omp master */
695 
696  /* synchronize */
697  {
698 #pragma omp barrier
699  }
700 
701  /* STEP3: get offset on thread level */
702  toffset=tdispls[sapi->thread_num];
703 
704 
705  /* STEP4: allocate temp buffer on master */
706 #pragma omp master
707  {
708  helpdata = (int *) malloc(mcount * _sion_ompi_size_of_dtype(dtype));
709  if (helpdata == NULL) {
710  fprintf(stderr,"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %lu (helpdata), aborting ...\n",
711  (unsigned long) mcount * _sion_ompi_size_of_dtype(dtype));
712  _sion_opmi_grc=SION_STD_NOT_SUCCESS;
713  }
714  } /* omp master */
715 
716  /* share data ptr among threads, internal barrier */
717  help=__sion_ompi_share_ptr((void *) helpdata);
718 
719  /* STEP5: scatter data on MPI level */
720 #pragma omp master
721  {
722  /* allocate compute displs and counts */
723  if(rank==root) {
724 
725  mcounts = (int *) malloc(size * sizeof(int));
726  if (mcounts == NULL) {
727  fprintf(stderr,"_mpi_gathervr_cb: cannot allocate temporary memory of size %lu (displs), aborting ...\n",(unsigned long) size * sizeof(int));
728  _sion_opmi_grc=SION_STD_NOT_SUCCESS;
729  }
730 
731  if(_sion_opmi_grc==SION_SUCCESS) {
732  mdispls = (int *) malloc(size * sizeof(int));
733  if (mdispls == NULL) {
734  fprintf(stderr,"_mpi_gathervr_cb: cannot allocate temporary memory of size %lu (displs), aborting ...\n",(unsigned long) size * sizeof(int));
735  _sion_opmi_grc=SION_STD_NOT_SUCCESS;
736  }
737  }
738 
739  /* calculate counts and displs on MPI level */
740  if(_sion_opmi_grc==SION_SUCCESS) {
741  for(m=0;m<size;m++) {
742  mcounts[m]=0;
743  for(t=0;t<sapi->num_threads;t++) {
744  mcounts[m]+=counts[m*sapi->num_threads+t];
745  }
746  }
747 
748  offset=0;
749  for(m=0;m<size;m++) {
750  mdispls[m]=offset;
751  offset+=mcounts[m];
752  DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, " after MPI_Gather %2d -> dpls=%2d count=%d\n", m,mdispls[m],mcounts[m]));
753  }
754  }
755 
756  }
757  } /* omp master */
758 
759  /* check return code from malloc */
760  if(_sion_opmi_grc!=SION_STD_SUCCESS) return(_sion_opmi_grc);
761 
762  /* call MPI scatterv */
763 #pragma omp master
764  {
765  commgroup = sapi->comm;
766  switch (dtype) {
767  case _SION_INT32:
768  _sion_opmi_grc = MPI_Scatterv((sion_int32 *) outdata, mcounts, mdispls, SION_MPI_INT32, (sion_int32 *) help, mcount, SION_MPI_INT32, mroot, commgroup);
769  break;
770  case _SION_INT64:
771  _sion_opmi_grc = MPI_Scatterv((sion_int64 *) outdata, mcounts, mdispls, SION_MPI_INT64, (sion_int64 *) help, mcount, SION_MPI_INT64, mroot, commgroup);
772  break;
773  case _SION_CHAR:
774  _sion_opmi_grc = MPI_Scatterv((char *) outdata, mcounts, mdispls, MPI_CHAR, (sion_int32 *) help, mcount, MPI_CHAR, mroot, commgroup);
775  break;
776  default:
777  _sion_opmi_grc = MPI_Scatterv((sion_int64 *) outdata, mcounts, mdispls, SION_MPI_INT64, (sion_int64 *) help, mcount, SION_MPI_INT64, mroot, commgroup);
778  break;
779  }
780 
781  } /* omp master */
782 
783  /* check return code from MPI call */
784  if(_sion_opmi_grc!=SION_STD_SUCCESS) return(_sion_opmi_grc);
785 
786  /* STEP6: scatterv data on thread level */
787 
788 
789  /* copy data from indata on non-master threads */
790  memcpy(indata, /* to */
791  help+toffset*_sion_ompi_size_of_dtype(dtype), /* from */
792  nelem*_sion_ompi_size_of_dtype(dtype));
793 
794 
795  /* cleanup */
796 #pragma omp master
797  {
798  if(tcounts) free(tcounts);
799  if(tdispls) free(tdispls);
800  if(help) free(help);
801 
802  if(rank==root) {
803  if(mcounts) free(mcounts);
804  if(mdispls) free(mdispls);
805  }
806  }
807 
808  /* synchronize */
809  {
810 #pragma omp barrier
811  }
812  rc=_sion_opmi_grc;
813  {
814 #pragma omp barrier
815  }
816 
817  return rc;
818 }
819 #undef DFUNCTION
820 
821 
822 
823 /* share in_ptr given on master with all other threads, return value is the shared ptr */
824 #define DFUNCTION "__sion_ompi_share_ptr"
825 void * __sion_ompi_share_ptr(void * in_ptr) {
826  void *out_ptr;
827 
828 #pragma omp master
829  __ompi_global_pointer = in_ptr;
830 
831 
832  {
833 #pragma omp barrier
834  }
835 
836  out_ptr=__ompi_global_pointer;
837 
838  return(out_ptr);
839 
840 }
841 #undef DFUNCTION
842 
843 int _sion_ompi_size_of_dtype(int dtype) {
844  switch (dtype) {
845  case _SION_INT32: return(sizeof(sion_int32)); break;
846  case _SION_INT64: return(sizeof(sion_int64)); break;
847  case _SION_CHAR: return(sizeof(char)); break;
848  default: return(sizeof(sion_int64));
849  }
850 }
851 
852 /* end of ifdef SION_OMPI */
853 #endif
Sion Time Stamp Header.