24 #define MPI_Comm_rank PMPI_Comm_rank 25 #define MPI_Comm_size PMPI_Comm_size 26 #define MPI_Gather PMPI_Gather 27 #define MPI_Scatter PMPI_Scatter 28 #define MPI_Bcast PMPI_Bcast 29 #define MPI_Barrier PMPI_Barrier 30 #define MPI_Comm_split PMPI_Comm_split 31 #define MPI_Send PMPI_Send 32 #define MPI_Recv PMPI_Recv 37 #include <sys/types.h> 53 #define DFUNCTION "_mpi_gather_execute_cb" 54 int _sion_mpi_gather_process_cb(
const void *indata, sion_int64 *spec,
int spec_len, sion_int64 fsblksize,
55 void *commdata,
int collector,
int range_start,
int range_end,
int sid,
56 int process_cb(
const void *,sion_int64 *,
int ) ) {
57 int rc=SION_STD_SUCCESS;
58 int size, rank, t, startsignal=1;
61 sion_int64 bytestorecv, bytestosend, datasize;
63 MPI_Comm commp = sapi->comm;
67 MPI_Comm_rank(commp, &rank);
68 MPI_Comm_size(commp, &size);
70 DPRINTFP((256, DFUNCTION, rank,
" input collector=%d range_start=%d range_end=%d sid=%d\n", collector,range_start,range_end, sid));
72 if(rank == collector) {
76 buffer = (
char *) malloc(fsblksize *
sizeof(
char));
78 return(
_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,
"_mpi_gather_process_cb: cannot allocate temporary memory of size %lu (buffer), aborting ...\n",
79 (
unsigned long) fsblksize *
sizeof(
char)));
83 for(t=range_start;t<=range_end;t++) {
86 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector wait for spec from %d\n", t));
87 MPI_Recv(spec, spec_len, SION_MPI_INT64, t, 1534, commp, &status);
88 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector got spec from %d (%lld,%lld)\n",
89 t, (
long long) spec[0], (
long long) spec[1]));
93 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector send signal to %d\n", t));
94 MPI_Send(&startsignal, 1, MPI_INT, t, 1535, commp);
98 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector start to proces data of size %lld at offset %lld\n",
99 (
long long) spec[1], (
long long) spec[0]));
104 while(bytestorecv>0) {
105 if(bytestorecv>fsblksize) datasize=fsblksize;
106 else datasize=bytestorecv;
109 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector wait for data block from %d\n", t));
110 MPI_Recv(buffer, datasize, MPI_CHAR, t, 1536, commp, &status);
111 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector got data block from %d datasize=%lld bytestorecv=%lld\n",
112 t, (
long long) datasize, (
long long) bytestorecv));
117 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector process data block of size %lld at pos %lld\n",
118 (
long long) spec[1], (
long long) spec[0]));
120 rc=process_cb(buffer,spec, sid);
122 if(rc != SION_STD_SUCCESS) {
123 return(
_sion_errorprint(SION_STD_NOT_SUCCESS,_SION_ERROR_RETURN,
"_mpi_gather_process_cb: problems writing data ...\n"));
127 bytestorecv-=datasize;spec[0]+=datasize;
134 if (buffer) free(buffer);
140 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender send spec to %d (%lld,%lld)\n",
141 collector,(
long long) spec[0], (
long long) spec[1]));
142 MPI_Send(spec, spec_len, SION_MPI_INT64, collector, 1534, commp);
145 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender wait for signal from %d\n", collector));
146 MPI_Recv(&startsignal, 1, MPI_INT, collector, 1535, commp, &status);
147 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender got signal from %d\n", collector));
152 while(bytestosend>0) {
153 if(bytestosend>fsblksize) datasize=fsblksize;
154 else datasize=bytestosend;
155 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender send data block to %d of size %lld\n", collector, (
long long) datasize));
156 MPI_Send(p, datasize, MPI_CHAR, collector, 1536, commp);
157 bytestosend-=datasize;p+=datasize;
162 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"leave collector=%d rc=%d\n", collector, rc ));
169 #define DFUNCTION "_mpi_process_scatter_cb" 170 int _sion_mpi_process_scatter_cb(
void *outdata, sion_int64 *spec,
int spec_len, sion_int64 fsblksize,
171 void *commdata,
int collector,
int range_start,
int range_end,
int sid,
172 int process_cb(
void *,sion_int64 *,
int ) ) {
173 int rc=SION_STD_SUCCESS;
174 int size, rank, t, startsignal=1, count;
177 sion_int64 bytestorecv, bytestosend, datasize;
179 MPI_Comm commp = sapi->comm;
182 MPI_Comm_rank(commp, &rank);
183 MPI_Comm_size(commp, &size);
185 DPRINTFP((256, DFUNCTION, rank,
" input collector=%d range_start=%d range_end=%d sid=%d\n", collector,range_start,range_end, sid));
187 if(rank == collector) {
191 buffer = (
char *) malloc(fsblksize *
sizeof(
char));
192 if (buffer == NULL) {
193 return(
_sion_errorprint(SION_STD_NOT_SUCCESS,_SION_ERROR_RETURN,
"_mpi_gather_process_cb: cannot allocate temporary memory of size %lu (buffer), aborting ...\n",
194 (
unsigned long) fsblksize *
sizeof(
char)));
198 for(t=range_start;t<=range_end;t++) {
201 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector wait for spec from %d\n", t));
202 MPI_Recv(spec, spec_len, SION_MPI_INT64, t, 1534, commp, &status);
203 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector got spec from %d (%lld,%lld)\n",
204 t, (
long long) spec[0], (
long long) spec[1]));
208 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector send signal to %d\n", t));
209 MPI_Send(&startsignal, 1, MPI_INT, t, 1535, commp);
215 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector start to proces data of size %lld at offset %lld\n",
216 (
long long) spec[1], (
long long) spec[0]));
221 while(bytestosend>0) {
223 if(bytestosend>fsblksize) datasize=fsblksize;
224 else datasize=bytestosend;
229 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector process data block of size %lld at pos %lld\n",
230 (
long long) spec[1], (
long long) spec[0]));
232 rc=process_cb(buffer,spec, sid);
234 if(rc != SION_STD_SUCCESS) {
235 return(
_sion_errorprint(SION_STD_NOT_SUCCESS,_SION_ERROR_RETURN,
"_mpi_gather_process_cb: problems writing data ...\n"));
239 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector send for data block to %d\n", t));
240 MPI_Send(buffer, datasize, MPI_CHAR, t, 1536, commp);
241 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector sent data block to %d datasize=%lld bytestorecv=%lld\n",
242 t, (
long long) datasize, (
long long) bytestosend));
245 bytestosend-=datasize;spec[0]+=datasize;
252 if (buffer) free(buffer);
258 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender send spec to %d (%lld,%lld)\n",
259 collector,(
long long) spec[0], (
long long) spec[1]));
260 MPI_Send(spec, spec_len, SION_MPI_INT64, collector, 1534, commp);
265 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender wait for signal from %d\n", collector));
266 MPI_Recv(&startsignal, 1, MPI_INT, collector, 1535, commp, &status);
267 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender got signal from %d\n", collector));
272 while(bytestorecv>0) {
273 if(bytestorecv>fsblksize) datasize=fsblksize;
274 else datasize=bytestorecv;
275 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender recv data block from %d of size %lld\n", collector, (
long long) datasize));
276 MPI_Recv(p, datasize, MPI_CHAR, collector, 1536, commp, &status);
277 MPI_Get_count(&status,MPI_CHAR,&count);
279 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender recv data block from %d of size %lld (%d)\n", collector, (
long long) datasize, count));
280 bytestorecv-=datasize;p+=datasize;
286 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"leave collector=%d rc=%d\n", collector, rc ));
293 #define DFUNCTION "_sion_ompi_get_capability_cb" 294 int _sion_mpi_get_capability_cb(
void *commdata )
300 DPRINTFP((256, DFUNCTION, sapi->rank,
"FULL capability\n"));
#define SION_CAPABILITY_FULL
#define SION_CAPABILITY_NONE
int _sion_errorprint(int rc, int level, const char *format,...)
Internal SION error.