27 #include <sys/types.h> 34 #include "sion_error_handler.h" 46 static void *__omp_global_pointer;
48 int _sion_omp_size_of_dtype(
int dtype);
50 int __sion_omp_get_info_from_other(
void *data, sion_int64 *spec,
int spec_len,
51 void *commdata,
int collector,
int range_start,
int range_end,
52 sion_int64 ***p_spec,
char ***p_indata );
54 int _sion_register_callbacks_omp() {
56 aid=sion_generic_create_api(
"SIONlib_OMP_API");
59 sion_generic_register_create_local_commgroup_cb(aid,&_sion_omp_create_lcg_cb);
60 sion_generic_register_free_local_commgroup_cb(aid,&_sion_omp_free_lcg_cb);
62 sion_generic_register_barrier_cb(aid,&_sion_omp_barrier_cb);
63 sion_generic_register_bcastr_cb(aid,&_sion_omp_bcastr_cb);
64 sion_generic_register_gatherr_cb(aid,&_sion_omp_gatherr_cb);
65 sion_generic_register_scatterr_cb(aid,&_sion_omp_scatterr_cb);
66 sion_generic_register_gathervr_cb(aid,&_sion_omp_gathervr_cb);
67 sion_generic_register_scattervr_cb(aid,&_sion_omp_scattervr_cb);
68 sion_generic_register_gather_and_execute_cb(aid,&_sion_omp_gather_process_cb);
69 sion_generic_register_execute_and_scatter_cb(aid,&_sion_omp_process_scatter_cb);
70 sion_generic_register_get_capability_cb(aid,&_sion_omp_get_capability_cb);
75 int _sion_omp_create_lcg_cb(
void **local_commgroup,
void *global_commgroup,
78 int filenumber,
int numfiles
81 int rc=SION_STD_SUCCESS;
83 DPRINTFP((256,
"_sion_omp_create_lcg_cb", grank,
" DUMMY for split global comm: grank=%d gsize=%d filenumber=%d, numfiles=%d, lrank=%d lsize=%d \n",
84 grank, gsize, filenumber, numfiles, lrank, lsize));
85 *local_commgroup=global_commgroup;
90 int _sion_omp_free_lcg_cb(
void *local_commgroup) {
91 int rc=SION_STD_SUCCESS;
92 DPRINTFP((256,
"_omp_free_lcg_cb", _SION_DEFAULT_RANK,
" DUMMY for free local comm\n"));
97 int _sion_omp_barrier_cb(
void *commdata)
99 int rc=SION_STD_SUCCESS;
106 int _sion_omp_bcastr_cb(
void *data,
void *commdata,
int dtype,
int nelem,
int root)
108 int rc=SION_STD_SUCCESS;
109 _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
113 if(sapi->thread_num==root) {
114 __omp_global_pointer = data;
123 if((sapi->thread_num!=root) && (__omp_global_pointer != NULL)) {
124 memcpy(data,__omp_global_pointer,nelem*_sion_omp_size_of_dtype(dtype));
135 int _sion_omp_gatherr_cb(
void *indata,
void *outdata,
void *commdata,
int dtype,
int nelem,
int root)
138 int rc=SION_STD_SUCCESS;
139 _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
142 if(sapi->thread_num==root) {
143 __omp_global_pointer = outdata;
153 memcpy(__omp_global_pointer+sapi->thread_num*nelem*_sion_omp_size_of_dtype(dtype),
155 nelem*_sion_omp_size_of_dtype(dtype)
165 int _sion_omp_scatterr_cb(
void *indata,
void *outdata,
void *commdata,
int dtype,
int nelem,
int root)
168 int rc=SION_STD_SUCCESS;
169 _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
172 if(sapi->thread_num==root) {
173 __omp_global_pointer = indata;
184 __omp_global_pointer+sapi->thread_num*nelem*_sion_omp_size_of_dtype(dtype),
185 nelem*_sion_omp_size_of_dtype(dtype)
197 int _sion_omp_gathervr_cb(
void *indata,
void *outdata,
void *commdata,
int dtype,
int *counts,
int nelem,
int root)
200 int rc=SION_STD_SUCCESS;
203 _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
206 if(sapi->thread_num==root) {
208 displs = (
int *) malloc(sapi->num_threads *
sizeof(
int));
209 if (displs == NULL) {
210 fprintf(stderr,
"__sion_omp_gathervr_cb: cannot allocate temporary memory of size %zu (displs), aborting ...\n",
211 (
size_t) sapi->num_threads *
sizeof(
int));
215 for(t=0;t<sapi->num_threads;t++) {
218 DPRINTFP((256,
"_sion_omp_gathervr_cb", sapi->thread_num,
" after compute %2d -> dpls=%2d count=%d\n", t,displs[t],counts[t]));
222 __omp_global_pointer = displs;
231 offset= ((
int*)__omp_global_pointer)[sapi->thread_num];
239 if(sapi->thread_num==root) {
240 if(displs) free(displs);
245 __omp_global_pointer = outdata;
254 memcpy((
char * ) __omp_global_pointer + offset * _sion_omp_size_of_dtype(dtype),
256 nelem*_sion_omp_size_of_dtype(dtype)
267 int _sion_omp_scattervr_cb(
void *indata,
void *outdata,
void *commdata,
int dtype,
int *counts,
int nelem,
int root)
270 int rc=SION_STD_SUCCESS;
273 _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
276 if(sapi->thread_num==root) {
278 displs = (
int *) malloc(sapi->num_threads *
sizeof(
int));
279 if (displs == NULL) {
280 fprintf(stderr,
"__sion_omp_gathervr_cb: cannot allocate temporary memory of size %zu (displs), aborting ...\n",
281 (
size_t) sapi->num_threads *
sizeof(
int));
285 for(t=0;t<sapi->num_threads;t++) {
288 DPRINTFP((256,
"_sion_omp_gathervr_cb", sapi->thread_num,
" after compute %2d -> dpls=%2d count=%d\n", t,displs[t],counts[t]));
292 __omp_global_pointer = displs;
301 offset= ((
int*)__omp_global_pointer)[sapi->thread_num];
309 if(sapi->thread_num==root) {
310 if(displs) free(displs);
315 __omp_global_pointer = indata;
325 (
char * ) __omp_global_pointer + offset * _sion_omp_size_of_dtype(dtype),
326 nelem*_sion_omp_size_of_dtype(dtype)
337 int _sion_omp_gather_process_cb(
const void *indata, sion_int64 *spec,
int spec_len, sion_int64 fsblksize,
338 void *commdata,
int collector,
int range_start,
int range_end,
int sid,
339 int process_cb(
const void *,sion_int64 *,
int ) ) {
340 int rc=SION_STD_SUCCESS;
342 sion_int64 **p_spec=NULL;
343 char **p_indata=NULL;
345 _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
348 __sion_omp_get_info_from_other((
void*) indata, spec, spec_len,
349 commdata, collector, range_start, range_end,
350 &p_spec, &p_indata );
354 if(sapi->thread_num==collector) {
357 for(t=range_start;t<=range_end;t++) {
359 DPRINTFP((256,
"_sion_omp_gather_process_cb", -1,
"on master t=%d spec=%ld,%ld,%ld \n",trank,
360 *(p_spec[trank]+0), *(p_spec[trank]+1), *(p_spec[trank]+2)));
362 rc=process_cb(p_indata[trank],p_spec[trank], sid);
365 if(rc != SION_STD_SUCCESS) {
366 return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,
"_sion_omp_gather_process_cb: problems writing data ...\n"));
369 if(p_spec) free(p_spec);
370 if(p_indata) free(p_indata);
383 int _sion_omp_process_scatter_cb(
void *outdata, sion_int64 *spec,
int spec_len, sion_int64 fsblksize,
384 void *commdata,
int collector,
int range_start,
int range_end,
int sid,
385 int process_cb(
void *,sion_int64 *,
int ) ) {
386 int rc=SION_STD_SUCCESS;
388 sion_int64 **p_spec=NULL;
389 char **p_indata=NULL;
391 _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
394 __sion_omp_get_info_from_other((
void*) outdata, spec, spec_len,
395 commdata, collector, range_start, range_end,
396 &p_spec, &p_indata );
400 if(sapi->thread_num==collector) {
403 for(t=range_start;t<=range_end;t++) {
405 DPRINTFP((256,
"_sion_omp_gather_process_cb", -1,
"on master t=%d spec=%ld,%ld,%ld \n",trank,
406 *(p_spec[trank]+0), *(p_spec[trank]+1), *(p_spec[trank]+2)));
408 rc=process_cb(p_indata[trank],p_spec[trank], sid);
411 if(rc != SION_STD_SUCCESS) {
412 return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,
"_sion_omp_gather_process_cb: problems writing data ...\n"));
415 if(p_spec) free(p_spec);
416 if(p_indata) free(p_indata);
430 int __sion_omp_get_info_from_other(
void *data, sion_int64 *spec,
int spec_len,
431 void *commdata,
int collector,
int range_start,
int range_end,
432 sion_int64 ***p_spec,
char ***p_indata ) {
437 int num_sender, my_trank;
438 _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
440 num_sender = range_end-range_start+1;
441 my_trank = sapi->thread_num-range_start;
444 if(sapi->thread_num==collector) {
446 *p_spec = (sion_int64 **) malloc(num_sender *
sizeof(sion_int64 *));
447 if (*p_spec == NULL) {
448 fprintf(stderr,
"_sion_omp_gather_process_cb: cannot allocate temporary memory of size %zu (p_spec), aborting ...\n",
449 (
size_t) num_sender *
sizeof(sion_int64 *));
452 *p_indata = (
char **) malloc(num_sender *
sizeof(
char *));
453 if (*p_indata == NULL) {
454 fprintf(stderr,
"_sion_omp_gather_process_cb: cannot allocate temporary memory of size %zu (p_indata), aborting ...\n",
455 (
size_t) num_sender *
sizeof(
char *));
460 __omp_global_pointer = *p_spec;
468 if(sapi->thread_num!=collector) {
469 tspec = (sion_int64 **) __omp_global_pointer;
470 tspec[my_trank] = spec;
471 DPRINTFP((256,
"_sion_omp_gather_process_cb", -1,
"on sender t=%d spec=%ld,%ld,%ld \n",my_trank,
472 *(tspec[my_trank]+0), *(tspec[my_trank]+1), *(tspec[my_trank]+2)));
481 if(sapi->thread_num==collector) {
483 __omp_global_pointer = *p_indata;
491 if(sapi->thread_num!=collector) {
492 tdata = (
char const **) __omp_global_pointer;
493 tdata[my_trank] = data;
505 int _sion_omp_get_capability_cb(
void *commdata )
508 _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
510 if(sapi->thread_num==0) {
512 DPRINTFP((256,
"_sion_omp_get_capability_cb", sapi->thread_num,
"FULL capability\n"));
515 DPRINTFP((256,
"_sion_omp_get_capability_cb", sapi->thread_num,
"ONLY SENDER capability\n"));
521 int _sion_omp_size_of_dtype(
int dtype) {
523 case _SION_INT32:
return(
sizeof(sion_int32));
break;
524 case _SION_INT64:
return(
sizeof(sion_int64));
break;
525 case _SION_CHAR:
return(
sizeof(
char));
break;
526 default:
return(
sizeof(sion_int64));
#define SION_CAPABILITY_FULL
#define SION_CAPABILITY_NONE
#define SION_CAPABILITY_ONLY_SENDER