14 #define _XOPEN_SOURCE 700
26 #define MPI_Comm_rank PMPI_Comm_rank
27 #define MPI_Comm_size PMPI_Comm_size
28 #define MPI_Gather PMPI_Gather
29 #define MPI_Scatter PMPI_Scatter
30 #define MPI_Bcast PMPI_Bcast
31 #define MPI_Barrier PMPI_Barrier
32 #define MPI_Comm_split PMPI_Comm_split
33 #define MPI_Send PMPI_Send
34 #define MPI_Recv PMPI_Recv
39 #include <sys/types.h>
46 #include "sion_error_handler.h"
56 #define DFUNCTION "_mpi_gather_execute_cb"
57 int _sion_mpi_gather_process_cb(
const void *indata, sion_int64 *spec,
int spec_len, sion_int64 fsblksize,
58 void *commdata,
int collector,
int range_start,
int range_end,
int sid,
59 int process_cb(
const void *,sion_int64 *,
int ) ) {
60 int rc=SION_STD_SUCCESS;
61 int size, rank, t, startsignal=1;
64 sion_int64 bytestorecv, bytestosend, datasize;
66 MPI_Comm commp = sapi->comm;
70 MPI_Comm_rank(commp, &rank);
71 MPI_Comm_size(commp, &size);
73 DPRINTFP((256, DFUNCTION, rank,
" input collector=%d range_start=%d range_end=%d sid=%d\n", collector,range_start,range_end, sid));
75 if(rank == collector) {
79 buffer = (
char *) malloc(fsblksize *
sizeof(
char));
81 return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,
"_mpi_gather_process_cb: cannot allocate temporary memory of size %lu (buffer), aborting ...\n",
82 (
unsigned long) fsblksize *
sizeof(
char)));
86 for(t=range_start;t<=range_end;t++) {
89 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector wait for spec from %d\n", t));
90 MPI_Recv(spec, spec_len, SION_MPI_INT64, t, 1534, commp, &status);
91 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector got spec from %d (%lld,%lld)\n",
92 t, (
long long) spec[0], (
long long) spec[1]));
96 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector send signal to %d\n", t));
97 MPI_Send(&startsignal, 1, MPI_INT, t, 1535, commp);
101 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector start to process data of size %lld at offset %lld\n",
102 (
long long) spec[1], (
long long) spec[0]));
107 while(bytestorecv>0) {
108 if(bytestorecv>fsblksize) datasize=fsblksize;
109 else datasize=bytestorecv;
112 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector wait for data block from %d\n", t));
113 MPI_Recv(buffer, datasize, MPI_CHAR, t, 1536, commp, &status);
114 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector got data block from %d datasize=%lld bytestorecv=%lld\n",
115 t, (
long long) datasize, (
long long) bytestorecv));
120 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector process data block of size %lld at pos %lld\n",
121 (
long long) spec[1], (
long long) spec[0]));
123 rc=process_cb(buffer,spec, sid);
125 if(rc != SION_STD_SUCCESS) {
126 return(_sion_errorprint(SION_STD_NOT_SUCCESS,_SION_ERROR_RETURN,
"_mpi_gather_process_cb: problems writing data ...\n"));
130 bytestorecv-=datasize;spec[0]+=datasize;
137 if (buffer) free(buffer);
140 if ( (rank>=range_start) && (rank<=range_end) ) {
144 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender send spec to %d (%lld,%lld)\n",
145 collector,(
long long) spec[0], (
long long) spec[1]));
146 MPI_Send(spec, spec_len, SION_MPI_INT64, collector, 1534, commp);
149 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender wait for signal from %d\n", collector));
150 MPI_Recv(&startsignal, 1, MPI_INT, collector, 1535, commp, &status);
151 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender got signal from %d\n", collector));
156 while(bytestosend>0) {
157 if(bytestosend>fsblksize) datasize=fsblksize;
158 else datasize=bytestosend;
159 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender send data block to %d of size %lld\n", collector, (
long long) datasize));
160 MPI_Send(p, datasize, MPI_CHAR, collector, 1536, commp);
161 bytestosend-=datasize;p+=datasize;
164 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"task take not part in collective operation\n"));
168 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"leave collector=%d rc=%d\n", collector, rc ));
175 #define DFUNCTION "_mpi_process_scatter_cb"
176 int _sion_mpi_process_scatter_cb(
void *outdata, sion_int64 *spec,
int spec_len, sion_int64 fsblksize,
177 void *commdata,
int collector,
int range_start,
int range_end,
int sid,
178 int process_cb(
void *,sion_int64 *,
int ) ) {
179 int rc=SION_STD_SUCCESS;
180 int size, rank, t, startsignal=1, count;
183 sion_int64 bytestorecv, bytestosend, datasize;
185 MPI_Comm commp = sapi->comm;
188 MPI_Comm_rank(commp, &rank);
189 MPI_Comm_size(commp, &size);
191 DPRINTFP((256, DFUNCTION, rank,
" input collector=%d range_start=%d range_end=%d sid=%d\n", collector,range_start,range_end, sid));
193 if(rank == collector) {
197 buffer = (
char *) malloc(fsblksize *
sizeof(
char));
198 if (buffer == NULL) {
199 return(_sion_errorprint(SION_STD_NOT_SUCCESS,_SION_ERROR_RETURN,
"_mpi_gather_process_cb: cannot allocate temporary memory of size %lu (buffer), aborting ...\n",
200 (
unsigned long) fsblksize *
sizeof(
char)));
204 for(t=range_start;t<=range_end;t++) {
207 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector wait for spec from %d\n", t));
208 MPI_Recv(spec, spec_len, SION_MPI_INT64, t, 1534, commp, &status);
209 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector got spec from %d (%lld,%lld)\n",
210 t, (
long long) spec[0], (
long long) spec[1]));
214 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector send signal to %d\n", t));
215 MPI_Send(&startsignal, 1, MPI_INT, t, 1535, commp);
221 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector start to process data of size %lld at offset %lld\n",
222 (
long long) spec[1], (
long long) spec[0]));
227 while(bytestosend>0) {
229 if(bytestosend>fsblksize) datasize=fsblksize;
230 else datasize=bytestosend;
235 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector process data block of size %lld at pos %lld\n",
236 (
long long) spec[1], (
long long) spec[0]));
238 rc=process_cb(buffer,spec, sid);
240 if(rc != SION_STD_SUCCESS) {
241 return(_sion_errorprint(SION_STD_NOT_SUCCESS,_SION_ERROR_RETURN,
"_mpi_gather_process_cb: problems writing data ...\n"));
245 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector send for data block to %d\n", t));
246 MPI_Send(buffer, datasize, MPI_CHAR, t, 1536, commp);
247 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector sent data block to %d datasize=%lld bytestorecv=%lld\n",
248 t, (
long long) datasize, (
long long) bytestosend));
251 bytestosend-=datasize;spec[0]+=datasize;
258 if (buffer) free(buffer);
261 if ( (rank>=range_start) && (rank<=range_end) ) {
265 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender send spec to %d (%lld,%lld)\n",
266 collector,(
long long) spec[0], (
long long) spec[1]));
267 MPI_Send(spec, spec_len, SION_MPI_INT64, collector, 1534, commp);
272 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender wait for signal from %d\n", collector));
273 MPI_Recv(&startsignal, 1, MPI_INT, collector, 1535, commp, &status);
274 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender got signal from %d\n", collector));
279 while(bytestorecv>0) {
280 if(bytestorecv>fsblksize) datasize=fsblksize;
281 else datasize=bytestorecv;
282 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender recv data block from %d of size %lld\n", collector, (
long long) datasize));
283 MPI_Recv(p, datasize, MPI_CHAR, collector, 1536, commp, &status);
284 MPI_Get_count(&status,MPI_CHAR,&count);
286 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender recv data block from %d of size %lld (%d)\n", collector, (
long long) datasize, count));
287 bytestorecv-=datasize;p+=datasize;
292 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"task take not part in collective operation\n"));
297 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"leave collector=%d rc=%d\n", collector, rc ));
304 #define DFUNCTION "_sion_ompi_get_capability_cb"
305 int _sion_mpi_get_capability_cb(
void *commdata )
311 DPRINTFP((256, DFUNCTION, sapi->rank,
"FULL capability\n"));
#define SION_CAPABILITY_NONE
#define SION_CAPABILITY_FULL