24 #define MPI_Comm_rank PMPI_Comm_rank 25 #define MPI_Comm_size PMPI_Comm_size 26 #define MPI_Gather PMPI_Gather 27 #define MPI_Scatter PMPI_Scatter 28 #define MPI_Bcast PMPI_Bcast 29 #define MPI_Barrier PMPI_Barrier 30 #define MPI_Comm_split PMPI_Comm_split 31 #define MPI_Send PMPI_Send 32 #define MPI_Recv PMPI_Recv 37 #include <sys/types.h> 54 static void *__ompicol_global_pointer;
55 static int _sion_opmicol_grc=SION_SUCCESS;
57 int _sion_ompicol_size_of_dtype(
int dtype);
58 void * __sion_ompicol_share_ptr(
void * in_ptr);
60 #define DFUNCTION "_ompi_gather_execute_cb" 61 int _sion_ompi_gather_process_cb(
const void *indata, sion_int64 *spec,
int spec_len, sion_int64 fsblksize,
62 void *commdata,
int collector,
int range_start,
int range_end,
int sid,
63 int process_cb(
const void *,sion_int64 *,
int ) ) {
65 int t, startsignal=1,mrank,mt,tt, mcollector;
71 sion_int64 **specs, *p_spec;
72 sion_int64 bytestorecv, bytestosend, datasize;
73 _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
77 DPRINTFP((256, DFUNCTION, rank,
" input collector=%d range_start=%d range_end=%d sid=%d\n", collector,range_start,range_end, sid));
83 _sion_opmicol_grc=SION_STD_SUCCESS;
85 helpdata = (
void *) malloc(sapi->num_threads *
sizeof(sion_int64 *));
86 if (helpdata == NULL) {
87 fprintf(stderr,
"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %lu (tcounts), aborting ...\n",
88 (
unsigned long) sapi->num_threads *
sizeof(
int *));
89 _sion_opmicol_grc=SION_STD_NOT_SUCCESS;
94 specs = (sion_int64 **)__sion_ompicol_share_ptr((
void *) helpdata);
97 if(_sion_opmicol_grc!=SION_STD_SUCCESS)
return(_sion_opmicol_grc);
100 specs[sapi->thread_num]= spec;
109 helpdata = (
void *) malloc(sapi->num_threads *
sizeof(
const void *));
110 if (helpdata == NULL) {
111 fprintf(stderr,
"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %lu (tcounts), aborting ...\n",
112 (
unsigned long) sapi->num_threads *
sizeof(
int *));
113 _sion_opmicol_grc=SION_STD_NOT_SUCCESS;
118 indatas = (
void const **)__sion_ompicol_share_ptr((
void *) helpdata);
121 if(_sion_opmicol_grc!=SION_STD_SUCCESS)
return(_sion_opmicol_grc);
124 indatas[sapi->thread_num] = indata;
132 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"store SPECS[%d]=%x (%x)\n", sapi->thread_num,specs[sapi->thread_num], spec));
133 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"store DATAS[%d]=%x (%x)\n", sapi->thread_num,indatas[sapi->thread_num], indata));
138 for(tt=0;tt<sapi->num_threads;tt++) {
139 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"MASTER SPECS[%d]=%x\n", tt,specs[tt]));
140 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"MASTER DATAS[%d]=%x\n", tt,indatas[tt]));
147 commgroup = sapi->comm;
149 if(rank == collector) {
152 mrank=_sion_map_rank_ompi_to_mpi(rank,sapi->num_threads);
155 buffer = (
char *) malloc(fsblksize *
sizeof(
char));
156 if (buffer == NULL) {
157 _sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,
"_mpi_gather_process_cb: cannot allocate temporary memory of size %lu (buffer), aborting ...\n",
158 (
unsigned long) fsblksize *
sizeof(
char));
159 _sion_opmicol_grc=SION_STD_NOT_SUCCESS;
163 for(t=range_start;t<=range_end;t++) {
165 mt=_sion_map_rank_ompi_to_mpi(t,sapi->num_threads);
170 tt=_sion_map_rank_ompi_to_thread_num(t,sapi->num_threads);
175 _sion_opmicol_grc=process_cb(p_data,p_spec, sid);
180 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector wait for spec from %d\n", mt));
181 MPI_Recv(spec, spec_len, SION_MPI_INT64, mt, 1534, commgroup, &status);
182 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector got spec from %d (%lld,%lld)\n",
183 mt, (
long long) spec[0], (
long long) spec[1]));
187 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector send signal to %d\n", mt));
188 MPI_Send(&startsignal, 1, MPI_INT, mt, 1535, commgroup);
192 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector start to process data of size %lld at offset %lld\n",
193 (
long long) spec[1], (
long long) spec[0]));
198 while(bytestorecv>0) {
199 if(bytestorecv>fsblksize) datasize=fsblksize;
200 else datasize=bytestorecv;
203 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector wait for data block from %d\n", mt));
204 MPI_Recv(buffer, datasize, MPI_CHAR, mt, 1536, commgroup, &status);
205 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector got data block from %d datasize=%lld bytestorecv=%lld\n",
206 mt, (
long long) datasize, (
long long) bytestorecv));
211 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector process data block of size %lld at pos %lld\n",
212 (
long long) spec[1], (
long long) spec[0]));
214 _sion_opmicol_grc=process_cb(buffer,spec, sid);
216 if(_sion_opmicol_grc != SION_STD_SUCCESS) {
217 _sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,
"_ompi_gather_process_cb: problems writing data ...\n");
221 bytestorecv-=datasize;spec[0]+=datasize;
230 if (buffer) free(buffer);
235 mcollector=_sion_map_rank_ompi_to_mpi(collector,sapi->num_threads);
238 for(tt=0;tt<sapi->num_threads;tt++) {
241 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender send spec to %d (%lld,%lld)\n",
242 collector,(
long long) specs[tt][0], (
long long) specs[tt][1]));
243 rc=MPI_Send(specs[tt], spec_len, SION_MPI_INT64, mcollector, 1534, commgroup);
244 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender sent spec to %d rc=%d\n", mcollector,rc));
247 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender wait for signal from %d\n", mcollector));
248 MPI_Recv(&startsignal, 1, MPI_INT, mcollector, 1535, commgroup, &status);
249 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender got signal from %d\n", mcollector));
252 bytestosend=specs[tt][1];
253 p=(
char *) indatas[tt];
254 while(bytestosend>0) {
255 if(bytestosend>fsblksize) datasize=fsblksize;
256 else datasize=bytestosend;
257 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender send data block to %d of size %lld\n", mcollector, (
long long) datasize));
258 MPI_Send(p, datasize, MPI_CHAR, mcollector, 1536, commgroup);
259 bytestosend-=datasize;p+=datasize;
266 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"leave collector=%d rc=%d\n", collector, rc ));
272 rc=_sion_opmicol_grc;
281 #define DFUNCTION "_ompi_process_scatter_cb" 282 int _sion_ompi_process_scatter_cb(
void *outdata, sion_int64 *spec,
int spec_len, sion_int64 fsblksize,
283 void *commdata,
int collector,
int range_start,
int range_end,
int sid,
284 int process_cb(
void *,sion_int64 *,
int ) ) {
286 int t, startsignal=1, count, mrank, mt, tt, mcollector;
292 sion_int64 bytestorecv, bytestosend, datasize;
293 _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
297 DPRINTFP((256, DFUNCTION, rank,
" input collector=%d range_start=%d range_end=%d sid=%d\n", collector,range_start,range_end, sid));
302 _sion_opmicol_grc=SION_STD_SUCCESS;
304 helpdata = (
void *) malloc(sapi->num_threads *
sizeof(sion_int64 *));
305 if (helpdata == NULL) {
306 fprintf(stderr,
"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %lu (tcounts), aborting ...\n",
307 (
unsigned long) sapi->num_threads *
sizeof(
int *));
308 _sion_opmicol_grc=SION_STD_NOT_SUCCESS;
313 specs = (sion_int64 **)__sion_ompicol_share_ptr((
void *) helpdata);
316 if(_sion_opmicol_grc!=SION_STD_SUCCESS)
return(_sion_opmicol_grc);
319 specs[sapi->thread_num]= spec;
328 helpdata = (
void *) malloc(sapi->num_threads *
sizeof(
void *));
329 if (helpdata == NULL) {
330 fprintf(stderr,
"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %lu (tcounts), aborting ...\n",
331 (
unsigned long) sapi->num_threads *
sizeof(
int *));
332 _sion_opmicol_grc=SION_STD_NOT_SUCCESS;
337 outdatas = (
void **)__sion_ompicol_share_ptr((
void *) helpdata);
340 if(_sion_opmicol_grc!=SION_STD_SUCCESS)
return(_sion_opmicol_grc);
343 outdatas[sapi->thread_num] = outdata;
354 commgroup = sapi->comm;
357 if(rank == collector) {
361 buffer = (
char *) malloc(fsblksize *
sizeof(
char));
362 if (buffer == NULL) {
363 _sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,
"_ompi_gather_process_cb: cannot allocate temporary memory of size %lu (buffer), aborting ...\n",
364 (
unsigned long) fsblksize *
sizeof(
char));
365 _sion_opmicol_grc=SION_STD_NOT_SUCCESS;
368 mrank=_sion_map_rank_ompi_to_mpi(rank,sapi->num_threads);
371 for(t=range_start;t<=range_end;t++) {
373 mt=_sion_map_rank_ompi_to_mpi(t,sapi->num_threads);
378 tt=_sion_map_rank_ompi_to_thread_num(t,sapi->num_threads);
381 _sion_opmicol_grc=process_cb(outdatas[tt],specs[tt], sid);
386 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector wait for spec from %d\n", t));
387 MPI_Recv(spec, spec_len, SION_MPI_INT64, mt, 1534, commgroup, &status);
388 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector got spec from %d (%lld,%lld)\n",
389 t, (
long long) spec[0], (
long long) spec[1]));
393 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector send signal to %d\n", t));
394 MPI_Send(&startsignal, 1, MPI_INT, mt, 1535, commgroup);
398 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector start to proces data of size %lld at offset %lld\n",
399 (
long long) spec[1], (
long long) spec[0]));
404 while(bytestosend>0) {
406 if(bytestosend>fsblksize) datasize=fsblksize;
407 else datasize=bytestosend;
412 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector process data block of size %lld at pos %lld\n",
413 (
long long) spec[1], (
long long) spec[0]));
415 _sion_opmicol_grc=process_cb(buffer,spec, sid);
417 if(_sion_opmicol_grc != SION_STD_SUCCESS) {
418 _sion_errorprint(SION_STD_NOT_SUCCESS,_SION_ERROR_RETURN,
"_ompi_gather_process_cb: problems writing data ...\n");
422 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector send for data block to %d\n", mt));
423 MPI_Send(buffer, datasize, MPI_CHAR, mt, 1536, commgroup);
424 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector sent data block to %d datasize=%lld bytestorecv=%lld\n",
425 mt, (
long long) datasize, (
long long) bytestosend));
428 bytestosend-=datasize;spec[0]+=datasize;
435 if (buffer) free(buffer);
440 mcollector=_sion_map_rank_ompi_to_mpi(collector,sapi->num_threads);
443 for(tt=0;tt<sapi->num_threads;tt++) {
446 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender send spec to %d (%lld,%lld)\n",
447 mcollector,(
long long) specs[tt][0], (
long long) specs[tt][1]));
448 MPI_Send(specs[tt], spec_len, SION_MPI_INT64, mcollector, 1534, commgroup);
453 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender wait for signal from %d\n", collector));
454 MPI_Recv(&startsignal, 1, MPI_INT, mcollector, 1535, commgroup, &status);
455 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender got signal from %d\n", collector));
458 bytestorecv=specs[tt][1];
460 while(bytestorecv>0) {
461 if(bytestorecv>fsblksize) datasize=fsblksize;
462 else datasize=bytestorecv;
463 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender recv data block from %d of size %lld\n", mcollector, (
long long) datasize));
464 MPI_Recv(p, datasize, MPI_CHAR, mcollector, 1536, commgroup, &status);
465 MPI_Get_count(&status,MPI_CHAR,&count);
467 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender recv data block from %d of size %lld (%d)\n", mcollector, (
long long) datasize, count));
468 bytestorecv-=datasize;p+=datasize;
482 rc=_sion_opmicol_grc;
487 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"leave collector=%d rc=%d\n", collector, rc ));
495 #define DFUNCTION "__sion_ompi_share_ptr" 496 void * __sion_ompicol_share_ptr(
void * in_ptr) {
500 __ompicol_global_pointer = in_ptr;
507 out_ptr=__ompicol_global_pointer;
515 #define DFUNCTION "_sion_ompi_get_capability_cb" 516 int _sion_ompi_get_capability_cb(
void *commdata )
519 _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
521 if(sapi->thread_num==0) {
523 DPRINTFP((256, DFUNCTION, sapi->rank,
"FULL capability\n"));
526 DPRINTFP((256, DFUNCTION, sapi->rank,
"ONLY SENDER capability\n"));
533 int _sion_ompicol_size_of_dtype(
int dtype) {
535 case _SION_INT32:
return(
sizeof(sion_int32));
break;
536 case _SION_INT64:
return(
sizeof(sion_int64));
break;
537 case _SION_CHAR:
return(
sizeof(
char));
break;
538 default:
return(
sizeof(sion_int64));
#define SION_CAPABILITY_FULL
#define SION_CAPABILITY_NONE
#define SION_CAPABILITY_ONLY_SENDER
int _sion_errorprint(int rc, int level, const char *format,...)
Internal SION error.