27 #define MPI_Comm_rank PMPI_Comm_rank 28 #define MPI_Comm_size PMPI_Comm_size 29 #define MPI_Gather PMPI_Gather 30 #define MPI_Scatter PMPI_Scatter 31 #define MPI_Bcast PMPI_Bcast 32 #define MPI_Barrier PMPI_Barrier 33 #define MPI_Comm_split PMPI_Comm_split 34 #define MPI_Send PMPI_Send 35 #define MPI_Recv PMPI_Recv 40 #include <sys/types.h> 60 static void *__ompi_global_pointer;
61 static int _sion_opmi_grc=SION_SUCCESS;
63 int _sion_ompi_size_of_dtype(
int dtype);
64 void * __sion_ompi_share_ptr(
void * in_ptr);
67 int _sion_register_callbacks_ompi() {
69 aid=sion_generic_create_api(
"SIONlib_OMPI_API");
72 sion_generic_register_create_local_commgroup_cb(aid,&_sion_ompi_create_lcg_cb);
73 sion_generic_register_free_local_commgroup_cb(aid,&_sion_ompi_free_lcg_cb);
75 sion_generic_register_barrier_cb(aid,&_sion_ompi_barrier_cb);
76 sion_generic_register_bcastr_cb(aid,&_sion_ompi_bcastr_cb);
77 sion_generic_register_gatherr_cb(aid,&_sion_ompi_gatherr_cb);
78 sion_generic_register_scatterr_cb(aid,&_sion_ompi_scatterr_cb);
79 sion_generic_register_gathervr_cb(aid,&_sion_ompi_gathervr_cb);
80 sion_generic_register_scattervr_cb(aid,&_sion_ompi_scattervr_cb);
81 sion_generic_register_gather_and_execute_cb(aid,&_sion_ompi_gather_process_cb);
82 sion_generic_register_execute_and_scatter_cb(aid,&_sion_ompi_process_scatter_cb);
83 sion_generic_register_get_capability_cb(aid,&_sion_ompi_get_capability_cb);
88 #define DFUNCTION "_sion_ompi_create_lcg_cb" 89 int _sion_ompi_create_lcg_cb(
void **local_commgroup,
void *global_commgroup,
92 int filenumber,
int numfiles
95 int rc=SION_STD_SUCCESS;
96 _ompi_api_commdata* sapi_global = (_ompi_api_commdata *) global_commgroup;
97 _ompi_api_commdata* commgroup=NULL;
98 int create_lcomm=1, set_in_global=1, mrank=0, msize=1, color;
100 DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK,
" split now comm on master: grank=%d gsize=%d filenumber=%d, numfiles=%d, lrank=%d lsize=%d \n",
101 grank, gsize, filenumber, numfiles, lrank, lsize));
104 _sion_opmi_grc=SION_STD_SUCCESS;
105 DPRINTFP((256,
"_mpi_create_lcg_cb", _SION_DEFAULT_RANK,
" I'm on master\n",rc));
108 if(global_commgroup==NULL) {
109 fprintf(stderr,
"_mpi_create_lcg_cb: error no global commgroup given, aborting ...\n");
110 return(SION_STD_NOT_SUCCESS);
112 if(*local_commgroup!=NULL) {
113 fprintf(stderr,
"_mpi_create_lcg_cb: error local commgroup already initialized, aborting ...\n");
114 return(SION_STD_NOT_SUCCESS);
118 if(sapi_global->lcommgroup!=NULL) {
120 if(sapi_global->lcommgroup->commset==0) {
121 *local_commgroup=sapi_global->lcommgroup;
122 create_lcomm=0;set_in_global=0;
123 sapi_global->lcommgroup->commset=1;
125 create_lcomm=1;set_in_global=0;
132 commgroup = (_ompi_api_commdata *) malloc(
sizeof(_ompi_api_commdata));
133 if (commgroup == NULL) {
134 fprintf(stderr,
"_ompi_create_lcg_cb: cannot allocate memory for commgroup of size %zu, aborting ...\n",
135 sizeof(_ompi_api_commdata));
136 _sion_opmi_grc=SION_STD_NOT_SUCCESS;
139 commgroup->commset=0; commgroup->lcommgroup=NULL;
140 commgroup->commcreated=0;
141 commgroup->rank=lrank;
142 commgroup->size=lsize;
143 commgroup->num_threads=sapi_global->num_threads;
144 commgroup->thread_num=sapi_global->thread_num;
149 if(filenumber==-1) color=MPI_UNDEFINED;
150 _sion_opmi_grc = MPI_Comm_split(sapi_global->comm, color, lrank, &commgroup->comm);
151 DPRINTFP((256,
"_ompi_create_lcg_cb", grank,
" rc=%d from MPI_Comm_split(comm,%d,%d,&newcomm)\n",rc,color,lrank));
152 commgroup->local=1; commgroup->commset=1;
157 sapi_global->lcommgroup=commgroup;
160 *local_commgroup=commgroup;
171 MPI_Comm_rank(commgroup->comm, &mrank);
172 MPI_Comm_size(commgroup->comm, &msize);
175 DPRINTFP((256,
"_mpi_create_lcg_cb", grank,
" leave rc=%d rank %d of %d\n",rc,mrank,msize));
183 #define DFUNCTION "_sion_ompi_free_lcg_cb" 184 int _sion_ompi_free_lcg_cb(
void *local_commgroup) {
185 int rc=SION_STD_SUCCESS;
186 _ompi_api_commdata* commgroup = (_ompi_api_commdata *) local_commgroup;
189 _sion_opmi_grc=SION_STD_SUCCESS;
191 if ( (commgroup->commset) && (commgroup->commcreated) ) {
192 DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK,
" free now comm\n"));
193 _sion_opmi_grc=MPI_Comm_free(&commgroup->comm);
194 DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK,
" free now comm rc=%d\n",_sion_opmi_grc));
208 #define DFUNCTION "_sion_ompi_barrier_cb" 209 int _sion_ompi_barrier_cb(
void *commdata)
211 int rc=SION_STD_SUCCESS;
212 _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
219 commgroup = sapi->comm;
220 DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK,
" performing MPI barrier now\n"));
221 _sion_opmi_grc = MPI_Barrier(commgroup);
236 #define DFUNCTION "_sion_ompi_bcastr_cb" 237 int _sion_ompi_bcastr_cb(
void *data,
void *commdata,
int dtype,
int nelem,
int root)
239 int rc=SION_STD_SUCCESS;
240 _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
247 commgroup = sapi->comm;
250 _sion_opmi_grc = MPI_Bcast((sion_int32 *) data, nelem, SION_MPI_INT32, root, commgroup);
253 _sion_opmi_grc = MPI_Bcast((sion_int64 *) data, nelem, SION_MPI_INT64, root, commgroup);
256 _sion_opmi_grc = MPI_Bcast((
char *) data, nelem, MPI_CHAR, root, commgroup);
259 _sion_opmi_grc = MPI_Bcast((sion_int64 *) data, nelem, SION_MPI_INT64, root, commgroup);
267 help=__sion_ompi_share_ptr((
void *) data);
270 if((omp_get_thread_num()!=root) && (help != NULL)) {
271 memcpy(data,help,nelem*_sion_ompi_size_of_dtype(dtype));
288 #define DFUNCTION "_sion_ompi_gatherr_cb" 289 int _sion_ompi_gatherr_cb(
void *indata,
void *outdata,
void *commdata,
int dtype,
int nelem,
int root)
291 int rc=SION_STD_SUCCESS;
293 _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
295 void *helpdata, *help;
296 ONLY_DEBUG(
int rank=sapi->rank;)
297 ONLY_DEBUG(
int size=sapi->size;)
299 mroot=_sion_map_rank_ompi_to_mpi(root,sapi->num_threads);
301 DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, " starting on %d of %d nelem=%d root=%d (MPI: %d)\n", rank, size, nelem, root, mroot));
306 _sion_opmi_grc=SION_STD_SUCCESS;
308 helpdata = (
int *) malloc(sapi->num_threads * nelem * _sion_ompi_size_of_dtype(dtype));
309 if (helpdata == NULL) {
310 fprintf(stderr,
"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %lu (helpdata), aborting ...\n",
311 (
unsigned long) sapi->num_threads * nelem * _sion_ompi_size_of_dtype(dtype));
312 _sion_opmi_grc=SION_STD_NOT_SUCCESS;
317 help=__sion_ompi_share_ptr((
void *) helpdata);
320 if(_sion_opmi_grc)
return(_sion_opmi_grc);
323 memcpy(help+sapi->thread_num*nelem*_sion_ompi_size_of_dtype(dtype),
325 nelem*_sion_ompi_size_of_dtype(dtype));
334 commgroup = sapi->comm;
337 _sion_opmi_grc = MPI_Gather((sion_int32 *) help, sapi->num_threads*nelem, SION_MPI_INT32, (sion_int32 *) outdata, sapi->num_threads*nelem, SION_MPI_INT32, mroot, commgroup);
340 _sion_opmi_grc = MPI_Gather((sion_int64 *) help, sapi->num_threads*nelem, SION_MPI_INT64, (sion_int64 *) outdata, sapi->num_threads*nelem, SION_MPI_INT64, mroot, commgroup);
343 _sion_opmi_grc = MPI_Gather((
char *) help, sapi->num_threads*nelem, MPI_CHAR, (
char *) outdata, sapi->num_threads*nelem, MPI_CHAR, mroot, commgroup);
346 _sion_opmi_grc = MPI_Gather((sion_int64 *) help, sapi->num_threads*nelem, SION_MPI_INT64, (sion_int64 *) outdata, sapi->num_threads*nelem, SION_MPI_INT64, mroot, commgroup);
367 #define DFUNCTION "_sion_ompi_scatterr_cb" 368 int _sion_ompi_scatterr_cb(
void *indata,
void *outdata,
void *commdata,
int dtype,
int nelem,
int root)
370 int rc=SION_STD_SUCCESS, mroot;
371 _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
373 void *helpdata, *help;
374 ONLY_DEBUG(
int rank=sapi->rank;)
375 ONLY_DEBUG(
int size=sapi->size;)
377 mroot=_sion_map_rank_ompi_to_mpi(root,sapi->num_threads);
379 DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, " starting on %d of %d nelem=%d root=%d (MPI: %d)\n", rank, size, nelem, root, mroot));
386 helpdata = (
int *) malloc(sapi->num_threads * nelem * _sion_ompi_size_of_dtype(dtype));
387 if (helpdata == NULL) {
388 fprintf(stderr,
"_sion_ompi_scatterr_cb: cannot allocate temporary memory of size %lu (helpdata), aborting ...\n",
389 (
unsigned long) sapi->num_threads * nelem * _sion_ompi_size_of_dtype(dtype));
390 _sion_opmi_grc=SION_STD_NOT_SUCCESS;
395 help=__sion_ompi_share_ptr((
void *) helpdata);
398 if(_sion_opmi_grc!=SION_STD_SUCCESS)
return(_sion_opmi_grc);
407 commgroup = sapi->comm;
410 _sion_opmi_grc = MPI_Scatter((sion_int32 *) indata, sapi->num_threads*nelem, SION_MPI_INT32, (sion_int32 *) help, sapi->num_threads*nelem, SION_MPI_INT32, mroot, commgroup);
413 _sion_opmi_grc = MPI_Scatter((sion_int64 *) indata, sapi->num_threads*nelem, SION_MPI_INT64, (sion_int64 *) help, sapi->num_threads*nelem, SION_MPI_INT64, mroot, commgroup);
416 _sion_opmi_grc = MPI_Scatter((
char *) indata, sapi->num_threads*nelem, MPI_CHAR, (
char *) help, sapi->num_threads*nelem, MPI_CHAR, mroot, commgroup);
419 _sion_opmi_grc = MPI_Scatter((sion_int64 *) indata, sapi->num_threads*nelem, SION_MPI_INT64, (sion_int64 *) help, sapi->num_threads*nelem, SION_MPI_INT64, mroot, commgroup);
432 help+sapi->thread_num*nelem*_sion_ompi_size_of_dtype(dtype),
433 nelem*_sion_ompi_size_of_dtype(dtype));
451 DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK,
" leaving nelem=%d root=%d, rc=%d\n", nelem, root, rc));
460 #define DFUNCTION "_sion_ompi_gathervr_cb" 461 int _sion_ompi_gathervr_cb(
void *indata,
void *outdata,
void *commdata,
int dtype,
int *counts,
int nelem,
int root)
463 int rc=SION_STD_SUCCESS;
464 int m, t, offset, mroot, mcount, toffset;
465 int *mcounts=NULL,*mdispls=NULL;
466 int *tcounts=NULL,*tdispls=NULL;
467 void *helpdata, *help;
468 _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
473 mroot=_sion_map_rank_ompi_to_mpi(root,sapi->num_threads);
475 DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK,
" input nelem=%d root=%d indata=%x, outdata=%x\n", nelem, root, indata, outdata));
481 helpdata = (
int *) malloc(sapi->num_threads *
sizeof(
int));
482 if (helpdata == NULL) {
483 fprintf(stderr,
"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %zu (helpdata), aborting ...\n",
484 (
size_t) sapi->num_threads *
sizeof(
int));
485 _sion_opmi_grc=SION_STD_NOT_SUCCESS;
490 tcounts=__sion_ompi_share_ptr((
void *) helpdata);
493 if(_sion_opmi_grc!=SION_STD_SUCCESS)
return(_sion_opmi_grc);
495 tcounts[sapi->thread_num]=nelem;
500 helpdata = (
int *) malloc(sapi->num_threads *
sizeof(
int));
501 if (helpdata == NULL) {
502 fprintf(stderr,
"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %zu (helpdata), aborting ...\n",
503 (
size_t) sapi->num_threads *
sizeof(
int));
504 _sion_opmi_grc=SION_STD_NOT_SUCCESS;
509 tdispls=__sion_ompi_share_ptr((
void *) helpdata);
512 if(_sion_opmi_grc!=SION_STD_SUCCESS)
return(_sion_opmi_grc);
517 for(t=0;t<size;t++) {
521 mcount=tdispls[size=1];
530 toffset=tdispls[sapi->thread_num];
536 helpdata = (
int *) malloc(mcount * _sion_ompi_size_of_dtype(dtype));
537 if (helpdata == NULL) {
538 fprintf(stderr,
"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %lu (helpdata), aborting ...\n",
539 (
unsigned long) mcount * _sion_ompi_size_of_dtype(dtype));
540 _sion_opmi_grc=SION_STD_NOT_SUCCESS;
547 help=__sion_ompi_share_ptr((
void *) helpdata);
550 if(_sion_opmi_grc!=SION_STD_SUCCESS)
return(_sion_opmi_grc);
553 memcpy(help+toffset*_sion_ompi_size_of_dtype(dtype),
555 nelem*_sion_ompi_size_of_dtype(dtype));
565 mcounts = (
int *) malloc(size *
sizeof(
int));
566 if (mcounts == NULL) {
567 fprintf(stderr,
"_mpi_gathervr_cb: cannot allocate temporary memory of size %zu (mcounts), aborting ...\n",(
size_t) size *
sizeof(
int));
568 _sion_opmi_grc=SION_STD_NOT_SUCCESS;
571 if(_sion_opmi_grc==SION_SUCCESS) {
572 mdispls = (
int *) malloc(size *
sizeof(
int));
573 if (mdispls == NULL) {
574 fprintf(stderr,
"_mpi_gathervr_cb: cannot allocate temporary memory of size %zu (mdispls), aborting ...\n",(
size_t) size *
sizeof(
int));
575 _sion_opmi_grc=SION_STD_NOT_SUCCESS;
580 if(_sion_opmi_grc==SION_SUCCESS) {
581 for(m=0;m<size;m++) {
583 for(t=0;t<sapi->num_threads;t++) {
584 mcounts[m]+=counts[m*sapi->num_threads+t];
589 for(m=0;m<size;m++) {
592 DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK,
" after MPI_Gather %2d -> dpls=%2d count=%d\n", m,mdispls[m],mcounts[m]));
600 if(_sion_opmi_grc!=SION_STD_SUCCESS)
return(_sion_opmi_grc);
606 commgroup = sapi->comm;
609 _sion_opmi_grc = MPI_Gatherv((sion_int32 *) help, mcount, SION_MPI_INT32, (sion_int32 *) outdata, mcounts, mdispls, SION_MPI_INT32, mroot, commgroup);
612 _sion_opmi_grc = MPI_Gatherv((sion_int64 *) help, mcount, SION_MPI_INT64, (sion_int64 *) outdata, mcounts, mdispls, SION_MPI_INT64, mroot, commgroup);
615 _sion_opmi_grc = MPI_Gatherv((
char *) help, mcount, MPI_CHAR, (sion_int32 *) outdata, mcounts, mdispls, MPI_CHAR, mroot, commgroup);
618 _sion_opmi_grc = MPI_Gatherv((sion_int64 *) help, mcount, SION_MPI_INT64, (sion_int64 *) outdata, mcounts, mdispls, SION_MPI_INT64, mroot, commgroup);
626 if(tcounts) free(tcounts);
627 if(tdispls) free(tdispls);
631 if(mcounts) free(mcounts);
632 if(mdispls) free(mdispls);
652 #define DFUNCTION "_sion_ompi_scatterr_cb" 653 int _sion_ompi_scattervr_cb(
void *indata,
void *outdata,
void *commdata,
int dtype,
int *counts,
int nelem,
int root)
655 int rc=SION_STD_SUCCESS;
656 int m, t, offset, mroot, mcount, toffset;
657 int *mcounts=NULL,*mdispls=NULL;
658 int *tcounts=NULL,*tdispls=NULL;
659 void *helpdata, *help;
660 _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
665 mroot=_sion_map_rank_ompi_to_mpi(root,sapi->num_threads);
667 DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK,
" input nelem=%d root=%d indata=%x, outdata=%x\n", nelem, root, indata, outdata));
673 _sion_opmi_grc=SION_STD_SUCCESS;
675 helpdata = (
int *) malloc(sapi->num_threads *
sizeof(
int));
676 if (helpdata == NULL) {
677 fprintf(stderr,
"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %zu (helpdata), aborting ...\n",
678 (
size_t) sapi->num_threads *
sizeof(
int));
679 _sion_opmi_grc=SION_STD_NOT_SUCCESS;
684 tcounts=__sion_ompi_share_ptr((
void *) helpdata);
687 if(_sion_opmi_grc!=SION_STD_SUCCESS)
return(_sion_opmi_grc);
689 tcounts[sapi->thread_num]=nelem;
694 helpdata = (
int *) malloc(sapi->num_threads *
sizeof(
int));
695 if (helpdata == NULL) {
696 fprintf(stderr,
"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %zu (helpdata), aborting ...\n",
697 (
size_t) sapi->num_threads *
sizeof(
int));
698 _sion_opmi_grc=SION_STD_NOT_SUCCESS;
703 tdispls=__sion_ompi_share_ptr((
void *) helpdata);
706 if(_sion_opmi_grc!=SION_STD_SUCCESS)
return(_sion_opmi_grc);
711 for(t=0;t<size;t++) {
715 mcount=tdispls[size=1];
724 toffset=tdispls[sapi->thread_num];
730 helpdata = (
int *) malloc(mcount * _sion_ompi_size_of_dtype(dtype));
731 if (helpdata == NULL) {
732 fprintf(stderr,
"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %lu (helpdata), aborting ...\n",
733 (
unsigned long) mcount * _sion_ompi_size_of_dtype(dtype));
734 _sion_opmi_grc=SION_STD_NOT_SUCCESS;
739 help=__sion_ompi_share_ptr((
void *) helpdata);
747 mcounts = (
int *) malloc(size *
sizeof(
int));
748 if (mcounts == NULL) {
749 fprintf(stderr,
"_mpi_gathervr_cb: cannot allocate temporary memory of size %zu (mcounts), aborting ...\n",(
size_t) size *
sizeof(
int));
750 _sion_opmi_grc=SION_STD_NOT_SUCCESS;
753 if(_sion_opmi_grc==SION_SUCCESS) {
754 mdispls = (
int *) malloc(size *
sizeof(
int));
755 if (mdispls == NULL) {
756 fprintf(stderr,
"_mpi_gathervr_cb: cannot allocate temporary memory of size %zu (mdispls), aborting ...\n",(
size_t) size *
sizeof(
int));
757 _sion_opmi_grc=SION_STD_NOT_SUCCESS;
762 if(_sion_opmi_grc==SION_SUCCESS) {
763 for(m=0;m<size;m++) {
765 for(t=0;t<sapi->num_threads;t++) {
766 mcounts[m]+=counts[m*sapi->num_threads+t];
771 for(m=0;m<size;m++) {
774 DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK,
" after MPI_Gather %2d -> dpls=%2d count=%d\n", m,mdispls[m],mcounts[m]));
782 if(_sion_opmi_grc!=SION_STD_SUCCESS)
return(_sion_opmi_grc);
787 commgroup = sapi->comm;
790 _sion_opmi_grc = MPI_Scatterv((sion_int32 *) outdata, mcounts, mdispls, SION_MPI_INT32, (sion_int32 *) help, mcount, SION_MPI_INT32, mroot, commgroup);
793 _sion_opmi_grc = MPI_Scatterv((sion_int64 *) outdata, mcounts, mdispls, SION_MPI_INT64, (sion_int64 *) help, mcount, SION_MPI_INT64, mroot, commgroup);
796 _sion_opmi_grc = MPI_Scatterv((
char *) outdata, mcounts, mdispls, MPI_CHAR, (sion_int32 *) help, mcount, MPI_CHAR, mroot, commgroup);
799 _sion_opmi_grc = MPI_Scatterv((sion_int64 *) outdata, mcounts, mdispls, SION_MPI_INT64, (sion_int64 *) help, mcount, SION_MPI_INT64, mroot, commgroup);
806 if(_sion_opmi_grc!=SION_STD_SUCCESS)
return(_sion_opmi_grc);
813 help+toffset*_sion_ompi_size_of_dtype(dtype),
814 nelem*_sion_ompi_size_of_dtype(dtype));
820 if(tcounts) free(tcounts);
821 if(tdispls) free(tdispls);
825 if(mcounts) free(mcounts);
826 if(mdispls) free(mdispls);
846 #define DFUNCTION "__sion_ompi_share_ptr" 847 void * __sion_ompi_share_ptr(
void * in_ptr) {
851 __ompi_global_pointer = in_ptr;
858 out_ptr=__ompi_global_pointer;
865 int _sion_ompi_size_of_dtype(
int dtype) {
867 case _SION_INT32:
return(
sizeof(sion_int32));
break;
868 case _SION_INT64:
return(
sizeof(sion_int64));
break;
869 case _SION_CHAR:
return(
sizeof(
char));
break;
870 default:
return(
sizeof(sion_int64));