14 #define _XOPEN_SOURCE 700
26 #define MPI_Comm_rank PMPI_Comm_rank
27 #define MPI_Comm_size PMPI_Comm_size
28 #define MPI_Gather PMPI_Gather
29 #define MPI_Scatter PMPI_Scatter
30 #define MPI_Bcast PMPI_Bcast
31 #define MPI_Barrier PMPI_Barrier
32 #define MPI_Comm_split PMPI_Comm_split
33 #define MPI_Send PMPI_Send
34 #define MPI_Recv PMPI_Recv
39 #include <sys/types.h>
46 #include "sion_error_handler.h"
57 static void *__ompicol_global_pointer;
58 static int _sion_opmicol_grc=SION_SUCCESS;
60 int _sion_ompicol_size_of_dtype(
int dtype);
61 void * __sion_ompicol_share_ptr(
void * in_ptr);
63 #define DFUNCTION "_ompi_gather_execute_cb"
64 int _sion_ompi_gather_process_cb(
const void *indata, sion_int64 *spec,
int spec_len, sion_int64 fsblksize,
65 void *commdata,
int collector,
int range_start,
int range_end,
int sid,
66 int process_cb(
const void *,sion_int64 *,
int ) ) {
68 int t, startsignal=1,mrank,mt,tt, mcollector;
74 sion_int64 **specs, *p_spec;
75 sion_int64 bytestorecv, bytestosend, datasize;
76 _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
80 DPRINTFP((256, DFUNCTION, rank,
" input collector=%d range_start=%d range_end=%d sid=%d\n", collector,range_start,range_end, sid));
86 _sion_opmicol_grc=SION_STD_SUCCESS;
88 helpdata = (
void *) malloc(sapi->num_threads *
sizeof(sion_int64 *));
89 if (helpdata == NULL) {
90 fprintf(stderr,
"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %zu (helpdata), aborting ...\n",
91 (
size_t) sapi->num_threads *
sizeof(
int *));
92 _sion_opmicol_grc=SION_STD_NOT_SUCCESS;
97 specs = (sion_int64 **)__sion_ompicol_share_ptr((
void *) helpdata);
100 if(_sion_opmicol_grc!=SION_STD_SUCCESS)
return(_sion_opmicol_grc);
103 specs[sapi->thread_num]= spec;
112 helpdata = (
void *) malloc(sapi->num_threads *
sizeof(
const void *));
113 if (helpdata == NULL) {
114 fprintf(stderr,
"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %zu (tcounts), aborting ...\n",
115 (
size_t) sapi->num_threads *
sizeof(
int *));
116 _sion_opmicol_grc=SION_STD_NOT_SUCCESS;
121 indatas = (
void const **)__sion_ompicol_share_ptr((
void *) helpdata);
124 if(_sion_opmicol_grc!=SION_STD_SUCCESS)
return(_sion_opmicol_grc);
127 indatas[sapi->thread_num] = indata;
135 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"store SPECS[%d]=%x (%x)\n", sapi->thread_num,specs[sapi->thread_num], spec));
136 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"store DATAS[%d]=%x (%x)\n", sapi->thread_num,indatas[sapi->thread_num], indata));
141 for(tt=0;tt<sapi->num_threads;tt++) {
142 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"MASTER SPECS[%d]=%x\n", tt,specs[tt]));
143 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"MASTER DATAS[%d]=%x\n", tt,indatas[tt]));
150 commgroup = sapi->comm;
152 if(rank == collector) {
155 mrank=_sion_map_rank_ompi_to_mpi(rank,sapi->num_threads);
158 buffer = (
char *) malloc(fsblksize *
sizeof(
char));
159 if (buffer == NULL) {
160 _sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,
"_mpi_gather_process_cb: cannot allocate temporary memory of size %lu (buffer), aborting ...\n",
161 (
unsigned long) fsblksize *
sizeof(
char));
162 _sion_opmicol_grc=SION_STD_NOT_SUCCESS;
166 for(t=range_start;t<=range_end;t++) {
168 mt=_sion_map_rank_ompi_to_mpi(t,sapi->num_threads);
173 tt=_sion_map_rank_ompi_to_thread_num(t,sapi->num_threads);
178 _sion_opmicol_grc=process_cb(p_data,p_spec, sid);
183 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector wait for spec from %d\n", mt));
184 MPI_Recv(spec, spec_len, SION_MPI_INT64, mt, 1534, commgroup, &status);
185 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector got spec from %d (%lld,%lld)\n",
186 mt, (
long long) spec[0], (
long long) spec[1]));
190 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector send signal to %d\n", mt));
191 MPI_Send(&startsignal, 1, MPI_INT, mt, 1535, commgroup);
195 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector start to process data of size %lld at offset %lld\n",
196 (
long long) spec[1], (
long long) spec[0]));
201 while(bytestorecv>0) {
202 if(bytestorecv>fsblksize) datasize=fsblksize;
203 else datasize=bytestorecv;
206 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector wait for data block from %d\n", mt));
207 MPI_Recv(buffer, datasize, MPI_CHAR, mt, 1536, commgroup, &status);
208 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector got data block from %d datasize=%lld bytestorecv=%lld\n",
209 mt, (
long long) datasize, (
long long) bytestorecv));
214 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector process data block of size %lld at pos %lld\n",
215 (
long long) spec[1], (
long long) spec[0]));
217 _sion_opmicol_grc=process_cb(buffer,spec, sid);
219 if(_sion_opmicol_grc != SION_STD_SUCCESS) {
220 _sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,
"_ompi_gather_process_cb: problems writing data ...\n");
224 bytestorecv-=datasize;spec[0]+=datasize;
233 if (buffer) free(buffer);
238 mcollector=_sion_map_rank_ompi_to_mpi(collector,sapi->num_threads);
241 for(tt=0;tt<sapi->num_threads;tt++) {
244 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender send spec to %d (%lld,%lld)\n",
245 collector,(
long long) specs[tt][0], (
long long) specs[tt][1]));
246 rc=MPI_Send(specs[tt], spec_len, SION_MPI_INT64, mcollector, 1534, commgroup);
247 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender sent spec to %d rc=%d\n", mcollector,rc));
250 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender wait for signal from %d\n", mcollector));
251 MPI_Recv(&startsignal, 1, MPI_INT, mcollector, 1535, commgroup, &status);
252 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender got signal from %d\n", mcollector));
255 bytestosend=specs[tt][1];
256 p=(
char *) indatas[tt];
257 while(bytestosend>0) {
258 if(bytestosend>fsblksize) datasize=fsblksize;
259 else datasize=bytestosend;
260 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender send data block to %d of size %lld\n", mcollector, (
long long) datasize));
261 MPI_Send(p, datasize, MPI_CHAR, mcollector, 1536, commgroup);
262 bytestosend-=datasize;p+=datasize;
269 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"leave collector=%d rc=%d\n", collector, rc ));
275 rc=_sion_opmicol_grc;
284 #define DFUNCTION "_ompi_process_scatter_cb"
285 int _sion_ompi_process_scatter_cb(
void *outdata, sion_int64 *spec,
int spec_len, sion_int64 fsblksize,
286 void *commdata,
int collector,
int range_start,
int range_end,
int sid,
287 int process_cb(
void *,sion_int64 *,
int ) ) {
289 int t, startsignal=1, count, mrank, mt, tt, mcollector;
295 sion_int64 bytestorecv, bytestosend, datasize;
296 _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
300 DPRINTFP((256, DFUNCTION, rank,
" input collector=%d range_start=%d range_end=%d sid=%d\n", collector,range_start,range_end, sid));
305 _sion_opmicol_grc=SION_STD_SUCCESS;
307 helpdata = (
void *) malloc(sapi->num_threads *
sizeof(sion_int64 *));
308 if (helpdata == NULL) {
309 fprintf(stderr,
"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %zu (helpdata), aborting ...\n",
310 (
size_t) sapi->num_threads *
sizeof(
int *));
311 _sion_opmicol_grc=SION_STD_NOT_SUCCESS;
316 specs = (sion_int64 **)__sion_ompicol_share_ptr((
void *) helpdata);
319 if(_sion_opmicol_grc!=SION_STD_SUCCESS)
return(_sion_opmicol_grc);
322 specs[sapi->thread_num]= spec;
331 helpdata = (
void *) malloc(sapi->num_threads *
sizeof(
void *));
332 if (helpdata == NULL) {
333 fprintf(stderr,
"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %zu (helpdata), aborting ...\n",
334 (
size_t) sapi->num_threads *
sizeof(
int *));
335 _sion_opmicol_grc=SION_STD_NOT_SUCCESS;
340 outdatas = (
void **)__sion_ompicol_share_ptr((
void *) helpdata);
343 if(_sion_opmicol_grc!=SION_STD_SUCCESS)
return(_sion_opmicol_grc);
346 outdatas[sapi->thread_num] = outdata;
357 commgroup = sapi->comm;
360 if(rank == collector) {
364 buffer = (
char *) malloc(fsblksize *
sizeof(
char));
365 if (buffer == NULL) {
366 _sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,
"_ompi_gather_process_cb: cannot allocate temporary memory of size %lu (buffer), aborting ...\n",
367 (
unsigned long) fsblksize *
sizeof(
char));
368 _sion_opmicol_grc=SION_STD_NOT_SUCCESS;
371 mrank=_sion_map_rank_ompi_to_mpi(rank,sapi->num_threads);
374 for(t=range_start;t<=range_end;t++) {
376 mt=_sion_map_rank_ompi_to_mpi(t,sapi->num_threads);
381 tt=_sion_map_rank_ompi_to_thread_num(t,sapi->num_threads);
384 _sion_opmicol_grc=process_cb(outdatas[tt],specs[tt], sid);
389 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector wait for spec from %d\n", t));
390 MPI_Recv(spec, spec_len, SION_MPI_INT64, mt, 1534, commgroup, &status);
391 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector got spec from %d (%lld,%lld)\n",
392 t, (
long long) spec[0], (
long long) spec[1]));
396 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector send signal to %d\n", t));
397 MPI_Send(&startsignal, 1, MPI_INT, mt, 1535, commgroup);
401 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector start to proces data of size %lld at offset %lld\n",
402 (
long long) spec[1], (
long long) spec[0]));
407 while(bytestosend>0) {
409 if(bytestosend>fsblksize) datasize=fsblksize;
410 else datasize=bytestosend;
415 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector process data block of size %lld at pos %lld\n",
416 (
long long) spec[1], (
long long) spec[0]));
418 _sion_opmicol_grc=process_cb(buffer,spec, sid);
420 if(_sion_opmicol_grc != SION_STD_SUCCESS) {
421 _sion_errorprint(SION_STD_NOT_SUCCESS,_SION_ERROR_RETURN,
"_ompi_gather_process_cb: problems writing data ...\n");
425 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector send for data block to %d\n", mt));
426 MPI_Send(buffer, datasize, MPI_CHAR, mt, 1536, commgroup);
427 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector sent data block to %d datasize=%lld bytestorecv=%lld\n",
428 mt, (
long long) datasize, (
long long) bytestosend));
431 bytestosend-=datasize;spec[0]+=datasize;
438 if (buffer) free(buffer);
443 mcollector=_sion_map_rank_ompi_to_mpi(collector,sapi->num_threads);
446 for(tt=0;tt<sapi->num_threads;tt++) {
449 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender send spec to %d (%lld,%lld)\n",
450 mcollector,(
long long) specs[tt][0], (
long long) specs[tt][1]));
451 MPI_Send(specs[tt], spec_len, SION_MPI_INT64, mcollector, 1534, commgroup);
456 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender wait for signal from %d\n", collector));
457 MPI_Recv(&startsignal, 1, MPI_INT, mcollector, 1535, commgroup, &status);
458 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender got signal from %d\n", collector));
461 bytestorecv=specs[tt][1];
463 while(bytestorecv>0) {
464 if(bytestorecv>fsblksize) datasize=fsblksize;
465 else datasize=bytestorecv;
466 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender recv data block from %d of size %lld\n", mcollector, (
long long) datasize));
467 MPI_Recv(p, datasize, MPI_CHAR, mcollector, 1536, commgroup, &status);
468 MPI_Get_count(&status,MPI_CHAR,&count);
470 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"sender recv data block from %d of size %lld (%d)\n", mcollector, (
long long) datasize, count));
471 bytestorecv-=datasize;p+=datasize;
485 rc=_sion_opmicol_grc;
490 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"leave collector=%d rc=%d\n", collector, rc ));
498 #define DFUNCTION "__sion_ompi_share_ptr"
499 void * __sion_ompicol_share_ptr(
void * in_ptr) {
503 __ompicol_global_pointer = in_ptr;
510 out_ptr=__ompicol_global_pointer;
518 #define DFUNCTION "_sion_ompi_get_capability_cb"
519 int _sion_ompi_get_capability_cb(
void *commdata )
522 _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
524 if(sapi->thread_num==0) {
526 DPRINTFP((256, DFUNCTION, sapi->rank,
"FULL capability\n"));
529 DPRINTFP((256, DFUNCTION, sapi->rank,
"ONLY SENDER capability\n"));
536 int _sion_ompicol_size_of_dtype(
int dtype) {
538 case _SION_INT32:
return(
sizeof(sion_int32));
break;
539 case _SION_INT64:
return(
sizeof(sion_int64));
break;
540 case _SION_CHAR:
return(
sizeof(
char));
break;
541 default:
return(
sizeof(sion_int64));
#define SION_CAPABILITY_NONE
#define SION_CAPABILITY_ONLY_SENDER
#define SION_CAPABILITY_FULL