17 #define _XOPEN_SOURCE 700
29 #define MPI_Comm_rank PMPI_Comm_rank
30 #define MPI_Comm_size PMPI_Comm_size
31 #define MPI_Gather PMPI_Gather
32 #define MPI_Scatter PMPI_Scatter
33 #define MPI_Bcast PMPI_Bcast
34 #define MPI_Barrier PMPI_Barrier
35 #define MPI_Comm_split PMPI_Comm_split
36 #define MPI_Send PMPI_Send
37 #define MPI_Recv PMPI_Recv
42 #include <sys/types.h>
62 static void *__ompi_global_pointer;
63 static int _sion_opmi_grc=SION_SUCCESS;
65 int _sion_ompi_size_of_dtype(
int dtype);
66 void * __sion_ompi_share_ptr(
void * in_ptr);
69 int _sion_register_callbacks_ompi(
void) {
71 aid=sion_generic_create_api(
"SIONlib_OMPI_API");
74 sion_generic_register_create_local_commgroup_cb(aid,&_sion_ompi_create_lcg_cb);
75 sion_generic_register_free_local_commgroup_cb(aid,&_sion_ompi_free_lcg_cb);
77 sion_generic_register_barrier_cb(aid,&_sion_ompi_barrier_cb);
78 sion_generic_register_bcastr_cb(aid,&_sion_ompi_bcastr_cb);
79 sion_generic_register_gatherr_cb(aid,&_sion_ompi_gatherr_cb);
80 sion_generic_register_scatterr_cb(aid,&_sion_ompi_scatterr_cb);
81 sion_generic_register_gathervr_cb(aid,&_sion_ompi_gathervr_cb);
82 sion_generic_register_scattervr_cb(aid,&_sion_ompi_scattervr_cb);
83 sion_generic_register_gather_and_execute_cb(aid,&_sion_ompi_gather_process_cb);
84 sion_generic_register_execute_and_scatter_cb(aid,&_sion_ompi_process_scatter_cb);
85 sion_generic_register_get_capability_cb(aid,&_sion_ompi_get_capability_cb);
90 #define DFUNCTION "_sion_ompi_create_lcg_cb"
91 int _sion_ompi_create_lcg_cb(
void **local_commgroup,
void *global_commgroup,
94 int filenumber,
int numfiles
97 int rc=SION_STD_SUCCESS;
98 _ompi_api_commdata* sapi_global = (_ompi_api_commdata *) global_commgroup;
99 _ompi_api_commdata* commgroup=NULL;
100 int create_lcomm=1, set_in_global=1, mrank=0, msize=1, color;
102 DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK,
" split now comm on master: grank=%d gsize=%d filenumber=%d, numfiles=%d, lrank=%d lsize=%d \n",
103 grank, gsize, filenumber, numfiles, lrank, lsize));
106 _sion_opmi_grc=SION_STD_SUCCESS;
107 DPRINTFP((256,
"_mpi_create_lcg_cb", _SION_DEFAULT_RANK,
" I'm on master\n",rc));
110 if(global_commgroup==NULL) {
111 fprintf(stderr,
"_mpi_create_lcg_cb: error no global commgroup given, aborting ...\n");
112 return(SION_STD_NOT_SUCCESS);
114 if(*local_commgroup!=NULL) {
115 fprintf(stderr,
"_mpi_create_lcg_cb: error local commgroup already initialized, aborting ...\n");
116 return(SION_STD_NOT_SUCCESS);
120 if(sapi_global->lcommgroup!=NULL) {
122 if(sapi_global->lcommgroup->commset==0) {
123 *local_commgroup=sapi_global->lcommgroup;
124 create_lcomm=0;set_in_global=0;
125 sapi_global->lcommgroup->commset=1;
127 create_lcomm=1;set_in_global=0;
134 commgroup = (_ompi_api_commdata *) malloc(
sizeof(_ompi_api_commdata));
135 if (commgroup == NULL) {
136 fprintf(stderr,
"_ompi_create_lcg_cb: cannot allocate memory for commgroup of size %zu, aborting ...\n",
137 sizeof(_ompi_api_commdata));
138 _sion_opmi_grc=SION_STD_NOT_SUCCESS;
141 commgroup->commset=0; commgroup->lcommgroup=NULL;
142 commgroup->commcreated=0;
143 commgroup->rank=lrank;
144 commgroup->size=lsize;
145 commgroup->num_threads=sapi_global->num_threads;
146 commgroup->thread_num=sapi_global->thread_num;
151 if(filenumber==-1) color=MPI_UNDEFINED;
152 _sion_opmi_grc = MPI_Comm_split(sapi_global->comm, color, lrank, &commgroup->comm);
153 DPRINTFP((256,
"_ompi_create_lcg_cb", grank,
" rc=%d from MPI_Comm_split(comm,%d,%d,&newcomm)\n",rc,color,lrank));
154 commgroup->local=1; commgroup->commset=1;
159 sapi_global->lcommgroup=commgroup;
162 *local_commgroup=commgroup;
173 MPI_Comm_rank(commgroup->comm, &mrank);
174 MPI_Comm_size(commgroup->comm, &msize);
177 DPRINTFP((256,
"_mpi_create_lcg_cb", grank,
" leave rc=%d rank %d of %d\n",rc,mrank,msize));
185 #define DFUNCTION "_sion_ompi_free_lcg_cb"
186 int _sion_ompi_free_lcg_cb(
void *local_commgroup) {
187 int rc=SION_STD_SUCCESS;
188 _ompi_api_commdata* commgroup = (_ompi_api_commdata *) local_commgroup;
191 _sion_opmi_grc=SION_STD_SUCCESS;
193 if ( (commgroup->commset) && (commgroup->commcreated) ) {
194 DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK,
" free now comm\n"));
195 _sion_opmi_grc=MPI_Comm_free(&commgroup->comm);
196 DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK,
" free now comm rc=%d\n",_sion_opmi_grc));
210 #define DFUNCTION "_sion_ompi_barrier_cb"
211 int _sion_ompi_barrier_cb(
void *commdata)
213 int rc=SION_STD_SUCCESS;
214 _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
221 commgroup = sapi->comm;
222 DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK,
" performing MPI barrier now\n"));
223 _sion_opmi_grc = MPI_Barrier(commgroup);
238 #define DFUNCTION "_sion_ompi_bcastr_cb"
239 int _sion_ompi_bcastr_cb(
void *data,
void *commdata,
int dtype,
int nelem,
int root)
241 int rc=SION_STD_SUCCESS;
242 _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
249 commgroup = sapi->comm;
252 _sion_opmi_grc = MPI_Bcast((sion_int32 *) data, nelem, SION_MPI_INT32, root, commgroup);
255 _sion_opmi_grc = MPI_Bcast((sion_int64 *) data, nelem, SION_MPI_INT64, root, commgroup);
258 _sion_opmi_grc = MPI_Bcast((
char *) data, nelem, MPI_CHAR, root, commgroup);
261 _sion_opmi_grc = MPI_Bcast((sion_int64 *) data, nelem, SION_MPI_INT64, root, commgroup);
269 help=__sion_ompi_share_ptr((
void *) data);
272 if((omp_get_thread_num()!=root) && (help != NULL)) {
273 memcpy(data,help,nelem*_sion_ompi_size_of_dtype(dtype));
290 #define DFUNCTION "_sion_ompi_gatherr_cb"
291 int _sion_ompi_gatherr_cb(
void *indata,
void *outdata,
void *commdata,
int dtype,
int nelem,
int root)
293 int rc=SION_STD_SUCCESS;
295 _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
297 void *helpdata, *help;
298 ONLY_DEBUG(
int rank=sapi->rank;)
299 ONLY_DEBUG(
int size=sapi->size;)
301 mroot=_sion_map_rank_ompi_to_mpi(root,sapi->num_threads);
303 DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, " starting on %d of %d nelem=%d root=%d (MPI: %d)\n", rank, size, nelem, root, mroot));
308 _sion_opmi_grc=SION_STD_SUCCESS;
310 helpdata = (
int *) malloc(sapi->num_threads * nelem * _sion_ompi_size_of_dtype(dtype));
311 if (helpdata == NULL) {
312 fprintf(stderr,
"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %lu (helpdata), aborting ...\n",
313 (
unsigned long) sapi->num_threads * nelem * _sion_ompi_size_of_dtype(dtype));
314 _sion_opmi_grc=SION_STD_NOT_SUCCESS;
319 help=__sion_ompi_share_ptr((
void *) helpdata);
322 if(_sion_opmi_grc)
return(_sion_opmi_grc);
325 memcpy((
char *)help+sapi->thread_num*nelem*_sion_ompi_size_of_dtype(dtype),
327 nelem*_sion_ompi_size_of_dtype(dtype));
336 commgroup = sapi->comm;
339 _sion_opmi_grc = MPI_Gather((sion_int32 *) help, sapi->num_threads*nelem, SION_MPI_INT32, (sion_int32 *) outdata, sapi->num_threads*nelem, SION_MPI_INT32, mroot, commgroup);
342 _sion_opmi_grc = MPI_Gather((sion_int64 *) help, sapi->num_threads*nelem, SION_MPI_INT64, (sion_int64 *) outdata, sapi->num_threads*nelem, SION_MPI_INT64, mroot, commgroup);
345 _sion_opmi_grc = MPI_Gather((
char *) help, sapi->num_threads*nelem, MPI_CHAR, (
char *) outdata, sapi->num_threads*nelem, MPI_CHAR, mroot, commgroup);
348 _sion_opmi_grc = MPI_Gather((sion_int64 *) help, sapi->num_threads*nelem, SION_MPI_INT64, (sion_int64 *) outdata, sapi->num_threads*nelem, SION_MPI_INT64, mroot, commgroup);
369 #define DFUNCTION "_sion_ompi_scatterr_cb"
370 int _sion_ompi_scatterr_cb(
void *indata,
void *outdata,
void *commdata,
int dtype,
int nelem,
int root)
372 int rc=SION_STD_SUCCESS, mroot;
373 _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
375 void *helpdata, *help;
376 ONLY_DEBUG(
int rank=sapi->rank;)
377 ONLY_DEBUG(
int size=sapi->size;)
379 mroot=_sion_map_rank_ompi_to_mpi(root,sapi->num_threads);
381 DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK, " starting on %d of %d nelem=%d root=%d (MPI: %d)\n", rank, size, nelem, root, mroot));
388 helpdata = (
int *) malloc(sapi->num_threads * nelem * _sion_ompi_size_of_dtype(dtype));
389 if (helpdata == NULL) {
390 fprintf(stderr,
"_sion_ompi_scatterr_cb: cannot allocate temporary memory of size %lu (helpdata), aborting ...\n",
391 (
unsigned long) sapi->num_threads * nelem * _sion_ompi_size_of_dtype(dtype));
392 _sion_opmi_grc=SION_STD_NOT_SUCCESS;
397 help=__sion_ompi_share_ptr((
void *) helpdata);
400 if(_sion_opmi_grc!=SION_STD_SUCCESS)
return(_sion_opmi_grc);
409 commgroup = sapi->comm;
412 _sion_opmi_grc = MPI_Scatter((sion_int32 *) indata, sapi->num_threads*nelem, SION_MPI_INT32, (sion_int32 *) help, sapi->num_threads*nelem, SION_MPI_INT32, mroot, commgroup);
415 _sion_opmi_grc = MPI_Scatter((sion_int64 *) indata, sapi->num_threads*nelem, SION_MPI_INT64, (sion_int64 *) help, sapi->num_threads*nelem, SION_MPI_INT64, mroot, commgroup);
418 _sion_opmi_grc = MPI_Scatter((
char *) indata, sapi->num_threads*nelem, MPI_CHAR, (
char *) help, sapi->num_threads*nelem, MPI_CHAR, mroot, commgroup);
421 _sion_opmi_grc = MPI_Scatter((sion_int64 *) indata, sapi->num_threads*nelem, SION_MPI_INT64, (sion_int64 *) help, sapi->num_threads*nelem, SION_MPI_INT64, mroot, commgroup);
434 (
char *)help+sapi->thread_num*nelem*_sion_ompi_size_of_dtype(dtype),
435 nelem*_sion_ompi_size_of_dtype(dtype));
453 DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK,
" leaving nelem=%d root=%d, rc=%d\n", nelem, root, rc));
462 #define DFUNCTION "_sion_ompi_gathervr_cb"
463 int _sion_ompi_gathervr_cb(
void *indata,
void *outdata,
void *commdata,
int dtype,
int *counts,
int nelem,
int root)
465 int rc=SION_STD_SUCCESS;
466 int m, t, offset, mroot, mcount, toffset;
467 int *mcounts=NULL,*mdispls=NULL;
468 int *tcounts=NULL,*tdispls=NULL;
469 void *helpdata, *help;
470 _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
475 mroot=_sion_map_rank_ompi_to_mpi(root,sapi->num_threads);
477 DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK,
" input nelem=%d root=%d indata=%x, outdata=%x\n", nelem, root, indata, outdata));
483 helpdata = (
int *) malloc(sapi->num_threads *
sizeof(
int));
484 if (helpdata == NULL) {
485 fprintf(stderr,
"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %zu (helpdata), aborting ...\n",
486 (
size_t) sapi->num_threads *
sizeof(
int));
487 _sion_opmi_grc=SION_STD_NOT_SUCCESS;
492 tcounts=__sion_ompi_share_ptr((
void *) helpdata);
495 if(_sion_opmi_grc!=SION_STD_SUCCESS)
return(_sion_opmi_grc);
497 tcounts[sapi->thread_num]=nelem;
502 helpdata = (
int *) malloc(sapi->num_threads *
sizeof(
int));
503 if (helpdata == NULL) {
504 fprintf(stderr,
"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %zu (helpdata), aborting ...\n",
505 (
size_t) sapi->num_threads *
sizeof(
int));
506 _sion_opmi_grc=SION_STD_NOT_SUCCESS;
511 tdispls=__sion_ompi_share_ptr((
void *) helpdata);
514 if(_sion_opmi_grc!=SION_STD_SUCCESS)
return(_sion_opmi_grc);
519 for(t=0;t<size;t++) {
523 mcount=tdispls[size=1];
532 toffset=tdispls[sapi->thread_num];
538 helpdata = (
int *) malloc(mcount * _sion_ompi_size_of_dtype(dtype));
539 if (helpdata == NULL) {
540 fprintf(stderr,
"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %lu (helpdata), aborting ...\n",
541 (
unsigned long) mcount * _sion_ompi_size_of_dtype(dtype));
542 _sion_opmi_grc=SION_STD_NOT_SUCCESS;
549 help=__sion_ompi_share_ptr((
void *) helpdata);
552 if(_sion_opmi_grc!=SION_STD_SUCCESS)
return(_sion_opmi_grc);
555 memcpy((
char *)help+toffset*_sion_ompi_size_of_dtype(dtype),
557 nelem*_sion_ompi_size_of_dtype(dtype));
567 mcounts = (
int *) malloc(size *
sizeof(
int));
568 if (mcounts == NULL) {
569 fprintf(stderr,
"_mpi_gathervr_cb: cannot allocate temporary memory of size %zu (mcounts), aborting ...\n",(
size_t) size *
sizeof(
int));
570 _sion_opmi_grc=SION_STD_NOT_SUCCESS;
573 if(_sion_opmi_grc==SION_SUCCESS) {
574 mdispls = (
int *) malloc(size *
sizeof(
int));
575 if (mdispls == NULL) {
576 fprintf(stderr,
"_mpi_gathervr_cb: cannot allocate temporary memory of size %zu (mdispls), aborting ...\n",(
size_t) size *
sizeof(
int));
577 _sion_opmi_grc=SION_STD_NOT_SUCCESS;
582 if(_sion_opmi_grc==SION_SUCCESS) {
583 for(m=0;m<size;m++) {
585 for(t=0;t<sapi->num_threads;t++) {
586 mcounts[m]+=counts[m*sapi->num_threads+t];
591 for(m=0;m<size;m++) {
594 DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK,
" after MPI_Gather %2d -> dpls=%2d count=%d\n", m,mdispls[m],mcounts[m]));
602 if(_sion_opmi_grc!=SION_STD_SUCCESS)
return(_sion_opmi_grc);
608 commgroup = sapi->comm;
611 _sion_opmi_grc = MPI_Gatherv((sion_int32 *) help, mcount, SION_MPI_INT32, (sion_int32 *) outdata, mcounts, mdispls, SION_MPI_INT32, mroot, commgroup);
614 _sion_opmi_grc = MPI_Gatherv((sion_int64 *) help, mcount, SION_MPI_INT64, (sion_int64 *) outdata, mcounts, mdispls, SION_MPI_INT64, mroot, commgroup);
617 _sion_opmi_grc = MPI_Gatherv((
char *) help, mcount, MPI_CHAR, (sion_int32 *) outdata, mcounts, mdispls, MPI_CHAR, mroot, commgroup);
620 _sion_opmi_grc = MPI_Gatherv((sion_int64 *) help, mcount, SION_MPI_INT64, (sion_int64 *) outdata, mcounts, mdispls, SION_MPI_INT64, mroot, commgroup);
628 if(tcounts) free(tcounts);
629 if(tdispls) free(tdispls);
633 if(mcounts) free(mcounts);
634 if(mdispls) free(mdispls);
654 #define DFUNCTION "_sion_ompi_scatterr_cb"
655 int _sion_ompi_scattervr_cb(
void *indata,
void *outdata,
void *commdata,
int dtype,
int *counts,
int nelem,
int root)
657 int rc=SION_STD_SUCCESS;
658 int m, t, offset, mroot, mcount, toffset;
659 int *mcounts=NULL,*mdispls=NULL;
660 int *tcounts=NULL,*tdispls=NULL;
661 void *helpdata, *help;
662 _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
667 mroot=_sion_map_rank_ompi_to_mpi(root,sapi->num_threads);
669 DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK,
" input nelem=%d root=%d indata=%x, outdata=%x\n", nelem, root, indata, outdata));
675 _sion_opmi_grc=SION_STD_SUCCESS;
677 helpdata = (
int *) malloc(sapi->num_threads *
sizeof(
int));
678 if (helpdata == NULL) {
679 fprintf(stderr,
"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %zu (helpdata), aborting ...\n",
680 (
size_t) sapi->num_threads *
sizeof(
int));
681 _sion_opmi_grc=SION_STD_NOT_SUCCESS;
686 tcounts=__sion_ompi_share_ptr((
void *) helpdata);
689 if(_sion_opmi_grc!=SION_STD_SUCCESS)
return(_sion_opmi_grc);
691 tcounts[sapi->thread_num]=nelem;
696 helpdata = (
int *) malloc(sapi->num_threads *
sizeof(
int));
697 if (helpdata == NULL) {
698 fprintf(stderr,
"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %zu (helpdata), aborting ...\n",
699 (
size_t) sapi->num_threads *
sizeof(
int));
700 _sion_opmi_grc=SION_STD_NOT_SUCCESS;
705 tdispls=__sion_ompi_share_ptr((
void *) helpdata);
708 if(_sion_opmi_grc!=SION_STD_SUCCESS)
return(_sion_opmi_grc);
713 for(t=0;t<size;t++) {
717 mcount=tdispls[size=1];
726 toffset=tdispls[sapi->thread_num];
732 helpdata = (
int *) malloc(mcount * _sion_ompi_size_of_dtype(dtype));
733 if (helpdata == NULL) {
734 fprintf(stderr,
"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %lu (helpdata), aborting ...\n",
735 (
unsigned long) mcount * _sion_ompi_size_of_dtype(dtype));
736 _sion_opmi_grc=SION_STD_NOT_SUCCESS;
741 help=__sion_ompi_share_ptr((
void *) helpdata);
749 mcounts = (
int *) malloc(size *
sizeof(
int));
750 if (mcounts == NULL) {
751 fprintf(stderr,
"_mpi_gathervr_cb: cannot allocate temporary memory of size %zu (mcounts), aborting ...\n",(
size_t) size *
sizeof(
int));
752 _sion_opmi_grc=SION_STD_NOT_SUCCESS;
755 if(_sion_opmi_grc==SION_SUCCESS) {
756 mdispls = (
int *) malloc(size *
sizeof(
int));
757 if (mdispls == NULL) {
758 fprintf(stderr,
"_mpi_gathervr_cb: cannot allocate temporary memory of size %zu (mdispls), aborting ...\n",(
size_t) size *
sizeof(
int));
759 _sion_opmi_grc=SION_STD_NOT_SUCCESS;
764 if(_sion_opmi_grc==SION_SUCCESS) {
765 for(m=0;m<size;m++) {
767 for(t=0;t<sapi->num_threads;t++) {
768 mcounts[m]+=counts[m*sapi->num_threads+t];
773 for(m=0;m<size;m++) {
776 DPRINTFP((256, DFUNCTION, _SION_DEFAULT_RANK,
" after MPI_Gather %2d -> dpls=%2d count=%d\n", m,mdispls[m],mcounts[m]));
784 if(_sion_opmi_grc!=SION_STD_SUCCESS)
return(_sion_opmi_grc);
789 commgroup = sapi->comm;
792 _sion_opmi_grc = MPI_Scatterv((sion_int32 *) outdata, mcounts, mdispls, SION_MPI_INT32, (sion_int32 *) help, mcount, SION_MPI_INT32, mroot, commgroup);
795 _sion_opmi_grc = MPI_Scatterv((sion_int64 *) outdata, mcounts, mdispls, SION_MPI_INT64, (sion_int64 *) help, mcount, SION_MPI_INT64, mroot, commgroup);
798 _sion_opmi_grc = MPI_Scatterv((
char *) outdata, mcounts, mdispls, MPI_CHAR, (sion_int32 *) help, mcount, MPI_CHAR, mroot, commgroup);
801 _sion_opmi_grc = MPI_Scatterv((sion_int64 *) outdata, mcounts, mdispls, SION_MPI_INT64, (sion_int64 *) help, mcount, SION_MPI_INT64, mroot, commgroup);
808 if(_sion_opmi_grc!=SION_STD_SUCCESS)
return(_sion_opmi_grc);
815 (
char *)help+toffset*_sion_ompi_size_of_dtype(dtype),
816 nelem*_sion_ompi_size_of_dtype(dtype));
822 if(tcounts) free(tcounts);
823 if(tdispls) free(tdispls);
827 if(mcounts) free(mcounts);
828 if(mdispls) free(mdispls);
848 #define DFUNCTION "__sion_ompi_share_ptr"
849 void * __sion_ompi_share_ptr(
void * in_ptr) {
853 __ompi_global_pointer = in_ptr;
860 out_ptr=__ompi_global_pointer;
867 int _sion_ompi_size_of_dtype(
int dtype) {
869 case _SION_INT32:
return(
sizeof(sion_int32));
break;
870 case _SION_INT64:
return(
sizeof(sion_int64));
break;
871 case _SION_CHAR:
return(
sizeof(
char));
break;
872 default:
return(
sizeof(sion_int64));