SIONlib  1.6.2
Scalable I/O library for parallel access to task-local files
sion_omp_cb_gen.c
Go to the documentation of this file.
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2016 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
19 #include <stdlib.h>
20 #include <stdio.h>
21 #include <stdarg.h>
22 #include <string.h>
23 #include <time.h>
24 
25 #include <sys/time.h>
26 
27 #include <sys/types.h>
28 #include <fcntl.h>
29 
30 #include <unistd.h>
31 
32 #include "sion.h"
33 #include "sion_debug.h"
34 #include "sion_internal.h"
35 #include "sion_fd.h"
36 #include "sion_filedesc.h"
37 #include "sion_printts.h"
38 
39 #include "sion_omp_cb_gen.h"
40 
41 #ifdef SION_OMP
42 
43 #include "omp.h"
44 
45 static void *__omp_global_pointer;
46 
47 int _sion_omp_size_of_dtype(int dtype);
48 
49 int __sion_omp_get_info_from_other(void *data, sion_int64 *spec, int spec_len,
50  void *commdata, int collector, int range_start, int range_end,
51  sion_int64 ***p_spec, char ***p_indata );
52 
53 int _sion_register_callbacks_omp() {
54  int aid=-1;
55  aid=sion_generic_create_api("SIONlib_OMP_API");
56 
57 
58  sion_generic_register_create_local_commgroup_cb(aid,&_sion_omp_create_lcg_cb);
59  sion_generic_register_free_local_commgroup_cb(aid,&_sion_omp_free_lcg_cb);
60 
61  sion_generic_register_barrier_cb(aid,&_sion_omp_barrier_cb);
62  sion_generic_register_bcastr_cb(aid,&_sion_omp_bcastr_cb);
63  sion_generic_register_gatherr_cb(aid,&_sion_omp_gatherr_cb);
64  sion_generic_register_scatterr_cb(aid,&_sion_omp_scatterr_cb);
65  sion_generic_register_gathervr_cb(aid,&_sion_omp_gathervr_cb);
66  sion_generic_register_scattervr_cb(aid,&_sion_omp_scattervr_cb);
67  sion_generic_register_gather_and_execute_cb(aid,&_sion_omp_gather_process_cb);
68  sion_generic_register_execute_and_scatter_cb(aid,&_sion_omp_process_scatter_cb);
69  sion_generic_register_get_capability_cb(aid,&_sion_omp_get_capability_cb);
70 
71  return(aid);
72 }
73 
74 int _sion_omp_create_lcg_cb(void **local_commgroup, void *global_commgroup,
75  int grank, int gsize,
76  int lrank, int lsize,
77  int filenumber, int numfiles
78  )
79 {
80  int rc=SION_STD_SUCCESS;
81 
82  DPRINTFP((256, "_sion_omp_create_lcg_cb", grank, " DUMMY for split global comm: grank=%d gsize=%d filenumber=%d, numfiles=%d, lrank=%d lsize=%d \n",
83  grank, gsize, filenumber, numfiles, lrank, lsize));
84  *local_commgroup=global_commgroup;
85 
86  return rc;
87 }
88 
89 int _sion_omp_free_lcg_cb(void *local_commgroup) {
90  int rc=SION_STD_SUCCESS;
91  DPRINTFP((256, "_omp_free_lcg_cb", _SION_DEFAULT_RANK, " DUMMY for free local comm\n"));
92  return rc;
93 }
94 
95 
96 int _sion_omp_barrier_cb(void *commdata)
97 {
98  int rc=SION_STD_SUCCESS;
99  {
100 #pragma omp barrier
101  }
102  return rc;
103 }
104 
105 int _sion_omp_bcastr_cb(void *data, void *commdata, int dtype, int nelem, int root)
106 {
107  int rc=SION_STD_SUCCESS;
108  _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
109 
110 
111 
112  if(sapi->thread_num==root) {
113  __omp_global_pointer = data;
114  }
115 
116  /* threads sync global pointer */
117 
118  {
119 #pragma omp barrier
120  }
121 
122  if((sapi->thread_num!=root) && (__omp_global_pointer != NULL)) {
123  memcpy(data,__omp_global_pointer,nelem*_sion_omp_size_of_dtype(dtype));
124  }
125  {
126 #pragma omp barrier
127  }
128 
129  return rc;
130 }
131 
132 
133 /* indata: data that comes into fuinction, outdata: data that will be created by function */
134 int _sion_omp_gatherr_cb(void *indata, void *outdata, void *commdata, int dtype, int nelem, int root)
135 {
136 
137  int rc=SION_STD_SUCCESS;
138  _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
139 
140  /* set global pointer to outdata */
141  if(sapi->thread_num==root) {
142  __omp_global_pointer = outdata;
143  }
144 
145  /* synchronize */
146  {
147 #pragma omp barrier
148  }
149 
150 
151  /* copy indata -> outdata */
152  memcpy(__omp_global_pointer+sapi->thread_num*nelem*_sion_omp_size_of_dtype(dtype),
153  indata,
154  nelem*_sion_omp_size_of_dtype(dtype)
155  );
156 
157  /* synchronize again */
158  {
159 #pragma omp barrier
160  }
161  return rc;
162 }
163 
164 int _sion_omp_scatterr_cb(void *indata, void *outdata, void *commdata, int dtype, int nelem, int root)
165 {
166 
167  int rc=SION_STD_SUCCESS;
168  _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
169 
170  /* set global pointer to indata */
171  if(sapi->thread_num==root) {
172  __omp_global_pointer = indata;
173  }
174 
175 
176  /* synchronize */
177  {
178 #pragma omp barrier
179  }
180 
181  /* copy outdata <- indata */
182  memcpy( outdata,
183  __omp_global_pointer+sapi->thread_num*nelem*_sion_omp_size_of_dtype(dtype),
184  nelem*_sion_omp_size_of_dtype(dtype)
185  );
186 
187  /* synchronize again */
188  {
189 #pragma omp barrier
190  }
191 
192  return rc;
193 }
194 
195 
196 int _sion_omp_gathervr_cb(void *indata, void *outdata, void *commdata, int dtype, int *counts, int nelem, int root)
197 {
198 
199  int rc=SION_STD_SUCCESS;
200  int *displs=NULL;
201  int t, offset;
202  _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
203 
204  /* compute offsets sizes for each thread and store these values in a temporary vector */
205  if(sapi->thread_num==root) {
206 
207  displs = (int *) malloc(sapi->num_threads * sizeof(int));
208  if (displs == NULL) {
209  fprintf(stderr,"__sion_omp_gathervr_cb: cannot allocate temporary memory of size %lu (displs), aborting ...\n",
210  (unsigned long) sapi->num_threads * sizeof(int));
211  return(-1);
212  }
213  offset=0;
214  for(t=0;t<sapi->num_threads;t++) {
215  displs[t]=offset;
216  offset+=counts[t];
217  DPRINTFP((256, "_sion_omp_gathervr_cb", sapi->thread_num, " after compute %2d -> dpls=%2d count=%d\n", t,displs[t],counts[t]));
218  }
219 
220  /* set global pointer to temporary vector */
221  __omp_global_pointer = displs;
222  }
223 
224  /* synchronize */
225  {
226 #pragma omp barrier
227  }
228 
229  /* get offset */
230  offset= ((int*)__omp_global_pointer)[sapi->thread_num];
231 
232  /* synchronize again */
233  {
234 #pragma omp barrier
235  }
236 
237  /* free temporary vector and set global pointer to outdata */
238  if(sapi->thread_num==root) {
239  if(displs) free(displs);
240 
241  }
242 
243  /* set global pointer to temporary vector */
244  __omp_global_pointer = outdata;
245 
246 
247  /* synchronize again */
248  {
249 #pragma omp barrier
250  }
251 
252  /* copy indata -> outdata */
253  memcpy(( char * ) __omp_global_pointer + offset * _sion_omp_size_of_dtype(dtype),
254  indata,
255  nelem*_sion_omp_size_of_dtype(dtype)
256  );
257 
258  /* synchronize again */
259  {
260 #pragma omp barrier
261  }
262  return rc;
263 }
264 
265 
266 int _sion_omp_scattervr_cb(void *indata, void *outdata, void *commdata, int dtype, int *counts, int nelem, int root)
267 {
268 
269  int rc=SION_STD_SUCCESS;
270  int *displs=NULL;
271  int t, offset;
272  _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
273 
274  /* compute offsets for each thread and store these values in a temporary vector */
275  if(sapi->thread_num==root) {
276 
277  displs = (int *) malloc(sapi->num_threads * sizeof(int));
278  if (displs == NULL) {
279  fprintf(stderr,"__sion_omp_gathervr_cb: cannot allocate temporary memory of size %lu (displs), aborting ...\n",
280  (unsigned long) sapi->num_threads * sizeof(int));
281  return(-1);
282  }
283  offset=0;
284  for(t=0;t<sapi->num_threads;t++) {
285  displs[t]=offset;
286  offset+=counts[t];
287  DPRINTFP((256, "_sion_omp_gathervr_cb", sapi->thread_num, " after compute %2d -> dpls=%2d count=%d\n", t,displs[t],counts[t]));
288  }
289 
290  /* set global pointer to temporary vector */
291  __omp_global_pointer = displs;
292  }
293 
294  /* synchronize */
295  {
296 #pragma omp barrier
297  }
298 
299  /* get offset */
300  offset= ((int*)__omp_global_pointer)[sapi->thread_num];
301 
302  /* synchronize again */
303  {
304 #pragma omp barrier
305  }
306 
307  /* free temporary vector and set global pointer to outdata */
308  if(sapi->thread_num==root) {
309  if(displs) free(displs);
310 
311  }
312 
313  /* set global pointer to temporary vector */
314  __omp_global_pointer = indata;
315 
316 
317  /* synchronize again */
318  {
319 #pragma omp barrier
320  }
321 
322  /* copy indata -> outdata */
323  memcpy(outdata,
324  ( char * ) __omp_global_pointer + offset * _sion_omp_size_of_dtype(dtype),
325  nelem*_sion_omp_size_of_dtype(dtype)
326  );
327 
328  /* synchronize again */
329  {
330 #pragma omp barrier
331  }
332  return rc;
333 }
334 
335 
336 int _sion_omp_gather_process_cb(const void *indata, sion_int64 *spec, int spec_len, sion_int64 fsblksize,
337  void *commdata, int collector, int range_start, int range_end, int sid,
338  int process_cb(const void *,sion_int64 *, int ) ) {
339  int rc=SION_STD_SUCCESS;
340 
341  sion_int64 **p_spec=NULL;
342  char **p_indata=NULL;
343  int t, trank;
344  _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
345 
346 
347  __sion_omp_get_info_from_other((void*) indata, spec, spec_len,
348  commdata, collector, range_start, range_end,
349  &p_spec, &p_indata );
350 
351 
352  /* process */
353  if(sapi->thread_num==collector) {
354 
355  /* scan all other tasks */
356  for(t=range_start;t<=range_end;t++) {
357  trank=t-range_start;
358  DPRINTFP((256, "_sion_omp_gather_process_cb", -1, "on master t=%d spec=%ld,%ld,%ld \n",trank,
359  *(p_spec[trank]+0), *(p_spec[trank]+1), *(p_spec[trank]+2)));
360 
361  rc=process_cb(p_indata[trank],p_spec[trank], sid);
362  }
363 
364  if(rc != SION_STD_SUCCESS) {
365  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"_sion_omp_gather_process_cb: problems writing data ...\n"));
366  }
367 
368  if(p_spec) free(p_spec);
369  if(p_indata) free(p_indata);
370 
371  }
372 
373  /* synchronize */
374  {
375 #pragma omp barrier
376  }
377 
378 
379  return(rc);
380 }
381 
382 int _sion_omp_process_scatter_cb(void *outdata, sion_int64 *spec, int spec_len, sion_int64 fsblksize,
383  void *commdata, int collector, int range_start, int range_end, int sid,
384  int process_cb(void *,sion_int64 *, int ) ) {
385  int rc=SION_STD_SUCCESS;
386 
387  sion_int64 **p_spec=NULL;
388  char **p_indata=NULL;
389  int t, trank;
390  _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
391 
392 
393  __sion_omp_get_info_from_other((void*) outdata, spec, spec_len,
394  commdata, collector, range_start, range_end,
395  &p_spec, &p_indata );
396 
397 
398  /* process */
399  if(sapi->thread_num==collector) {
400 
401  /* scan all other tasks */
402  for(t=range_start;t<=range_end;t++) {
403  trank=t-range_start;
404  DPRINTFP((256, "_sion_omp_gather_process_cb", -1, "on master t=%d spec=%ld,%ld,%ld \n",trank,
405  *(p_spec[trank]+0), *(p_spec[trank]+1), *(p_spec[trank]+2)));
406 
407  rc=process_cb(p_indata[trank],p_spec[trank], sid);
408  }
409 
410  if(rc != SION_STD_SUCCESS) {
411  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"_sion_omp_gather_process_cb: problems writing data ...\n"));
412  }
413 
414  if(p_spec) free(p_spec);
415  if(p_indata) free(p_indata);
416 
417  }
418 
419  /* synchronize */
420  {
421 #pragma omp barrier
422  }
423 
424 
425 
426  return(rc);
427 }
428 
429 int __sion_omp_get_info_from_other(void *data, sion_int64 *spec, int spec_len,
430  void *commdata, int collector, int range_start, int range_end,
431  sion_int64 ***p_spec, char ***p_indata ) {
432  int rc=SION_SUCCESS;
433 
434  sion_int64 **tspec;
435  char const **tdata;
436  int num_sender, my_trank;
437  _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
438 
439  num_sender = range_end-range_start+1;
440  my_trank = sapi->thread_num-range_start;
441 
442  /* collect offsets and sizes for each thread */
443  if(sapi->thread_num==collector) {
444 
445  *p_spec = (sion_int64 **) malloc(num_sender * sizeof(sion_int64 *));
446  if (p_spec == NULL) {
447  fprintf(stderr,"_sion_omp_gather_process_cb: cannot allocate temporary memory of size %lu (p_spec), aborting ...\n",
448  (unsigned long) num_sender * sizeof(sion_int64 *));
449  return(-1);
450  }
451  *p_indata = (char **) malloc(num_sender * sizeof(char *));
452  if (p_spec == NULL) {
453  fprintf(stderr,"_sion_omp_gather_process_cb: cannot allocate temporary memory of size %lu (p_indata), aborting ...\n",
454  (unsigned long) num_sender * sizeof(char *));
455  return(-1);
456  }
457 
458  /* set global pointer to temporary vector */
459  __omp_global_pointer = *p_spec;
460  }
461 
462  /* synchronize */
463  {
464 #pragma omp barrier
465  }
466 
467  if(sapi->thread_num!=collector) {
468  tspec = (sion_int64 **) __omp_global_pointer;
469  tspec[my_trank] = spec;
470  DPRINTFP((256, "_sion_omp_gather_process_cb", -1, "on sender t=%d spec=%ld,%ld,%ld \n",my_trank,
471  *(tspec[my_trank]+0), *(tspec[my_trank]+1), *(tspec[my_trank]+2)));
472 
473  }
474 
475  /* synchronize */
476  {
477 #pragma omp barrier
478  }
479 
480  if(sapi->thread_num==collector) {
481  /* set global pointer to temporary vector */
482  __omp_global_pointer = *p_indata;
483  }
484 
485  /* synchronize */
486  {
487 #pragma omp barrier
488  }
489 
490  if(sapi->thread_num!=collector) {
491  tdata = (char const **) __omp_global_pointer;
492  tdata[my_trank] = data;
493  }
494 
495  /* synchronize */
496  {
497 #pragma omp barrier
498  }
499 
500 
501  return(rc);
502 }
503 
504 int _sion_omp_get_capability_cb(void *commdata )
505 {
506  int rc=SION_CAPABILITY_NONE;
507  _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
508 
509  if(sapi->thread_num==0) {
511  DPRINTFP((256, "_sion_omp_get_capability_cb", sapi->thread_num, "FULL capability\n"));
512  } else {
514  DPRINTFP((256, "_sion_omp_get_capability_cb", sapi->thread_num, "ONLY SENDER capability\n"));
515  }
516  return rc;
517 }
518 
519 
520 int _sion_omp_size_of_dtype(int dtype) {
521  switch (dtype) {
522  case _SION_INT32: return(sizeof(sion_int32)); break;
523  case _SION_INT64: return(sizeof(sion_int64)); break;
524  case _SION_CHAR: return(sizeof(char)); break;
525  default: return(sizeof(sion_int64));
526  }
527 }
528 
529 /* end of ifdef OPENMP */
530 #endif
#define SION_CAPABILITY_FULL
Definition: sion_filedesc.h:50
#define SION_CAPABILITY_NONE
Definition: sion_filedesc.h:52
#define SION_CAPABILITY_ONLY_SENDER
Definition: sion_filedesc.h:51
int _sion_errorprint(int rc, int level, const char *format,...)
Internal SION error.
Sion Time Stamp Header.