SIONlib  1.7.7
Scalable I/O library for parallel access to task-local files
sion_ompi_cb_gen.c
Go to the documentation of this file.
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2019 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
17 #define _XOPEN_SOURCE 700
18 
19 #include <stdlib.h>
20 #include <stdio.h>
21 #include <stdarg.h>
22 #include <string.h>
23 #include <time.h>
24 
25 #include "mpi.h"
26 
27 #define USE_PMPIno
28 #ifdef USE_PMPI
29 #define MPI_Comm_rank PMPI_Comm_rank
30 #define MPI_Comm_size PMPI_Comm_size
31 #define MPI_Gather PMPI_Gather
32 #define MPI_Scatter PMPI_Scatter
33 #define MPI_Bcast PMPI_Bcast
34 #define MPI_Barrier PMPI_Barrier
35 #define MPI_Comm_split PMPI_Comm_split
36 #define MPI_Send PMPI_Send
37 #define MPI_Recv PMPI_Recv
38 #endif
39 
40 #include <sys/time.h>
41 
42 #include <sys/types.h>
43 #include <fcntl.h>
44 
45 #include <unistd.h>
46 
47 #include "sion.h"
48 #include "sion_debug.h"
49 #include "sion_internal.h"
50 #include "sion_fd.h"
51 #include "sion_filedesc.h"
52 #include "sion_printts.h"
53 
54 #include "sion_ompi_cb_gen.h"
55 #include "sion_ompi_internal_gen.h"
56 
57 #ifdef SION_OMPI
58 
59 #include "omp.h"
60 
61 
62 static void *__ompi_global_pointer;
63 static int _sion_opmi_grc=SION_SUCCESS;
64 
65 int _sion_ompi_size_of_dtype(int dtype);
66 void * __sion_ompi_share_ptr(void * in_ptr);
67 
68 
69 int _sion_register_callbacks_ompi(void) {
70  int aid=0;
71  aid=sion_generic_create_api("SIONlib_OMPI_API");
72 
73 
74  sion_generic_register_create_local_commgroup_cb(aid,&_sion_ompi_create_lcg_cb);
75  sion_generic_register_free_local_commgroup_cb(aid,&_sion_ompi_free_lcg_cb);
76 
77  sion_generic_register_barrier_cb(aid,&_sion_ompi_barrier_cb);
78  sion_generic_register_bcastr_cb(aid,&_sion_ompi_bcastr_cb);
79  sion_generic_register_gatherr_cb(aid,&_sion_ompi_gatherr_cb);
80  sion_generic_register_scatterr_cb(aid,&_sion_ompi_scatterr_cb);
81  sion_generic_register_gathervr_cb(aid,&_sion_ompi_gathervr_cb);
82  sion_generic_register_scattervr_cb(aid,&_sion_ompi_scattervr_cb);
83  sion_generic_register_gather_and_execute_cb(aid,&_sion_ompi_gather_process_cb);
84  sion_generic_register_execute_and_scatter_cb(aid,&_sion_ompi_process_scatter_cb);
85  sion_generic_register_get_capability_cb(aid,&_sion_ompi_get_capability_cb);
86 
87  return(aid);
88 }
89 
90 #define DFUNCTION "_sion_ompi_create_lcg_cb"
91 int _sion_ompi_create_lcg_cb(void **local_commgroup, void *global_commgroup,
92  int grank, int gsize,
93  int lrank, int lsize,
94  int filenumber, int numfiles
95  )
96 {
97  int rc=SION_STD_SUCCESS;
98  _ompi_api_commdata* sapi_global = (_ompi_api_commdata *) global_commgroup;
99  _ompi_api_commdata* commgroup=NULL;
100  int create_lcomm=1, set_in_global=1, mrank=0, msize=1, color;
101 
102  DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, " split now comm on master: grank=%d gsize=%d filenumber=%d, numfiles=%d, lrank=%d lsize=%d \n",
103  grank, gsize, filenumber, numfiles, lrank, lsize));
104 #pragma omp master
105  {
106  _sion_opmi_grc=SION_STD_SUCCESS;
107  DPRINTFP((256, "_mpi_create_lcg_cb", _SION_DEFAULT_RANK, " I'm on master\n",rc));
108  }
109 
110  if(global_commgroup==NULL) {
111  fprintf(stderr,"_mpi_create_lcg_cb: error no global commgroup given, aborting ...\n");
112  return(SION_STD_NOT_SUCCESS);
113  }
114  if(*local_commgroup!=NULL) {
115  fprintf(stderr,"_mpi_create_lcg_cb: error local commgroup already initialized, aborting ...\n");
116  return(SION_STD_NOT_SUCCESS);
117  }
118 
119  /* is local commgroup already set by calling program? */
120  if(sapi_global->lcommgroup!=NULL) {
121  /* use this communicator */
122  if(sapi_global->lcommgroup->commset==0) {
123  *local_commgroup=sapi_global->lcommgroup;
124  create_lcomm=0;set_in_global=0; /* all is done */
125  sapi_global->lcommgroup->commset=1;
126  } else {
127  create_lcomm=1;set_in_global=0; /* another communicator will be created */
128  }
129  }
130 
131  if(create_lcomm) {
132 
133  /* create new communicator */
134  commgroup = (_ompi_api_commdata *) malloc(sizeof(_ompi_api_commdata));
135  if (commgroup == NULL) {
136  fprintf(stderr,"_ompi_create_lcg_cb: cannot allocate memory for commgroup of size %zu, aborting ...\n",
137  sizeof(_ompi_api_commdata));
138  _sion_opmi_grc=SION_STD_NOT_SUCCESS;
139  }
140 
141  commgroup->commset=0; commgroup->lcommgroup=NULL;
142  commgroup->commcreated=0;
143  commgroup->rank=lrank;
144  commgroup->size=lsize;
145  commgroup->num_threads=sapi_global->num_threads;
146  commgroup->thread_num=sapi_global->thread_num;
147 
148 #pragma omp master
149  {
150  color=filenumber;
151  if(filenumber==-1) color=MPI_UNDEFINED;
152  _sion_opmi_grc = MPI_Comm_split(sapi_global->comm, color, lrank, &commgroup->comm);
153  DPRINTFP((256, "_ompi_create_lcg_cb", grank, " rc=%d from MPI_Comm_split(comm,%d,%d,&newcomm)\n",rc,color,lrank));
154  commgroup->local=1; commgroup->commset=1;
155  } /* omp master */
156 
157 
158  if(set_in_global) {
159  sapi_global->lcommgroup=commgroup; /* needed for collective calls */
160  }
161 
162  *local_commgroup=commgroup;
163 
164  }
165 
166  {
167 #pragma omp barrier
168  }
169 
170 #pragma omp master
171  {
172  if(filenumber!=-1) {
173  MPI_Comm_rank(commgroup->comm, &mrank);
174  MPI_Comm_size(commgroup->comm, &msize);
175  }
176 
177  DPRINTFP((256, "_mpi_create_lcg_cb", grank, " leave rc=%d rank %d of %d\n",rc,mrank,msize));
178  }
179 
180  rc=_sion_opmi_grc;
181  return rc;
182 }
183 #undef DFUNCTION
184 
185 #define DFUNCTION "_sion_ompi_free_lcg_cb"
186 int _sion_ompi_free_lcg_cb(void *local_commgroup) {
187  int rc=SION_STD_SUCCESS;
188  _ompi_api_commdata* commgroup = (_ompi_api_commdata *) local_commgroup;
189 #pragma omp master
190  {
191  _sion_opmi_grc=SION_STD_SUCCESS;
192 
193  if ( (commgroup->commset) && (commgroup->commcreated) ) {
194  DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, " free now comm\n"));
195  _sion_opmi_grc=MPI_Comm_free(&commgroup->comm);
196  DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, " free now comm rc=%d\n",_sion_opmi_grc));
197  }
198  free(commgroup);
199  }
200 
201  {
202 #pragma omp barrier
203  }
204  rc=_sion_opmi_grc;
205  return rc;
206 }
207 #undef DFUNCTION
208 
209 
210 #define DFUNCTION "_sion_ompi_barrier_cb"
211 int _sion_ompi_barrier_cb(void *commdata)
212 {
213  int rc=SION_STD_SUCCESS;
214  _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
215  MPI_Comm commgroup;
216  {
217 #pragma omp barrier
218  }
219 #pragma omp master
220  {
221  commgroup = sapi->comm;
222  DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, " performing MPI barrier now\n"));
223  _sion_opmi_grc = MPI_Barrier(commgroup);
224  }
225 
226  {
227 #pragma omp barrier
228  }
229  rc=_sion_opmi_grc;
230  {
231 #pragma omp barrier
232  }
233  return rc;
234 }
235 #undef DFUNCTION
236 
237 
238 #define DFUNCTION "_sion_ompi_bcastr_cb"
239 int _sion_ompi_bcastr_cb(void *data, void *commdata, int dtype, int nelem, int root)
240 {
241  int rc=SION_STD_SUCCESS;
242  _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
243  MPI_Comm commgroup;
244  void *help;
245 
246  /* first distribute data to master threads (over MPI) */
247 #pragma omp master
248  {
249  commgroup = sapi->comm;
250  switch (dtype) {
251  case _SION_INT32:
252  _sion_opmi_grc = MPI_Bcast((sion_int32 *) data, nelem, SION_MPI_INT32, root, commgroup);
253  break;
254  case _SION_INT64:
255  _sion_opmi_grc = MPI_Bcast((sion_int64 *) data, nelem, SION_MPI_INT64, root, commgroup);
256  break;
257  case _SION_CHAR:
258  _sion_opmi_grc = MPI_Bcast((char *) data, nelem, MPI_CHAR, root, commgroup);
259  break;
260  default:
261  _sion_opmi_grc = MPI_Bcast((sion_int64 *) data, nelem, SION_MPI_INT64, root, commgroup);
262  break;
263  }
264 
265  }
266 
267 
268  /* share data ptr among threads */
269  help=__sion_ompi_share_ptr((void *) data);
270 
271  /* copy data to local val on non-master threads */
272  if((omp_get_thread_num()!=root) && (help != NULL)) {
273  memcpy(data,help,nelem*_sion_ompi_size_of_dtype(dtype));
274  }
275 
276  {
277 #pragma omp barrier
278  }
279  rc=_sion_opmi_grc;
280  {
281 #pragma omp barrier
282  }
283 
284 return rc;
285 }
286 #undef DFUNCTION
287 
288 
289 /* master receives data in outdata, number of OpenMP threads is equal on all tasks */
290 #define DFUNCTION "_sion_ompi_gatherr_cb"
291 int _sion_ompi_gatherr_cb(void *indata, void *outdata, void *commdata, int dtype, int nelem, int root)
292 {
293  int rc=SION_STD_SUCCESS;
294  int mroot;
295  _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
296  MPI_Comm commgroup;
297  void *helpdata, *help;
298  ONLY_DEBUG(int rank=sapi->rank;)
299  ONLY_DEBUG(int size=sapi->size;)
300 
301  mroot=_sion_map_rank_ompi_to_mpi(root,sapi->num_threads);
302 
303  DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, " starting on %d of %d nelem=%d root=%d (MPI: %d)\n", rank, size, nelem, root, mroot));
304 
305  /* allocate temp buffer */
306 #pragma omp master
307  {
308  _sion_opmi_grc=SION_STD_SUCCESS;
309 
310  helpdata = (int *) malloc(sapi->num_threads * nelem * _sion_ompi_size_of_dtype(dtype));
311  if (helpdata == NULL) {
312  fprintf(stderr,"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %lu (helpdata), aborting ...\n",
313  (unsigned long) sapi->num_threads * nelem * _sion_ompi_size_of_dtype(dtype));
314  _sion_opmi_grc=SION_STD_NOT_SUCCESS;
315  }
316  }
317 
318  /* share data ptr among threads, internal barrier */
319  help=__sion_ompi_share_ptr((void *) helpdata);
320 
321  /* check return code from malloc */
322  if(_sion_opmi_grc) return(_sion_opmi_grc);
323 
324  /* copy data from indata on non-master threads */
325  memcpy((char *)help+sapi->thread_num*nelem*_sion_ompi_size_of_dtype(dtype), /* to */
326  indata, /* from */
327  nelem*_sion_ompi_size_of_dtype(dtype));
328 
329  {
330 #pragma omp barrier
331  }
332 
333  /* gather on MPI level on master thread */
334 #pragma omp master
335  {
336  commgroup = sapi->comm;
337  switch (dtype) {
338  case _SION_INT32:
339  _sion_opmi_grc = MPI_Gather((sion_int32 *) help, sapi->num_threads*nelem, SION_MPI_INT32, (sion_int32 *) outdata, sapi->num_threads*nelem, SION_MPI_INT32, mroot, commgroup);
340  break;
341  case _SION_INT64:
342  _sion_opmi_grc = MPI_Gather((sion_int64 *) help, sapi->num_threads*nelem, SION_MPI_INT64, (sion_int64 *) outdata, sapi->num_threads*nelem, SION_MPI_INT64, mroot, commgroup);
343  break;
344  case _SION_CHAR:
345  _sion_opmi_grc = MPI_Gather((char *) help, sapi->num_threads*nelem, MPI_CHAR, (char *) outdata, sapi->num_threads*nelem, MPI_CHAR, mroot, commgroup);
346  break;
347  default:
348  _sion_opmi_grc = MPI_Gather((sion_int64 *) help, sapi->num_threads*nelem, SION_MPI_INT64, (sion_int64 *) outdata, sapi->num_threads*nelem, SION_MPI_INT64, mroot, commgroup);
349  break;
350  }
351 
352  free(helpdata);
353  }
354 
355  /* sychronize */
356  {
357 #pragma omp barrier
358  }
359  rc=_sion_opmi_grc;
360  {
361 #pragma omp barrier
362  }
363 
364  return rc;
365 }
366 #undef DFUNCTION
367 
368 /* indata (root) -> outdata */
369 #define DFUNCTION "_sion_ompi_scatterr_cb"
370 int _sion_ompi_scatterr_cb(void *indata, void *outdata, void *commdata, int dtype, int nelem, int root)
371 {
372  int rc=SION_STD_SUCCESS, mroot;
373  _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
374  MPI_Comm commgroup;
375  void *helpdata, *help;
376  ONLY_DEBUG(int rank=sapi->rank;)
377  ONLY_DEBUG(int size=sapi->size;)
378 
379  mroot=_sion_map_rank_ompi_to_mpi(root,sapi->num_threads);
380 
381  DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, " starting on %d of %d nelem=%d root=%d (MPI: %d)\n", rank, size, nelem, root, mroot));
382 
383  /* allocate temp buffer */
384 #pragma omp master
385  {
386  _sion_opmi_grc=0;
387 
388  helpdata = (int *) malloc(sapi->num_threads * nelem * _sion_ompi_size_of_dtype(dtype));
389  if (helpdata == NULL) {
390  fprintf(stderr,"_sion_ompi_scatterr_cb: cannot allocate temporary memory of size %lu (helpdata), aborting ...\n",
391  (unsigned long) sapi->num_threads * nelem * _sion_ompi_size_of_dtype(dtype));
392  _sion_opmi_grc=SION_STD_NOT_SUCCESS;
393  }
394  }
395 
396  /* share data ptr among threads, internal barrier */
397  help=__sion_ompi_share_ptr((void *) helpdata);
398 
399  /* check return code from malloc */
400  if(_sion_opmi_grc!=SION_STD_SUCCESS) return(_sion_opmi_grc);
401 
402  {
403 #pragma omp barrier
404  }
405 
406  /* scatter data on MPI level */
407 #pragma omp master
408  {
409  commgroup = sapi->comm;
410  switch (dtype) {
411  case _SION_INT32:
412  _sion_opmi_grc = MPI_Scatter((sion_int32 *) indata, sapi->num_threads*nelem, SION_MPI_INT32, (sion_int32 *) help, sapi->num_threads*nelem, SION_MPI_INT32, mroot, commgroup);
413  break;
414  case _SION_INT64:
415  _sion_opmi_grc = MPI_Scatter((sion_int64 *) indata, sapi->num_threads*nelem, SION_MPI_INT64, (sion_int64 *) help, sapi->num_threads*nelem, SION_MPI_INT64, mroot, commgroup);
416  break;
417  case _SION_CHAR:
418  _sion_opmi_grc = MPI_Scatter((char *) indata, sapi->num_threads*nelem, MPI_CHAR, (char *) help, sapi->num_threads*nelem, MPI_CHAR, mroot, commgroup);
419  break;
420  default:
421  _sion_opmi_grc = MPI_Scatter((sion_int64 *) indata, sapi->num_threads*nelem, SION_MPI_INT64, (sion_int64 *) help, sapi->num_threads*nelem, SION_MPI_INT64, mroot, commgroup);
422  break;
423  }
424 
425  } /* omp master */
426 
427  /* synchronize */
428  {
429 #pragma omp barrier
430  }
431 
432  /* copy data from indata on non-master threads */
433  memcpy(outdata, /* to */
434  (char *)help+sapi->thread_num*nelem*_sion_ompi_size_of_dtype(dtype), /* from */
435  nelem*_sion_ompi_size_of_dtype(dtype));
436 
437  /* synchronize */
438  {
439 #pragma omp barrier
440  }
441 
442  /* free temp buffer on master */
443 #pragma omp master
444  {
445  free(helpdata);
446  }
447 
448  rc=_sion_opmi_grc;
449  {
450 #pragma omp barrier
451  }
452 
453  DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, " leaving nelem=%d root=%d, rc=%d\n", nelem, root, rc));
454 
455  return rc;
456 }
457 #undef DFUNCTION
458 
459 
460 
461 /* master receives data in outdata, number of OpenMP threads is equal on all tasks */
462 #define DFUNCTION "_sion_ompi_gathervr_cb"
463 int _sion_ompi_gathervr_cb(void *indata, void *outdata, void *commdata, int dtype, int *counts, int nelem, int root)
464 {
465  int rc=SION_STD_SUCCESS;
466  int m, t, offset, mroot, mcount, toffset;
467  int *mcounts=NULL,*mdispls=NULL;
468  int *tcounts=NULL,*tdispls=NULL;
469  void *helpdata, *help;
470  _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
471  MPI_Comm commgroup;
472  int rank=sapi->rank;
473  int size=sapi->size;
474 
475  mroot=_sion_map_rank_ompi_to_mpi(root,sapi->num_threads);
476 
477  DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, " input nelem=%d root=%d indata=%x, outdata=%x\n", nelem, root, indata, outdata));
478 
479 
480  /* STEP1: collect counts on thread level */
481 #pragma omp master
482  {
483  helpdata = (int *) malloc(sapi->num_threads * sizeof(int));
484  if (helpdata == NULL) {
485  fprintf(stderr,"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %zu (helpdata), aborting ...\n",
486  (size_t) sapi->num_threads * sizeof(int));
487  _sion_opmi_grc=SION_STD_NOT_SUCCESS;
488  }
489  } /* omp master */
490 
491  /* share data ptr among threads, internal barrier */
492  tcounts=__sion_ompi_share_ptr((void *) helpdata);
493 
494  /* check return code from malloc */
495  if(_sion_opmi_grc!=SION_STD_SUCCESS) return(_sion_opmi_grc);
496 
497  tcounts[sapi->thread_num]=nelem;
498 
499  /* STEP2: calculate offsets on thread level */
500 #pragma omp master
501  {
502  helpdata = (int *) malloc(sapi->num_threads * sizeof(int));
503  if (helpdata == NULL) {
504  fprintf(stderr,"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %zu (helpdata), aborting ...\n",
505  (size_t) sapi->num_threads * sizeof(int));
506  _sion_opmi_grc=SION_STD_NOT_SUCCESS;
507  }
508  } /* omp master */
509 
510  /* share data ptr among threads, internal barrier */
511  tdispls=__sion_ompi_share_ptr((void *) helpdata);
512 
513  /* check return code from malloc */
514  if(_sion_opmi_grc!=SION_STD_SUCCESS) return(_sion_opmi_grc);
515 
516 #pragma omp master
517  {
518  offset=0;
519  for(t=0;t<size;t++) {
520  tdispls[t]=offset;
521  offset+=tcounts[t];
522  }
523  mcount=tdispls[size=1];
524  } /* omp master */
525 
526  /* synchronize */
527  {
528 #pragma omp barrier
529  }
530 
531  /* STEP3: get offset on thread level */
532  toffset=tdispls[sapi->thread_num];
533 
534 
535  /* STEP4: allocate temp buffer on master */
536 #pragma omp master
537  {
538  helpdata = (int *) malloc(mcount * _sion_ompi_size_of_dtype(dtype));
539  if (helpdata == NULL) {
540  fprintf(stderr,"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %lu (helpdata), aborting ...\n",
541  (unsigned long) mcount * _sion_ompi_size_of_dtype(dtype));
542  _sion_opmi_grc=SION_STD_NOT_SUCCESS;
543  }
544  } /* omp master */
545 
546  /* STEP5: gather data on thread level */
547 
548  /* share data ptr among threads, internal barrier */
549  help=__sion_ompi_share_ptr((void *) helpdata);
550 
551  /* check return code from malloc */
552  if(_sion_opmi_grc!=SION_STD_SUCCESS) return(_sion_opmi_grc);
553 
554  /* copy data from indata on non-master threads */
555  memcpy((char *)help+toffset*_sion_ompi_size_of_dtype(dtype), /* to */
556  indata, /* from */
557  nelem*_sion_ompi_size_of_dtype(dtype));
558 
559 
560 
561  /* STEP6: gather data on MPI level */
562 #pragma omp master
563  {
564  /* allocate compute displs and counts */
565  if(rank==root) {
566 
567  mcounts = (int *) malloc(size * sizeof(int));
568  if (mcounts == NULL) {
569  fprintf(stderr,"_mpi_gathervr_cb: cannot allocate temporary memory of size %zu (mcounts), aborting ...\n",(size_t) size * sizeof(int));
570  _sion_opmi_grc=SION_STD_NOT_SUCCESS;
571  }
572 
573  if(_sion_opmi_grc==SION_SUCCESS) {
574  mdispls = (int *) malloc(size * sizeof(int));
575  if (mdispls == NULL) {
576  fprintf(stderr,"_mpi_gathervr_cb: cannot allocate temporary memory of size %zu (mdispls), aborting ...\n",(size_t) size * sizeof(int));
577  _sion_opmi_grc=SION_STD_NOT_SUCCESS;
578  }
579  }
580 
581  /* calculate counts and displs on MPI level */
582  if(_sion_opmi_grc==SION_SUCCESS) {
583  for(m=0;m<size;m++) {
584  mcounts[m]=0;
585  for(t=0;t<sapi->num_threads;t++) {
586  mcounts[m]+=counts[m*sapi->num_threads+t];
587  }
588  }
589 
590  offset=0;
591  for(m=0;m<size;m++) {
592  mdispls[m]=offset;
593  offset+=mcounts[m];
594  DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, " after MPI_Gather %2d -> dpls=%2d count=%d\n", m,mdispls[m],mcounts[m]));
595  }
596  }
597 
598  }
599  } /* omp master */
600 
601  /* check return code from malloc */
602  if(_sion_opmi_grc!=SION_STD_SUCCESS) return(_sion_opmi_grc);
603 
604 
605  /* call MPI gatherv */
606 #pragma omp master
607  {
608  commgroup = sapi->comm;
609  switch (dtype) {
610  case _SION_INT32:
611  _sion_opmi_grc = MPI_Gatherv((sion_int32 *) help, mcount, SION_MPI_INT32, (sion_int32 *) outdata, mcounts, mdispls, SION_MPI_INT32, mroot, commgroup);
612  break;
613  case _SION_INT64:
614  _sion_opmi_grc = MPI_Gatherv((sion_int64 *) help, mcount, SION_MPI_INT64, (sion_int64 *) outdata, mcounts, mdispls, SION_MPI_INT64, mroot, commgroup);
615  break;
616  case _SION_CHAR:
617  _sion_opmi_grc = MPI_Gatherv((char *) help, mcount, MPI_CHAR, (sion_int32 *) outdata, mcounts, mdispls, MPI_CHAR, mroot, commgroup);
618  break;
619  default:
620  _sion_opmi_grc = MPI_Gatherv((sion_int64 *) help, mcount, SION_MPI_INT64, (sion_int64 *) outdata, mcounts, mdispls, SION_MPI_INT64, mroot, commgroup);
621  break;
622  }
623 
624  } /* omp master */
625 
626 #pragma omp master
627  {
628  if(tcounts) free(tcounts);
629  if(tdispls) free(tdispls);
630  if(help) free(help);
631 
632  if(rank==root) {
633  if(mcounts) free(mcounts);
634  if(mdispls) free(mdispls);
635  }
636  }
637 
638  /* synchronize */
639  {
640 #pragma omp barrier
641  }
642  rc=_sion_opmi_grc;
643  {
644 #pragma omp barrier
645  }
646 
647  return rc;
648 }
649 #undef DFUNCTION
650 
651 
652 
653 /* outdata (root) -> indata */
654 #define DFUNCTION "_sion_ompi_scatterr_cb"
655 int _sion_ompi_scattervr_cb(void *indata, void *outdata, void *commdata, int dtype, int *counts, int nelem, int root)
656 {
657  int rc=SION_STD_SUCCESS;
658  int m, t, offset, mroot, mcount, toffset;
659  int *mcounts=NULL,*mdispls=NULL;
660  int *tcounts=NULL,*tdispls=NULL;
661  void *helpdata, *help;
662  _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
663  MPI_Comm commgroup;
664  int rank=sapi->rank;
665  int size=sapi->size;
666 
667  mroot=_sion_map_rank_ompi_to_mpi(root,sapi->num_threads);
668 
669  DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, " input nelem=%d root=%d indata=%x, outdata=%x\n", nelem, root, indata, outdata));
670 
671 
672  /* STEP1: collect counts on thread level */
673 #pragma omp master
674  {
675  _sion_opmi_grc=SION_STD_SUCCESS;
676 
677  helpdata = (int *) malloc(sapi->num_threads * sizeof(int));
678  if (helpdata == NULL) {
679  fprintf(stderr,"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %zu (helpdata), aborting ...\n",
680  (size_t) sapi->num_threads * sizeof(int));
681  _sion_opmi_grc=SION_STD_NOT_SUCCESS;
682  }
683  } /* omp master */
684 
685  /* share data ptr among threads, internal barrier */
686  tcounts=__sion_ompi_share_ptr((void *) helpdata);
687 
688  /* check return code from malloc */
689  if(_sion_opmi_grc!=SION_STD_SUCCESS) return(_sion_opmi_grc);
690 
691  tcounts[sapi->thread_num]=nelem;
692 
693  /* STEP2: calculate offsets on thread level */
694 #pragma omp master
695  {
696  helpdata = (int *) malloc(sapi->num_threads * sizeof(int));
697  if (helpdata == NULL) {
698  fprintf(stderr,"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %zu (helpdata), aborting ...\n",
699  (size_t) sapi->num_threads * sizeof(int));
700  _sion_opmi_grc=SION_STD_NOT_SUCCESS;
701  }
702  } /* omp master */
703 
704  /* share data ptr among threads, internal barrier */
705  tdispls=__sion_ompi_share_ptr((void *) helpdata);
706 
707  /* check return code from malloc */
708  if(_sion_opmi_grc!=SION_STD_SUCCESS) return(_sion_opmi_grc);
709 
710 #pragma omp master
711  {
712  offset=0;
713  for(t=0;t<size;t++) {
714  tdispls[t]=offset;
715  offset+=tcounts[t];
716  }
717  mcount=tdispls[size=1];
718  } /* omp master */
719 
720  /* synchronize */
721  {
722 #pragma omp barrier
723  }
724 
725  /* STEP3: get offset on thread level */
726  toffset=tdispls[sapi->thread_num];
727 
728 
729  /* STEP4: allocate temp buffer on master */
730 #pragma omp master
731  {
732  helpdata = (int *) malloc(mcount * _sion_ompi_size_of_dtype(dtype));
733  if (helpdata == NULL) {
734  fprintf(stderr,"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %lu (helpdata), aborting ...\n",
735  (unsigned long) mcount * _sion_ompi_size_of_dtype(dtype));
736  _sion_opmi_grc=SION_STD_NOT_SUCCESS;
737  }
738  } /* omp master */
739 
740  /* share data ptr among threads, internal barrier */
741  help=__sion_ompi_share_ptr((void *) helpdata);
742 
743  /* STEP5: scatter data on MPI level */
744 #pragma omp master
745  {
746  /* allocate compute displs and counts */
747  if(rank==root) {
748 
749  mcounts = (int *) malloc(size * sizeof(int));
750  if (mcounts == NULL) {
751  fprintf(stderr,"_mpi_gathervr_cb: cannot allocate temporary memory of size %zu (mcounts), aborting ...\n",(size_t) size * sizeof(int));
752  _sion_opmi_grc=SION_STD_NOT_SUCCESS;
753  }
754 
755  if(_sion_opmi_grc==SION_SUCCESS) {
756  mdispls = (int *) malloc(size * sizeof(int));
757  if (mdispls == NULL) {
758  fprintf(stderr,"_mpi_gathervr_cb: cannot allocate temporary memory of size %zu (mdispls), aborting ...\n",(size_t) size * sizeof(int));
759  _sion_opmi_grc=SION_STD_NOT_SUCCESS;
760  }
761  }
762 
763  /* calculate counts and displs on MPI level */
764  if(_sion_opmi_grc==SION_SUCCESS) {
765  for(m=0;m<size;m++) {
766  mcounts[m]=0;
767  for(t=0;t<sapi->num_threads;t++) {
768  mcounts[m]+=counts[m*sapi->num_threads+t];
769  }
770  }
771 
772  offset=0;
773  for(m=0;m<size;m++) {
774  mdispls[m]=offset;
775  offset+=mcounts[m];
776  DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, " after MPI_Gather %2d -> dpls=%2d count=%d\n", m,mdispls[m],mcounts[m]));
777  }
778  }
779 
780  }
781  } /* omp master */
782 
783  /* check return code from malloc */
784  if(_sion_opmi_grc!=SION_STD_SUCCESS) return(_sion_opmi_grc);
785 
786  /* call MPI scatterv */
787 #pragma omp master
788  {
789  commgroup = sapi->comm;
790  switch (dtype) {
791  case _SION_INT32:
792  _sion_opmi_grc = MPI_Scatterv((sion_int32 *) outdata, mcounts, mdispls, SION_MPI_INT32, (sion_int32 *) help, mcount, SION_MPI_INT32, mroot, commgroup);
793  break;
794  case _SION_INT64:
795  _sion_opmi_grc = MPI_Scatterv((sion_int64 *) outdata, mcounts, mdispls, SION_MPI_INT64, (sion_int64 *) help, mcount, SION_MPI_INT64, mroot, commgroup);
796  break;
797  case _SION_CHAR:
798  _sion_opmi_grc = MPI_Scatterv((char *) outdata, mcounts, mdispls, MPI_CHAR, (sion_int32 *) help, mcount, MPI_CHAR, mroot, commgroup);
799  break;
800  default:
801  _sion_opmi_grc = MPI_Scatterv((sion_int64 *) outdata, mcounts, mdispls, SION_MPI_INT64, (sion_int64 *) help, mcount, SION_MPI_INT64, mroot, commgroup);
802  break;
803  }
804 
805  } /* omp master */
806 
807  /* check return code from MPI call */
808  if(_sion_opmi_grc!=SION_STD_SUCCESS) return(_sion_opmi_grc);
809 
810  /* STEP6: scatterv data on thread level */
811 
812 
813  /* copy data from indata on non-master threads */
814  memcpy(indata, /* to */
815  (char *)help+toffset*_sion_ompi_size_of_dtype(dtype), /* from */
816  nelem*_sion_ompi_size_of_dtype(dtype));
817 
818 
819  /* cleanup */
820 #pragma omp master
821  {
822  if(tcounts) free(tcounts);
823  if(tdispls) free(tdispls);
824  if(help) free(help);
825 
826  if(rank==root) {
827  if(mcounts) free(mcounts);
828  if(mdispls) free(mdispls);
829  }
830  }
831 
832  /* synchronize */
833  {
834 #pragma omp barrier
835  }
836  rc=_sion_opmi_grc;
837  {
838 #pragma omp barrier
839  }
840 
841  return rc;
842 }
843 #undef DFUNCTION
844 
845 
846 
847 /* share in_ptr given on master with all other threads, return value is the shared ptr */
848 #define DFUNCTION "__sion_ompi_share_ptr"
849 void * __sion_ompi_share_ptr(void * in_ptr) {
850  void *out_ptr;
851 
852 #pragma omp master
853  __ompi_global_pointer = in_ptr;
854 
855 
856  {
857 #pragma omp barrier
858  }
859 
860  out_ptr=__ompi_global_pointer;
861 
862  return(out_ptr);
863 
864 }
865 #undef DFUNCTION
866 
867 int _sion_ompi_size_of_dtype(int dtype) {
868  switch (dtype) {
869  case _SION_INT32: return(sizeof(sion_int32)); break;
870  case _SION_INT64: return(sizeof(sion_int64)); break;
871  case _SION_CHAR: return(sizeof(char)); break;
872  default: return(sizeof(sion_int64));
873  }
874 }
875 
876 /* end of ifdef SION_OMPI */
877 #endif
Sion Time Stamp Header.