19 #define _XOPEN_SOURCE 700
29 #include <sys/types.h>
36 #include "sion_error_handler.h"
48 static void *__omp_global_pointer;
50 int _sion_omp_size_of_dtype(
int dtype);
52 int __sion_omp_get_info_from_other(
void *data, sion_int64 *spec,
int spec_len,
53 void *commdata,
int collector,
int range_start,
int range_end,
54 sion_int64 ***p_spec,
char ***p_indata );
56 int _sion_register_callbacks_omp(
void) {
58 aid=sion_generic_create_api(
"SIONlib_OMP_API");
61 sion_generic_register_create_local_commgroup_cb(aid,&_sion_omp_create_lcg_cb);
62 sion_generic_register_free_local_commgroup_cb(aid,&_sion_omp_free_lcg_cb);
64 sion_generic_register_barrier_cb(aid,&_sion_omp_barrier_cb);
65 sion_generic_register_bcastr_cb(aid,&_sion_omp_bcastr_cb);
66 sion_generic_register_gatherr_cb(aid,&_sion_omp_gatherr_cb);
67 sion_generic_register_scatterr_cb(aid,&_sion_omp_scatterr_cb);
68 sion_generic_register_gathervr_cb(aid,&_sion_omp_gathervr_cb);
69 sion_generic_register_scattervr_cb(aid,&_sion_omp_scattervr_cb);
70 sion_generic_register_gather_and_execute_cb(aid,&_sion_omp_gather_process_cb);
71 sion_generic_register_execute_and_scatter_cb(aid,&_sion_omp_process_scatter_cb);
72 sion_generic_register_get_capability_cb(aid,&_sion_omp_get_capability_cb);
77 int _sion_omp_create_lcg_cb(
void **local_commgroup,
void *global_commgroup,
80 int filenumber,
int numfiles
83 int rc=SION_STD_SUCCESS;
85 DPRINTFP((256,
"_sion_omp_create_lcg_cb", grank,
" DUMMY for split global comm: grank=%d gsize=%d filenumber=%d, numfiles=%d, lrank=%d lsize=%d \n",
86 grank, gsize, filenumber, numfiles, lrank, lsize));
87 *local_commgroup=global_commgroup;
92 int _sion_omp_free_lcg_cb(
void *local_commgroup) {
93 int rc=SION_STD_SUCCESS;
94 DPRINTFP((256,
"_omp_free_lcg_cb", _SION_DEFAULT_RANK,
" DUMMY for free local comm\n"));
99 int _sion_omp_barrier_cb(
void *commdata)
101 int rc=SION_STD_SUCCESS;
108 int _sion_omp_bcastr_cb(
void *data,
void *commdata,
int dtype,
int nelem,
int root)
110 int rc=SION_STD_SUCCESS;
111 _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
115 if(sapi->thread_num==root) {
116 __omp_global_pointer = data;
125 if((sapi->thread_num!=root) && (__omp_global_pointer != NULL)) {
126 memcpy(data,__omp_global_pointer,nelem*_sion_omp_size_of_dtype(dtype));
137 int _sion_omp_gatherr_cb(
void *indata,
void *outdata,
void *commdata,
int dtype,
int nelem,
int root)
140 int rc=SION_STD_SUCCESS;
141 _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
144 if(sapi->thread_num==root) {
145 __omp_global_pointer = outdata;
155 memcpy((
char *)__omp_global_pointer+sapi->thread_num*nelem*_sion_omp_size_of_dtype(dtype),
157 nelem*_sion_omp_size_of_dtype(dtype)
167 int _sion_omp_scatterr_cb(
void *indata,
void *outdata,
void *commdata,
int dtype,
int nelem,
int root)
170 int rc=SION_STD_SUCCESS;
171 _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
174 if(sapi->thread_num==root) {
175 __omp_global_pointer = indata;
186 (
char *)__omp_global_pointer+sapi->thread_num*nelem*_sion_omp_size_of_dtype(dtype),
187 nelem*_sion_omp_size_of_dtype(dtype)
199 int _sion_omp_gathervr_cb(
void *indata,
void *outdata,
void *commdata,
int dtype,
int *counts,
int nelem,
int root)
202 int rc=SION_STD_SUCCESS;
205 _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
208 if(sapi->thread_num==root) {
210 displs = (
int *) malloc(sapi->num_threads *
sizeof(
int));
211 if (displs == NULL) {
212 fprintf(stderr,
"__sion_omp_gathervr_cb: cannot allocate temporary memory of size %zu (displs), aborting ...\n",
213 (
size_t) sapi->num_threads *
sizeof(
int));
217 for(t=0;t<sapi->num_threads;t++) {
220 DPRINTFP((256,
"_sion_omp_gathervr_cb", sapi->thread_num,
" after compute %2d -> dpls=%2d count=%d\n", t,displs[t],counts[t]));
224 __omp_global_pointer = displs;
233 offset= ((
int*)__omp_global_pointer)[sapi->thread_num];
241 if(sapi->thread_num==root) {
242 if(displs) free(displs);
247 __omp_global_pointer = outdata;
256 memcpy((
char * ) __omp_global_pointer + offset * _sion_omp_size_of_dtype(dtype),
258 nelem*_sion_omp_size_of_dtype(dtype)
269 int _sion_omp_scattervr_cb(
void *indata,
void *outdata,
void *commdata,
int dtype,
int *counts,
int nelem,
int root)
272 int rc=SION_STD_SUCCESS;
275 _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
278 if(sapi->thread_num==root) {
280 displs = (
int *) malloc(sapi->num_threads *
sizeof(
int));
281 if (displs == NULL) {
282 fprintf(stderr,
"__sion_omp_gathervr_cb: cannot allocate temporary memory of size %zu (displs), aborting ...\n",
283 (
size_t) sapi->num_threads *
sizeof(
int));
287 for(t=0;t<sapi->num_threads;t++) {
290 DPRINTFP((256,
"_sion_omp_gathervr_cb", sapi->thread_num,
" after compute %2d -> dpls=%2d count=%d\n", t,displs[t],counts[t]));
294 __omp_global_pointer = displs;
303 offset= ((
int*)__omp_global_pointer)[sapi->thread_num];
311 if(sapi->thread_num==root) {
312 if(displs) free(displs);
317 __omp_global_pointer = indata;
327 (
char * ) __omp_global_pointer + offset * _sion_omp_size_of_dtype(dtype),
328 nelem*_sion_omp_size_of_dtype(dtype)
339 int _sion_omp_gather_process_cb(
const void *indata, sion_int64 *spec,
int spec_len, sion_int64 fsblksize,
340 void *commdata,
int collector,
int range_start,
int range_end,
int sid,
341 int process_cb(
const void *,sion_int64 *,
int ) ) {
342 int rc=SION_STD_SUCCESS;
344 sion_int64 **p_spec=NULL;
345 char **p_indata=NULL;
347 _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
350 __sion_omp_get_info_from_other((
void*) indata, spec, spec_len,
351 commdata, collector, range_start, range_end,
352 &p_spec, &p_indata );
356 if(sapi->thread_num==collector) {
359 for(t=range_start;t<=range_end;t++) {
361 DPRINTFP((256,
"_sion_omp_gather_process_cb", -1,
"on master t=%d spec=%ld,%ld,%ld \n",trank,
362 *(p_spec[trank]+0), *(p_spec[trank]+1), *(p_spec[trank]+2)));
364 rc=process_cb(p_indata[trank],p_spec[trank], sid);
367 if(rc != SION_STD_SUCCESS) {
368 return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,
"_sion_omp_gather_process_cb: problems writing data ...\n"));
371 if(p_spec) free(p_spec);
372 if(p_indata) free(p_indata);
385 int _sion_omp_process_scatter_cb(
void *outdata, sion_int64 *spec,
int spec_len, sion_int64 fsblksize,
386 void *commdata,
int collector,
int range_start,
int range_end,
int sid,
387 int process_cb(
void *,sion_int64 *,
int ) ) {
388 int rc=SION_STD_SUCCESS;
390 sion_int64 **p_spec=NULL;
391 char **p_indata=NULL;
393 _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
396 __sion_omp_get_info_from_other((
void*) outdata, spec, spec_len,
397 commdata, collector, range_start, range_end,
398 &p_spec, &p_indata );
402 if(sapi->thread_num==collector) {
405 for(t=range_start;t<=range_end;t++) {
407 DPRINTFP((256,
"_sion_omp_gather_process_cb", -1,
"on master t=%d spec=%ld,%ld,%ld \n",trank,
408 *(p_spec[trank]+0), *(p_spec[trank]+1), *(p_spec[trank]+2)));
410 rc=process_cb(p_indata[trank],p_spec[trank], sid);
413 if(rc != SION_STD_SUCCESS) {
414 return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,
"_sion_omp_gather_process_cb: problems writing data ...\n"));
417 if(p_spec) free(p_spec);
418 if(p_indata) free(p_indata);
432 int __sion_omp_get_info_from_other(
void *data, sion_int64 *spec,
int spec_len,
433 void *commdata,
int collector,
int range_start,
int range_end,
434 sion_int64 ***p_spec,
char ***p_indata ) {
439 int num_sender, my_trank;
440 _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
442 num_sender = range_end-range_start+1;
443 my_trank = sapi->thread_num-range_start;
446 if(sapi->thread_num==collector) {
448 *p_spec = (sion_int64 **) malloc(num_sender *
sizeof(sion_int64 *));
449 if (*p_spec == NULL) {
450 fprintf(stderr,
"_sion_omp_gather_process_cb: cannot allocate temporary memory of size %zu (p_spec), aborting ...\n",
451 (
size_t) num_sender *
sizeof(sion_int64 *));
454 *p_indata = (
char **) malloc(num_sender *
sizeof(
char *));
455 if (*p_indata == NULL) {
456 fprintf(stderr,
"_sion_omp_gather_process_cb: cannot allocate temporary memory of size %zu (p_indata), aborting ...\n",
457 (
size_t) num_sender *
sizeof(
char *));
462 __omp_global_pointer = *p_spec;
470 if(sapi->thread_num!=collector) {
471 tspec = (sion_int64 **) __omp_global_pointer;
472 tspec[my_trank] = spec;
473 DPRINTFP((256,
"_sion_omp_gather_process_cb", -1,
"on sender t=%d spec=%ld,%ld,%ld \n",my_trank,
474 *(tspec[my_trank]+0), *(tspec[my_trank]+1), *(tspec[my_trank]+2)));
483 if(sapi->thread_num==collector) {
485 __omp_global_pointer = *p_indata;
493 if(sapi->thread_num!=collector) {
494 tdata = (
char const **) __omp_global_pointer;
495 tdata[my_trank] = data;
507 int _sion_omp_get_capability_cb(
void *commdata )
510 _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
512 if(sapi->thread_num==0) {
514 DPRINTFP((256,
"_sion_omp_get_capability_cb", sapi->thread_num,
"FULL capability\n"));
517 DPRINTFP((256,
"_sion_omp_get_capability_cb", sapi->thread_num,
"ONLY SENDER capability\n"));
523 int _sion_omp_size_of_dtype(
int dtype) {
525 case _SION_INT32:
return(
sizeof(sion_int32));
break;
526 case _SION_INT64:
return(
sizeof(sion_int64));
break;
527 case _SION_CHAR:
return(
sizeof(
char));
break;
528 default:
return(
sizeof(sion_int64));
#define SION_CAPABILITY_NONE
#define SION_CAPABILITY_ONLY_SENDER
#define SION_CAPABILITY_FULL