SIONlib  1.7.7
Scalable I/O library for parallel access to task-local files
sion_omp_cb_gen.c
Go to the documentation of this file.
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2019 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
19 #define _XOPEN_SOURCE 700
20 
21 #include <stdlib.h>
22 #include <stdio.h>
23 #include <stdarg.h>
24 #include <string.h>
25 #include <time.h>
26 
27 #include <sys/time.h>
28 
29 #include <sys/types.h>
30 #include <fcntl.h>
31 
32 #include <unistd.h>
33 
34 #include "sion.h"
35 #include "sion_debug.h"
36 #include "sion_error_handler.h"
37 #include "sion_internal.h"
38 #include "sion_fd.h"
39 #include "sion_filedesc.h"
40 #include "sion_printts.h"
41 
42 #include "sion_omp_cb_gen.h"
43 
44 #ifdef SION_OMP
45 
46 #include "omp.h"
47 
48 static void *__omp_global_pointer;
49 
50 int _sion_omp_size_of_dtype(int dtype);
51 
52 int __sion_omp_get_info_from_other(void *data, sion_int64 *spec, int spec_len,
53  void *commdata, int collector, int range_start, int range_end,
54  sion_int64 ***p_spec, char ***p_indata );
55 
56 int _sion_register_callbacks_omp(void) {
57  int aid=0;
58  aid=sion_generic_create_api("SIONlib_OMP_API");
59 
60 
61  sion_generic_register_create_local_commgroup_cb(aid,&_sion_omp_create_lcg_cb);
62  sion_generic_register_free_local_commgroup_cb(aid,&_sion_omp_free_lcg_cb);
63 
64  sion_generic_register_barrier_cb(aid,&_sion_omp_barrier_cb);
65  sion_generic_register_bcastr_cb(aid,&_sion_omp_bcastr_cb);
66  sion_generic_register_gatherr_cb(aid,&_sion_omp_gatherr_cb);
67  sion_generic_register_scatterr_cb(aid,&_sion_omp_scatterr_cb);
68  sion_generic_register_gathervr_cb(aid,&_sion_omp_gathervr_cb);
69  sion_generic_register_scattervr_cb(aid,&_sion_omp_scattervr_cb);
70  sion_generic_register_gather_and_execute_cb(aid,&_sion_omp_gather_process_cb);
71  sion_generic_register_execute_and_scatter_cb(aid,&_sion_omp_process_scatter_cb);
72  sion_generic_register_get_capability_cb(aid,&_sion_omp_get_capability_cb);
73 
74  return(aid);
75 }
76 
77 int _sion_omp_create_lcg_cb(void **local_commgroup, void *global_commgroup,
78  int grank, int gsize,
79  int lrank, int lsize,
80  int filenumber, int numfiles
81  )
82 {
83  int rc=SION_STD_SUCCESS;
84 
85  DPRINTFP((256, "_sion_omp_create_lcg_cb", grank, " DUMMY for split global comm: grank=%d gsize=%d filenumber=%d, numfiles=%d, lrank=%d lsize=%d \n",
86  grank, gsize, filenumber, numfiles, lrank, lsize));
87  *local_commgroup=global_commgroup;
88 
89  return rc;
90 }
91 
92 int _sion_omp_free_lcg_cb(void *local_commgroup) {
93  int rc=SION_STD_SUCCESS;
94  DPRINTFP((256, "_omp_free_lcg_cb", _SION_DEFAULT_RANK, " DUMMY for free local comm\n"));
95  return rc;
96 }
97 
98 
99 int _sion_omp_barrier_cb(void *commdata)
100 {
101  int rc=SION_STD_SUCCESS;
102  {
103 #pragma omp barrier
104  }
105  return rc;
106 }
107 
108 int _sion_omp_bcastr_cb(void *data, void *commdata, int dtype, int nelem, int root)
109 {
110  int rc=SION_STD_SUCCESS;
111  _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
112 
113 
114 
115  if(sapi->thread_num==root) {
116  __omp_global_pointer = data;
117  }
118 
119  /* threads sync global pointer */
120 
121  {
122 #pragma omp barrier
123  }
124 
125  if((sapi->thread_num!=root) && (__omp_global_pointer != NULL)) {
126  memcpy(data,__omp_global_pointer,nelem*_sion_omp_size_of_dtype(dtype));
127  }
128  {
129 #pragma omp barrier
130  }
131 
132  return rc;
133 }
134 
135 
136 /* indata: data that comes into fuinction, outdata: data that will be created by function */
137 int _sion_omp_gatherr_cb(void *indata, void *outdata, void *commdata, int dtype, int nelem, int root)
138 {
139 
140  int rc=SION_STD_SUCCESS;
141  _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
142 
143  /* set global pointer to outdata */
144  if(sapi->thread_num==root) {
145  __omp_global_pointer = outdata;
146  }
147 
148  /* synchronize */
149  {
150 #pragma omp barrier
151  }
152 
153 
154  /* copy indata -> outdata */
155  memcpy((char *)__omp_global_pointer+sapi->thread_num*nelem*_sion_omp_size_of_dtype(dtype),
156  indata,
157  nelem*_sion_omp_size_of_dtype(dtype)
158  );
159 
160  /* synchronize again */
161  {
162 #pragma omp barrier
163  }
164  return rc;
165 }
166 
167 int _sion_omp_scatterr_cb(void *indata, void *outdata, void *commdata, int dtype, int nelem, int root)
168 {
169 
170  int rc=SION_STD_SUCCESS;
171  _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
172 
173  /* set global pointer to indata */
174  if(sapi->thread_num==root) {
175  __omp_global_pointer = indata;
176  }
177 
178 
179  /* synchronize */
180  {
181 #pragma omp barrier
182  }
183 
184  /* copy outdata <- indata */
185  memcpy( outdata,
186  (char *)__omp_global_pointer+sapi->thread_num*nelem*_sion_omp_size_of_dtype(dtype),
187  nelem*_sion_omp_size_of_dtype(dtype)
188  );
189 
190  /* synchronize again */
191  {
192 #pragma omp barrier
193  }
194 
195  return rc;
196 }
197 
198 
199 int _sion_omp_gathervr_cb(void *indata, void *outdata, void *commdata, int dtype, int *counts, int nelem, int root)
200 {
201 
202  int rc=SION_STD_SUCCESS;
203  int *displs=NULL;
204  int t, offset;
205  _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
206 
207  /* compute offsets sizes for each thread and store these values in a temporary vector */
208  if(sapi->thread_num==root) {
209 
210  displs = (int *) malloc(sapi->num_threads * sizeof(int));
211  if (displs == NULL) {
212  fprintf(stderr,"__sion_omp_gathervr_cb: cannot allocate temporary memory of size %zu (displs), aborting ...\n",
213  (size_t) sapi->num_threads * sizeof(int));
214  return(-1);
215  }
216  offset=0;
217  for(t=0;t<sapi->num_threads;t++) {
218  displs[t]=offset;
219  offset+=counts[t];
220  DPRINTFP((256, "_sion_omp_gathervr_cb", sapi->thread_num, " after compute %2d -> dpls=%2d count=%d\n", t,displs[t],counts[t]));
221  }
222 
223  /* set global pointer to temporary vector */
224  __omp_global_pointer = displs;
225  }
226 
227  /* synchronize */
228  {
229 #pragma omp barrier
230  }
231 
232  /* get offset */
233  offset= ((int*)__omp_global_pointer)[sapi->thread_num];
234 
235  /* synchronize again */
236  {
237 #pragma omp barrier
238  }
239 
240  /* free temporary vector and set global pointer to outdata */
241  if(sapi->thread_num==root) {
242  if(displs) free(displs);
243 
244  }
245 
246  /* set global pointer to temporary vector */
247  __omp_global_pointer = outdata;
248 
249 
250  /* synchronize again */
251  {
252 #pragma omp barrier
253  }
254 
255  /* copy indata -> outdata */
256  memcpy(( char * ) __omp_global_pointer + offset * _sion_omp_size_of_dtype(dtype),
257  indata,
258  nelem*_sion_omp_size_of_dtype(dtype)
259  );
260 
261  /* synchronize again */
262  {
263 #pragma omp barrier
264  }
265  return rc;
266 }
267 
268 
269 int _sion_omp_scattervr_cb(void *indata, void *outdata, void *commdata, int dtype, int *counts, int nelem, int root)
270 {
271 
272  int rc=SION_STD_SUCCESS;
273  int *displs=NULL;
274  int t, offset;
275  _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
276 
277  /* compute offsets for each thread and store these values in a temporary vector */
278  if(sapi->thread_num==root) {
279 
280  displs = (int *) malloc(sapi->num_threads * sizeof(int));
281  if (displs == NULL) {
282  fprintf(stderr,"__sion_omp_gathervr_cb: cannot allocate temporary memory of size %zu (displs), aborting ...\n",
283  (size_t) sapi->num_threads * sizeof(int));
284  return(-1);
285  }
286  offset=0;
287  for(t=0;t<sapi->num_threads;t++) {
288  displs[t]=offset;
289  offset+=counts[t];
290  DPRINTFP((256, "_sion_omp_gathervr_cb", sapi->thread_num, " after compute %2d -> dpls=%2d count=%d\n", t,displs[t],counts[t]));
291  }
292 
293  /* set global pointer to temporary vector */
294  __omp_global_pointer = displs;
295  }
296 
297  /* synchronize */
298  {
299 #pragma omp barrier
300  }
301 
302  /* get offset */
303  offset= ((int*)__omp_global_pointer)[sapi->thread_num];
304 
305  /* synchronize again */
306  {
307 #pragma omp barrier
308  }
309 
310  /* free temporary vector and set global pointer to outdata */
311  if(sapi->thread_num==root) {
312  if(displs) free(displs);
313 
314  }
315 
316  /* set global pointer to temporary vector */
317  __omp_global_pointer = indata;
318 
319 
320  /* synchronize again */
321  {
322 #pragma omp barrier
323  }
324 
325  /* copy indata -> outdata */
326  memcpy(outdata,
327  ( char * ) __omp_global_pointer + offset * _sion_omp_size_of_dtype(dtype),
328  nelem*_sion_omp_size_of_dtype(dtype)
329  );
330 
331  /* synchronize again */
332  {
333 #pragma omp barrier
334  }
335  return rc;
336 }
337 
338 
339 int _sion_omp_gather_process_cb(const void *indata, sion_int64 *spec, int spec_len, sion_int64 fsblksize,
340  void *commdata, int collector, int range_start, int range_end, int sid,
341  int process_cb(const void *,sion_int64 *, int ) ) {
342  int rc=SION_STD_SUCCESS;
343 
344  sion_int64 **p_spec=NULL;
345  char **p_indata=NULL;
346  int t, trank;
347  _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
348 
349 
350  __sion_omp_get_info_from_other((void*) indata, spec, spec_len,
351  commdata, collector, range_start, range_end,
352  &p_spec, &p_indata );
353 
354 
355  /* process */
356  if(sapi->thread_num==collector) {
357 
358  /* scan all other tasks */
359  for(t=range_start;t<=range_end;t++) {
360  trank=t-range_start;
361  DPRINTFP((256, "_sion_omp_gather_process_cb", -1, "on master t=%d spec=%ld,%ld,%ld \n",trank,
362  *(p_spec[trank]+0), *(p_spec[trank]+1), *(p_spec[trank]+2)));
363 
364  rc=process_cb(p_indata[trank],p_spec[trank], sid);
365  }
366 
367  if(rc != SION_STD_SUCCESS) {
368  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"_sion_omp_gather_process_cb: problems writing data ...\n"));
369  }
370 
371  if(p_spec) free(p_spec);
372  if(p_indata) free(p_indata);
373 
374  }
375 
376  /* synchronize */
377  {
378 #pragma omp barrier
379  }
380 
381 
382  return(rc);
383 }
384 
385 int _sion_omp_process_scatter_cb(void *outdata, sion_int64 *spec, int spec_len, sion_int64 fsblksize,
386  void *commdata, int collector, int range_start, int range_end, int sid,
387  int process_cb(void *,sion_int64 *, int ) ) {
388  int rc=SION_STD_SUCCESS;
389 
390  sion_int64 **p_spec=NULL;
391  char **p_indata=NULL;
392  int t, trank;
393  _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
394 
395 
396  __sion_omp_get_info_from_other((void*) outdata, spec, spec_len,
397  commdata, collector, range_start, range_end,
398  &p_spec, &p_indata );
399 
400 
401  /* process */
402  if(sapi->thread_num==collector) {
403 
404  /* scan all other tasks */
405  for(t=range_start;t<=range_end;t++) {
406  trank=t-range_start;
407  DPRINTFP((256, "_sion_omp_gather_process_cb", -1, "on master t=%d spec=%ld,%ld,%ld \n",trank,
408  *(p_spec[trank]+0), *(p_spec[trank]+1), *(p_spec[trank]+2)));
409 
410  rc=process_cb(p_indata[trank],p_spec[trank], sid);
411  }
412 
413  if(rc != SION_STD_SUCCESS) {
414  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"_sion_omp_gather_process_cb: problems writing data ...\n"));
415  }
416 
417  if(p_spec) free(p_spec);
418  if(p_indata) free(p_indata);
419 
420  }
421 
422  /* synchronize */
423  {
424 #pragma omp barrier
425  }
426 
427 
428 
429  return(rc);
430 }
431 
432 int __sion_omp_get_info_from_other(void *data, sion_int64 *spec, int spec_len,
433  void *commdata, int collector, int range_start, int range_end,
434  sion_int64 ***p_spec, char ***p_indata ) {
435  int rc=SION_SUCCESS;
436 
437  sion_int64 **tspec;
438  char const **tdata;
439  int num_sender, my_trank;
440  _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
441 
442  num_sender = range_end-range_start+1;
443  my_trank = sapi->thread_num-range_start;
444 
445  /* collect offsets and sizes for each thread */
446  if(sapi->thread_num==collector) {
447 
448  *p_spec = (sion_int64 **) malloc(num_sender * sizeof(sion_int64 *));
449  if (*p_spec == NULL) {
450  fprintf(stderr,"_sion_omp_gather_process_cb: cannot allocate temporary memory of size %zu (p_spec), aborting ...\n",
451  (size_t) num_sender * sizeof(sion_int64 *));
452  return(-1);
453  }
454  *p_indata = (char **) malloc(num_sender * sizeof(char *));
455  if (*p_indata == NULL) {
456  fprintf(stderr,"_sion_omp_gather_process_cb: cannot allocate temporary memory of size %zu (p_indata), aborting ...\n",
457  (size_t) num_sender * sizeof(char *));
458  return(-1);
459  }
460 
461  /* set global pointer to temporary vector */
462  __omp_global_pointer = *p_spec;
463  }
464 
465  /* synchronize */
466  {
467 #pragma omp barrier
468  }
469 
470  if(sapi->thread_num!=collector) {
471  tspec = (sion_int64 **) __omp_global_pointer;
472  tspec[my_trank] = spec;
473  DPRINTFP((256, "_sion_omp_gather_process_cb", -1, "on sender t=%d spec=%ld,%ld,%ld \n",my_trank,
474  *(tspec[my_trank]+0), *(tspec[my_trank]+1), *(tspec[my_trank]+2)));
475 
476  }
477 
478  /* synchronize */
479  {
480 #pragma omp barrier
481  }
482 
483  if(sapi->thread_num==collector) {
484  /* set global pointer to temporary vector */
485  __omp_global_pointer = *p_indata;
486  }
487 
488  /* synchronize */
489  {
490 #pragma omp barrier
491  }
492 
493  if(sapi->thread_num!=collector) {
494  tdata = (char const **) __omp_global_pointer;
495  tdata[my_trank] = data;
496  }
497 
498  /* synchronize */
499  {
500 #pragma omp barrier
501  }
502 
503 
504  return(rc);
505 }
506 
507 int _sion_omp_get_capability_cb(void *commdata )
508 {
509  int rc=SION_CAPABILITY_NONE;
510  _omp_api_commdata* sapi= (_omp_api_commdata *) commdata;
511 
512  if(sapi->thread_num==0) {
514  DPRINTFP((256, "_sion_omp_get_capability_cb", sapi->thread_num, "FULL capability\n"));
515  } else {
517  DPRINTFP((256, "_sion_omp_get_capability_cb", sapi->thread_num, "ONLY SENDER capability\n"));
518  }
519  return rc;
520 }
521 
522 
523 int _sion_omp_size_of_dtype(int dtype) {
524  switch (dtype) {
525  case _SION_INT32: return(sizeof(sion_int32)); break;
526  case _SION_INT64: return(sizeof(sion_int64)); break;
527  case _SION_CHAR: return(sizeof(char)); break;
528  default: return(sizeof(sion_int64));
529  }
530 }
531 
532 /* end of ifdef OPENMP */
533 #endif
#define SION_CAPABILITY_NONE
Definition: sion_filedesc.h:56
#define SION_CAPABILITY_ONLY_SENDER
Definition: sion_filedesc.h:55
#define SION_CAPABILITY_FULL
Definition: sion_filedesc.h:54
Sion Time Stamp Header.