SIONlib  1.7.0
Scalable I/O library for parallel access to task-local files
sion_omp_cb_gen.c
Go to the documentation of this file.
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2016 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
19 #include <stdlib.h>
20 #include <stdio.h>
21 #include <stdarg.h>
22 #include <string.h>
23 #include <time.h>
24 
25 #include <sys/time.h>
26 
27 #include <sys/types.h>
28 #include <fcntl.h>
29 
30 #include <unistd.h>
31 
32 #include "sion.h"
33 #include "sion_debug.h"
34 #include "sion_error_handler.h"
35 #include "sion_internal.h"
36 #include "sion_fd.h"
37 #include "sion_filedesc.h"
38 #include "sion_printts.h"
39 
40 #include "sion_omp_cb_gen.h"
41 
42 #ifdef SION_OMP
43 
44 #include "omp.h"
45 
46 static void *__omp_global_pointer;
47 
48 int _sion_omp_size_of_dtype(int dtype);
49 
50 int __sion_omp_get_info_from_other(void *data, sion_int64 *spec, int spec_len,
51  void *commdata, int collector, int range_start, int range_end,
52  sion_int64 ***p_spec, char ***p_indata );
53 
54 int _sion_register_callbacks_omp() {
55  int aid=0;
56  aid=sion_generic_create_api("SIONlib_OMP_API");
57 
58 
59  sion_generic_register_create_local_commgroup_cb(aid,&_sion_omp_create_lcg_cb);
60  sion_generic_register_free_local_commgroup_cb(aid,&_sion_omp_free_lcg_cb);
61 
62  sion_generic_register_barrier_cb(aid,&_sion_omp_barrier_cb);
63  sion_generic_register_bcastr_cb(aid,&_sion_omp_bcastr_cb);
64  sion_generic_register_gatherr_cb(aid,&_sion_omp_gatherr_cb);
65  sion_generic_register_scatterr_cb(aid,&_sion_omp_scatterr_cb);
66  sion_generic_register_gathervr_cb(aid,&_sion_omp_gathervr_cb);
67  sion_generic_register_scattervr_cb(aid,&_sion_omp_scattervr_cb);
68  sion_generic_register_gather_and_execute_cb(aid,&_sion_omp_gather_process_cb);
69  sion_generic_register_execute_and_scatter_cb(aid,&_sion_omp_process_scatter_cb);
70  sion_generic_register_get_capability_cb(aid,&_sion_omp_get_capability_cb);
71 
72  return(aid);
73 }
74 
75 int _sion_omp_create_lcg_cb(void **local_commgroup, void *global_commgroup,
76  int grank, int gsize,
77  int lrank, int lsize,
78  int filenumber, int numfiles
79  )
80 {
81  int rc=SION_STD_SUCCESS;
82 
83  DPRINTFP((256, "_sion_omp_create_lcg_cb", grank, " DUMMY for split global comm: grank=%d gsize=%d filenumber=%d, numfiles=%d, lrank=%d lsize=%d \n",
84  grank, gsize, filenumber, numfiles, lrank, lsize));
85  *local_commgroup=global_commgroup;
86 
87  return rc;
88 }
89 
90 int _sion_omp_free_lcg_cb(void *local_commgroup) {
91  int rc=SION_STD_SUCCESS;
92  DPRINTFP((256, "_omp_free_lcg_cb", _SION_DEFAULT_RANK, " DUMMY for free local comm\n"));
93  return rc;
94 }
95 
96 
97 int _sion_omp_barrier_cb(void *commdata)
98 {
99  int rc=SION_STD_SUCCESS;
100  {
101 #pragma omp barrier
102  }
103  return rc;
104 }
105 
106 int _sion_omp_bcastr_cb(void *data, void *commdata, int dtype, int nelem, int root)
107 {
108  int rc=SION_STD_SUCCESS;
109  _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
110 
111 
112 
113  if(sapi->thread_num==root) {
114  __omp_global_pointer = data;
115  }
116 
117  /* threads sync global pointer */
118 
119  {
120 #pragma omp barrier
121  }
122 
123  if((sapi->thread_num!=root) && (__omp_global_pointer != NULL)) {
124  memcpy(data,__omp_global_pointer,nelem*_sion_omp_size_of_dtype(dtype));
125  }
126  {
127 #pragma omp barrier
128  }
129 
130  return rc;
131 }
132 
133 
134 /* indata: data that comes into fuinction, outdata: data that will be created by function */
135 int _sion_omp_gatherr_cb(void *indata, void *outdata, void *commdata, int dtype, int nelem, int root)
136 {
137 
138  int rc=SION_STD_SUCCESS;
139  _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
140 
141  /* set global pointer to outdata */
142  if(sapi->thread_num==root) {
143  __omp_global_pointer = outdata;
144  }
145 
146  /* synchronize */
147  {
148 #pragma omp barrier
149  }
150 
151 
152  /* copy indata -> outdata */
153  memcpy(__omp_global_pointer+sapi->thread_num*nelem*_sion_omp_size_of_dtype(dtype),
154  indata,
155  nelem*_sion_omp_size_of_dtype(dtype)
156  );
157 
158  /* synchronize again */
159  {
160 #pragma omp barrier
161  }
162  return rc;
163 }
164 
165 int _sion_omp_scatterr_cb(void *indata, void *outdata, void *commdata, int dtype, int nelem, int root)
166 {
167 
168  int rc=SION_STD_SUCCESS;
169  _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
170 
171  /* set global pointer to indata */
172  if(sapi->thread_num==root) {
173  __omp_global_pointer = indata;
174  }
175 
176 
177  /* synchronize */
178  {
179 #pragma omp barrier
180  }
181 
182  /* copy outdata <- indata */
183  memcpy( outdata,
184  __omp_global_pointer+sapi->thread_num*nelem*_sion_omp_size_of_dtype(dtype),
185  nelem*_sion_omp_size_of_dtype(dtype)
186  );
187 
188  /* synchronize again */
189  {
190 #pragma omp barrier
191  }
192 
193  return rc;
194 }
195 
196 
197 int _sion_omp_gathervr_cb(void *indata, void *outdata, void *commdata, int dtype, int *counts, int nelem, int root)
198 {
199 
200  int rc=SION_STD_SUCCESS;
201  int *displs=NULL;
202  int t, offset;
203  _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
204 
205  /* compute offsets sizes for each thread and store these values in a temporary vector */
206  if(sapi->thread_num==root) {
207 
208  displs = (int *) malloc(sapi->num_threads * sizeof(int));
209  if (displs == NULL) {
210  fprintf(stderr,"__sion_omp_gathervr_cb: cannot allocate temporary memory of size %zu (displs), aborting ...\n",
211  (size_t) sapi->num_threads * sizeof(int));
212  return(-1);
213  }
214  offset=0;
215  for(t=0;t<sapi->num_threads;t++) {
216  displs[t]=offset;
217  offset+=counts[t];
218  DPRINTFP((256, "_sion_omp_gathervr_cb", sapi->thread_num, " after compute %2d -> dpls=%2d count=%d\n", t,displs[t],counts[t]));
219  }
220 
221  /* set global pointer to temporary vector */
222  __omp_global_pointer = displs;
223  }
224 
225  /* synchronize */
226  {
227 #pragma omp barrier
228  }
229 
230  /* get offset */
231  offset= ((int*)__omp_global_pointer)[sapi->thread_num];
232 
233  /* synchronize again */
234  {
235 #pragma omp barrier
236  }
237 
238  /* free temporary vector and set global pointer to outdata */
239  if(sapi->thread_num==root) {
240  if(displs) free(displs);
241 
242  }
243 
244  /* set global pointer to temporary vector */
245  __omp_global_pointer = outdata;
246 
247 
248  /* synchronize again */
249  {
250 #pragma omp barrier
251  }
252 
253  /* copy indata -> outdata */
254  memcpy(( char * ) __omp_global_pointer + offset * _sion_omp_size_of_dtype(dtype),
255  indata,
256  nelem*_sion_omp_size_of_dtype(dtype)
257  );
258 
259  /* synchronize again */
260  {
261 #pragma omp barrier
262  }
263  return rc;
264 }
265 
266 
267 int _sion_omp_scattervr_cb(void *indata, void *outdata, void *commdata, int dtype, int *counts, int nelem, int root)
268 {
269 
270  int rc=SION_STD_SUCCESS;
271  int *displs=NULL;
272  int t, offset;
273  _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
274 
275  /* compute offsets for each thread and store these values in a temporary vector */
276  if(sapi->thread_num==root) {
277 
278  displs = (int *) malloc(sapi->num_threads * sizeof(int));
279  if (displs == NULL) {
280  fprintf(stderr,"__sion_omp_gathervr_cb: cannot allocate temporary memory of size %zu (displs), aborting ...\n",
281  (size_t) sapi->num_threads * sizeof(int));
282  return(-1);
283  }
284  offset=0;
285  for(t=0;t<sapi->num_threads;t++) {
286  displs[t]=offset;
287  offset+=counts[t];
288  DPRINTFP((256, "_sion_omp_gathervr_cb", sapi->thread_num, " after compute %2d -> dpls=%2d count=%d\n", t,displs[t],counts[t]));
289  }
290 
291  /* set global pointer to temporary vector */
292  __omp_global_pointer = displs;
293  }
294 
295  /* synchronize */
296  {
297 #pragma omp barrier
298  }
299 
300  /* get offset */
301  offset= ((int*)__omp_global_pointer)[sapi->thread_num];
302 
303  /* synchronize again */
304  {
305 #pragma omp barrier
306  }
307 
308  /* free temporary vector and set global pointer to outdata */
309  if(sapi->thread_num==root) {
310  if(displs) free(displs);
311 
312  }
313 
314  /* set global pointer to temporary vector */
315  __omp_global_pointer = indata;
316 
317 
318  /* synchronize again */
319  {
320 #pragma omp barrier
321  }
322 
323  /* copy indata -> outdata */
324  memcpy(outdata,
325  ( char * ) __omp_global_pointer + offset * _sion_omp_size_of_dtype(dtype),
326  nelem*_sion_omp_size_of_dtype(dtype)
327  );
328 
329  /* synchronize again */
330  {
331 #pragma omp barrier
332  }
333  return rc;
334 }
335 
336 
337 int _sion_omp_gather_process_cb(const void *indata, sion_int64 *spec, int spec_len, sion_int64 fsblksize,
338  void *commdata, int collector, int range_start, int range_end, int sid,
339  int process_cb(const void *,sion_int64 *, int ) ) {
340  int rc=SION_STD_SUCCESS;
341 
342  sion_int64 **p_spec=NULL;
343  char **p_indata=NULL;
344  int t, trank;
345  _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
346 
347 
348  __sion_omp_get_info_from_other((void*) indata, spec, spec_len,
349  commdata, collector, range_start, range_end,
350  &p_spec, &p_indata );
351 
352 
353  /* process */
354  if(sapi->thread_num==collector) {
355 
356  /* scan all other tasks */
357  for(t=range_start;t<=range_end;t++) {
358  trank=t-range_start;
359  DPRINTFP((256, "_sion_omp_gather_process_cb", -1, "on master t=%d spec=%ld,%ld,%ld \n",trank,
360  *(p_spec[trank]+0), *(p_spec[trank]+1), *(p_spec[trank]+2)));
361 
362  rc=process_cb(p_indata[trank],p_spec[trank], sid);
363  }
364 
365  if(rc != SION_STD_SUCCESS) {
366  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"_sion_omp_gather_process_cb: problems writing data ...\n"));
367  }
368 
369  if(p_spec) free(p_spec);
370  if(p_indata) free(p_indata);
371 
372  }
373 
374  /* synchronize */
375  {
376 #pragma omp barrier
377  }
378 
379 
380  return(rc);
381 }
382 
383 int _sion_omp_process_scatter_cb(void *outdata, sion_int64 *spec, int spec_len, sion_int64 fsblksize,
384  void *commdata, int collector, int range_start, int range_end, int sid,
385  int process_cb(void *,sion_int64 *, int ) ) {
386  int rc=SION_STD_SUCCESS;
387 
388  sion_int64 **p_spec=NULL;
389  char **p_indata=NULL;
390  int t, trank;
391  _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
392 
393 
394  __sion_omp_get_info_from_other((void*) outdata, spec, spec_len,
395  commdata, collector, range_start, range_end,
396  &p_spec, &p_indata );
397 
398 
399  /* process */
400  if(sapi->thread_num==collector) {
401 
402  /* scan all other tasks */
403  for(t=range_start;t<=range_end;t++) {
404  trank=t-range_start;
405  DPRINTFP((256, "_sion_omp_gather_process_cb", -1, "on master t=%d spec=%ld,%ld,%ld \n",trank,
406  *(p_spec[trank]+0), *(p_spec[trank]+1), *(p_spec[trank]+2)));
407 
408  rc=process_cb(p_indata[trank],p_spec[trank], sid);
409  }
410 
411  if(rc != SION_STD_SUCCESS) {
412  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"_sion_omp_gather_process_cb: problems writing data ...\n"));
413  }
414 
415  if(p_spec) free(p_spec);
416  if(p_indata) free(p_indata);
417 
418  }
419 
420  /* synchronize */
421  {
422 #pragma omp barrier
423  }
424 
425 
426 
427  return(rc);
428 }
429 
430 int __sion_omp_get_info_from_other(void *data, sion_int64 *spec, int spec_len,
431  void *commdata, int collector, int range_start, int range_end,
432  sion_int64 ***p_spec, char ***p_indata ) {
433  int rc=SION_SUCCESS;
434 
435  sion_int64 **tspec;
436  char const **tdata;
437  int num_sender, my_trank;
438  _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
439 
440  num_sender = range_end-range_start+1;
441  my_trank = sapi->thread_num-range_start;
442 
443  /* collect offsets and sizes for each thread */
444  if(sapi->thread_num==collector) {
445 
446  *p_spec = (sion_int64 **) malloc(num_sender * sizeof(sion_int64 *));
447  if (*p_spec == NULL) {
448  fprintf(stderr,"_sion_omp_gather_process_cb: cannot allocate temporary memory of size %zu (p_spec), aborting ...\n",
449  (size_t) num_sender * sizeof(sion_int64 *));
450  return(-1);
451  }
452  *p_indata = (char **) malloc(num_sender * sizeof(char *));
453  if (*p_indata == NULL) {
454  fprintf(stderr,"_sion_omp_gather_process_cb: cannot allocate temporary memory of size %zu (p_indata), aborting ...\n",
455  (size_t) num_sender * sizeof(char *));
456  return(-1);
457  }
458 
459  /* set global pointer to temporary vector */
460  __omp_global_pointer = *p_spec;
461  }
462 
463  /* synchronize */
464  {
465 #pragma omp barrier
466  }
467 
468  if(sapi->thread_num!=collector) {
469  tspec = (sion_int64 **) __omp_global_pointer;
470  tspec[my_trank] = spec;
471  DPRINTFP((256, "_sion_omp_gather_process_cb", -1, "on sender t=%d spec=%ld,%ld,%ld \n",my_trank,
472  *(tspec[my_trank]+0), *(tspec[my_trank]+1), *(tspec[my_trank]+2)));
473 
474  }
475 
476  /* synchronize */
477  {
478 #pragma omp barrier
479  }
480 
481  if(sapi->thread_num==collector) {
482  /* set global pointer to temporary vector */
483  __omp_global_pointer = *p_indata;
484  }
485 
486  /* synchronize */
487  {
488 #pragma omp barrier
489  }
490 
491  if(sapi->thread_num!=collector) {
492  tdata = (char const **) __omp_global_pointer;
493  tdata[my_trank] = data;
494  }
495 
496  /* synchronize */
497  {
498 #pragma omp barrier
499  }
500 
501 
502  return(rc);
503 }
504 
505 int _sion_omp_get_capability_cb(void *commdata )
506 {
507  int rc=SION_CAPABILITY_NONE;
508  _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
509 
510  if(sapi->thread_num==0) {
512  DPRINTFP((256, "_sion_omp_get_capability_cb", sapi->thread_num, "FULL capability\n"));
513  } else {
515  DPRINTFP((256, "_sion_omp_get_capability_cb", sapi->thread_num, "ONLY SENDER capability\n"));
516  }
517  return rc;
518 }
519 
520 
521 int _sion_omp_size_of_dtype(int dtype) {
522  switch (dtype) {
523  case _SION_INT32: return(sizeof(sion_int32)); break;
524  case _SION_INT64: return(sizeof(sion_int64)); break;
525  case _SION_CHAR: return(sizeof(char)); break;
526  default: return(sizeof(sion_int64));
527  }
528 }
529 
530 /* end of ifdef OPENMP */
531 #endif
#define SION_CAPABILITY_FULL
Definition: sion_filedesc.h:50
#define SION_CAPABILITY_NONE
Definition: sion_filedesc.h:52
#define SION_CAPABILITY_ONLY_SENDER
Definition: sion_filedesc.h:51
Sion Time Stamp Header.