24 #define MPI_Comm_rank      PMPI_Comm_rank    25 #define MPI_Comm_size      PMPI_Comm_size         26 #define MPI_Gather     PMPI_Gather        27 #define MPI_Scatter    PMPI_Scatter       28 #define MPI_Bcast      PMPI_Bcast         29 #define MPI_Barrier    PMPI_Barrier       30 #define MPI_Comm_split     PMPI_Comm_split        31 #define MPI_Send           PMPI_Send    32 #define MPI_Recv           PMPI_Recv    37 #include <sys/types.h>    44 #include "sion_error_handler.h"    54 #define DFUNCTION "_mpi_gather_execute_cb"    55 int _sion_mpi_gather_process_cb(
const void *indata, sion_int64 *spec, 
int spec_len, sion_int64 fsblksize,
    56                 void *commdata,  
int collector, 
int range_start, 
int range_end, 
int sid, 
    57                 int process_cb(
const void *,sion_int64 *, 
int ) ) {
    58   int       rc=SION_STD_SUCCESS;
    59   int       size, rank, t, startsignal=1;
    62   sion_int64 bytestorecv, bytestosend, datasize;
    64   MPI_Comm   commp = sapi->comm;
    68   MPI_Comm_rank(commp, &rank);
    69   MPI_Comm_size(commp, &size);
    71   DPRINTFP((256, DFUNCTION, rank, 
" input collector=%d range_start=%d range_end=%d sid=%d\n", collector,range_start,range_end, sid));
    73   if(rank == collector) {
    77     buffer = (
char *) malloc(fsblksize * 
sizeof(
char));
    79       return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,
"_mpi_gather_process_cb: cannot allocate temporary memory of size %lu (buffer), aborting ...\n",
    80                   (
unsigned long) fsblksize * 
sizeof(
char)));
    84     for(t=range_start;t<=range_end;t++) {
    87       DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"collector wait for spec from %d\n", t));
    88       MPI_Recv(spec, spec_len, SION_MPI_INT64, t, 1534, commp, &status);
    89       DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"collector got spec from %d (%lld,%lld)\n", 
    90         t, (
long long) spec[0], (
long long) spec[1]));
    94     DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"collector send signal to %d\n", t));
    95     MPI_Send(&startsignal, 1, MPI_INT, t, 1535, commp);
    99       DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"collector start to process data of size %lld at offset %lld\n", 
   100         (
long long) spec[1], (
long long) spec[0]));
   105       while(bytestorecv>0) {
   106     if(bytestorecv>fsblksize) datasize=fsblksize;
   107     else                      datasize=bytestorecv;
   110     DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"collector wait for data block from %d\n", t));
   111     MPI_Recv(buffer, datasize, MPI_CHAR, t, 1536, commp, &status);
   112     DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"collector got data block from %d datasize=%lld bytestorecv=%lld\n", 
   113           t, (
long long) datasize, (
long long) bytestorecv));
   118     DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"collector process data block of size %lld at pos %lld\n", 
   119           (
long long) spec[1], (
long long) spec[0]));
   121     rc=process_cb(buffer,spec, sid);
   123     if(rc != SION_STD_SUCCESS) {
   124       return(_sion_errorprint(SION_STD_NOT_SUCCESS,_SION_ERROR_RETURN,
"_mpi_gather_process_cb: problems writing data ...\n"));
   128     bytestorecv-=datasize;spec[0]+=datasize;
   135     if (buffer) free(buffer);
   138     if ( (rank>=range_start) && (rank<=range_end) ) {
   142       DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"sender send spec to %d (%lld,%lld)\n", 
   143         collector,(
long long) spec[0], (
long long) spec[1]));
   144       MPI_Send(spec, spec_len, SION_MPI_INT64, collector, 1534, commp);
   147       DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"sender wait for signal from %d\n", collector));
   148       MPI_Recv(&startsignal, 1, MPI_INT, collector, 1535, commp, &status);
   149       DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"sender got signal from %d\n", collector));
   154       while(bytestosend>0) {
   155     if(bytestosend>fsblksize) datasize=fsblksize;
   156     else                      datasize=bytestosend;
   157     DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"sender send data block to %d of size %lld\n", collector, (
long long) datasize));
   158     MPI_Send(p, datasize, MPI_CHAR, collector, 1536, commp);
   159     bytestosend-=datasize;p+=datasize;
   162       DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"task take not part in collective operation\n"));
   166   DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"leave collector=%d rc=%d\n", collector, rc ));
   173 #define DFUNCTION "_mpi_process_scatter_cb"   174 int _sion_mpi_process_scatter_cb(
void *outdata, sion_int64 *spec, 
int spec_len, sion_int64 fsblksize,
   175                  void *commdata,  
int collector, 
int range_start, 
int range_end, 
int sid, 
   176                  int process_cb(
void *,sion_int64 *, 
int ) ) {
   177   int       rc=SION_STD_SUCCESS;
   178   int       size, rank, t, startsignal=1, count;
   181   sion_int64 bytestorecv, bytestosend, datasize;
   183   MPI_Comm   commp = sapi->comm;
   186   MPI_Comm_rank(commp, &rank);
   187   MPI_Comm_size(commp, &size);
   189   DPRINTFP((256, DFUNCTION, rank, 
" input collector=%d range_start=%d range_end=%d sid=%d\n", collector,range_start,range_end, sid));
   191   if(rank == collector) {
   195     buffer = (
char *) malloc(fsblksize * 
sizeof(
char));
   196     if (buffer == NULL) {
   197       return(_sion_errorprint(SION_STD_NOT_SUCCESS,_SION_ERROR_RETURN,
"_mpi_gather_process_cb: cannot allocate temporary memory of size %lu (buffer), aborting ...\n",
   198                   (
unsigned long) fsblksize * 
sizeof(
char)));
   202     for(t=range_start;t<=range_end;t++) {
   205       DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"collector wait for spec from %d\n", t));
   206       MPI_Recv(spec, spec_len, SION_MPI_INT64, t, 1534, commp, &status);
   207       DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"collector got spec from %d (%lld,%lld)\n", 
   208         t, (
long long) spec[0], (
long long) spec[1]));
   212     DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"collector send signal to %d\n", t));
   213     MPI_Send(&startsignal, 1, MPI_INT, t, 1535, commp);
   219       DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"collector start to process data of size %lld at offset %lld\n", 
   220         (
long long) spec[1], (
long long) spec[0]));
   225       while(bytestosend>0) {
   227     if(bytestosend>fsblksize) datasize=fsblksize;
   228     else                      datasize=bytestosend;
   233     DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"collector process data block of size %lld at pos %lld\n", 
   234           (
long long) spec[1], (
long long) spec[0]));
   236     rc=process_cb(buffer,spec, sid);
   238     if(rc != SION_STD_SUCCESS) {
   239       return(_sion_errorprint(SION_STD_NOT_SUCCESS,_SION_ERROR_RETURN,
"_mpi_gather_process_cb: problems writing data ...\n"));
   243     DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"collector send for data block to %d\n", t));
   244     MPI_Send(buffer, datasize, MPI_CHAR, t, 1536, commp);
   245     DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"collector sent data block to %d datasize=%lld bytestorecv=%lld\n", 
   246           t, (
long long) datasize, (
long long) bytestosend));
   249     bytestosend-=datasize;spec[0]+=datasize;
   256     if (buffer) free(buffer);
   259     if ( (rank>=range_start) && (rank<=range_end) ) {
   263       DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"sender send spec to %d (%lld,%lld)\n", 
   264         collector,(
long long) spec[0], (
long long) spec[1]));
   265       MPI_Send(spec, spec_len, SION_MPI_INT64, collector, 1534, commp);
   270     DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"sender wait for signal from %d\n", collector));
   271     MPI_Recv(&startsignal, 1, MPI_INT, collector, 1535, commp, &status);
   272     DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"sender got signal from %d\n", collector));
   277     while(bytestorecv>0) {
   278       if(bytestorecv>fsblksize) datasize=fsblksize;
   279       else                      datasize=bytestorecv;
   280       DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"sender recv data block from %d of size %lld\n", collector, (
long long) datasize));
   281       MPI_Recv(p, datasize, MPI_CHAR, collector, 1536, commp, &status);
   282       MPI_Get_count(&status,MPI_CHAR,&count);
   284       DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"sender recv data block from %d of size %lld (%d)\n", collector, (
long long) datasize, count));
   285       bytestorecv-=datasize;p+=datasize;
   290       DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"task take not part in collective operation\n"));
   295   DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"leave collector=%d rc=%d\n", collector, rc ));
   302 #define DFUNCTION "_sion_ompi_get_capability_cb"   303 int _sion_mpi_get_capability_cb(
void *commdata )
   309   DPRINTFP((256, DFUNCTION, sapi->rank, 
"FULL capability\n"));
 
#define SION_CAPABILITY_FULL
#define SION_CAPABILITY_NONE