27 #define MPI_Comm_rank PMPI_Comm_rank 28 #define MPI_Comm_size PMPI_Comm_size 29 #define MPI_Gather PMPI_Gather 30 #define MPI_Scatter PMPI_Scatter 31 #define MPI_Bcast PMPI_Bcast 32 #define MPI_Barrier PMPI_Barrier 33 #define MPI_Comm_split PMPI_Comm_split 34 #define MPI_Send PMPI_Send 35 #define MPI_Recv PMPI_Recv 40 #include <sys/types.h> 60 static void *__ompi_global_pointer;
61 static int _sion_opmi_grc=SION_SUCCESS;
63 int _sion_ompi_size_of_dtype(
int dtype);
64 void * __sion_ompi_share_ptr(
void * in_ptr);
67 int _sion_register_callbacks_ompi() {
69 aid=sion_generic_create_api(
"SIONlib_OMPI_API");
72 sion_generic_register_create_local_commgroup_cb(aid,&_sion_ompi_create_lcg_cb);
73 sion_generic_register_free_local_commgroup_cb(aid,&_sion_ompi_free_lcg_cb);
75 sion_generic_register_barrier_cb(aid,&_sion_ompi_barrier_cb);
76 sion_generic_register_bcastr_cb(aid,&_sion_ompi_bcastr_cb);
77 sion_generic_register_gatherr_cb(aid,&_sion_ompi_gatherr_cb);
78 sion_generic_register_scatterr_cb(aid,&_sion_ompi_scatterr_cb);
79 sion_generic_register_gathervr_cb(aid,&_sion_ompi_gathervr_cb);
80 sion_generic_register_scattervr_cb(aid,&_sion_ompi_scattervr_cb);
81 sion_generic_register_gather_and_execute_cb(aid,&_sion_ompi_gather_process_cb);
82 sion_generic_register_execute_and_scatter_cb(aid,&_sion_ompi_process_scatter_cb);
83 sion_generic_register_get_capability_cb(aid,&_sion_ompi_get_capability_cb);
88 #define DFUNCTION "_sion_ompi_create_lcg_cb" 89 int _sion_ompi_create_lcg_cb(
void **local_commgroup,
void *global_commgroup,
92 int filenumber,
int numfiles
95 int rc=SION_STD_SUCCESS;
96 _ompi_api_commdata* sapi_global = (_ompi_api_commdata *) global_commgroup;
97 _ompi_api_commdata* commgroup=NULL;
99 DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK,
" split now comm on master: grank=%d gsize=%d filenumber=%d, numfiles=%d, lrank=%d lsize=%d \n",
100 grank, gsize, filenumber, numfiles, lrank, lsize));
103 _sion_opmi_grc=SION_STD_SUCCESS;
104 DPRINTFP((256,
"_mpi_create_lcg_cb", _SION_DEFAULT_RANK,
" I'm on master\n",rc));
107 if(global_commgroup==NULL) {
108 fprintf(stderr,
"_mpi_create_lcg_cb: error no global commgroup given, aborting ...\n");
109 return(SION_STD_NOT_SUCCESS);
111 if(*local_commgroup!=NULL) {
112 fprintf(stderr,
"_mpi_create_lcg_cb: error local commgroup already initialized, aborting ...\n");
113 return(SION_STD_NOT_SUCCESS);
117 if(sapi_global->lcommgroup!=NULL) {
119 commgroup=*local_commgroup=sapi_global->lcommgroup;
120 DPRINTFP((256,
"_mpi_create_lcg_cb", _SION_DEFAULT_RANK,
" local comm group is already set\n"));
124 commgroup = (_ompi_api_commdata *) malloc(
sizeof(_ompi_api_commdata));
125 if (commgroup == NULL) {
126 fprintf(stderr,
"_ompi_create_lcg_cb: cannot allocate memory for local commgroup of size %lu, aborting ...\n",
127 (
unsigned long)
sizeof(_ompi_api_commdata));
128 _sion_opmi_grc=SION_STD_NOT_SUCCESS;
131 commgroup->commset=0; commgroup->lcommgroup=NULL;
132 commgroup->commcreated=0;
133 commgroup->rank=lrank;
134 commgroup->size=lsize;
135 commgroup->num_threads=sapi_global->num_threads;
136 commgroup->thread_num=sapi_global->thread_num;
138 *local_commgroup=commgroup;
139 sapi_global->lcommgroup=commgroup;
144 if(commgroup->commset==0) {
146 DPRINTFP((256,
"_mpi_create_lcg_cb", _SION_DEFAULT_RANK,
"MPI_Comm_split(%x,%d,%d,%x)\n",sapi_global->comm,filenumber,lrank,&commgroup->comm));
147 _sion_opmi_grc = MPI_Comm_split(sapi_global->comm, filenumber, lrank, &commgroup->comm);
148 DPRINTFP((256,
"_mpi_create_lcg_cb", _SION_DEFAULT_RANK,
" rc=%d from MPI_Comm_split\n",rc));
149 commgroup->local=1; commgroup->commset=1; commgroup->commcreated=1;
161 #define DFUNCTION "_sion_ompi_free_lcg_cb" 162 int _sion_ompi_free_lcg_cb(
void *local_commgroup) {
163 int rc=SION_STD_SUCCESS;
164 _ompi_api_commdata* commgroup = (_ompi_api_commdata *) local_commgroup;
167 _sion_opmi_grc=SION_STD_SUCCESS;
169 if ( (commgroup->commset) && (commgroup->commcreated) ) {
170 DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK,
" free now comm\n"));
171 _sion_opmi_grc=MPI_Comm_free(&commgroup->comm);
172 DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK,
" free now comm rc=%d\n",_sion_opmi_grc));
186 #define DFUNCTION "_sion_ompi_barrier_cb" 187 int _sion_ompi_barrier_cb(
void *commdata)
189 int rc=SION_STD_SUCCESS;
190 _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
197 commgroup = sapi->comm;
198 DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK,
" performing MPI barrier now\n"));
199 _sion_opmi_grc = MPI_Barrier(commgroup);
214 #define DFUNCTION "_sion_ompi_bcastr_cb" 215 int _sion_ompi_bcastr_cb(
void *data,
void *commdata,
int dtype,
int nelem,
int root)
217 int rc=SION_STD_SUCCESS;
218 _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
225 commgroup = sapi->comm;
228 _sion_opmi_grc = MPI_Bcast((sion_int32 *) data, nelem, SION_MPI_INT32, root, commgroup);
231 _sion_opmi_grc = MPI_Bcast((sion_int64 *) data, nelem, SION_MPI_INT64, root, commgroup);
234 _sion_opmi_grc = MPI_Bcast((
char *) data, nelem, MPI_CHAR, root, commgroup);
237 _sion_opmi_grc = MPI_Bcast((sion_int64 *) data, nelem, SION_MPI_INT64, root, commgroup);
245 help=__sion_ompi_share_ptr((
void *) data);
248 if((omp_get_thread_num()!=root) && (help != NULL)) {
249 memcpy(data,help,nelem*_sion_ompi_size_of_dtype(dtype));
266 #define DFUNCTION "_sion_ompi_gatherr_cb" 267 int _sion_ompi_gatherr_cb(
void *indata,
void *outdata,
void *commdata,
int dtype,
int nelem,
int root)
269 int rc=SION_STD_SUCCESS;
271 _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
273 void *helpdata, *help;
274 ONLY_DEBUG(
int rank=sapi->rank;)
275 ONLY_DEBUG(
int size=sapi->size;)
277 mroot=_sion_map_rank_ompi_to_mpi(root,sapi->num_threads);
279 DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, " starting on %d of %d nelem=%d root=%d (MPI: %d)\n", rank, size, nelem, root, mroot));
284 _sion_opmi_grc=SION_STD_SUCCESS;
286 helpdata = (
int *) malloc(sapi->num_threads * nelem * _sion_ompi_size_of_dtype(dtype));
287 if (helpdata == NULL) {
288 fprintf(stderr,
"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %lu (helpdata), aborting ...\n",
289 (
unsigned long) sapi->num_threads * nelem * _sion_ompi_size_of_dtype(dtype));
290 _sion_opmi_grc=SION_STD_NOT_SUCCESS;
295 help=__sion_ompi_share_ptr((
void *) helpdata);
298 if(_sion_opmi_grc)
return(_sion_opmi_grc);
301 memcpy(help+sapi->thread_num*nelem*_sion_ompi_size_of_dtype(dtype),
303 nelem*_sion_ompi_size_of_dtype(dtype));
312 commgroup = sapi->comm;
315 _sion_opmi_grc = MPI_Gather((sion_int32 *) help, sapi->num_threads*nelem, SION_MPI_INT32, (sion_int32 *) outdata, sapi->num_threads*nelem, SION_MPI_INT32, mroot, commgroup);
318 _sion_opmi_grc = MPI_Gather((sion_int64 *) help, sapi->num_threads*nelem, SION_MPI_INT64, (sion_int64 *) outdata, sapi->num_threads*nelem, SION_MPI_INT64, mroot, commgroup);
321 _sion_opmi_grc = MPI_Gather((
char *) help, sapi->num_threads*nelem, MPI_CHAR, (
char *) outdata, sapi->num_threads*nelem, MPI_CHAR, mroot, commgroup);
324 _sion_opmi_grc = MPI_Gather((sion_int64 *) help, sapi->num_threads*nelem, SION_MPI_INT64, (sion_int64 *) outdata, sapi->num_threads*nelem, SION_MPI_INT64, mroot, commgroup);
345 #define DFUNCTION "_sion_ompi_scatterr_cb" 346 int _sion_ompi_scatterr_cb(
void *indata,
void *outdata,
void *commdata,
int dtype,
int nelem,
int root)
348 int rc=SION_STD_SUCCESS, mroot;
349 _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
351 void *helpdata, *help;
352 ONLY_DEBUG(
int rank=sapi->rank;)
353 ONLY_DEBUG(
int size=sapi->size;)
355 mroot=_sion_map_rank_ompi_to_mpi(root,sapi->num_threads);
357 DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, " starting on %d of %d nelem=%d root=%d (MPI: %d)\n", rank, size, nelem, root, mroot));
364 helpdata = (
int *) malloc(sapi->num_threads * nelem * _sion_ompi_size_of_dtype(dtype));
365 if (helpdata == NULL) {
366 fprintf(stderr,
"_sion_ompi_scatterr_cb: cannot allocate temporary memory of size %lu (helpdata), aborting ...\n",
367 (
unsigned long) sapi->num_threads * nelem * _sion_ompi_size_of_dtype(dtype));
368 _sion_opmi_grc=SION_STD_NOT_SUCCESS;
373 help=__sion_ompi_share_ptr((
void *) helpdata);
376 if(_sion_opmi_grc!=SION_STD_SUCCESS)
return(_sion_opmi_grc);
385 commgroup = sapi->comm;
388 _sion_opmi_grc = MPI_Scatter((sion_int32 *) indata, sapi->num_threads*nelem, SION_MPI_INT32, (sion_int32 *) help, sapi->num_threads*nelem, SION_MPI_INT32, mroot, commgroup);
391 _sion_opmi_grc = MPI_Scatter((sion_int64 *) indata, sapi->num_threads*nelem, SION_MPI_INT64, (sion_int64 *) help, sapi->num_threads*nelem, SION_MPI_INT64, mroot, commgroup);
394 _sion_opmi_grc = MPI_Scatter((
char *) indata, sapi->num_threads*nelem, MPI_CHAR, (
char *) help, sapi->num_threads*nelem, MPI_CHAR, mroot, commgroup);
397 _sion_opmi_grc = MPI_Scatter((sion_int64 *) indata, sapi->num_threads*nelem, SION_MPI_INT64, (sion_int64 *) help, sapi->num_threads*nelem, SION_MPI_INT64, mroot, commgroup);
410 help+sapi->thread_num*nelem*_sion_ompi_size_of_dtype(dtype),
411 nelem*_sion_ompi_size_of_dtype(dtype));
429 DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK,
" leaving nelem=%d root=%d, rc=%d\n", nelem, root, rc));
438 #define DFUNCTION "_sion_ompi_gathervr_cb" 439 int _sion_ompi_gathervr_cb(
void *indata,
void *outdata,
void *commdata,
int dtype,
int *counts,
int nelem,
int root)
441 int rc=SION_STD_SUCCESS;
442 int m, t, offset, mroot, mcount, toffset;
443 int *mcounts=NULL,*mdispls=NULL;
444 int *tcounts=NULL,*tdispls=NULL;
445 void *helpdata, *help;
446 _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
451 mroot=_sion_map_rank_ompi_to_mpi(root,sapi->num_threads);
453 DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK,
" input nelem=%d root=%d indata=%x, outdata=%x\n", nelem, root, indata, outdata));
459 helpdata = (
int *) malloc(sapi->num_threads *
sizeof(
int));
460 if (helpdata == NULL) {
461 fprintf(stderr,
"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %lu (tcounts), aborting ...\n",
462 (
unsigned long) sapi->num_threads *
sizeof(
int));
463 _sion_opmi_grc=SION_STD_NOT_SUCCESS;
468 tcounts=__sion_ompi_share_ptr((
void *) helpdata);
471 if(_sion_opmi_grc!=SION_STD_SUCCESS)
return(_sion_opmi_grc);
473 tcounts[sapi->thread_num]=nelem;
478 helpdata = (
int *) malloc(sapi->num_threads *
sizeof(
int));
479 if (helpdata == NULL) {
480 fprintf(stderr,
"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %lu (tcounts), aborting ...\n",
481 (
unsigned long) sapi->num_threads *
sizeof(
int));
482 _sion_opmi_grc=SION_STD_NOT_SUCCESS;
487 tdispls=__sion_ompi_share_ptr((
void *) helpdata);
490 if(_sion_opmi_grc!=SION_STD_SUCCESS)
return(_sion_opmi_grc);
495 for(t=0;t<size;t++) {
499 mcount=tdispls[size=1];
508 toffset=tdispls[sapi->thread_num];
514 helpdata = (
int *) malloc(mcount * _sion_ompi_size_of_dtype(dtype));
515 if (helpdata == NULL) {
516 fprintf(stderr,
"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %lu (helpdata), aborting ...\n",
517 (
unsigned long) mcount * _sion_ompi_size_of_dtype(dtype));
518 _sion_opmi_grc=SION_STD_NOT_SUCCESS;
525 help=__sion_ompi_share_ptr((
void *) helpdata);
528 if(_sion_opmi_grc!=SION_STD_SUCCESS)
return(_sion_opmi_grc);
531 memcpy(help+toffset*_sion_ompi_size_of_dtype(dtype),
533 nelem*_sion_ompi_size_of_dtype(dtype));
543 mcounts = (
int *) malloc(size *
sizeof(
int));
544 if (mcounts == NULL) {
545 fprintf(stderr,
"_mpi_gathervr_cb: cannot allocate temporary memory of size %lu (displs), aborting ...\n",(
unsigned long) size *
sizeof(
int));
546 _sion_opmi_grc=SION_STD_NOT_SUCCESS;
549 if(_sion_opmi_grc==SION_SUCCESS) {
550 mdispls = (
int *) malloc(size *
sizeof(
int));
551 if (mdispls == NULL) {
552 fprintf(stderr,
"_mpi_gathervr_cb: cannot allocate temporary memory of size %lu (displs), aborting ...\n",(
unsigned long) size *
sizeof(
int));
553 _sion_opmi_grc=SION_STD_NOT_SUCCESS;
558 if(_sion_opmi_grc==SION_SUCCESS) {
559 for(m=0;m<size;m++) {
561 for(t=0;t<sapi->num_threads;t++) {
562 mcounts[m]+=counts[m*sapi->num_threads+t];
567 for(m=0;m<size;m++) {
570 DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK,
" after MPI_Gather %2d -> dpls=%2d count=%d\n", m,mdispls[m],mcounts[m]));
578 if(_sion_opmi_grc!=SION_STD_SUCCESS)
return(_sion_opmi_grc);
584 commgroup = sapi->comm;
587 _sion_opmi_grc = MPI_Gatherv((sion_int32 *) help, mcount, SION_MPI_INT32, (sion_int32 *) outdata, mcounts, mdispls, SION_MPI_INT32, mroot, commgroup);
590 _sion_opmi_grc = MPI_Gatherv((sion_int64 *) help, mcount, SION_MPI_INT64, (sion_int64 *) outdata, mcounts, mdispls, SION_MPI_INT64, mroot, commgroup);
593 _sion_opmi_grc = MPI_Gatherv((
char *) help, mcount, MPI_CHAR, (sion_int32 *) outdata, mcounts, mdispls, MPI_CHAR, mroot, commgroup);
596 _sion_opmi_grc = MPI_Gatherv((sion_int64 *) help, mcount, SION_MPI_INT64, (sion_int64 *) outdata, mcounts, mdispls, SION_MPI_INT64, mroot, commgroup);
604 if(tcounts) free(tcounts);
605 if(tdispls) free(tdispls);
609 if(mcounts) free(mcounts);
610 if(mdispls) free(mdispls);
630 #define DFUNCTION "_sion_ompi_scatterr_cb" 631 int _sion_ompi_scattervr_cb(
void *indata,
void *outdata,
void *commdata,
int dtype,
int *counts,
int nelem,
int root)
633 int rc=SION_STD_SUCCESS;
634 int m, t, offset, mroot, mcount, toffset;
635 int *mcounts=NULL,*mdispls=NULL;
636 int *tcounts=NULL,*tdispls=NULL;
637 void *helpdata, *help;
638 _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
643 mroot=_sion_map_rank_ompi_to_mpi(root,sapi->num_threads);
645 DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK,
" input nelem=%d root=%d indata=%x, outdata=%x\n", nelem, root, indata, outdata));
651 _sion_opmi_grc=SION_STD_SUCCESS;
653 helpdata = (
int *) malloc(sapi->num_threads *
sizeof(
int));
654 if (helpdata == NULL) {
655 fprintf(stderr,
"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %lu (tcounts), aborting ...\n",
656 (
unsigned long) sapi->num_threads *
sizeof(
int));
657 _sion_opmi_grc=SION_STD_NOT_SUCCESS;
662 tcounts=__sion_ompi_share_ptr((
void *) helpdata);
665 if(_sion_opmi_grc!=SION_STD_SUCCESS)
return(_sion_opmi_grc);
667 tcounts[sapi->thread_num]=nelem;
672 helpdata = (
int *) malloc(sapi->num_threads *
sizeof(
int));
673 if (helpdata == NULL) {
674 fprintf(stderr,
"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %lu (tcounts), aborting ...\n",
675 (
unsigned long) sapi->num_threads *
sizeof(
int));
676 _sion_opmi_grc=SION_STD_NOT_SUCCESS;
681 tdispls=__sion_ompi_share_ptr((
void *) helpdata);
684 if(_sion_opmi_grc!=SION_STD_SUCCESS)
return(_sion_opmi_grc);
689 for(t=0;t<size;t++) {
693 mcount=tdispls[size=1];
702 toffset=tdispls[sapi->thread_num];
708 helpdata = (
int *) malloc(mcount * _sion_ompi_size_of_dtype(dtype));
709 if (helpdata == NULL) {
710 fprintf(stderr,
"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %lu (helpdata), aborting ...\n",
711 (
unsigned long) mcount * _sion_ompi_size_of_dtype(dtype));
712 _sion_opmi_grc=SION_STD_NOT_SUCCESS;
717 help=__sion_ompi_share_ptr((
void *) helpdata);
725 mcounts = (
int *) malloc(size *
sizeof(
int));
726 if (mcounts == NULL) {
727 fprintf(stderr,
"_mpi_gathervr_cb: cannot allocate temporary memory of size %lu (displs), aborting ...\n",(
unsigned long) size *
sizeof(
int));
728 _sion_opmi_grc=SION_STD_NOT_SUCCESS;
731 if(_sion_opmi_grc==SION_SUCCESS) {
732 mdispls = (
int *) malloc(size *
sizeof(
int));
733 if (mdispls == NULL) {
734 fprintf(stderr,
"_mpi_gathervr_cb: cannot allocate temporary memory of size %lu (displs), aborting ...\n",(
unsigned long) size *
sizeof(
int));
735 _sion_opmi_grc=SION_STD_NOT_SUCCESS;
740 if(_sion_opmi_grc==SION_SUCCESS) {
741 for(m=0;m<size;m++) {
743 for(t=0;t<sapi->num_threads;t++) {
744 mcounts[m]+=counts[m*sapi->num_threads+t];
749 for(m=0;m<size;m++) {
752 DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK,
" after MPI_Gather %2d -> dpls=%2d count=%d\n", m,mdispls[m],mcounts[m]));
760 if(_sion_opmi_grc!=SION_STD_SUCCESS)
return(_sion_opmi_grc);
765 commgroup = sapi->comm;
768 _sion_opmi_grc = MPI_Scatterv((sion_int32 *) outdata, mcounts, mdispls, SION_MPI_INT32, (sion_int32 *) help, mcount, SION_MPI_INT32, mroot, commgroup);
771 _sion_opmi_grc = MPI_Scatterv((sion_int64 *) outdata, mcounts, mdispls, SION_MPI_INT64, (sion_int64 *) help, mcount, SION_MPI_INT64, mroot, commgroup);
774 _sion_opmi_grc = MPI_Scatterv((
char *) outdata, mcounts, mdispls, MPI_CHAR, (sion_int32 *) help, mcount, MPI_CHAR, mroot, commgroup);
777 _sion_opmi_grc = MPI_Scatterv((sion_int64 *) outdata, mcounts, mdispls, SION_MPI_INT64, (sion_int64 *) help, mcount, SION_MPI_INT64, mroot, commgroup);
784 if(_sion_opmi_grc!=SION_STD_SUCCESS)
return(_sion_opmi_grc);
791 help+toffset*_sion_ompi_size_of_dtype(dtype),
792 nelem*_sion_ompi_size_of_dtype(dtype));
798 if(tcounts) free(tcounts);
799 if(tdispls) free(tdispls);
803 if(mcounts) free(mcounts);
804 if(mdispls) free(mdispls);
824 #define DFUNCTION "__sion_ompi_share_ptr" 825 void * __sion_ompi_share_ptr(
void * in_ptr) {
829 __ompi_global_pointer = in_ptr;
836 out_ptr=__ompi_global_pointer;
843 int _sion_ompi_size_of_dtype(
int dtype) {
845 case _SION_INT32:
return(
sizeof(sion_int32));
break;
846 case _SION_INT64:
return(
sizeof(sion_int64));
break;
847 case _SION_CHAR:
return(
sizeof(
char));
break;
848 default:
return(
sizeof(sion_int64));