SIONlib  1.7.0
Scalable I/O library for parallel access to task-local files
sion_ompi_cb_gen.c
Go to the documentation of this file.
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2016 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
17 #include <stdlib.h>
18 #include <stdio.h>
19 #include <stdarg.h>
20 #include <string.h>
21 #include <time.h>
22 
23 #include "mpi.h"
24 
25 #define USE_PMPIno
26 #ifdef USE_PMPI
27 #define MPI_Comm_rank PMPI_Comm_rank
28 #define MPI_Comm_size PMPI_Comm_size
29 #define MPI_Gather PMPI_Gather
30 #define MPI_Scatter PMPI_Scatter
31 #define MPI_Bcast PMPI_Bcast
32 #define MPI_Barrier PMPI_Barrier
33 #define MPI_Comm_split PMPI_Comm_split
34 #define MPI_Send PMPI_Send
35 #define MPI_Recv PMPI_Recv
36 #endif
37 
38 #include <sys/time.h>
39 
40 #include <sys/types.h>
41 #include <fcntl.h>
42 
43 #include <unistd.h>
44 
45 #include "sion.h"
46 #include "sion_debug.h"
47 #include "sion_internal.h"
48 #include "sion_fd.h"
49 #include "sion_filedesc.h"
50 #include "sion_printts.h"
51 
52 #include "sion_ompi_cb_gen.h"
53 #include "sion_ompi_internal_gen.h"
54 
55 #ifdef SION_OMPI
56 
57 #include "omp.h"
58 
59 
60 static void *__ompi_global_pointer;
61 static int _sion_opmi_grc=SION_SUCCESS;
62 
63 int _sion_ompi_size_of_dtype(int dtype);
64 void * __sion_ompi_share_ptr(void * in_ptr);
65 
66 
67 int _sion_register_callbacks_ompi() {
68  int aid=0;
69  aid=sion_generic_create_api("SIONlib_OMPI_API");
70 
71 
72  sion_generic_register_create_local_commgroup_cb(aid,&_sion_ompi_create_lcg_cb);
73  sion_generic_register_free_local_commgroup_cb(aid,&_sion_ompi_free_lcg_cb);
74 
75  sion_generic_register_barrier_cb(aid,&_sion_ompi_barrier_cb);
76  sion_generic_register_bcastr_cb(aid,&_sion_ompi_bcastr_cb);
77  sion_generic_register_gatherr_cb(aid,&_sion_ompi_gatherr_cb);
78  sion_generic_register_scatterr_cb(aid,&_sion_ompi_scatterr_cb);
79  sion_generic_register_gathervr_cb(aid,&_sion_ompi_gathervr_cb);
80  sion_generic_register_scattervr_cb(aid,&_sion_ompi_scattervr_cb);
81  sion_generic_register_gather_and_execute_cb(aid,&_sion_ompi_gather_process_cb);
82  sion_generic_register_execute_and_scatter_cb(aid,&_sion_ompi_process_scatter_cb);
83  sion_generic_register_get_capability_cb(aid,&_sion_ompi_get_capability_cb);
84 
85  return(aid);
86 }
87 
88 #define DFUNCTION "_sion_ompi_create_lcg_cb"
89 int _sion_ompi_create_lcg_cb(void **local_commgroup, void *global_commgroup,
90  int grank, int gsize,
91  int lrank, int lsize,
92  int filenumber, int numfiles
93  )
94 {
95  int rc=SION_STD_SUCCESS;
96  _ompi_api_commdata* sapi_global = (_ompi_api_commdata *) global_commgroup;
97  _ompi_api_commdata* commgroup=NULL;
98  int create_lcomm=1, set_in_global=1, mrank=0, msize=1, color;
99 
100  DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, " split now comm on master: grank=%d gsize=%d filenumber=%d, numfiles=%d, lrank=%d lsize=%d \n",
101  grank, gsize, filenumber, numfiles, lrank, lsize));
102 #pragma omp master
103  {
104  _sion_opmi_grc=SION_STD_SUCCESS;
105  DPRINTFP((256, "_mpi_create_lcg_cb", _SION_DEFAULT_RANK, " I'm on master\n",rc));
106  }
107 
108  if(global_commgroup==NULL) {
109  fprintf(stderr,"_mpi_create_lcg_cb: error no global commgroup given, aborting ...\n");
110  return(SION_STD_NOT_SUCCESS);
111  }
112  if(*local_commgroup!=NULL) {
113  fprintf(stderr,"_mpi_create_lcg_cb: error local commgroup already initialized, aborting ...\n");
114  return(SION_STD_NOT_SUCCESS);
115  }
116 
117  /* is local commgroup already set by calling program? */
118  if(sapi_global->lcommgroup!=NULL) {
119  /* use this communicator */
120  if(sapi_global->lcommgroup->commset==0) {
121  *local_commgroup=sapi_global->lcommgroup;
122  create_lcomm=0;set_in_global=0; /* all is done */
123  sapi_global->lcommgroup->commset=1;
124  } else {
125  create_lcomm=1;set_in_global=0; /* another communicator will be created */
126  }
127  }
128 
129  if(create_lcomm) {
130 
131  /* create new communicator */
132  commgroup = (_ompi_api_commdata *) malloc(sizeof(_ompi_api_commdata));
133  if (commgroup == NULL) {
134  fprintf(stderr,"_ompi_create_lcg_cb: cannot allocate memory for commgroup of size %zu, aborting ...\n",
135  sizeof(_ompi_api_commdata));
136  _sion_opmi_grc=SION_STD_NOT_SUCCESS;
137  }
138 
139  commgroup->commset=0; commgroup->lcommgroup=NULL;
140  commgroup->commcreated=0;
141  commgroup->rank=lrank;
142  commgroup->size=lsize;
143  commgroup->num_threads=sapi_global->num_threads;
144  commgroup->thread_num=sapi_global->thread_num;
145 
146 #pragma omp master
147  {
148  color=filenumber;
149  if(filenumber==-1) color=MPI_UNDEFINED;
150  _sion_opmi_grc = MPI_Comm_split(sapi_global->comm, color, lrank, &commgroup->comm);
151  DPRINTFP((256, "_ompi_create_lcg_cb", grank, " rc=%d from MPI_Comm_split(comm,%d,%d,&newcomm)\n",rc,color,lrank));
152  commgroup->local=1; commgroup->commset=1;
153  } /* omp master */
154 
155 
156  if(set_in_global) {
157  sapi_global->lcommgroup=commgroup; /* needed for collective calls */
158  }
159 
160  *local_commgroup=commgroup;
161 
162  }
163 
164  {
165 #pragma omp barrier
166  }
167 
168 #pragma omp master
169  {
170  if(filenumber!=-1) {
171  MPI_Comm_rank(commgroup->comm, &mrank);
172  MPI_Comm_size(commgroup->comm, &msize);
173  }
174 
175  DPRINTFP((256, "_mpi_create_lcg_cb", grank, " leave rc=%d rank %d of %d\n",rc,mrank,msize));
176  }
177 
178  rc=_sion_opmi_grc;
179  return rc;
180 }
181 #undef DFUNCTION
182 
183 #define DFUNCTION "_sion_ompi_free_lcg_cb"
184 int _sion_ompi_free_lcg_cb(void *local_commgroup) {
185  int rc=SION_STD_SUCCESS;
186  _ompi_api_commdata* commgroup = (_ompi_api_commdata *) local_commgroup;
187 #pragma omp master
188  {
189  _sion_opmi_grc=SION_STD_SUCCESS;
190 
191  if ( (commgroup->commset) && (commgroup->commcreated) ) {
192  DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, " free now comm\n"));
193  _sion_opmi_grc=MPI_Comm_free(&commgroup->comm);
194  DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, " free now comm rc=%d\n",_sion_opmi_grc));
195  }
196  free(commgroup);
197  }
198 
199  {
200 #pragma omp barrier
201  }
202  rc=_sion_opmi_grc;
203  return rc;
204 }
205 #undef DFUNCTION
206 
207 
208 #define DFUNCTION "_sion_ompi_barrier_cb"
209 int _sion_ompi_barrier_cb(void *commdata)
210 {
211  int rc=SION_STD_SUCCESS;
212  _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
213  MPI_Comm commgroup;
214  {
215 #pragma omp barrier
216  }
217 #pragma omp master
218  {
219  commgroup = sapi->comm;
220  DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, " performing MPI barrier now\n"));
221  _sion_opmi_grc = MPI_Barrier(commgroup);
222  }
223 
224  {
225 #pragma omp barrier
226  }
227  rc=_sion_opmi_grc;
228  {
229 #pragma omp barrier
230  }
231  return rc;
232 }
233 #undef DFUNCTION
234 
235 
236 #define DFUNCTION "_sion_ompi_bcastr_cb"
237 int _sion_ompi_bcastr_cb(void *data, void *commdata, int dtype, int nelem, int root)
238 {
239  int rc=SION_STD_SUCCESS;
240  _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
241  MPI_Comm commgroup;
242  void *help;
243 
244  /* first distribute data to master threads (over MPI) */
245 #pragma omp master
246  {
247  commgroup = sapi->comm;
248  switch (dtype) {
249  case _SION_INT32:
250  _sion_opmi_grc = MPI_Bcast((sion_int32 *) data, nelem, SION_MPI_INT32, root, commgroup);
251  break;
252  case _SION_INT64:
253  _sion_opmi_grc = MPI_Bcast((sion_int64 *) data, nelem, SION_MPI_INT64, root, commgroup);
254  break;
255  case _SION_CHAR:
256  _sion_opmi_grc = MPI_Bcast((char *) data, nelem, MPI_CHAR, root, commgroup);
257  break;
258  default:
259  _sion_opmi_grc = MPI_Bcast((sion_int64 *) data, nelem, SION_MPI_INT64, root, commgroup);
260  break;
261  }
262 
263  }
264 
265 
266  /* share data ptr among threads */
267  help=__sion_ompi_share_ptr((void *) data);
268 
269  /* copy data to local val on non-master threads */
270  if((omp_get_thread_num()!=root) && (help != NULL)) {
271  memcpy(data,help,nelem*_sion_ompi_size_of_dtype(dtype));
272  }
273 
274  {
275 #pragma omp barrier
276  }
277  rc=_sion_opmi_grc;
278  {
279 #pragma omp barrier
280  }
281 
282 return rc;
283 }
284 #undef DFUNCTION
285 
286 
287 /* master receives data in outdata, number of OpenMP threads is equal on all tasks */
288 #define DFUNCTION "_sion_ompi_gatherr_cb"
289 int _sion_ompi_gatherr_cb(void *indata, void *outdata, void *commdata, int dtype, int nelem, int root)
290 {
291  int rc=SION_STD_SUCCESS;
292  int mroot;
293  _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
294  MPI_Comm commgroup;
295  void *helpdata, *help;
296  ONLY_DEBUG(int rank=sapi->rank;)
297  ONLY_DEBUG(int size=sapi->size;)
298 
299  mroot=_sion_map_rank_ompi_to_mpi(root,sapi->num_threads);
300 
301  DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, " starting on %d of %d nelem=%d root=%d (MPI: %d)\n", rank, size, nelem, root, mroot));
302 
303  /* allocate temp buffer */
304 #pragma omp master
305  {
306  _sion_opmi_grc=SION_STD_SUCCESS;
307 
308  helpdata = (int *) malloc(sapi->num_threads * nelem * _sion_ompi_size_of_dtype(dtype));
309  if (helpdata == NULL) {
310  fprintf(stderr,"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %lu (helpdata), aborting ...\n",
311  (unsigned long) sapi->num_threads * nelem * _sion_ompi_size_of_dtype(dtype));
312  _sion_opmi_grc=SION_STD_NOT_SUCCESS;
313  }
314  }
315 
316  /* share data ptr among threads, internal barrier */
317  help=__sion_ompi_share_ptr((void *) helpdata);
318 
319  /* check return code from malloc */
320  if(_sion_opmi_grc) return(_sion_opmi_grc);
321 
322  /* copy data from indata on non-master threads */
323  memcpy(help+sapi->thread_num*nelem*_sion_ompi_size_of_dtype(dtype), /* to */
324  indata, /* from */
325  nelem*_sion_ompi_size_of_dtype(dtype));
326 
327  {
328 #pragma omp barrier
329  }
330 
331  /* gather on MPI level on master thread */
332 #pragma omp master
333  {
334  commgroup = sapi->comm;
335  switch (dtype) {
336  case _SION_INT32:
337  _sion_opmi_grc = MPI_Gather((sion_int32 *) help, sapi->num_threads*nelem, SION_MPI_INT32, (sion_int32 *) outdata, sapi->num_threads*nelem, SION_MPI_INT32, mroot, commgroup);
338  break;
339  case _SION_INT64:
340  _sion_opmi_grc = MPI_Gather((sion_int64 *) help, sapi->num_threads*nelem, SION_MPI_INT64, (sion_int64 *) outdata, sapi->num_threads*nelem, SION_MPI_INT64, mroot, commgroup);
341  break;
342  case _SION_CHAR:
343  _sion_opmi_grc = MPI_Gather((char *) help, sapi->num_threads*nelem, MPI_CHAR, (char *) outdata, sapi->num_threads*nelem, MPI_CHAR, mroot, commgroup);
344  break;
345  default:
346  _sion_opmi_grc = MPI_Gather((sion_int64 *) help, sapi->num_threads*nelem, SION_MPI_INT64, (sion_int64 *) outdata, sapi->num_threads*nelem, SION_MPI_INT64, mroot, commgroup);
347  break;
348  }
349 
350  free(helpdata);
351  }
352 
353  /* sychronize */
354  {
355 #pragma omp barrier
356  }
357  rc=_sion_opmi_grc;
358  {
359 #pragma omp barrier
360  }
361 
362  return rc;
363 }
364 #undef DFUNCTION
365 
366 /* indata (root) -> outdata */
367 #define DFUNCTION "_sion_ompi_scatterr_cb"
368 int _sion_ompi_scatterr_cb(void *indata, void *outdata, void *commdata, int dtype, int nelem, int root)
369 {
370  int rc=SION_STD_SUCCESS, mroot;
371  _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
372  MPI_Comm commgroup;
373  void *helpdata, *help;
374  ONLY_DEBUG(int rank=sapi->rank;)
375  ONLY_DEBUG(int size=sapi->size;)
376 
377  mroot=_sion_map_rank_ompi_to_mpi(root,sapi->num_threads);
378 
379  DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, " starting on %d of %d nelem=%d root=%d (MPI: %d)\n", rank, size, nelem, root, mroot));
380 
381  /* allocate temp buffer */
382 #pragma omp master
383  {
384  _sion_opmi_grc=0;
385 
386  helpdata = (int *) malloc(sapi->num_threads * nelem * _sion_ompi_size_of_dtype(dtype));
387  if (helpdata == NULL) {
388  fprintf(stderr,"_sion_ompi_scatterr_cb: cannot allocate temporary memory of size %lu (helpdata), aborting ...\n",
389  (unsigned long) sapi->num_threads * nelem * _sion_ompi_size_of_dtype(dtype));
390  _sion_opmi_grc=SION_STD_NOT_SUCCESS;
391  }
392  }
393 
394  /* share data ptr among threads, internal barrier */
395  help=__sion_ompi_share_ptr((void *) helpdata);
396 
397  /* check return code from malloc */
398  if(_sion_opmi_grc!=SION_STD_SUCCESS) return(_sion_opmi_grc);
399 
400  {
401 #pragma omp barrier
402  }
403 
404  /* scatter data on MPI level */
405 #pragma omp master
406  {
407  commgroup = sapi->comm;
408  switch (dtype) {
409  case _SION_INT32:
410  _sion_opmi_grc = MPI_Scatter((sion_int32 *) indata, sapi->num_threads*nelem, SION_MPI_INT32, (sion_int32 *) help, sapi->num_threads*nelem, SION_MPI_INT32, mroot, commgroup);
411  break;
412  case _SION_INT64:
413  _sion_opmi_grc = MPI_Scatter((sion_int64 *) indata, sapi->num_threads*nelem, SION_MPI_INT64, (sion_int64 *) help, sapi->num_threads*nelem, SION_MPI_INT64, mroot, commgroup);
414  break;
415  case _SION_CHAR:
416  _sion_opmi_grc = MPI_Scatter((char *) indata, sapi->num_threads*nelem, MPI_CHAR, (char *) help, sapi->num_threads*nelem, MPI_CHAR, mroot, commgroup);
417  break;
418  default:
419  _sion_opmi_grc = MPI_Scatter((sion_int64 *) indata, sapi->num_threads*nelem, SION_MPI_INT64, (sion_int64 *) help, sapi->num_threads*nelem, SION_MPI_INT64, mroot, commgroup);
420  break;
421  }
422 
423  } /* omp master */
424 
425  /* synchronize */
426  {
427 #pragma omp barrier
428  }
429 
430  /* copy data from indata on non-master threads */
431  memcpy(outdata, /* to */
432  help+sapi->thread_num*nelem*_sion_ompi_size_of_dtype(dtype), /* from */
433  nelem*_sion_ompi_size_of_dtype(dtype));
434 
435  /* synchronize */
436  {
437 #pragma omp barrier
438  }
439 
440  /* free temp buffer on master */
441 #pragma omp master
442  {
443  free(helpdata);
444  }
445 
446  rc=_sion_opmi_grc;
447  {
448 #pragma omp barrier
449  }
450 
451  DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, " leaving nelem=%d root=%d, rc=%d\n", nelem, root, rc));
452 
453  return rc;
454 }
455 #undef DFUNCTION
456 
457 
458 
459 /* master receives data in outdata, number of OpenMP threads is equal on all tasks */
460 #define DFUNCTION "_sion_ompi_gathervr_cb"
461 int _sion_ompi_gathervr_cb(void *indata, void *outdata, void *commdata, int dtype, int *counts, int nelem, int root)
462 {
463  int rc=SION_STD_SUCCESS;
464  int m, t, offset, mroot, mcount, toffset;
465  int *mcounts=NULL,*mdispls=NULL;
466  int *tcounts=NULL,*tdispls=NULL;
467  void *helpdata, *help;
468  _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
469  MPI_Comm commgroup;
470  int rank=sapi->rank;
471  int size=sapi->size;
472 
473  mroot=_sion_map_rank_ompi_to_mpi(root,sapi->num_threads);
474 
475  DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, " input nelem=%d root=%d indata=%x, outdata=%x\n", nelem, root, indata, outdata));
476 
477 
478  /* STEP1: collect counts on thread level */
479 #pragma omp master
480  {
481  helpdata = (int *) malloc(sapi->num_threads * sizeof(int));
482  if (helpdata == NULL) {
483  fprintf(stderr,"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %zu (helpdata), aborting ...\n",
484  (size_t) sapi->num_threads * sizeof(int));
485  _sion_opmi_grc=SION_STD_NOT_SUCCESS;
486  }
487  } /* omp master */
488 
489  /* share data ptr among threads, internal barrier */
490  tcounts=__sion_ompi_share_ptr((void *) helpdata);
491 
492  /* check return code from malloc */
493  if(_sion_opmi_grc!=SION_STD_SUCCESS) return(_sion_opmi_grc);
494 
495  tcounts[sapi->thread_num]=nelem;
496 
497  /* STEP2: calculate offsets on thread level */
498 #pragma omp master
499  {
500  helpdata = (int *) malloc(sapi->num_threads * sizeof(int));
501  if (helpdata == NULL) {
502  fprintf(stderr,"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %zu (helpdata), aborting ...\n",
503  (size_t) sapi->num_threads * sizeof(int));
504  _sion_opmi_grc=SION_STD_NOT_SUCCESS;
505  }
506  } /* omp master */
507 
508  /* share data ptr among threads, internal barrier */
509  tdispls=__sion_ompi_share_ptr((void *) helpdata);
510 
511  /* check return code from malloc */
512  if(_sion_opmi_grc!=SION_STD_SUCCESS) return(_sion_opmi_grc);
513 
514 #pragma omp master
515  {
516  offset=0;
517  for(t=0;t<size;t++) {
518  tdispls[t]=offset;
519  offset+=tcounts[t];
520  }
521  mcount=tdispls[size=1];
522  } /* omp master */
523 
524  /* synchronize */
525  {
526 #pragma omp barrier
527  }
528 
529  /* STEP3: get offset on thread level */
530  toffset=tdispls[sapi->thread_num];
531 
532 
533  /* STEP4: allocate temp buffer on master */
534 #pragma omp master
535  {
536  helpdata = (int *) malloc(mcount * _sion_ompi_size_of_dtype(dtype));
537  if (helpdata == NULL) {
538  fprintf(stderr,"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %lu (helpdata), aborting ...\n",
539  (unsigned long) mcount * _sion_ompi_size_of_dtype(dtype));
540  _sion_opmi_grc=SION_STD_NOT_SUCCESS;
541  }
542  } /* omp master */
543 
544  /* STEP5: gather data on thread level */
545 
546  /* share data ptr among threads, internal barrier */
547  help=__sion_ompi_share_ptr((void *) helpdata);
548 
549  /* check return code from malloc */
550  if(_sion_opmi_grc!=SION_STD_SUCCESS) return(_sion_opmi_grc);
551 
552  /* copy data from indata on non-master threads */
553  memcpy(help+toffset*_sion_ompi_size_of_dtype(dtype), /* to */
554  indata, /* from */
555  nelem*_sion_ompi_size_of_dtype(dtype));
556 
557 
558 
559  /* STEP6: gather data on MPI level */
560 #pragma omp master
561  {
562  /* allocate compute displs and counts */
563  if(rank==root) {
564 
565  mcounts = (int *) malloc(size * sizeof(int));
566  if (mcounts == NULL) {
567  fprintf(stderr,"_mpi_gathervr_cb: cannot allocate temporary memory of size %zu (mcounts), aborting ...\n",(size_t) size * sizeof(int));
568  _sion_opmi_grc=SION_STD_NOT_SUCCESS;
569  }
570 
571  if(_sion_opmi_grc==SION_SUCCESS) {
572  mdispls = (int *) malloc(size * sizeof(int));
573  if (mdispls == NULL) {
574  fprintf(stderr,"_mpi_gathervr_cb: cannot allocate temporary memory of size %zu (mdispls), aborting ...\n",(size_t) size * sizeof(int));
575  _sion_opmi_grc=SION_STD_NOT_SUCCESS;
576  }
577  }
578 
579  /* calculate counts and displs on MPI level */
580  if(_sion_opmi_grc==SION_SUCCESS) {
581  for(m=0;m<size;m++) {
582  mcounts[m]=0;
583  for(t=0;t<sapi->num_threads;t++) {
584  mcounts[m]+=counts[m*sapi->num_threads+t];
585  }
586  }
587 
588  offset=0;
589  for(m=0;m<size;m++) {
590  mdispls[m]=offset;
591  offset+=mcounts[m];
592  DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, " after MPI_Gather %2d -> dpls=%2d count=%d\n", m,mdispls[m],mcounts[m]));
593  }
594  }
595 
596  }
597  } /* omp master */
598 
599  /* check return code from malloc */
600  if(_sion_opmi_grc!=SION_STD_SUCCESS) return(_sion_opmi_grc);
601 
602 
603  /* call MPI gatherv */
604 #pragma omp master
605  {
606  commgroup = sapi->comm;
607  switch (dtype) {
608  case _SION_INT32:
609  _sion_opmi_grc = MPI_Gatherv((sion_int32 *) help, mcount, SION_MPI_INT32, (sion_int32 *) outdata, mcounts, mdispls, SION_MPI_INT32, mroot, commgroup);
610  break;
611  case _SION_INT64:
612  _sion_opmi_grc = MPI_Gatherv((sion_int64 *) help, mcount, SION_MPI_INT64, (sion_int64 *) outdata, mcounts, mdispls, SION_MPI_INT64, mroot, commgroup);
613  break;
614  case _SION_CHAR:
615  _sion_opmi_grc = MPI_Gatherv((char *) help, mcount, MPI_CHAR, (sion_int32 *) outdata, mcounts, mdispls, MPI_CHAR, mroot, commgroup);
616  break;
617  default:
618  _sion_opmi_grc = MPI_Gatherv((sion_int64 *) help, mcount, SION_MPI_INT64, (sion_int64 *) outdata, mcounts, mdispls, SION_MPI_INT64, mroot, commgroup);
619  break;
620  }
621 
622  } /* omp master */
623 
624 #pragma omp master
625  {
626  if(tcounts) free(tcounts);
627  if(tdispls) free(tdispls);
628  if(help) free(help);
629 
630  if(rank==root) {
631  if(mcounts) free(mcounts);
632  if(mdispls) free(mdispls);
633  }
634  }
635 
636  /* synchronize */
637  {
638 #pragma omp barrier
639  }
640  rc=_sion_opmi_grc;
641  {
642 #pragma omp barrier
643  }
644 
645  return rc;
646 }
647 #undef DFUNCTION
648 
649 
650 
651 /* outdata (root) -> indata */
652 #define DFUNCTION "_sion_ompi_scatterr_cb"
653 int _sion_ompi_scattervr_cb(void *indata, void *outdata, void *commdata, int dtype, int *counts, int nelem, int root)
654 {
655  int rc=SION_STD_SUCCESS;
656  int m, t, offset, mroot, mcount, toffset;
657  int *mcounts=NULL,*mdispls=NULL;
658  int *tcounts=NULL,*tdispls=NULL;
659  void *helpdata, *help;
660  _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
661  MPI_Comm commgroup;
662  int rank=sapi->rank;
663  int size=sapi->size;
664 
665  mroot=_sion_map_rank_ompi_to_mpi(root,sapi->num_threads);
666 
667  DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, " input nelem=%d root=%d indata=%x, outdata=%x\n", nelem, root, indata, outdata));
668 
669 
670  /* STEP1: collect counts on thread level */
671 #pragma omp master
672  {
673  _sion_opmi_grc=SION_STD_SUCCESS;
674 
675  helpdata = (int *) malloc(sapi->num_threads * sizeof(int));
676  if (helpdata == NULL) {
677  fprintf(stderr,"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %zu (helpdata), aborting ...\n",
678  (size_t) sapi->num_threads * sizeof(int));
679  _sion_opmi_grc=SION_STD_NOT_SUCCESS;
680  }
681  } /* omp master */
682 
683  /* share data ptr among threads, internal barrier */
684  tcounts=__sion_ompi_share_ptr((void *) helpdata);
685 
686  /* check return code from malloc */
687  if(_sion_opmi_grc!=SION_STD_SUCCESS) return(_sion_opmi_grc);
688 
689  tcounts[sapi->thread_num]=nelem;
690 
691  /* STEP2: calculate offsets on thread level */
692 #pragma omp master
693  {
694  helpdata = (int *) malloc(sapi->num_threads * sizeof(int));
695  if (helpdata == NULL) {
696  fprintf(stderr,"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %zu (helpdata), aborting ...\n",
697  (size_t) sapi->num_threads * sizeof(int));
698  _sion_opmi_grc=SION_STD_NOT_SUCCESS;
699  }
700  } /* omp master */
701 
702  /* share data ptr among threads, internal barrier */
703  tdispls=__sion_ompi_share_ptr((void *) helpdata);
704 
705  /* check return code from malloc */
706  if(_sion_opmi_grc!=SION_STD_SUCCESS) return(_sion_opmi_grc);
707 
708 #pragma omp master
709  {
710  offset=0;
711  for(t=0;t<size;t++) {
712  tdispls[t]=offset;
713  offset+=tcounts[t];
714  }
715  mcount=tdispls[size=1];
716  } /* omp master */
717 
718  /* synchronize */
719  {
720 #pragma omp barrier
721  }
722 
723  /* STEP3: get offset on thread level */
724  toffset=tdispls[sapi->thread_num];
725 
726 
727  /* STEP4: allocate temp buffer on master */
728 #pragma omp master
729  {
730  helpdata = (int *) malloc(mcount * _sion_ompi_size_of_dtype(dtype));
731  if (helpdata == NULL) {
732  fprintf(stderr,"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %lu (helpdata), aborting ...\n",
733  (unsigned long) mcount * _sion_ompi_size_of_dtype(dtype));
734  _sion_opmi_grc=SION_STD_NOT_SUCCESS;
735  }
736  } /* omp master */
737 
738  /* share data ptr among threads, internal barrier */
739  help=__sion_ompi_share_ptr((void *) helpdata);
740 
741  /* STEP5: scatter data on MPI level */
742 #pragma omp master
743  {
744  /* allocate compute displs and counts */
745  if(rank==root) {
746 
747  mcounts = (int *) malloc(size * sizeof(int));
748  if (mcounts == NULL) {
749  fprintf(stderr,"_mpi_gathervr_cb: cannot allocate temporary memory of size %zu (mcounts), aborting ...\n",(size_t) size * sizeof(int));
750  _sion_opmi_grc=SION_STD_NOT_SUCCESS;
751  }
752 
753  if(_sion_opmi_grc==SION_SUCCESS) {
754  mdispls = (int *) malloc(size * sizeof(int));
755  if (mdispls == NULL) {
756  fprintf(stderr,"_mpi_gathervr_cb: cannot allocate temporary memory of size %zu (mdispls), aborting ...\n",(size_t) size * sizeof(int));
757  _sion_opmi_grc=SION_STD_NOT_SUCCESS;
758  }
759  }
760 
761  /* calculate counts and displs on MPI level */
762  if(_sion_opmi_grc==SION_SUCCESS) {
763  for(m=0;m<size;m++) {
764  mcounts[m]=0;
765  for(t=0;t<sapi->num_threads;t++) {
766  mcounts[m]+=counts[m*sapi->num_threads+t];
767  }
768  }
769 
770  offset=0;
771  for(m=0;m<size;m++) {
772  mdispls[m]=offset;
773  offset+=mcounts[m];
774  DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, " after MPI_Gather %2d -> dpls=%2d count=%d\n", m,mdispls[m],mcounts[m]));
775  }
776  }
777 
778  }
779  } /* omp master */
780 
781  /* check return code from malloc */
782  if(_sion_opmi_grc!=SION_STD_SUCCESS) return(_sion_opmi_grc);
783 
784  /* call MPI scatterv */
785 #pragma omp master
786  {
787  commgroup = sapi->comm;
788  switch (dtype) {
789  case _SION_INT32:
790  _sion_opmi_grc = MPI_Scatterv((sion_int32 *) outdata, mcounts, mdispls, SION_MPI_INT32, (sion_int32 *) help, mcount, SION_MPI_INT32, mroot, commgroup);
791  break;
792  case _SION_INT64:
793  _sion_opmi_grc = MPI_Scatterv((sion_int64 *) outdata, mcounts, mdispls, SION_MPI_INT64, (sion_int64 *) help, mcount, SION_MPI_INT64, mroot, commgroup);
794  break;
795  case _SION_CHAR:
796  _sion_opmi_grc = MPI_Scatterv((char *) outdata, mcounts, mdispls, MPI_CHAR, (sion_int32 *) help, mcount, MPI_CHAR, mroot, commgroup);
797  break;
798  default:
799  _sion_opmi_grc = MPI_Scatterv((sion_int64 *) outdata, mcounts, mdispls, SION_MPI_INT64, (sion_int64 *) help, mcount, SION_MPI_INT64, mroot, commgroup);
800  break;
801  }
802 
803  } /* omp master */
804 
805  /* check return code from MPI call */
806  if(_sion_opmi_grc!=SION_STD_SUCCESS) return(_sion_opmi_grc);
807 
808  /* STEP6: scatterv data on thread level */
809 
810 
811  /* copy data from indata on non-master threads */
812  memcpy(indata, /* to */
813  help+toffset*_sion_ompi_size_of_dtype(dtype), /* from */
814  nelem*_sion_ompi_size_of_dtype(dtype));
815 
816 
817  /* cleanup */
818 #pragma omp master
819  {
820  if(tcounts) free(tcounts);
821  if(tdispls) free(tdispls);
822  if(help) free(help);
823 
824  if(rank==root) {
825  if(mcounts) free(mcounts);
826  if(mdispls) free(mdispls);
827  }
828  }
829 
830  /* synchronize */
831  {
832 #pragma omp barrier
833  }
834  rc=_sion_opmi_grc;
835  {
836 #pragma omp barrier
837  }
838 
839  return rc;
840 }
841 #undef DFUNCTION
842 
843 
844 
845 /* share in_ptr given on master with all other threads, return value is the shared ptr */
846 #define DFUNCTION "__sion_ompi_share_ptr"
847 void * __sion_ompi_share_ptr(void * in_ptr) {
848  void *out_ptr;
849 
850 #pragma omp master
851  __ompi_global_pointer = in_ptr;
852 
853 
854  {
855 #pragma omp barrier
856  }
857 
858  out_ptr=__ompi_global_pointer;
859 
860  return(out_ptr);
861 
862 }
863 #undef DFUNCTION
864 
865 int _sion_ompi_size_of_dtype(int dtype) {
866  switch (dtype) {
867  case _SION_INT32: return(sizeof(sion_int32)); break;
868  case _SION_INT64: return(sizeof(sion_int64)); break;
869  case _SION_CHAR: return(sizeof(char)); break;
870  default: return(sizeof(sion_int64));
871  }
872 }
873 
874 /* end of ifdef SION_OMPI */
875 #endif
Sion Time Stamp Header.