24 #define MPI_Comm_rank PMPI_Comm_rank 25 #define MPI_Comm_size PMPI_Comm_size 26 #define MPI_Gather PMPI_Gather 27 #define MPI_Scatter PMPI_Scatter 28 #define MPI_Bcast PMPI_Bcast 29 #define MPI_Barrier PMPI_Barrier 30 #define MPI_Comm_split PMPI_Comm_split 31 #define MPI_Send PMPI_Send 32 #define MPI_Recv PMPI_Recv 37 #include <sys/types.h> 44 #include "sion_error_handler.h" 55 static void *__ompicol_global_pointer;
56 static int _sion_opmicol_grc=SION_SUCCESS;
58 int _sion_ompicol_size_of_dtype(
int dtype);
59 void * __sion_ompicol_share_ptr(
void * in_ptr);
61 #define DFUNCTION "_ompi_gather_execute_cb" 62 int _sion_ompi_gather_process_cb(
const void *indata, sion_int64 *spec,
int spec_len, sion_int64 fsblksize,
63 void *commdata,
int collector,
int range_start,
int range_end,
int sid,
64 int process_cb(
const void *,sion_int64 *,
int ) ) {
66 int t, startsignal=1,mrank,mt,tt, mcollector;
72 sion_int64 **specs, *p_spec;
73 sion_int64 bytestorecv, bytestosend, datasize;
74 _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
78 DPRINTFP((256, DFUNCTION, rank,
" input collector=%d range_start=%d range_end=%d sid=%d\n", collector,range_start,range_end, sid));
84 _sion_opmicol_grc=SION_STD_SUCCESS;
86 helpdata = (
void *) malloc(sapi->num_threads *
sizeof(sion_int64 *));
87 if (helpdata == NULL) {
88 fprintf(stderr,
"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %zu (helpdata), aborting ...\n",
89 (
size_t) sapi->num_threads *
sizeof(
int *));
90 _sion_opmicol_grc=SION_STD_NOT_SUCCESS;
95 specs = (sion_int64 **)__sion_ompicol_share_ptr((
void *) helpdata);
98 if(_sion_opmicol_grc!=SION_STD_SUCCESS)
return(_sion_opmicol_grc);
101 specs[sapi->thread_num]= spec;
110 helpdata = (
void *) malloc(sapi->num_threads *
sizeof(
const void *));
111 if (helpdata == NULL) {
112 fprintf(stderr,
"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %zu (tcounts), aborting ...\n",
113 (
size_t) sapi->num_threads *
sizeof(
int *));
114 _sion_opmicol_grc=SION_STD_NOT_SUCCESS;
119 indatas = (
void const **)__sion_ompicol_share_ptr((
void *) helpdata);
122 if(_sion_opmicol_grc!=SION_STD_SUCCESS)
return(_sion_opmicol_grc);
125 indatas[sapi->thread_num] = indata;
133 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"store SPECS[%d]=%x (%x)\n", sapi->thread_num,specs[sapi->thread_num], spec));
134 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"store DATAS[%d]=%x (%x)\n", sapi->thread_num,indatas[sapi->thread_num], indata));
139 for(tt=0;tt<sapi->num_threads;tt++) {
140 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"MASTER SPECS[%d]=%x\n", tt,specs[tt]));
141 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"MASTER DATAS[%d]=%x\n", tt,indatas[tt]));
148 commgroup = sapi->comm;
150 if(rank == collector) {
153 mrank=_sion_map_rank_ompi_to_mpi(rank,sapi->num_threads);
156 buffer = (
char *) malloc(fsblksize *
sizeof(
char));
157 if (buffer == NULL) {
158 _sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,
"_mpi_gather_process_cb: cannot allocate temporary memory of size %lu (buffer), aborting ...\n",
159 (
unsigned long) fsblksize *
sizeof(
char));
160 _sion_opmicol_grc=SION_STD_NOT_SUCCESS;
164 for(t=range_start;t<=range_end;t++) {
166 mt=_sion_map_rank_ompi_to_mpi(t,sapi->num_threads);
171 tt=_sion_map_rank_ompi_to_thread_num(t,sapi->num_threads);
176 _sion_opmicol_grc=process_cb(p_data,p_spec, sid);
181 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector wait for spec from %d\n", mt));
182 MPI_Recv(spec, spec_len, SION_MPI_INT64, mt, 1534, commgroup, &status);
183 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector got spec from %d (%lld,%lld)\n",
184 mt, (
long long) spec[0], (
long long) spec[1]));
188 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector send signal to %d\n", mt));
189 MPI_Send(&startsignal, 1, MPI_INT, mt, 1535, commgroup);
193 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector start to process data of size %lld at offset %lld\n",
194 (
long long) spec[1], (
long long) spec[0]));
199 while(bytestorecv>0) {
200 if(bytestorecv>fsblksize) datasize=fsblksize;
201 else datasize=bytestorecv;
204 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector wait for data block from %d\n", mt));
205 MPI_Recv(buffer, datasize, MPI_CHAR, mt, 1536, commgroup, &status);
206 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector got data block from %d datasize=%lld bytestorecv=%lld\n",
207 mt, (
long long) datasize, (
long long) bytestorecv));
212 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector process data block of size %lld at pos %lld\n",
213 (
long long) spec[1], (
long long) spec[0]));
215 _sion_opmicol_grc=process_cb(buffer,spec, sid);
217 if(_sion_opmicol_grc != SION_STD_SUCCESS) {
218 _sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,
"_ompi_gather_process_cb: problems writing data ...\n");
222 bytestorecv-=datasize;spec[0]+=datasize;
231 if (buffer) free(buffer);
236 mcollector=_sion_map_rank_ompi_to_mpi(collector,sapi->num_threads);
239 for(tt=0;tt<sapi->num_threads;tt++) {
242 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender send spec to %d (%lld,%lld)\n",
243 collector,(
long long) specs[tt][0], (
long long) specs[tt][1]));
244 rc=MPI_Send(specs[tt], spec_len, SION_MPI_INT64, mcollector, 1534, commgroup);
245 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender sent spec to %d rc=%d\n", mcollector,rc));
248 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender wait for signal from %d\n", mcollector));
249 MPI_Recv(&startsignal, 1, MPI_INT, mcollector, 1535, commgroup, &status);
250 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender got signal from %d\n", mcollector));
253 bytestosend=specs[tt][1];
254 p=(
char *) indatas[tt];
255 while(bytestosend>0) {
256 if(bytestosend>fsblksize) datasize=fsblksize;
257 else datasize=bytestosend;
258 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender send data block to %d of size %lld\n", mcollector, (
long long) datasize));
259 MPI_Send(p, datasize, MPI_CHAR, mcollector, 1536, commgroup);
260 bytestosend-=datasize;p+=datasize;
267 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"leave collector=%d rc=%d\n", collector, rc ));
273 rc=_sion_opmicol_grc;
282 #define DFUNCTION "_ompi_process_scatter_cb" 283 int _sion_ompi_process_scatter_cb(
void *outdata, sion_int64 *spec,
int spec_len, sion_int64 fsblksize,
284 void *commdata,
int collector,
int range_start,
int range_end,
int sid,
285 int process_cb(
void *,sion_int64 *,
int ) ) {
287 int t, startsignal=1, count, mrank, mt, tt, mcollector;
293 sion_int64 bytestorecv, bytestosend, datasize;
294 _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
298 DPRINTFP((256, DFUNCTION, rank,
" input collector=%d range_start=%d range_end=%d sid=%d\n", collector,range_start,range_end, sid));
303 _sion_opmicol_grc=SION_STD_SUCCESS;
305 helpdata = (
void *) malloc(sapi->num_threads *
sizeof(sion_int64 *));
306 if (helpdata == NULL) {
307 fprintf(stderr,
"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %zu (helpdata), aborting ...\n",
308 (
size_t) sapi->num_threads *
sizeof(
int *));
309 _sion_opmicol_grc=SION_STD_NOT_SUCCESS;
314 specs = (sion_int64 **)__sion_ompicol_share_ptr((
void *) helpdata);
317 if(_sion_opmicol_grc!=SION_STD_SUCCESS)
return(_sion_opmicol_grc);
320 specs[sapi->thread_num]= spec;
329 helpdata = (
void *) malloc(sapi->num_threads *
sizeof(
void *));
330 if (helpdata == NULL) {
331 fprintf(stderr,
"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %zu (helpdata), aborting ...\n",
332 (
size_t) sapi->num_threads *
sizeof(
int *));
333 _sion_opmicol_grc=SION_STD_NOT_SUCCESS;
338 outdatas = (
void **)__sion_ompicol_share_ptr((
void *) helpdata);
341 if(_sion_opmicol_grc!=SION_STD_SUCCESS)
return(_sion_opmicol_grc);
344 outdatas[sapi->thread_num] = outdata;
355 commgroup = sapi->comm;
358 if(rank == collector) {
362 buffer = (
char *) malloc(fsblksize *
sizeof(
char));
363 if (buffer == NULL) {
364 _sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,
"_ompi_gather_process_cb: cannot allocate temporary memory of size %lu (buffer), aborting ...\n",
365 (
unsigned long) fsblksize *
sizeof(
char));
366 _sion_opmicol_grc=SION_STD_NOT_SUCCESS;
369 mrank=_sion_map_rank_ompi_to_mpi(rank,sapi->num_threads);
372 for(t=range_start;t<=range_end;t++) {
374 mt=_sion_map_rank_ompi_to_mpi(t,sapi->num_threads);
379 tt=_sion_map_rank_ompi_to_thread_num(t,sapi->num_threads);
382 _sion_opmicol_grc=process_cb(outdatas[tt],specs[tt], sid);
387 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector wait for spec from %d\n", t));
388 MPI_Recv(spec, spec_len, SION_MPI_INT64, mt, 1534, commgroup, &status);
389 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector got spec from %d (%lld,%lld)\n",
390 t, (
long long) spec[0], (
long long) spec[1]));
394 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector send signal to %d\n", t));
395 MPI_Send(&startsignal, 1, MPI_INT, mt, 1535, commgroup);
399 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector start to proces data of size %lld at offset %lld\n",
400 (
long long) spec[1], (
long long) spec[0]));
405 while(bytestosend>0) {
407 if(bytestosend>fsblksize) datasize=fsblksize;
408 else datasize=bytestosend;
413 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector process data block of size %lld at pos %lld\n",
414 (
long long) spec[1], (
long long) spec[0]));
416 _sion_opmicol_grc=process_cb(buffer,spec, sid);
418 if(_sion_opmicol_grc != SION_STD_SUCCESS) {
419 _sion_errorprint(SION_STD_NOT_SUCCESS,_SION_ERROR_RETURN,
"_ompi_gather_process_cb: problems writing data ...\n");
423 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector send for data block to %d\n", mt));
424 MPI_Send(buffer, datasize, MPI_CHAR, mt, 1536, commgroup);
425 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector sent data block to %d datasize=%lld bytestorecv=%lld\n",
426 mt, (
long long) datasize, (
long long) bytestosend));
429 bytestosend-=datasize;spec[0]+=datasize;
436 if (buffer) free(buffer);
441 mcollector=_sion_map_rank_ompi_to_mpi(collector,sapi->num_threads);
444 for(tt=0;tt<sapi->num_threads;tt++) {
447 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender send spec to %d (%lld,%lld)\n",
448 mcollector,(
long long) specs[tt][0], (
long long) specs[tt][1]));
449 MPI_Send(specs[tt], spec_len, SION_MPI_INT64, mcollector, 1534, commgroup);
454 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender wait for signal from %d\n", collector));
455 MPI_Recv(&startsignal, 1, MPI_INT, mcollector, 1535, commgroup, &status);
456 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender got signal from %d\n", collector));
459 bytestorecv=specs[tt][1];
461 while(bytestorecv>0) {
462 if(bytestorecv>fsblksize) datasize=fsblksize;
463 else datasize=bytestorecv;
464 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender recv data block from %d of size %lld\n", mcollector, (
long long) datasize));
465 MPI_Recv(p, datasize, MPI_CHAR, mcollector, 1536, commgroup, &status);
466 MPI_Get_count(&status,MPI_CHAR,&count);
468 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender recv data block from %d of size %lld (%d)\n", mcollector, (
long long) datasize, count));
469 bytestorecv-=datasize;p+=datasize;
483 rc=_sion_opmicol_grc;
488 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"leave collector=%d rc=%d\n", collector, rc ));
496 #define DFUNCTION "__sion_ompi_share_ptr" 497 void * __sion_ompicol_share_ptr(
void * in_ptr) {
501 __ompicol_global_pointer = in_ptr;
508 out_ptr=__ompicol_global_pointer;
516 #define DFUNCTION "_sion_ompi_get_capability_cb" 517 int _sion_ompi_get_capability_cb(
void *commdata )
520 _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
522 if(sapi->thread_num==0) {
524 DPRINTFP((256, DFUNCTION, sapi->rank,
"FULL capability\n"));
527 DPRINTFP((256, DFUNCTION, sapi->rank,
"ONLY SENDER capability\n"));
534 int _sion_ompicol_size_of_dtype(
int dtype) {
536 case _SION_INT32:
return(
sizeof(sion_int32));
break;
537 case _SION_INT64:
return(
sizeof(sion_int64));
break;
538 case _SION_CHAR:
return(
sizeof(
char));
break;
539 default:
return(
sizeof(sion_int64));
#define SION_CAPABILITY_FULL
#define SION_CAPABILITY_NONE
#define SION_CAPABILITY_ONLY_SENDER