24 #define MPI_Comm_rank PMPI_Comm_rank 25 #define MPI_Comm_size PMPI_Comm_size 26 #define MPI_Gather PMPI_Gather 27 #define MPI_Scatter PMPI_Scatter 28 #define MPI_Bcast PMPI_Bcast 29 #define MPI_Barrier PMPI_Barrier 30 #define MPI_Comm_split PMPI_Comm_split 31 #define MPI_Send PMPI_Send 32 #define MPI_Recv PMPI_Recv 37 #include <sys/types.h> 44 #include "sion_error_handler.h" 54 #define DFUNCTION "_mpi_gather_execute_cb" 55 int _sion_mpi_gather_process_cb(
const void *indata, sion_int64 *spec,
int spec_len, sion_int64 fsblksize,
56 void *commdata,
int collector,
int range_start,
int range_end,
int sid,
57 int process_cb(
const void *,sion_int64 *,
int ) ) {
58 int rc=SION_STD_SUCCESS;
59 int size, rank, t, startsignal=1;
62 sion_int64 bytestorecv, bytestosend, datasize;
64 MPI_Comm commp = sapi->comm;
68 MPI_Comm_rank(commp, &rank);
69 MPI_Comm_size(commp, &size);
71 DPRINTFP((256, DFUNCTION, rank,
" input collector=%d range_start=%d range_end=%d sid=%d\n", collector,range_start,range_end, sid));
73 if(rank == collector) {
77 buffer = (
char *) malloc(fsblksize *
sizeof(
char));
79 return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,
"_mpi_gather_process_cb: cannot allocate temporary memory of size %lu (buffer), aborting ...\n",
80 (
unsigned long) fsblksize *
sizeof(
char)));
84 for(t=range_start;t<=range_end;t++) {
87 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector wait for spec from %d\n", t));
88 MPI_Recv(spec, spec_len, SION_MPI_INT64, t, 1534, commp, &status);
89 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector got spec from %d (%lld,%lld)\n",
90 t, (
long long) spec[0], (
long long) spec[1]));
94 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector send signal to %d\n", t));
95 MPI_Send(&startsignal, 1, MPI_INT, t, 1535, commp);
99 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector start to process data of size %lld at offset %lld\n",
100 (
long long) spec[1], (
long long) spec[0]));
105 while(bytestorecv>0) {
106 if(bytestorecv>fsblksize) datasize=fsblksize;
107 else datasize=bytestorecv;
110 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector wait for data block from %d\n", t));
111 MPI_Recv(buffer, datasize, MPI_CHAR, t, 1536, commp, &status);
112 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector got data block from %d datasize=%lld bytestorecv=%lld\n",
113 t, (
long long) datasize, (
long long) bytestorecv));
118 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector process data block of size %lld at pos %lld\n",
119 (
long long) spec[1], (
long long) spec[0]));
121 rc=process_cb(buffer,spec, sid);
123 if(rc != SION_STD_SUCCESS) {
124 return(_sion_errorprint(SION_STD_NOT_SUCCESS,_SION_ERROR_RETURN,
"_mpi_gather_process_cb: problems writing data ...\n"));
128 bytestorecv-=datasize;spec[0]+=datasize;
135 if (buffer) free(buffer);
138 if ( (rank>=range_start) && (rank<=range_end) ) {
142 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender send spec to %d (%lld,%lld)\n",
143 collector,(
long long) spec[0], (
long long) spec[1]));
144 MPI_Send(spec, spec_len, SION_MPI_INT64, collector, 1534, commp);
147 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender wait for signal from %d\n", collector));
148 MPI_Recv(&startsignal, 1, MPI_INT, collector, 1535, commp, &status);
149 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender got signal from %d\n", collector));
154 while(bytestosend>0) {
155 if(bytestosend>fsblksize) datasize=fsblksize;
156 else datasize=bytestosend;
157 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender send data block to %d of size %lld\n", collector, (
long long) datasize));
158 MPI_Send(p, datasize, MPI_CHAR, collector, 1536, commp);
159 bytestosend-=datasize;p+=datasize;
162 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"task take not part in collective operation\n"));
166 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"leave collector=%d rc=%d\n", collector, rc ));
173 #define DFUNCTION "_mpi_process_scatter_cb" 174 int _sion_mpi_process_scatter_cb(
void *outdata, sion_int64 *spec,
int spec_len, sion_int64 fsblksize,
175 void *commdata,
int collector,
int range_start,
int range_end,
int sid,
176 int process_cb(
void *,sion_int64 *,
int ) ) {
177 int rc=SION_STD_SUCCESS;
178 int size, rank, t, startsignal=1, count;
181 sion_int64 bytestorecv, bytestosend, datasize;
183 MPI_Comm commp = sapi->comm;
186 MPI_Comm_rank(commp, &rank);
187 MPI_Comm_size(commp, &size);
189 DPRINTFP((256, DFUNCTION, rank,
" input collector=%d range_start=%d range_end=%d sid=%d\n", collector,range_start,range_end, sid));
191 if(rank == collector) {
195 buffer = (
char *) malloc(fsblksize *
sizeof(
char));
196 if (buffer == NULL) {
197 return(_sion_errorprint(SION_STD_NOT_SUCCESS,_SION_ERROR_RETURN,
"_mpi_gather_process_cb: cannot allocate temporary memory of size %lu (buffer), aborting ...\n",
198 (
unsigned long) fsblksize *
sizeof(
char)));
202 for(t=range_start;t<=range_end;t++) {
205 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector wait for spec from %d\n", t));
206 MPI_Recv(spec, spec_len, SION_MPI_INT64, t, 1534, commp, &status);
207 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector got spec from %d (%lld,%lld)\n",
208 t, (
long long) spec[0], (
long long) spec[1]));
212 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector send signal to %d\n", t));
213 MPI_Send(&startsignal, 1, MPI_INT, t, 1535, commp);
219 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector start to process data of size %lld at offset %lld\n",
220 (
long long) spec[1], (
long long) spec[0]));
225 while(bytestosend>0) {
227 if(bytestosend>fsblksize) datasize=fsblksize;
228 else datasize=bytestosend;
233 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector process data block of size %lld at pos %lld\n",
234 (
long long) spec[1], (
long long) spec[0]));
236 rc=process_cb(buffer,spec, sid);
238 if(rc != SION_STD_SUCCESS) {
239 return(_sion_errorprint(SION_STD_NOT_SUCCESS,_SION_ERROR_RETURN,
"_mpi_gather_process_cb: problems writing data ...\n"));
243 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector send for data block to %d\n", t));
244 MPI_Send(buffer, datasize, MPI_CHAR, t, 1536, commp);
245 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector sent data block to %d datasize=%lld bytestorecv=%lld\n",
246 t, (
long long) datasize, (
long long) bytestosend));
249 bytestosend-=datasize;spec[0]+=datasize;
256 if (buffer) free(buffer);
259 if ( (rank>=range_start) && (rank<=range_end) ) {
263 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender send spec to %d (%lld,%lld)\n",
264 collector,(
long long) spec[0], (
long long) spec[1]));
265 MPI_Send(spec, spec_len, SION_MPI_INT64, collector, 1534, commp);
270 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender wait for signal from %d\n", collector));
271 MPI_Recv(&startsignal, 1, MPI_INT, collector, 1535, commp, &status);
272 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender got signal from %d\n", collector));
277 while(bytestorecv>0) {
278 if(bytestorecv>fsblksize) datasize=fsblksize;
279 else datasize=bytestorecv;
280 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender recv data block from %d of size %lld\n", collector, (
long long) datasize));
281 MPI_Recv(p, datasize, MPI_CHAR, collector, 1536, commp, &status);
282 MPI_Get_count(&status,MPI_CHAR,&count);
284 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender recv data block from %d of size %lld (%d)\n", collector, (
long long) datasize, count));
285 bytestorecv-=datasize;p+=datasize;
290 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"task take not part in collective operation\n"));
295 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"leave collector=%d rc=%d\n", collector, rc ));
302 #define DFUNCTION "_sion_ompi_get_capability_cb" 303 int _sion_mpi_get_capability_cb(
void *commdata )
309 DPRINTFP((256, DFUNCTION, sapi->rank,
"FULL capability\n"));
#define SION_CAPABILITY_FULL
#define SION_CAPABILITY_NONE