SIONlib  1.7.7
Scalable I/O library for parallel access to task-local files
sion_ompi_coll_cb_gen.c
Go to the documentation of this file.
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2019 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
14 #define _XOPEN_SOURCE 700
15 
16 #include <stdlib.h>
17 #include <stdio.h>
18 #include <stdarg.h>
19 #include <string.h>
20 #include <time.h>
21 
22 #include "mpi.h"
23 
24 #define USE_PMPIno
25 #ifdef USE_PMPI
26 #define MPI_Comm_rank PMPI_Comm_rank
27 #define MPI_Comm_size PMPI_Comm_size
28 #define MPI_Gather PMPI_Gather
29 #define MPI_Scatter PMPI_Scatter
30 #define MPI_Bcast PMPI_Bcast
31 #define MPI_Barrier PMPI_Barrier
32 #define MPI_Comm_split PMPI_Comm_split
33 #define MPI_Send PMPI_Send
34 #define MPI_Recv PMPI_Recv
35 #endif
36 
37 #include <sys/time.h>
38 
39 #include <sys/types.h>
40 #include <fcntl.h>
41 
42 #include <unistd.h>
43 
44 #include "sion.h"
45 #include "sion_debug.h"
46 #include "sion_error_handler.h"
47 #include "sion_internal.h"
48 #include "sion_fd.h"
49 #include "sion_filedesc.h"
50 #include "sion_printts.h"
51 
52 #include "sion_ompi_internal_gen.h"
53 #include "sion_ompi_cb_gen.h"
54 
55 #ifdef SION_OMPI
56 
57 static void *__ompicol_global_pointer;
58 static int _sion_opmicol_grc=SION_SUCCESS;
59 
60 int _sion_ompicol_size_of_dtype(int dtype);
61 void * __sion_ompicol_share_ptr(void * in_ptr);
62 
63 #define DFUNCTION "_ompi_gather_execute_cb"
64 int _sion_ompi_gather_process_cb(const void *indata, sion_int64 *spec, int spec_len, sion_int64 fsblksize,
65  void *commdata, int collector, int range_start, int range_end, int sid,
66  int process_cb(const void *,sion_int64 *, int ) ) {
67  int rc=SION_SUCCESS;
68  int t, startsignal=1,mrank,mt,tt, mcollector;
69  MPI_Status status;
70  char *p, *buffer;
71  void *helpdata;
72  const void *p_data;
73  void const **indatas;
74  sion_int64 **specs, *p_spec;
75  sion_int64 bytestorecv, bytestosend, datasize;
76  _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
77  MPI_Comm commgroup;
78  int rank=sapi->rank;
79 
80  DPRINTFP((256, DFUNCTION, rank, " input collector=%d range_start=%d range_end=%d sid=%d\n", collector,range_start,range_end, sid));
81 
82 
83  /* STEP1: collect info on thread level: specs -> pointer to spec on thread, indatas -> pointer to indata on thread */
84 #pragma omp master
85  {
86  _sion_opmicol_grc=SION_STD_SUCCESS;
87 
88  helpdata = (void *) malloc(sapi->num_threads * sizeof(sion_int64 *));
89  if (helpdata == NULL) {
90  fprintf(stderr,"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %zu (helpdata), aborting ...\n",
91  (size_t) sapi->num_threads * sizeof(int *));
92  _sion_opmicol_grc=SION_STD_NOT_SUCCESS;
93  }
94  } /* omp master */
95 
96  /* share data ptr among threads, internal barrier */
97  specs = (sion_int64 **)__sion_ompicol_share_ptr((void *) helpdata);
98 
99  /* check return code from malloc */
100  if(_sion_opmicol_grc!=SION_STD_SUCCESS) return(_sion_opmicol_grc);
101 
102  /* store Info about spec */
103  specs[sapi->thread_num]= spec;
104 
105  /* synchronize */
106  {
107 #pragma omp barrier
108  }
109 
110 #pragma omp master
111  {
112  helpdata = (void *) malloc(sapi->num_threads * sizeof(const void *));
113  if (helpdata == NULL) {
114  fprintf(stderr,"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %zu (tcounts), aborting ...\n",
115  (size_t) sapi->num_threads * sizeof(int *));
116  _sion_opmicol_grc=SION_STD_NOT_SUCCESS;
117  }
118  } /* omp master */
119 
120  /* share data ptr among threads, internal barrier */
121  indatas = (void const **)__sion_ompicol_share_ptr((void *) helpdata);
122 
123  /* check return code from malloc */
124  if(_sion_opmicol_grc!=SION_STD_SUCCESS) return(_sion_opmicol_grc);
125 
126  /* store info about spec */
127  indatas[sapi->thread_num] = indata;
128 
129 
130  /* synchronize */
131  {
132 #pragma omp barrier
133  }
134 
135  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "store SPECS[%d]=%x (%x)\n", sapi->thread_num,specs[sapi->thread_num], spec));
136  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "store DATAS[%d]=%x (%x)\n", sapi->thread_num,indatas[sapi->thread_num], indata));
137 
138 
139 #pragma omp master
140  {
141  for(tt=0;tt<sapi->num_threads;tt++) {
142  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "MASTER SPECS[%d]=%x\n", tt,specs[tt]));
143  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "MASTER DATAS[%d]=%x\n", tt,indatas[tt]));
144  }
145  }
146 
147  /* STEP2: proceed on MPI level, master threads have all needed info */
148 #pragma omp master
149  {
150  commgroup = sapi->comm;
151 
152  if(rank == collector) {
153  /* its the collector */
154 
155  mrank=_sion_map_rank_ompi_to_mpi(rank,sapi->num_threads);
156 
157  /* allocate buffer */
158  buffer = (char *) malloc(fsblksize * sizeof(char));
159  if (buffer == NULL) {
160  _sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"_mpi_gather_process_cb: cannot allocate temporary memory of size %lu (buffer), aborting ...\n",
161  (unsigned long) fsblksize * sizeof(char));
162  _sion_opmicol_grc=SION_STD_NOT_SUCCESS;
163  }
164 
165  /* scan all other tasks */
166  for(t=range_start;t<=range_end;t++) {
167 
168  mt=_sion_map_rank_ompi_to_mpi(t,sapi->num_threads);
169 
170 
171  if(mt==mrank) {
172  /* thread is on same MPI rank */
173  tt=_sion_map_rank_ompi_to_thread_num(t,sapi->num_threads);
174 
175  /* process data directly */
176  p_spec=specs[tt];
177  p_data=indatas[tt];
178  _sion_opmicol_grc=process_cb(p_data,p_spec, sid);
179 
180  } else {
181 
182  /* receive spec */
183  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector wait for spec from %d\n", mt));
184  MPI_Recv(spec, spec_len, SION_MPI_INT64, mt, 1534, commgroup, &status);
185  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector got spec from %d (%lld,%lld)\n",
186  mt, (long long) spec[0], (long long) spec[1]));
187 
188  /* send signal to send data */
189  if(spec[0]!=-1) { /* no error on sender */
190  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector send signal to %d\n", mt));
191  MPI_Send(&startsignal, 1, MPI_INT, mt, 1535, commgroup);
192  }
193 
194  /* get and write data */
195  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector start to process data of size %lld at offset %lld\n",
196  (long long) spec[1], (long long) spec[0]));
197 
198  bytestorecv=spec[1];
199 
200  /* loop over data parts */
201  while(bytestorecv>0) {
202  if(bytestorecv>fsblksize) datasize=fsblksize;
203  else datasize=bytestorecv;
204 
205  /* receive portion or all data */
206  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector wait for data block from %d\n", mt));
207  MPI_Recv(buffer, datasize, MPI_CHAR, mt, 1536, commgroup, &status);
208  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector got data block from %d datasize=%lld bytestorecv=%lld\n",
209  mt, (long long) datasize, (long long) bytestorecv));
210 
211  spec[1]=datasize; /* adjust size */
212 
213  /* process data with callback function */
214  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector process data block of size %lld at pos %lld\n",
215  (long long) spec[1], (long long) spec[0]));
216 
217  _sion_opmicol_grc=process_cb(buffer,spec, sid);
218 
219  if(_sion_opmicol_grc != SION_STD_SUCCESS) {
220  _sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"_ompi_gather_process_cb: problems writing data ...\n");
221  }
222 
223  /* advance counter */
224  bytestorecv-=datasize;spec[0]+=datasize;
225 
226 
227  }
228  } /* not on local MPI rank */
229 
230  }
231 
232  /* remove buffer */
233  if (buffer) free(buffer);
234 
235  } else {
236  /* its a sender */
237 
238  mcollector=_sion_map_rank_ompi_to_mpi(collector,sapi->num_threads);
239 
240  /* send data for all threads on MPI rank */
241  for(tt=0;tt<sapi->num_threads;tt++) {
242 
243  /* send spec to collector */
244  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender send spec to %d (%lld,%lld)\n",
245  collector,(long long) specs[tt][0], (long long) specs[tt][1]));
246  rc=MPI_Send(specs[tt], spec_len, SION_MPI_INT64, mcollector, 1534, commgroup);
247  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender sent spec to %d rc=%d\n", mcollector,rc));
248 
249  /* wait for start signal */
250  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender wait for signal from %d\n", mcollector));
251  MPI_Recv(&startsignal, 1, MPI_INT, mcollector, 1535, commgroup, &status);
252  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender got signal from %d\n", mcollector));
253 
254  /* send data in chunks of fsblksize */
255  bytestosend=specs[tt][1];
256  p=(char *) indatas[tt];
257  while(bytestosend>0) {
258  if(bytestosend>fsblksize) datasize=fsblksize;
259  else datasize=bytestosend;
260  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender send data block to %d of size %lld\n", mcollector, (long long) datasize));
261  MPI_Send(p, datasize, MPI_CHAR, mcollector, 1536, commgroup);
262  bytestosend-=datasize;p+=datasize;
263  }
264  } /* for all threads */
265  } /* sender */
266 
267  } /* omp master */
268 
269  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "leave collector=%d rc=%d\n", collector, rc ));
270 
271  /* synchronize */
272  {
273 #pragma omp barrier
274  }
275  rc=_sion_opmicol_grc;
276  {
277 #pragma omp barrier
278  }
279  return rc;
280 }
281 #undef DFUNCTION
282 
283 
284 #define DFUNCTION "_ompi_process_scatter_cb"
285 int _sion_ompi_process_scatter_cb(void *outdata, sion_int64 *spec, int spec_len, sion_int64 fsblksize,
286  void *commdata, int collector, int range_start, int range_end, int sid,
287  int process_cb(void *,sion_int64 *, int ) ) {
288  int rc=SION_SUCCESS;
289  int t, startsignal=1, count, mrank, mt, tt, mcollector;
290  MPI_Status status;
291  char *p, *buffer;
292  void *helpdata;
293  void **outdatas;
294  sion_int64 **specs;
295  sion_int64 bytestorecv, bytestosend, datasize;
296  _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
297  MPI_Comm commgroup;
298  int rank=sapi->rank;
299 
300  DPRINTFP((256, DFUNCTION, rank, " input collector=%d range_start=%d range_end=%d sid=%d\n", collector,range_start,range_end, sid));
301 
302  /* STEP1: collect info on thread level: specs -> pointer to spec on thread, outdatas -> pointer to outdata on thread */
303 #pragma omp master
304  {
305  _sion_opmicol_grc=SION_STD_SUCCESS;
306 
307  helpdata = (void *) malloc(sapi->num_threads * sizeof(sion_int64 *));
308  if (helpdata == NULL) {
309  fprintf(stderr,"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %zu (helpdata), aborting ...\n",
310  (size_t) sapi->num_threads * sizeof(int *));
311  _sion_opmicol_grc=SION_STD_NOT_SUCCESS;
312  }
313  } /* omp master */
314 
315  /* share data ptr among threads, internal barrier */
316  specs = (sion_int64 **)__sion_ompicol_share_ptr((void *) helpdata);
317 
318  /* check return code from malloc */
319  if(_sion_opmicol_grc!=SION_STD_SUCCESS) return(_sion_opmicol_grc);
320 
321  /* store Info about spec */
322  specs[sapi->thread_num]= spec;
323 
324  /* synchronize */
325  {
326 #pragma omp barrier
327  }
328 
329 #pragma omp master
330  {
331  helpdata = (void *) malloc(sapi->num_threads * sizeof(void *));
332  if (helpdata == NULL) {
333  fprintf(stderr,"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %zu (helpdata), aborting ...\n",
334  (size_t) sapi->num_threads * sizeof(int *));
335  _sion_opmicol_grc=SION_STD_NOT_SUCCESS;
336  }
337  } /* omp master */
338 
339  /* share data ptr among threads, internal barrier */
340  outdatas = (void **)__sion_ompicol_share_ptr((void *) helpdata);
341 
342  /* check return code from malloc */
343  if(_sion_opmicol_grc!=SION_STD_SUCCESS) return(_sion_opmicol_grc);
344 
345  /* store info about spec */
346  outdatas[sapi->thread_num] = outdata;
347 
348  /* synchronize */
349  {
350 #pragma omp barrier
351  }
352 
353 
354  /* STEP2: proceed on MPI level, master threads have all needed info */
355 #pragma omp master
356  {
357  commgroup = sapi->comm;
358 
359 
360  if(rank == collector) {
361  /* its the collector */
362 
363  /* allocate buffer */
364  buffer = (char *) malloc(fsblksize * sizeof(char));
365  if (buffer == NULL) {
366  _sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"_ompi_gather_process_cb: cannot allocate temporary memory of size %lu (buffer), aborting ...\n",
367  (unsigned long) fsblksize * sizeof(char));
368  _sion_opmicol_grc=SION_STD_NOT_SUCCESS;
369  }
370 
371  mrank=_sion_map_rank_ompi_to_mpi(rank,sapi->num_threads);
372 
373  /* scan all other tasks */
374  for(t=range_start;t<=range_end;t++) {
375 
376  mt=_sion_map_rank_ompi_to_mpi(t,sapi->num_threads);
377 
378  if(mt==mrank) {
379 
380  /* thread is on same MPI rank */
381  tt=_sion_map_rank_ompi_to_thread_num(t,sapi->num_threads);
382 
383  /* process data directly */
384  _sion_opmicol_grc=process_cb(outdatas[tt],specs[tt], sid);
385 
386  } else {
387 
388  /* receive spec */
389  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector wait for spec from %d\n", t));
390  MPI_Recv(spec, spec_len, SION_MPI_INT64, mt, 1534, commgroup, &status);
391  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector got spec from %d (%lld,%lld)\n",
392  t, (long long) spec[0], (long long) spec[1]));
393 
394  /* send signal to send data */
395  if(spec[0]>=0) { /* sender waits for data */
396  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector send signal to %d\n", t));
397  MPI_Send(&startsignal, 1, MPI_INT, mt, 1535, commgroup);
398  }
399 
400  /* get and send data */
401  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector start to proces data of size %lld at offset %lld\n",
402  (long long) spec[1], (long long) spec[0]));
403 
404  bytestosend=spec[1];
405 
406  /* loop over data parts */
407  while(bytestosend>0) {
408 
409  if(bytestosend>fsblksize) datasize=fsblksize;
410  else datasize=bytestosend;
411 
412  spec[1]=datasize; /* adjust size */
413 
414  /* process data with callback function */
415  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector process data block of size %lld at pos %lld\n",
416  (long long) spec[1], (long long) spec[0]));
417 
418  _sion_opmicol_grc=process_cb(buffer,spec, sid);
419 
420  if(_sion_opmicol_grc != SION_STD_SUCCESS) {
421  _sion_errorprint(SION_STD_NOT_SUCCESS,_SION_ERROR_RETURN,"_ompi_gather_process_cb: problems writing data ...\n");
422  }
423 
424  /* send portion or all data */
425  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector send for data block to %d\n", mt));
426  MPI_Send(buffer, datasize, MPI_CHAR, mt, 1536, commgroup);
427  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector sent data block to %d datasize=%lld bytestorecv=%lld\n",
428  mt, (long long) datasize, (long long) bytestosend));
429 
430  /* advance counter */
431  bytestosend-=datasize;spec[0]+=datasize;
432 
433  } /* while */
434  } /* not on local MPI rank */
435  } /* for all tasks */
436 
437  /* remove buffer */
438  if (buffer) free(buffer);
439 
440  } else {
441  /* its a sender */
442 
443  mcollector=_sion_map_rank_ompi_to_mpi(collector,sapi->num_threads);
444 
445  /* send data for all threads on MPI rank */
446  for(tt=0;tt<sapi->num_threads;tt++) {
447 
448  /* send spec to collector */
449  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender send spec to %d (%lld,%lld)\n",
450  mcollector,(long long) specs[tt][0], (long long) specs[tt][1]));
451  MPI_Send(specs[tt], spec_len, SION_MPI_INT64, mcollector, 1534, commgroup);
452 
453  if(specs[tt][0]>0) { /* no error in sion_feof */
454 
455  /* wait for start signal */
456  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender wait for signal from %d\n", collector));
457  MPI_Recv(&startsignal, 1, MPI_INT, mcollector, 1535, commgroup, &status);
458  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender got signal from %d\n", collector));
459 
460  /* send data in chunks of fsblksize */
461  bytestorecv=specs[tt][1];
462  p=(char *) outdata;
463  while(bytestorecv>0) {
464  if(bytestorecv>fsblksize) datasize=fsblksize;
465  else datasize=bytestorecv;
466  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender recv data block from %d of size %lld\n", mcollector, (long long) datasize));
467  MPI_Recv(p, datasize, MPI_CHAR, mcollector, 1536, commgroup, &status);
468  MPI_Get_count(&status,MPI_CHAR,&count);
469 
470  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender recv data block from %d of size %lld (%d)\n", mcollector, (long long) datasize, count));
471  bytestorecv-=datasize;p+=datasize;
472 
473  } /* while */
474  } /* spec[0]>0 */
475  } /* for all threads */
476  } /* sender */
477 
478  } /* omp master */
479 
480 
481  /* synchronize */
482  {
483 #pragma omp barrier
484  }
485  rc=_sion_opmicol_grc;
486  {
487 #pragma omp barrier
488  }
489 
490  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "leave collector=%d rc=%d\n", collector, rc ));
491 
492  return rc;
493 }
494 #undef DFUNCTION
495 
496 
497  /* share in_ptr given on master with all other threads, return value is the shared ptr */
498 #define DFUNCTION "__sion_ompi_share_ptr"
499  void * __sion_ompicol_share_ptr(void * in_ptr) {
500  void *out_ptr;
501 
502 #pragma omp master
503  __ompicol_global_pointer = in_ptr;
504 
505 
506  {
507 #pragma omp barrier
508  }
509 
510  out_ptr=__ompicol_global_pointer;
511 
512  return(out_ptr);
513 
514  }
515 #undef DFUNCTION
516 
517 
518 #define DFUNCTION "_sion_ompi_get_capability_cb"
519 int _sion_ompi_get_capability_cb(void *commdata )
520 {
521  int rc=SION_CAPABILITY_NONE;
522  _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
523 
524  if(sapi->thread_num==0) {
526  DPRINTFP((256, DFUNCTION, sapi->rank, "FULL capability\n"));
527  } else {
529  DPRINTFP((256, DFUNCTION, sapi->rank, "ONLY SENDER capability\n"));
530  }
531  return rc;
532 }
533 #undef DFUNCTION
534 
535 
536  int _sion_ompicol_size_of_dtype(int dtype) {
537  switch (dtype) {
538  case _SION_INT32: return(sizeof(sion_int32)); break;
539  case _SION_INT64: return(sizeof(sion_int64)); break;
540  case _SION_CHAR: return(sizeof(char)); break;
541  default: return(sizeof(sion_int64));
542  }
543  }
544 
545  /* end of ifdef SION_OMPI */
546 #endif
547 
#define SION_CAPABILITY_NONE
Definition: sion_filedesc.h:56
#define SION_CAPABILITY_ONLY_SENDER
Definition: sion_filedesc.h:55
#define SION_CAPABILITY_FULL
Definition: sion_filedesc.h:54
Sion Time Stamp Header.