SIONlib  1.6.2
Scalable I/O library for parallel access to task-local files
sion_ompi_coll_cb_gen.c
Go to the documentation of this file.
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2016 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
14 #include <stdlib.h>
15 #include <stdio.h>
16 #include <stdarg.h>
17 #include <string.h>
18 #include <time.h>
19 
20 #include "mpi.h"
21 
22 #define USE_PMPIno
23 #ifdef USE_PMPI
24 #define MPI_Comm_rank PMPI_Comm_rank
25 #define MPI_Comm_size PMPI_Comm_size
26 #define MPI_Gather PMPI_Gather
27 #define MPI_Scatter PMPI_Scatter
28 #define MPI_Bcast PMPI_Bcast
29 #define MPI_Barrier PMPI_Barrier
30 #define MPI_Comm_split PMPI_Comm_split
31 #define MPI_Send PMPI_Send
32 #define MPI_Recv PMPI_Recv
33 #endif
34 
35 #include <sys/time.h>
36 
37 #include <sys/types.h>
38 #include <fcntl.h>
39 
40 #include <unistd.h>
41 
42 #include "sion.h"
43 #include "sion_debug.h"
44 #include "sion_internal.h"
45 #include "sion_fd.h"
46 #include "sion_filedesc.h"
47 #include "sion_printts.h"
48 
49 #include "sion_ompi_internal_gen.h"
50 #include "sion_ompi_cb_gen.h"
51 
52 #ifdef SION_OMPI
53 
54 static void *__ompicol_global_pointer;
55 static int _sion_opmicol_grc=SION_SUCCESS;
56 
57 int _sion_ompicol_size_of_dtype(int dtype);
58 void * __sion_ompicol_share_ptr(void * in_ptr);
59 
60 #define DFUNCTION "_ompi_gather_execute_cb"
61 int _sion_ompi_gather_process_cb(const void *indata, sion_int64 *spec, int spec_len, sion_int64 fsblksize,
62  void *commdata, int collector, int range_start, int range_end, int sid,
63  int process_cb(const void *,sion_int64 *, int ) ) {
64  int rc=SION_SUCCESS;
65  int t, startsignal=1,mrank,mt,tt, mcollector;
66  MPI_Status status;
67  char *p, *buffer;
68  void *helpdata;
69  const void *p_data;
70  void const **indatas;
71  sion_int64 **specs, *p_spec;
72  sion_int64 bytestorecv, bytestosend, datasize;
73  _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
74  MPI_Comm commgroup;
75  int rank=sapi->rank;
76 
77  DPRINTFP((256, DFUNCTION, rank, " input collector=%d range_start=%d range_end=%d sid=%d\n", collector,range_start,range_end, sid));
78 
79 
80  /* STEP1: collect info on thread level: specs -> pointer to spec on thread, indatas -> pointer to indata on thread */
81 #pragma omp master
82  {
83  _sion_opmicol_grc=SION_STD_SUCCESS;
84 
85  helpdata = (void *) malloc(sapi->num_threads * sizeof(sion_int64 *));
86  if (helpdata == NULL) {
87  fprintf(stderr,"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %lu (tcounts), aborting ...\n",
88  (unsigned long) sapi->num_threads * sizeof(int *));
89  _sion_opmicol_grc=SION_STD_NOT_SUCCESS;
90  }
91  } /* omp master */
92 
93  /* share data ptr among threads, internal barrier */
94  specs = (sion_int64 **)__sion_ompicol_share_ptr((void *) helpdata);
95 
96  /* check return code from malloc */
97  if(_sion_opmicol_grc!=SION_STD_SUCCESS) return(_sion_opmicol_grc);
98 
99  /* store Info about spec */
100  specs[sapi->thread_num]= spec;
101 
102  /* synchronize */
103  {
104 #pragma omp barrier
105  }
106 
107 #pragma omp master
108  {
109  helpdata = (void *) malloc(sapi->num_threads * sizeof(const void *));
110  if (helpdata == NULL) {
111  fprintf(stderr,"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %lu (tcounts), aborting ...\n",
112  (unsigned long) sapi->num_threads * sizeof(int *));
113  _sion_opmicol_grc=SION_STD_NOT_SUCCESS;
114  }
115  } /* omp master */
116 
117  /* share data ptr among threads, internal barrier */
118  indatas = (void const **)__sion_ompicol_share_ptr((void *) helpdata);
119 
120  /* check return code from malloc */
121  if(_sion_opmicol_grc!=SION_STD_SUCCESS) return(_sion_opmicol_grc);
122 
123  /* store info about spec */
124  indatas[sapi->thread_num] = indata;
125 
126 
127  /* synchronize */
128  {
129 #pragma omp barrier
130  }
131 
132  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "store SPECS[%d]=%x (%x)\n", sapi->thread_num,specs[sapi->thread_num], spec));
133  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "store DATAS[%d]=%x (%x)\n", sapi->thread_num,indatas[sapi->thread_num], indata));
134 
135 
136 #pragma omp master
137  {
138  for(tt=0;tt<sapi->num_threads;tt++) {
139  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "MASTER SPECS[%d]=%x\n", tt,specs[tt]));
140  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "MASTER DATAS[%d]=%x\n", tt,indatas[tt]));
141  }
142  }
143 
144  /* STEP2: proceed on MPI level, master threads have all needed info */
145 #pragma omp master
146  {
147  commgroup = sapi->comm;
148 
149  if(rank == collector) {
150  /* its the collector */
151 
152  mrank=_sion_map_rank_ompi_to_mpi(rank,sapi->num_threads);
153 
154  /* allocate buffer */
155  buffer = (char *) malloc(fsblksize * sizeof(char));
156  if (buffer == NULL) {
157  _sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"_mpi_gather_process_cb: cannot allocate temporary memory of size %lu (buffer), aborting ...\n",
158  (unsigned long) fsblksize * sizeof(char));
159  _sion_opmicol_grc=SION_STD_NOT_SUCCESS;
160  }
161 
162  /* scan all other tasks */
163  for(t=range_start;t<=range_end;t++) {
164 
165  mt=_sion_map_rank_ompi_to_mpi(t,sapi->num_threads);
166 
167 
168  if(mt==mrank) {
169  /* thread is on same MPI rank */
170  tt=_sion_map_rank_ompi_to_thread_num(t,sapi->num_threads);
171 
172  /* process data directly */
173  p_spec=specs[tt];
174  p_data=indatas[tt];
175  _sion_opmicol_grc=process_cb(p_data,p_spec, sid);
176 
177  } else {
178 
179  /* receive spec */
180  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector wait for spec from %d\n", mt));
181  MPI_Recv(spec, spec_len, SION_MPI_INT64, mt, 1534, commgroup, &status);
182  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector got spec from %d (%lld,%lld)\n",
183  mt, (long long) spec[0], (long long) spec[1]));
184 
185  /* send signal to send data */
186  if(spec[0]!=-1) { /* no error on sender */
187  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector send signal to %d\n", mt));
188  MPI_Send(&startsignal, 1, MPI_INT, mt, 1535, commgroup);
189  }
190 
191  /* get and write data */
192  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector start to process data of size %lld at offset %lld\n",
193  (long long) spec[1], (long long) spec[0]));
194 
195  bytestorecv=spec[1];
196 
197  /* loop over data parts */
198  while(bytestorecv>0) {
199  if(bytestorecv>fsblksize) datasize=fsblksize;
200  else datasize=bytestorecv;
201 
202  /* receive portion or all data */
203  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector wait for data block from %d\n", mt));
204  MPI_Recv(buffer, datasize, MPI_CHAR, mt, 1536, commgroup, &status);
205  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector got data block from %d datasize=%lld bytestorecv=%lld\n",
206  mt, (long long) datasize, (long long) bytestorecv));
207 
208  spec[1]=datasize; /* adjust size */
209 
210  /* process data with callback function */
211  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector process data block of size %lld at pos %lld\n",
212  (long long) spec[1], (long long) spec[0]));
213 
214  _sion_opmicol_grc=process_cb(buffer,spec, sid);
215 
216  if(_sion_opmicol_grc != SION_STD_SUCCESS) {
217  _sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"_ompi_gather_process_cb: problems writing data ...\n");
218  }
219 
220  /* advance counter */
221  bytestorecv-=datasize;spec[0]+=datasize;
222 
223 
224  }
225  } /* not on local MPI rank */
226 
227  }
228 
229  /* remove buffer */
230  if (buffer) free(buffer);
231 
232  } else {
233  /* its a sender */
234 
235  mcollector=_sion_map_rank_ompi_to_mpi(collector,sapi->num_threads);
236 
237  /* send data for all threads on MPI rank */
238  for(tt=0;tt<sapi->num_threads;tt++) {
239 
240  /* send spec to collector */
241  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender send spec to %d (%lld,%lld)\n",
242  collector,(long long) specs[tt][0], (long long) specs[tt][1]));
243  rc=MPI_Send(specs[tt], spec_len, SION_MPI_INT64, mcollector, 1534, commgroup);
244  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender sent spec to %d rc=%d\n", mcollector,rc));
245 
246  /* wait for start signal */
247  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender wait for signal from %d\n", mcollector));
248  MPI_Recv(&startsignal, 1, MPI_INT, mcollector, 1535, commgroup, &status);
249  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender got signal from %d\n", mcollector));
250 
251  /* send data in chunks of fsblksize */
252  bytestosend=specs[tt][1];
253  p=(char *) indatas[tt];
254  while(bytestosend>0) {
255  if(bytestosend>fsblksize) datasize=fsblksize;
256  else datasize=bytestosend;
257  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender send data block to %d of size %lld\n", mcollector, (long long) datasize));
258  MPI_Send(p, datasize, MPI_CHAR, mcollector, 1536, commgroup);
259  bytestosend-=datasize;p+=datasize;
260  }
261  } /* for all threads */
262  } /* sender */
263 
264  } /* omp master */
265 
266  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "leave collector=%d rc=%d\n", collector, rc ));
267 
268  /* synchronize */
269  {
270 #pragma omp barrier
271  }
272  rc=_sion_opmicol_grc;
273  {
274 #pragma omp barrier
275  }
276  return rc;
277 }
278 #undef DFUNCTION
279 
280 
281 #define DFUNCTION "_ompi_process_scatter_cb"
282 int _sion_ompi_process_scatter_cb(void *outdata, sion_int64 *spec, int spec_len, sion_int64 fsblksize,
283  void *commdata, int collector, int range_start, int range_end, int sid,
284  int process_cb(void *,sion_int64 *, int ) ) {
285  int rc=SION_SUCCESS;
286  int t, startsignal=1, count, mrank, mt, tt, mcollector;
287  MPI_Status status;
288  char *p, *buffer;
289  void *helpdata;
290  void **outdatas;
291  sion_int64 **specs;
292  sion_int64 bytestorecv, bytestosend, datasize;
293  _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
294  MPI_Comm commgroup;
295  int rank=sapi->rank;
296 
297  DPRINTFP((256, DFUNCTION, rank, " input collector=%d range_start=%d range_end=%d sid=%d\n", collector,range_start,range_end, sid));
298 
299  /* STEP1: collect info on thread level: specs -> pointer to spec on thread, outdatas -> pointer to outdata on thread */
300 #pragma omp master
301  {
302  _sion_opmicol_grc=SION_STD_SUCCESS;
303 
304  helpdata = (void *) malloc(sapi->num_threads * sizeof(sion_int64 *));
305  if (helpdata == NULL) {
306  fprintf(stderr,"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %lu (tcounts), aborting ...\n",
307  (unsigned long) sapi->num_threads * sizeof(int *));
308  _sion_opmicol_grc=SION_STD_NOT_SUCCESS;
309  }
310  } /* omp master */
311 
312  /* share data ptr among threads, internal barrier */
313  specs = (sion_int64 **)__sion_ompicol_share_ptr((void *) helpdata);
314 
315  /* check return code from malloc */
316  if(_sion_opmicol_grc!=SION_STD_SUCCESS) return(_sion_opmicol_grc);
317 
318  /* store Info about spec */
319  specs[sapi->thread_num]= spec;
320 
321  /* synchronize */
322  {
323 #pragma omp barrier
324  }
325 
326 #pragma omp master
327  {
328  helpdata = (void *) malloc(sapi->num_threads * sizeof(void *));
329  if (helpdata == NULL) {
330  fprintf(stderr,"_sion_ompi_gathervr_cb: cannot allocate temporary memory of size %lu (tcounts), aborting ...\n",
331  (unsigned long) sapi->num_threads * sizeof(int *));
332  _sion_opmicol_grc=SION_STD_NOT_SUCCESS;
333  }
334  } /* omp master */
335 
336  /* share data ptr among threads, internal barrier */
337  outdatas = (void **)__sion_ompicol_share_ptr((void *) helpdata);
338 
339  /* check return code from malloc */
340  if(_sion_opmicol_grc!=SION_STD_SUCCESS) return(_sion_opmicol_grc);
341 
342  /* store info about spec */
343  outdatas[sapi->thread_num] = outdata;
344 
345  /* synchronize */
346  {
347 #pragma omp barrier
348  }
349 
350 
351  /* STEP2: proceed on MPI level, master threads have all needed info */
352 #pragma omp master
353  {
354  commgroup = sapi->comm;
355 
356 
357  if(rank == collector) {
358  /* its the collector */
359 
360  /* allocate buffer */
361  buffer = (char *) malloc(fsblksize * sizeof(char));
362  if (buffer == NULL) {
363  _sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"_ompi_gather_process_cb: cannot allocate temporary memory of size %lu (buffer), aborting ...\n",
364  (unsigned long) fsblksize * sizeof(char));
365  _sion_opmicol_grc=SION_STD_NOT_SUCCESS;
366  }
367 
368  mrank=_sion_map_rank_ompi_to_mpi(rank,sapi->num_threads);
369 
370  /* scan all other tasks */
371  for(t=range_start;t<=range_end;t++) {
372 
373  mt=_sion_map_rank_ompi_to_mpi(t,sapi->num_threads);
374 
375  if(mt==mrank) {
376 
377  /* thread is on same MPI rank */
378  tt=_sion_map_rank_ompi_to_thread_num(t,sapi->num_threads);
379 
380  /* process data directly */
381  _sion_opmicol_grc=process_cb(outdatas[tt],specs[tt], sid);
382 
383  } else {
384 
385  /* receive spec */
386  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector wait for spec from %d\n", t));
387  MPI_Recv(spec, spec_len, SION_MPI_INT64, mt, 1534, commgroup, &status);
388  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector got spec from %d (%lld,%lld)\n",
389  t, (long long) spec[0], (long long) spec[1]));
390 
391  /* send signal to send data */
392  if(spec[0]>=0) { /* sender waits for data */
393  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector send signal to %d\n", t));
394  MPI_Send(&startsignal, 1, MPI_INT, mt, 1535, commgroup);
395  }
396 
397  /* get and send data */
398  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector start to proces data of size %lld at offset %lld\n",
399  (long long) spec[1], (long long) spec[0]));
400 
401  bytestosend=spec[1];
402 
403  /* loop over data parts */
404  while(bytestosend>0) {
405 
406  if(bytestosend>fsblksize) datasize=fsblksize;
407  else datasize=bytestosend;
408 
409  spec[1]=datasize; /* adjust size */
410 
411  /* process data with callback function */
412  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector process data block of size %lld at pos %lld\n",
413  (long long) spec[1], (long long) spec[0]));
414 
415  _sion_opmicol_grc=process_cb(buffer,spec, sid);
416 
417  if(_sion_opmicol_grc != SION_STD_SUCCESS) {
418  _sion_errorprint(SION_STD_NOT_SUCCESS,_SION_ERROR_RETURN,"_ompi_gather_process_cb: problems writing data ...\n");
419  }
420 
421  /* send portion or all data */
422  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector send for data block to %d\n", mt));
423  MPI_Send(buffer, datasize, MPI_CHAR, mt, 1536, commgroup);
424  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector sent data block to %d datasize=%lld bytestorecv=%lld\n",
425  mt, (long long) datasize, (long long) bytestosend));
426 
427  /* advance counter */
428  bytestosend-=datasize;spec[0]+=datasize;
429 
430  } /* while */
431  } /* not on local MPI rank */
432  } /* for all tasks */
433 
434  /* remove buffer */
435  if (buffer) free(buffer);
436 
437  } else {
438  /* its a sender */
439 
440  mcollector=_sion_map_rank_ompi_to_mpi(collector,sapi->num_threads);
441 
442  /* send data for all threads on MPI rank */
443  for(tt=0;tt<sapi->num_threads;tt++) {
444 
445  /* send spec to collector */
446  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender send spec to %d (%lld,%lld)\n",
447  mcollector,(long long) specs[tt][0], (long long) specs[tt][1]));
448  MPI_Send(specs[tt], spec_len, SION_MPI_INT64, mcollector, 1534, commgroup);
449 
450  if(specs[tt][0]>0) { /* no error in sion_feof */
451 
452  /* wait for start signal */
453  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender wait for signal from %d\n", collector));
454  MPI_Recv(&startsignal, 1, MPI_INT, mcollector, 1535, commgroup, &status);
455  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender got signal from %d\n", collector));
456 
457  /* send data in chunks of fsblksize */
458  bytestorecv=specs[tt][1];
459  p=(char *) outdata;
460  while(bytestorecv>0) {
461  if(bytestorecv>fsblksize) datasize=fsblksize;
462  else datasize=bytestorecv;
463  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender recv data block from %d of size %lld\n", mcollector, (long long) datasize));
464  MPI_Recv(p, datasize, MPI_CHAR, mcollector, 1536, commgroup, &status);
465  MPI_Get_count(&status,MPI_CHAR,&count);
466 
467  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "sender recv data block from %d of size %lld (%d)\n", mcollector, (long long) datasize, count));
468  bytestorecv-=datasize;p+=datasize;
469 
470  } /* while */
471  } /* spec[0]>0 */
472  } /* for all threads */
473  } /* sender */
474 
475  } /* omp master */
476 
477 
478  /* synchronize */
479  {
480 #pragma omp barrier
481  }
482  rc=_sion_opmicol_grc;
483  {
484 #pragma omp barrier
485  }
486 
487  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "leave collector=%d rc=%d\n", collector, rc ));
488 
489  return rc;
490 }
491 #undef DFUNCTION
492 
493 
494  /* share in_ptr given on master with all other threads, return value is the shared ptr */
495 #define DFUNCTION "__sion_ompi_share_ptr"
496  void * __sion_ompicol_share_ptr(void * in_ptr) {
497  void *out_ptr;
498 
499 #pragma omp master
500  __ompicol_global_pointer = in_ptr;
501 
502 
503  {
504 #pragma omp barrier
505  }
506 
507  out_ptr=__ompicol_global_pointer;
508 
509  return(out_ptr);
510 
511  }
512 #undef DFUNCTION
513 
514 
515 #define DFUNCTION "_sion_ompi_get_capability_cb"
516 int _sion_ompi_get_capability_cb(void *commdata )
517 {
518  int rc=SION_CAPABILITY_NONE;
519  _ompi_api_commdata* sapi= (_ompi_api_commdata *) commdata;
520 
521  if(sapi->thread_num==0) {
523  DPRINTFP((256, DFUNCTION, sapi->rank, "FULL capability\n"));
524  } else {
526  DPRINTFP((256, DFUNCTION, sapi->rank, "ONLY SENDER capability\n"));
527  }
528  return rc;
529 }
530 #undef DFUNCTION
531 
532 
533  int _sion_ompicol_size_of_dtype(int dtype) {
534  switch (dtype) {
535  case _SION_INT32: return(sizeof(sion_int32)); break;
536  case _SION_INT64: return(sizeof(sion_int64)); break;
537  case _SION_CHAR: return(sizeof(char)); break;
538  default: return(sizeof(sion_int64));
539  }
540  }
541 
542  /* end of ifdef SION_OMPI */
543 #endif
544 
#define SION_CAPABILITY_FULL
Definition: sion_filedesc.h:50
#define SION_CAPABILITY_NONE
Definition: sion_filedesc.h:52
#define SION_CAPABILITY_ONLY_SENDER
Definition: sion_filedesc.h:51
int _sion_errorprint(int rc, int level, const char *format,...)
Internal SION error.
Sion Time Stamp Header.