27 #include <sys/types.h> 45 static void *__omp_global_pointer;
47 int _sion_omp_size_of_dtype(
int dtype);
49 int __sion_omp_get_info_from_other(
void *data, sion_int64 *spec,
int spec_len,
50 void *commdata,
int collector,
int range_start,
int range_end,
51 sion_int64 ***p_spec,
char ***p_indata );
53 int _sion_register_callbacks_omp() {
55 aid=sion_generic_create_api(
"SIONlib_OMP_API");
58 sion_generic_register_create_local_commgroup_cb(aid,&_sion_omp_create_lcg_cb);
59 sion_generic_register_free_local_commgroup_cb(aid,&_sion_omp_free_lcg_cb);
61 sion_generic_register_barrier_cb(aid,&_sion_omp_barrier_cb);
62 sion_generic_register_bcastr_cb(aid,&_sion_omp_bcastr_cb);
63 sion_generic_register_gatherr_cb(aid,&_sion_omp_gatherr_cb);
64 sion_generic_register_scatterr_cb(aid,&_sion_omp_scatterr_cb);
65 sion_generic_register_gathervr_cb(aid,&_sion_omp_gathervr_cb);
66 sion_generic_register_scattervr_cb(aid,&_sion_omp_scattervr_cb);
67 sion_generic_register_gather_and_execute_cb(aid,&_sion_omp_gather_process_cb);
68 sion_generic_register_execute_and_scatter_cb(aid,&_sion_omp_process_scatter_cb);
69 sion_generic_register_get_capability_cb(aid,&_sion_omp_get_capability_cb);
74 int _sion_omp_create_lcg_cb(
void **local_commgroup,
void *global_commgroup,
77 int filenumber,
int numfiles
80 int rc=SION_STD_SUCCESS;
82 DPRINTFP((256,
"_sion_omp_create_lcg_cb", grank,
" DUMMY for split global comm: grank=%d gsize=%d filenumber=%d, numfiles=%d, lrank=%d lsize=%d \n",
83 grank, gsize, filenumber, numfiles, lrank, lsize));
84 *local_commgroup=global_commgroup;
89 int _sion_omp_free_lcg_cb(
void *local_commgroup) {
90 int rc=SION_STD_SUCCESS;
91 DPRINTFP((256,
"_omp_free_lcg_cb", _SION_DEFAULT_RANK,
" DUMMY for free local comm\n"));
96 int _sion_omp_barrier_cb(
void *commdata)
98 int rc=SION_STD_SUCCESS;
105 int _sion_omp_bcastr_cb(
void *data,
void *commdata,
int dtype,
int nelem,
int root)
107 int rc=SION_STD_SUCCESS;
108 _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
112 if(sapi->thread_num==root) {
113 __omp_global_pointer = data;
122 if((sapi->thread_num!=root) && (__omp_global_pointer != NULL)) {
123 memcpy(data,__omp_global_pointer,nelem*_sion_omp_size_of_dtype(dtype));
134 int _sion_omp_gatherr_cb(
void *indata,
void *outdata,
void *commdata,
int dtype,
int nelem,
int root)
137 int rc=SION_STD_SUCCESS;
138 _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
141 if(sapi->thread_num==root) {
142 __omp_global_pointer = outdata;
152 memcpy(__omp_global_pointer+sapi->thread_num*nelem*_sion_omp_size_of_dtype(dtype),
154 nelem*_sion_omp_size_of_dtype(dtype)
164 int _sion_omp_scatterr_cb(
void *indata,
void *outdata,
void *commdata,
int dtype,
int nelem,
int root)
167 int rc=SION_STD_SUCCESS;
168 _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
171 if(sapi->thread_num==root) {
172 __omp_global_pointer = indata;
183 __omp_global_pointer+sapi->thread_num*nelem*_sion_omp_size_of_dtype(dtype),
184 nelem*_sion_omp_size_of_dtype(dtype)
196 int _sion_omp_gathervr_cb(
void *indata,
void *outdata,
void *commdata,
int dtype,
int *counts,
int nelem,
int root)
199 int rc=SION_STD_SUCCESS;
202 _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
205 if(sapi->thread_num==root) {
207 displs = (
int *) malloc(sapi->num_threads *
sizeof(
int));
208 if (displs == NULL) {
209 fprintf(stderr,
"__sion_omp_gathervr_cb: cannot allocate temporary memory of size %lu (displs), aborting ...\n",
210 (
unsigned long) sapi->num_threads *
sizeof(
int));
214 for(t=0;t<sapi->num_threads;t++) {
217 DPRINTFP((256,
"_sion_omp_gathervr_cb", sapi->thread_num,
" after compute %2d -> dpls=%2d count=%d\n", t,displs[t],counts[t]));
221 __omp_global_pointer = displs;
230 offset= ((
int*)__omp_global_pointer)[sapi->thread_num];
238 if(sapi->thread_num==root) {
239 if(displs) free(displs);
244 __omp_global_pointer = outdata;
253 memcpy((
char * ) __omp_global_pointer + offset * _sion_omp_size_of_dtype(dtype),
255 nelem*_sion_omp_size_of_dtype(dtype)
266 int _sion_omp_scattervr_cb(
void *indata,
void *outdata,
void *commdata,
int dtype,
int *counts,
int nelem,
int root)
269 int rc=SION_STD_SUCCESS;
272 _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
275 if(sapi->thread_num==root) {
277 displs = (
int *) malloc(sapi->num_threads *
sizeof(
int));
278 if (displs == NULL) {
279 fprintf(stderr,
"__sion_omp_gathervr_cb: cannot allocate temporary memory of size %lu (displs), aborting ...\n",
280 (
unsigned long) sapi->num_threads *
sizeof(
int));
284 for(t=0;t<sapi->num_threads;t++) {
287 DPRINTFP((256,
"_sion_omp_gathervr_cb", sapi->thread_num,
" after compute %2d -> dpls=%2d count=%d\n", t,displs[t],counts[t]));
291 __omp_global_pointer = displs;
300 offset= ((
int*)__omp_global_pointer)[sapi->thread_num];
308 if(sapi->thread_num==root) {
309 if(displs) free(displs);
314 __omp_global_pointer = indata;
324 (
char * ) __omp_global_pointer + offset * _sion_omp_size_of_dtype(dtype),
325 nelem*_sion_omp_size_of_dtype(dtype)
336 int _sion_omp_gather_process_cb(
const void *indata, sion_int64 *spec,
int spec_len, sion_int64 fsblksize,
337 void *commdata,
int collector,
int range_start,
int range_end,
int sid,
338 int process_cb(
const void *,sion_int64 *,
int ) ) {
339 int rc=SION_STD_SUCCESS;
341 sion_int64 **p_spec=NULL;
342 char **p_indata=NULL;
344 _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
347 __sion_omp_get_info_from_other((
void*) indata, spec, spec_len,
348 commdata, collector, range_start, range_end,
349 &p_spec, &p_indata );
353 if(sapi->thread_num==collector) {
356 for(t=range_start;t<=range_end;t++) {
358 DPRINTFP((256,
"_sion_omp_gather_process_cb", -1,
"on master t=%d spec=%ld,%ld,%ld \n",trank,
359 *(p_spec[trank]+0), *(p_spec[trank]+1), *(p_spec[trank]+2)));
361 rc=process_cb(p_indata[trank],p_spec[trank], sid);
364 if(rc != SION_STD_SUCCESS) {
365 return(
_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,
"_sion_omp_gather_process_cb: problems writing data ...\n"));
368 if(p_spec) free(p_spec);
369 if(p_indata) free(p_indata);
382 int _sion_omp_process_scatter_cb(
void *outdata, sion_int64 *spec,
int spec_len, sion_int64 fsblksize,
383 void *commdata,
int collector,
int range_start,
int range_end,
int sid,
384 int process_cb(
void *,sion_int64 *,
int ) ) {
385 int rc=SION_STD_SUCCESS;
387 sion_int64 **p_spec=NULL;
388 char **p_indata=NULL;
390 _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
393 __sion_omp_get_info_from_other((
void*) outdata, spec, spec_len,
394 commdata, collector, range_start, range_end,
395 &p_spec, &p_indata );
399 if(sapi->thread_num==collector) {
402 for(t=range_start;t<=range_end;t++) {
404 DPRINTFP((256,
"_sion_omp_gather_process_cb", -1,
"on master t=%d spec=%ld,%ld,%ld \n",trank,
405 *(p_spec[trank]+0), *(p_spec[trank]+1), *(p_spec[trank]+2)));
407 rc=process_cb(p_indata[trank],p_spec[trank], sid);
410 if(rc != SION_STD_SUCCESS) {
411 return(
_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,
"_sion_omp_gather_process_cb: problems writing data ...\n"));
414 if(p_spec) free(p_spec);
415 if(p_indata) free(p_indata);
429 int __sion_omp_get_info_from_other(
void *data, sion_int64 *spec,
int spec_len,
430 void *commdata,
int collector,
int range_start,
int range_end,
431 sion_int64 ***p_spec,
char ***p_indata ) {
436 int num_sender, my_trank;
437 _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
439 num_sender = range_end-range_start+1;
440 my_trank = sapi->thread_num-range_start;
443 if(sapi->thread_num==collector) {
445 *p_spec = (sion_int64 **) malloc(num_sender *
sizeof(sion_int64 *));
446 if (p_spec == NULL) {
447 fprintf(stderr,
"_sion_omp_gather_process_cb: cannot allocate temporary memory of size %lu (p_spec), aborting ...\n",
448 (
unsigned long) num_sender *
sizeof(sion_int64 *));
451 *p_indata = (
char **) malloc(num_sender *
sizeof(
char *));
452 if (p_spec == NULL) {
453 fprintf(stderr,
"_sion_omp_gather_process_cb: cannot allocate temporary memory of size %lu (p_indata), aborting ...\n",
454 (
unsigned long) num_sender *
sizeof(
char *));
459 __omp_global_pointer = *p_spec;
467 if(sapi->thread_num!=collector) {
468 tspec = (sion_int64 **) __omp_global_pointer;
469 tspec[my_trank] = spec;
470 DPRINTFP((256,
"_sion_omp_gather_process_cb", -1,
"on sender t=%d spec=%ld,%ld,%ld \n",my_trank,
471 *(tspec[my_trank]+0), *(tspec[my_trank]+1), *(tspec[my_trank]+2)));
480 if(sapi->thread_num==collector) {
482 __omp_global_pointer = *p_indata;
490 if(sapi->thread_num!=collector) {
491 tdata = (
char const **) __omp_global_pointer;
492 tdata[my_trank] = data;
504 int _sion_omp_get_capability_cb(
void *commdata )
507 _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
509 if(sapi->thread_num==0) {
511 DPRINTFP((256,
"_sion_omp_get_capability_cb", sapi->thread_num,
"FULL capability\n"));
514 DPRINTFP((256,
"_sion_omp_get_capability_cb", sapi->thread_num,
"ONLY SENDER capability\n"));
520 int _sion_omp_size_of_dtype(
int dtype) {
522 case _SION_INT32:
return(
sizeof(sion_int32));
break;
523 case _SION_INT64:
return(
sizeof(sion_int64));
break;
524 case _SION_CHAR:
return(
sizeof(
char));
break;
525 default:
return(
sizeof(sion_int64));
#define SION_CAPABILITY_FULL
#define SION_CAPABILITY_NONE
#define SION_CAPABILITY_ONLY_SENDER
int _sion_errorprint(int rc, int level, const char *format,...)
Internal SION error.