24 #define MPI_Comm_rank      PMPI_Comm_rank    25 #define MPI_Comm_size      PMPI_Comm_size         26 #define MPI_Gather     PMPI_Gather        27 #define MPI_Scatter    PMPI_Scatter       28 #define MPI_Bcast      PMPI_Bcast         29 #define MPI_Barrier    PMPI_Barrier       30 #define MPI_Comm_split     PMPI_Comm_split        31 #define MPI_Send           PMPI_Send    32 #define MPI_Recv           PMPI_Recv    37 #include <sys/types.h>    44 #include "sion_error_handler.h"    55 static void *__ompicol_global_pointer;
    56 static int _sion_opmicol_grc=SION_SUCCESS;
    58 int _sion_ompicol_size_of_dtype(
int dtype);
    59 void * __sion_ompicol_share_ptr(
void * in_ptr);
    61 #define DFUNCTION "_ompi_gather_execute_cb"    62 int _sion_ompi_gather_process_cb(
const void *indata, sion_int64 *spec, 
int spec_len, sion_int64 fsblksize,
    63                  void *commdata,  
int collector, 
int range_start, 
int range_end, 
int sid, 
    64                  int process_cb(
const void *,sion_int64 *, 
int ) ) {
    66   int       t, startsignal=1,mrank,mt,tt, mcollector;
    72   sion_int64 **specs, *p_spec;
    73   sion_int64 bytestorecv, bytestosend, datasize;
    74   _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
    78   DPRINTFP((256, DFUNCTION, rank, 
" input collector=%d range_start=%d range_end=%d sid=%d\n", collector,range_start,range_end, sid));
    84     _sion_opmicol_grc=SION_STD_SUCCESS;
    86     helpdata = (
void *) malloc(sapi->num_threads * 
sizeof(sion_int64 *));
    87     if (helpdata == NULL) {
    88       fprintf(stderr,
"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %zu (helpdata), aborting ...\n",
    89           (
size_t) sapi->num_threads * 
sizeof(
int *));
    90       _sion_opmicol_grc=SION_STD_NOT_SUCCESS;
    95   specs = (sion_int64 **)__sion_ompicol_share_ptr((
void *) helpdata);
    98   if(_sion_opmicol_grc!=SION_STD_SUCCESS) 
return(_sion_opmicol_grc);
   101   specs[sapi->thread_num]= spec;
   110     helpdata = (
void *) malloc(sapi->num_threads * 
sizeof(
const void *));
   111     if (helpdata == NULL) {
   112       fprintf(stderr,
"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %zu (tcounts), aborting ...\n",
   113           (
size_t) sapi->num_threads * 
sizeof(
int *));
   114       _sion_opmicol_grc=SION_STD_NOT_SUCCESS;
   119   indatas = (
void const **)__sion_ompicol_share_ptr((
void *) helpdata);
   122   if(_sion_opmicol_grc!=SION_STD_SUCCESS) 
return(_sion_opmicol_grc);
   125   indatas[sapi->thread_num] = indata;
   133   DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"store SPECS[%d]=%x (%x)\n", sapi->thread_num,specs[sapi->thread_num], spec));
   134   DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"store DATAS[%d]=%x (%x)\n", sapi->thread_num,indatas[sapi->thread_num], indata));
   139     for(tt=0;tt<sapi->num_threads;tt++) {
   140       DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"MASTER SPECS[%d]=%x\n", tt,specs[tt]));
   141       DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"MASTER DATAS[%d]=%x\n", tt,indatas[tt]));
   148     commgroup = sapi->comm;    
   150     if(rank == collector) {
   153       mrank=_sion_map_rank_ompi_to_mpi(rank,sapi->num_threads);
   156       buffer = (
char *) malloc(fsblksize * 
sizeof(
char));
   157       if (buffer == NULL) {
   158     _sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,
"_mpi_gather_process_cb: cannot allocate temporary memory of size %lu (buffer), aborting ...\n",
   159                 (
unsigned long) fsblksize * 
sizeof(
char));
   160     _sion_opmicol_grc=SION_STD_NOT_SUCCESS;
   164       for(t=range_start;t<=range_end;t++) {
   166     mt=_sion_map_rank_ompi_to_mpi(t,sapi->num_threads);
   171       tt=_sion_map_rank_ompi_to_thread_num(t,sapi->num_threads);
   176       _sion_opmicol_grc=process_cb(p_data,p_spec, sid);
   181       DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"collector wait for spec from %d\n", mt));
   182       MPI_Recv(spec, spec_len, SION_MPI_INT64, mt, 1534, commgroup, &status);
   183       DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"collector got spec from %d (%lld,%lld)\n", 
   184             mt, (
long long) spec[0], (
long long) spec[1]));
   188         DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"collector send signal to %d\n", mt));
   189         MPI_Send(&startsignal, 1, MPI_INT, mt, 1535, commgroup);
   193       DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"collector start to process data of size %lld at offset %lld\n", 
   194             (
long long) spec[1], (
long long) spec[0]));
   199       while(bytestorecv>0) {
   200         if(bytestorecv>fsblksize) datasize=fsblksize;
   201         else                      datasize=bytestorecv;
   204         DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"collector wait for data block from %d\n", mt));
   205         MPI_Recv(buffer, datasize, MPI_CHAR, mt, 1536, commgroup, &status);
   206         DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"collector got data block from %d datasize=%lld bytestorecv=%lld\n", 
   207               mt, (
long long) datasize, (
long long) bytestorecv));
   212         DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"collector process data block of size %lld at pos %lld\n", 
   213               (
long long) spec[1], (
long long) spec[0]));
   215         _sion_opmicol_grc=process_cb(buffer,spec, sid);
   217         if(_sion_opmicol_grc != SION_STD_SUCCESS) {
   218           _sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,
"_ompi_gather_process_cb: problems writing data ...\n");
   222         bytestorecv-=datasize;spec[0]+=datasize;
   231       if (buffer) free(buffer);
   236       mcollector=_sion_map_rank_ompi_to_mpi(collector,sapi->num_threads);
   239       for(tt=0;tt<sapi->num_threads;tt++) {
   242     DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"sender send spec to %d (%lld,%lld)\n", 
   243           collector,(
long long) specs[tt][0], (
long long) specs[tt][1]));
   244     rc=MPI_Send(specs[tt], spec_len, SION_MPI_INT64, mcollector, 1534, commgroup);
   245     DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"sender sent spec to %d rc=%d\n", mcollector,rc));
   248     DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"sender wait for signal from %d\n", mcollector));
   249     MPI_Recv(&startsignal, 1, MPI_INT, mcollector, 1535, commgroup, &status);
   250     DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"sender got signal from %d\n", mcollector));
   253     bytestosend=specs[tt][1];
   254     p=(
char *) indatas[tt];
   255     while(bytestosend>0) {
   256       if(bytestosend>fsblksize) datasize=fsblksize;
   257       else                      datasize=bytestosend;
   258       DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"sender send data block to %d of size %lld\n", mcollector, (
long long) datasize));
   259       MPI_Send(p, datasize, MPI_CHAR, mcollector, 1536, commgroup);
   260       bytestosend-=datasize;p+=datasize;
   267   DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"leave collector=%d rc=%d\n", collector, rc ));
   273   rc=_sion_opmicol_grc;
   282 #define DFUNCTION "_ompi_process_scatter_cb"   283 int _sion_ompi_process_scatter_cb(
void *outdata, sion_int64 *spec, 
int spec_len, sion_int64 fsblksize,
   284                   void *commdata,  
int collector, 
int range_start, 
int range_end, 
int sid, 
   285                   int process_cb(
void *,sion_int64 *, 
int ) ) {
   287   int       t, startsignal=1, count, mrank, mt, tt, mcollector;
   293   sion_int64 bytestorecv, bytestosend, datasize;
   294   _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
   298   DPRINTFP((256, DFUNCTION, rank, 
" input collector=%d range_start=%d range_end=%d sid=%d\n", collector,range_start,range_end, sid));
   303     _sion_opmicol_grc=SION_STD_SUCCESS;
   305     helpdata = (
void *) malloc(sapi->num_threads * 
sizeof(sion_int64 *));
   306     if (helpdata == NULL) {
   307       fprintf(stderr,
"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %zu (helpdata), aborting ...\n",
   308           (
size_t) sapi->num_threads * 
sizeof(
int *));
   309       _sion_opmicol_grc=SION_STD_NOT_SUCCESS;
   314   specs = (sion_int64 **)__sion_ompicol_share_ptr((
void *) helpdata);
   317   if(_sion_opmicol_grc!=SION_STD_SUCCESS) 
return(_sion_opmicol_grc);
   320   specs[sapi->thread_num]= spec;
   329     helpdata = (
void *) malloc(sapi->num_threads * 
sizeof(
void *));
   330     if (helpdata == NULL) {
   331       fprintf(stderr,
"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %zu (helpdata), aborting ...\n",
   332           (
size_t) sapi->num_threads * 
sizeof(
int *));
   333       _sion_opmicol_grc=SION_STD_NOT_SUCCESS;
   338   outdatas = (
void **)__sion_ompicol_share_ptr((
void *) helpdata);
   341   if(_sion_opmicol_grc!=SION_STD_SUCCESS) 
return(_sion_opmicol_grc);
   344   outdatas[sapi->thread_num] = outdata;
   355     commgroup = sapi->comm;    
   358     if(rank == collector) {
   362       buffer = (
char *) malloc(fsblksize * 
sizeof(
char));
   363       if (buffer == NULL) {
   364     _sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,
"_ompi_gather_process_cb: cannot allocate temporary memory of size %lu (buffer), aborting ...\n",
   365                 (
unsigned long) fsblksize * 
sizeof(
char));
   366     _sion_opmicol_grc=SION_STD_NOT_SUCCESS;
   369       mrank=_sion_map_rank_ompi_to_mpi(rank,sapi->num_threads);
   372       for(t=range_start;t<=range_end;t++) {
   374     mt=_sion_map_rank_ompi_to_mpi(t,sapi->num_threads);
   379       tt=_sion_map_rank_ompi_to_thread_num(t,sapi->num_threads);
   382       _sion_opmicol_grc=process_cb(outdatas[tt],specs[tt], sid);
   387       DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"collector wait for spec from %d\n", t));
   388       MPI_Recv(spec, spec_len, SION_MPI_INT64, mt, 1534, commgroup, &status);
   389       DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"collector got spec from %d (%lld,%lld)\n", 
   390             t, (
long long) spec[0], (
long long) spec[1]));
   394         DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"collector send signal to %d\n", t));
   395         MPI_Send(&startsignal, 1, MPI_INT, mt, 1535, commgroup);
   399       DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"collector start to proces data of size %lld at offset %lld\n", 
   400             (
long long) spec[1], (
long long) spec[0]));
   405       while(bytestosend>0) {
   407         if(bytestosend>fsblksize) datasize=fsblksize;
   408         else                      datasize=bytestosend;
   413         DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"collector process data block of size %lld at pos %lld\n", 
   414               (
long long) spec[1], (
long long) spec[0]));
   416         _sion_opmicol_grc=process_cb(buffer,spec, sid);
   418         if(_sion_opmicol_grc != SION_STD_SUCCESS) {
   419           _sion_errorprint(SION_STD_NOT_SUCCESS,_SION_ERROR_RETURN,
"_ompi_gather_process_cb: problems writing data ...\n");
   423         DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"collector send for data block to %d\n", mt));
   424         MPI_Send(buffer, datasize, MPI_CHAR, mt, 1536, commgroup);
   425         DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"collector sent data block to %d datasize=%lld bytestorecv=%lld\n", 
   426               mt, (
long long) datasize, (
long long) bytestosend));
   429         bytestosend-=datasize;spec[0]+=datasize;
   436       if (buffer) free(buffer);
   441       mcollector=_sion_map_rank_ompi_to_mpi(collector,sapi->num_threads);
   444       for(tt=0;tt<sapi->num_threads;tt++) {
   447     DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"sender send spec to %d (%lld,%lld)\n", 
   448           mcollector,(
long long) specs[tt][0], (
long long) specs[tt][1]));
   449     MPI_Send(specs[tt], spec_len, SION_MPI_INT64, mcollector, 1534, commgroup);
   454       DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"sender wait for signal from %d\n", collector));
   455       MPI_Recv(&startsignal, 1, MPI_INT, mcollector, 1535, commgroup, &status);
   456       DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"sender got signal from %d\n", collector));
   459       bytestorecv=specs[tt][1];
   461       while(bytestorecv>0) {
   462         if(bytestorecv>fsblksize) datasize=fsblksize;
   463         else                      datasize=bytestorecv;
   464         DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"sender recv data block from %d of size %lld\n", mcollector, (
long long) datasize));
   465         MPI_Recv(p, datasize, MPI_CHAR, mcollector, 1536, commgroup, &status);
   466         MPI_Get_count(&status,MPI_CHAR,&count);
   468         DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"sender recv data block from %d of size %lld (%d)\n", mcollector, (
long long) datasize, count));
   469         bytestorecv-=datasize;p+=datasize;
   483   rc=_sion_opmicol_grc;
   488   DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"leave collector=%d rc=%d\n", collector, rc ));
   496 #define DFUNCTION "__sion_ompi_share_ptr"   497   void * __sion_ompicol_share_ptr(
void * in_ptr) {
   501     __ompicol_global_pointer = in_ptr;
   508     out_ptr=__ompicol_global_pointer;
   516 #define DFUNCTION "_sion_ompi_get_capability_cb"   517 int _sion_ompi_get_capability_cb(
void *commdata )
   520   _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
   522   if(sapi->thread_num==0) {
   524     DPRINTFP((256, DFUNCTION, sapi->rank, 
"FULL capability\n"));
   527     DPRINTFP((256, DFUNCTION, sapi->rank, 
"ONLY SENDER capability\n"));
   534   int _sion_ompicol_size_of_dtype(
int dtype) {
   536     case _SION_INT32: 
return(
sizeof(sion_int32));             
break;
   537     case _SION_INT64: 
return(
sizeof(sion_int64));             
break;
   538     case _SION_CHAR:  
return(
sizeof(
char));           
break;
   539     default:          
return(
sizeof(sion_int64));
 
#define SION_CAPABILITY_FULL
#define SION_CAPABILITY_NONE
#define SION_CAPABILITY_ONLY_SENDER