SIONlib  1.7.0
Scalable I/O library for parallel access to task-local files
sion_ompi_coll_cb_gen.c
Go to the documentation of this file.
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2016 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
14 #include <stdlib.h>
15 #include <stdio.h>
16 #include <stdarg.h>
17 #include <string.h>
18 #include <time.h>
19 
20 #include "mpi.h"
21 
22 #define USE_PMPIno
23 #ifdef USE_PMPI
24 #define MPI_Comm_rank PMPI_Comm_rank
25 #define MPI_Comm_size PMPI_Comm_size
26 #define MPI_Gather PMPI_Gather
27 #define MPI_Scatter PMPI_Scatter
28 #define MPI_Bcast PMPI_Bcast
29 #define MPI_Barrier PMPI_Barrier
30 #define MPI_Comm_split PMPI_Comm_split
31 #define MPI_Send PMPI_Send
32 #define MPI_Recv PMPI_Recv
33 #endif
34 
35 #include <sys/time.h>
36 
37 #include <sys/types.h>
38 #include <fcntl.h>
39 
40 #include <unistd.h>
41 
42 #include "sion.h"
43 #include "sion_debug.h"
44 #include "sion_error_handler.h"
45 #include "sion_internal.h"
46 #include "sion_fd.h"
47 #include "sion_filedesc.h"
48 #include "sion_printts.h"
49 
50 #include "sion_ompi_internal_gen.h"
51 #include "sion_ompi_cb_gen.h"
52 
53 #ifdef SION_OMPI
54 
55 static void *__ompicol_global_pointer;
56 static int _sion_opmicol_grc=SION_SUCCESS;
57 
58 int _sion_ompicol_size_of_dtype(int dtype);
59 void * __sion_ompicol_share_ptr(void * in_ptr);
60 
61 #define DFUNCTION "_ompi_gather_execute_cb"
62 int _sion_ompi_gather_process_cb(const void *indata, sion_int64 *spec, int spec_len, sion_int64 fsblksize,
63  void *commdata, int collector, int range_start, int range_end, int sid,
64  int process_cb(const void *,sion_int64 *, int ) ) {
65  int rc=SION_SUCCESS;
66  int t, startsignal=1,mrank,mt,tt, mcollector;
67  MPI_Status status;
68  char *p, *buffer;
69  void *helpdata;
70  const void *p_data;
71  void const **indatas;
72  sion_int64 **specs, *p_spec;
73  sion_int64 bytestorecv, bytestosend, datasize;
74  _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
75  MPI_Comm commgroup;
76  int rank=sapi->rank;
77 
78  DPRINTFP((256, DFUNCTION, rank, " input collector=%d range_start=%d range_end=%d sid=%d\n", collector,range_start,range_end, sid));
79 
80 
81  /* STEP1: collect info on thread level: specs -> pointer to spec on thread, indatas -> pointer to indata on thread */
82 #pragma omp master
83  {
84  _sion_opmicol_grc=SION_STD_SUCCESS;
85 
86  helpdata = (void *) malloc(sapi->num_threads * sizeof(sion_int64 *));
87  if (helpdata == NULL) {
88  fprintf(stderr,"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %zu (helpdata), aborting ...\n",
89  (size_t) sapi->num_threads * sizeof(int *));
90  _sion_opmicol_grc=SION_STD_NOT_SUCCESS;
91  }
92  } /* omp master */
93 
94  /* share data ptr among threads, internal barrier */
95  specs = (sion_int64 **)__sion_ompicol_share_ptr((void *) helpdata);
96 
97  /* check return code from malloc */
98  if(_sion_opmicol_grc!=SION_STD_SUCCESS) return(_sion_opmicol_grc);
99 
100  /* store Info about spec */
101  specs[sapi->thread_num]= spec;
102 
103  /* synchronize */
104  {
105 #pragma omp barrier
106  }
107 
108 #pragma omp master
109  {
110  helpdata = (void *) malloc(sapi->num_threads * sizeof(const void *));
111  if (helpdata == NULL) {
112  fprintf(stderr,"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %zu (tcounts), aborting ...\n",
113  (size_t) sapi->num_threads * sizeof(int *));
114  _sion_opmicol_grc=SION_STD_NOT_SUCCESS;
115  }
116  } /* omp master */
117 
118  /* share data ptr among threads, internal barrier */
119  indatas = (void const **)__sion_ompicol_share_ptr((void *) helpdata);
120 
121  /* check return code from malloc */
122  if(_sion_opmicol_grc!=SION_STD_SUCCESS) return(_sion_opmicol_grc);
123 
124  /* store info about spec */
125  indatas[sapi->thread_num] = indata;
126 
127 
128  /* synchronize */
129  {
130 #pragma omp barrier
131  }
132 
133  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "store SPECS[%d]=%x (%x)\n", sapi->thread_num,specs[sapi->thread_num], spec));
134  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "store DATAS[%d]=%x (%x)\n", sapi->thread_num,indatas[sapi->thread_num], indata));
135 
136 
137 #pragma omp master
138  {
139  for(tt=0;tt<sapi->num_threads;tt++) {
140  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "MASTER SPECS[%d]=%x\n", tt,specs[tt]));
141  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "MASTER DATAS[%d]=%x\n", tt,indatas[tt]));
142  }
143  }
144 
145  /* STEP2: proceed on MPI level, master threads have all needed info */
146 #pragma omp master
147  {
148  commgroup = sapi->comm;
149 
150  if(rank == collector) {
151  /* its the collector */
152 
153  mrank=_sion_map_rank_ompi_to_mpi(rank,sapi->num_threads);
154 
155  /* allocate buffer */
156  buffer = (char *) malloc(fsblksize * sizeof(char));
157  if (buffer == NULL) {
158  _sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"_mpi_gather_process_cb: cannot allocate temporary memory of size %lu (buffer), aborting ...\n",
159  (unsigned long) fsblksize * sizeof(char));
160  _sion_opmicol_grc=SION_STD_NOT_SUCCESS;
161  }
162 
163  /* scan all other tasks */
164  for(t=range_start;t<=range_end;t++) {
165 
166  mt=_sion_map_rank_ompi_to_mpi(t,sapi->num_threads);
167 
168 
169  if(mt==mrank) {
170  /* thread is on same MPI rank */
171  tt=_sion_map_rank_ompi_to_thread_num(t,sapi->num_threads);
172 
173  /* process data directly */
174  p_spec=specs[tt];
175  p_data=indatas[tt];
176  _sion_opmicol_grc=process_cb(p_data,p_spec, sid);
177 
178  } else {
179 
180  /* receive spec */
181  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector wait for spec from %d\n", mt));
182  MPI_Recv(spec, spec_len, SION_MPI_INT64, mt, 1534, commgroup, &status);
183  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector got spec from %d (%lld,%lld)\n",
184  mt, (long long) spec[0], (long long) spec[1]));
185 
186  /* send signal to send data */
187  if(spec[0]!=-1) { /* no error on sender */
188  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector send signal to %d\n", mt));
189  MPI_Send(&startsignal, 1, MPI_INT, mt, 1535, commgroup);
190  }
191 
192  /* get and write data */
193  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector start to process data of size %lld at offset %lld\n",
194  (long long) spec[1], (long long) spec[0]));
195 
196  bytestorecv=spec[1];
197 
198  /* loop over data parts */
199  while(bytestorecv>0) {
200  if(bytestorecv>fsblksize) datasize=fsblksize;
201  else datasize=bytestorecv;
202 
203  /* receive portion or all data */
204  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector wait for data block from %d\n", mt));
205  MPI_Recv(buffer, datasize, MPI_CHAR, mt, 1536, commgroup, &status);
206  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector got data block from %d datasize=%lld bytestorecv=%lld\n",
207  mt, (long long) datasize, (long long) bytestorecv));
208 
209  spec[1]=datasize; /* adjust size */
210 
211  /* process data with callback function */
212  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector process data block of size %lld at pos %lld\n",
213  (long long) spec[1], (long long) spec[0]));
214 
215  _sion_opmicol_grc=process_cb(buffer,spec, sid);
216 
217  if(_sion_opmicol_grc != SION_STD_SUCCESS) {
218  _sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"_ompi_gather_process_cb: problems writing data ...\n");
219  }
220 
221  /* advance counter */
222  bytestorecv-=datasize;spec[0]+=datasize;
223 
224 
225  }
226  } /* not on local MPI rank */
227 
228  }
229 
230  /* remove buffer */
231  if (buffer) free(buffer);
232 
233  } else {
234  /* its a sender */
235 
236  mcollector=_sion_map_rank_ompi_to_mpi(collector,sapi->num_threads);
237 
238  /* send data for all threads on MPI rank */
239  for(tt=0;tt<sapi->num_threads;tt++) {
240 
241  /* send spec to collector */
242  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender send spec to %d (%lld,%lld)\n",
243  collector,(long long) specs[tt][0], (long long) specs[tt][1]));
244  rc=MPI_Send(specs[tt], spec_len, SION_MPI_INT64, mcollector, 1534, commgroup);
245  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender sent spec to %d rc=%d\n", mcollector,rc));
246 
247  /* wait for start signal */
248  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender wait for signal from %d\n", mcollector));
249  MPI_Recv(&startsignal, 1, MPI_INT, mcollector, 1535, commgroup, &status);
250  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender got signal from %d\n", mcollector));
251 
252  /* send data in chunks of fsblksize */
253  bytestosend=specs[tt][1];
254  p=(char *) indatas[tt];
255  while(bytestosend>0) {
256  if(bytestosend>fsblksize) datasize=fsblksize;
257  else datasize=bytestosend;
258  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender send data block to %d of size %lld\n", mcollector, (long long) datasize));
259  MPI_Send(p, datasize, MPI_CHAR, mcollector, 1536, commgroup);
260  bytestosend-=datasize;p+=datasize;
261  }
262  } /* for all threads */
263  } /* sender */
264 
265  } /* omp master */
266 
267  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "leave collector=%d rc=%d\n", collector, rc ));
268 
269  /* synchronize */
270  {
271 #pragma omp barrier
272  }
273  rc=_sion_opmicol_grc;
274  {
275 #pragma omp barrier
276  }
277  return rc;
278 }
279 #undef DFUNCTION
280 
281 
282 #define DFUNCTION "_ompi_process_scatter_cb"
283 int _sion_ompi_process_scatter_cb(void *outdata, sion_int64 *spec, int spec_len, sion_int64 fsblksize,
284  void *commdata, int collector, int range_start, int range_end, int sid,
285  int process_cb(void *,sion_int64 *, int ) ) {
286  int rc=SION_SUCCESS;
287  int t, startsignal=1, count, mrank, mt, tt, mcollector;
288  MPI_Status status;
289  char *p, *buffer;
290  void *helpdata;
291  void **outdatas;
292  sion_int64 **specs;
293  sion_int64 bytestorecv, bytestosend, datasize;
294  _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
295  MPI_Comm commgroup;
296  int rank=sapi->rank;
297 
298  DPRINTFP((256, DFUNCTION, rank, " input collector=%d range_start=%d range_end=%d sid=%d\n", collector,range_start,range_end, sid));
299 
300  /* STEP1: collect info on thread level: specs -> pointer to spec on thread, outdatas -> pointer to outdata on thread */
301 #pragma omp master
302  {
303  _sion_opmicol_grc=SION_STD_SUCCESS;
304 
305  helpdata = (void *) malloc(sapi->num_threads * sizeof(sion_int64 *));
306  if (helpdata == NULL) {
307  fprintf(stderr,"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %zu (helpdata), aborting ...\n",
308  (size_t) sapi->num_threads * sizeof(int *));
309  _sion_opmicol_grc=SION_STD_NOT_SUCCESS;
310  }
311  } /* omp master */
312 
313  /* share data ptr among threads, internal barrier */
314  specs = (sion_int64 **)__sion_ompicol_share_ptr((void *) helpdata);
315 
316  /* check return code from malloc */
317  if(_sion_opmicol_grc!=SION_STD_SUCCESS) return(_sion_opmicol_grc);
318 
319  /* store Info about spec */
320  specs[sapi->thread_num]= spec;
321 
322  /* synchronize */
323  {
324 #pragma omp barrier
325  }
326 
327 #pragma omp master
328  {
329  helpdata = (void *) malloc(sapi->num_threads * sizeof(void *));
330  if (helpdata == NULL) {
331  fprintf(stderr,"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %zu (helpdata), aborting ...\n",
332  (size_t) sapi->num_threads * sizeof(int *));
333  _sion_opmicol_grc=SION_STD_NOT_SUCCESS;
334  }
335  } /* omp master */
336 
337  /* share data ptr among threads, internal barrier */
338  outdatas = (void **)__sion_ompicol_share_ptr((void *) helpdata);
339 
340  /* check return code from malloc */
341  if(_sion_opmicol_grc!=SION_STD_SUCCESS) return(_sion_opmicol_grc);
342 
343  /* store info about spec */
344  outdatas[sapi->thread_num] = outdata;
345 
346  /* synchronize */
347  {
348 #pragma omp barrier
349  }
350 
351 
352  /* STEP2: proceed on MPI level, master threads have all needed info */
353 #pragma omp master
354  {
355  commgroup = sapi->comm;
356 
357 
358  if(rank == collector) {
359  /* its the collector */
360 
361  /* allocate buffer */
362  buffer = (char *) malloc(fsblksize * sizeof(char));
363  if (buffer == NULL) {
364  _sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"_ompi_gather_process_cb: cannot allocate temporary memory of size %lu (buffer), aborting ...\n",
365  (unsigned long) fsblksize * sizeof(char));
366  _sion_opmicol_grc=SION_STD_NOT_SUCCESS;
367  }
368 
369  mrank=_sion_map_rank_ompi_to_mpi(rank,sapi->num_threads);
370 
371  /* scan all other tasks */
372  for(t=range_start;t<=range_end;t++) {
373 
374  mt=_sion_map_rank_ompi_to_mpi(t,sapi->num_threads);
375 
376  if(mt==mrank) {
377 
378  /* thread is on same MPI rank */
379  tt=_sion_map_rank_ompi_to_thread_num(t,sapi->num_threads);
380 
381  /* process data directly */
382  _sion_opmicol_grc=process_cb(outdatas[tt],specs[tt], sid);
383 
384  } else {
385 
386  /* receive spec */
387  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector wait for spec from %d\n", t));
388  MPI_Recv(spec, spec_len, SION_MPI_INT64, mt, 1534, commgroup, &status);
389  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector got spec from %d (%lld,%lld)\n",
390  t, (long long) spec[0], (long long) spec[1]));
391 
392  /* send signal to send data */
393  if(spec[0]>=0) { /* sender waits for data */
394  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector send signal to %d\n", t));
395  MPI_Send(&startsignal, 1, MPI_INT, mt, 1535, commgroup);
396  }
397 
398  /* get and send data */
399  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector start to proces data of size %lld at offset %lld\n",
400  (long long) spec[1], (long long) spec[0]));
401 
402  bytestosend=spec[1];
403 
404  /* loop over data parts */
405  while(bytestosend>0) {
406 
407  if(bytestosend>fsblksize) datasize=fsblksize;
408  else datasize=bytestosend;
409 
410  spec[1]=datasize; /* adjust size */
411 
412  /* process data with callback function */
413  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector process data block of size %lld at pos %lld\n",
414  (long long) spec[1], (long long) spec[0]));
415 
416  _sion_opmicol_grc=process_cb(buffer,spec, sid);
417 
418  if(_sion_opmicol_grc != SION_STD_SUCCESS) {
419  _sion_errorprint(SION_STD_NOT_SUCCESS,_SION_ERROR_RETURN,"_ompi_gather_process_cb: problems writing data ...\n");
420  }
421 
422  /* send portion or all data */
423  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector send for data block to %d\n", mt));
424  MPI_Send(buffer, datasize, MPI_CHAR, mt, 1536, commgroup);
425  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector sent data block to %d datasize=%lld bytestorecv=%lld\n",
426  mt, (long long) datasize, (long long) bytestosend));
427 
428  /* advance counter */
429  bytestosend-=datasize;spec[0]+=datasize;
430 
431  } /* while */
432  } /* not on local MPI rank */
433  } /* for all tasks */
434 
435  /* remove buffer */
436  if (buffer) free(buffer);
437 
438  } else {
439  /* its a sender */
440 
441  mcollector=_sion_map_rank_ompi_to_mpi(collector,sapi->num_threads);
442 
443  /* send data for all threads on MPI rank */
444  for(tt=0;tt<sapi->num_threads;tt++) {
445 
446  /* send spec to collector */
447  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender send spec to %d (%lld,%lld)\n",
448  mcollector,(long long) specs[tt][0], (long long) specs[tt][1]));
449  MPI_Send(specs[tt], spec_len, SION_MPI_INT64, mcollector, 1534, commgroup);
450 
451  if(specs[tt][0]>0) { /* no error in sion_feof */
452 
453  /* wait for start signal */
454  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender wait for signal from %d\n", collector));
455  MPI_Recv(&startsignal, 1, MPI_INT, mcollector, 1535, commgroup, &status);
456  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender got signal from %d\n", collector));
457 
458  /* send data in chunks of fsblksize */
459  bytestorecv=specs[tt][1];
460  p=(char *) outdata;
461  while(bytestorecv>0) {
462  if(bytestorecv>fsblksize) datasize=fsblksize;
463  else datasize=bytestorecv;
464  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender recv data block from %d of size %lld\n", mcollector, (long long) datasize));
465  MPI_Recv(p, datasize, MPI_CHAR, mcollector, 1536, commgroup, &status);
466  MPI_Get_count(&status,MPI_CHAR,&count);
467 
468  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender recv data block from %d of size %lld (%d)\n", mcollector, (long long) datasize, count));
469  bytestorecv-=datasize;p+=datasize;
470 
471  } /* while */
472  } /* spec[0]>0 */
473  } /* for all threads */
474  } /* sender */
475 
476  } /* omp master */
477 
478 
479  /* synchronize */
480  {
481 #pragma omp barrier
482  }
483  rc=_sion_opmicol_grc;
484  {
485 #pragma omp barrier
486  }
487 
488  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "leave collector=%d rc=%d\n", collector, rc ));
489 
490  return rc;
491 }
492 #undef DFUNCTION
493 
494 
495  /* share in_ptr given on master with all other threads, return value is the shared ptr */
496 #define DFUNCTION "__sion_ompi_share_ptr"
497  void * __sion_ompicol_share_ptr(void * in_ptr) {
498  void *out_ptr;
499 
500 #pragma omp master
501  __ompicol_global_pointer = in_ptr;
502 
503 
504  {
505 #pragma omp barrier
506  }
507 
508  out_ptr=__ompicol_global_pointer;
509 
510  return(out_ptr);
511 
512  }
513 #undef DFUNCTION
514 
515 
516 #define DFUNCTION "_sion_ompi_get_capability_cb"
517 int _sion_ompi_get_capability_cb(void *commdata )
518 {
519  int rc=SION_CAPABILITY_NONE;
520  _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
521 
522  if(sapi->thread_num==0) {
524  DPRINTFP((256, DFUNCTION, sapi->rank, "FULL capability\n"));
525  } else {
527  DPRINTFP((256, DFUNCTION, sapi->rank, "ONLY SENDER capability\n"));
528  }
529  return rc;
530 }
531 #undef DFUNCTION
532 
533 
534  int _sion_ompicol_size_of_dtype(int dtype) {
535  switch (dtype) {
536  case _SION_INT32: return(sizeof(sion_int32)); break;
537  case _SION_INT64: return(sizeof(sion_int64)); break;
538  case _SION_CHAR: return(sizeof(char)); break;
539  default: return(sizeof(sion_int64));
540  }
541  }
542 
543  /* end of ifdef SION_OMPI */
544 #endif
545 
#define SION_CAPABILITY_FULL
Definition: sion_filedesc.h:50
#define SION_CAPABILITY_NONE
Definition: sion_filedesc.h:52
#define SION_CAPABILITY_ONLY_SENDER
Definition: sion_filedesc.h:51
Sion Time Stamp Header.