SIONlib  1.6.2
Scalable I/O library for parallel access to task-local files
sion_ompi_internal_gen.c
Go to the documentation of this file.
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2016 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
14 #include <stdlib.h>
15 #include <stdio.h>
16 #include <stdarg.h>
17 #include <string.h>
18 #include <time.h>
19 
20 #include "mpi.h"
21 #include "omp.h"
22 
23 #if defined(_BGL)
24 #include <rts.h>
25 #ifndef __USE_FILE_OFFSET64
26 #define __USE_FILE_OFFSET64
27 #endif
28 #endif
29 
30 #include <sys/time.h>
31 
32 #include <sys/types.h>
33 #include <fcntl.h>
34 
35 #include <unistd.h>
36 
37 #include "sion.h"
38 #include "sion_debug.h"
39 #include "sion_internal.h"
40 #include "sion_fd.h"
41 #include "sion_filedesc.h"
42 #include "sion_printts.h"
43 
44 #include "sion_ompi_internal_gen.h"
45 
46 #ifdef SION_OMPI
47 
48 #include "sion_ompi.h"
49 
50 int _sion_gen_info_from_gcomm_ompi(int numFiles, MPI_Comm gComm, int *filenumber, int *lrank, int *lsize)
51 {
52  int gtasks, gRank;
53  int rc = SION_SUCCESS;
54  int task_per_file;
55 
56 
57  MPI_Comm_size(gComm, &gtasks);
58  MPI_Comm_rank(gComm, &gRank);
59  DPRINTFP((1, "_sion_get_info_from_splitted_comm_mpi", gRank, "enter: gcomm: %d of %d, numfiles=%d\n",
60  gRank,gtasks,numFiles));
61 
62  if (gtasks < numFiles) {
63  return(_sion_errorprint_ompi(SION_NOT_SUCCESS,_SION_ERROR_RETURN,
64  "_sion_gen_info_from_gcomm_mpi: Number of files(%d) cannot be bigger the the number of tasks(%d), aborting...\n",
65  numFiles, gtasks));
66  }
67 
68  task_per_file = gtasks / numFiles;
69 
70  /* remaining tasks are added to last communicator */
71  if (gRank >= (numFiles * task_per_file)) {
72  *filenumber = numFiles - 1;
73  }
74  else {
75  *filenumber = gRank / task_per_file;
76  }
77 
78  if(*filenumber == numFiles - 1) {
79  *lrank=gRank-(numFiles - 1)*task_per_file;
80  *lsize=gtasks-(numFiles - 1)*task_per_file;
81  } else {
82  *lrank=gRank%task_per_file;
83  *lsize=task_per_file;
84  }
85 
86  DPRINTFP((1, "_sion_get_info_from_splitted_comm_mpi", gRank, "Global communicator divided in %d local communicators (%d: %d of %d)\n",
87  numFiles,*filenumber,*lrank,*lsize));
88 
89  return (rc);
90 }
91 
92 int _sion_get_info_from_splitted_comm_ompi(MPI_Comm gComm, MPI_Comm lComm, int *numComm, int *CommNumber, int *lrank, int *lsize) {
93  int gSize, gRank, lSize, lRank;
94  int *allRanks=NULL, *allSizes=NULL, i, ntasks;
95  int ncomms, color;
96  int rc = SION_SUCCESS;
97  MPI_Status status;
98 
99  MPI_Comm_size(gComm, &gSize);
100  MPI_Comm_rank(gComm, &gRank);
101 
102  if (lComm != MPI_COMM_NULL) {
103  MPI_Comm_size(lComm, &lSize);
104  MPI_Comm_rank(lComm, &lRank);
105  }
106  else {
107  lSize = gSize;
108  lRank = gRank;
109  }
110 
111  DPRINTFP((32, "_sion_get_info_from_splitted_comm_mpi", gRank, "lRank: %d\n", lRank));
112 
113  if (gRank == 0) {
114 
115  allRanks = (int *) malloc(gSize * sizeof(int));
116  allSizes = (int *) malloc(gSize * sizeof(int));
117  if ((allRanks == NULL) || (allSizes == NULL)) {
118  return(_sion_errorprint_ompi(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"_sion_get_info_from_splitted_comm_mpi: Cannot allocate temp arrays of size %lu\n", (unsigned long) gSize * sizeof(int)));
119  }
120  }
121 
122  /* Gather all local ranks to task 0 for counting.. */
123  MPI_Gather(&lRank, 1, MPI_INT, allRanks, 1, MPI_INT, 0, gComm);
124  MPI_Gather(&lSize, 1, MPI_INT, allSizes, 1, MPI_INT, 0, gComm);
125 
126  ncomms = 0;
127  ntasks = 0;
128  if (gRank == 0) {
129  for (i = 0; i < gSize; i++) {
130  if (allRanks[i] == 0) {
131  /* Use the current number of the communicator for the file suffix!! */
132  if (i != 0) {
133  MPI_Send(&ncomms, 1, MPI_INT, i, i, gComm);
134  DPRINTFP((32, "sion_paropen_comms_mpi", gRank, "Sent ncomms=%05d to %d with TAG %d\n", ncomms, i, i));
135  }
136  else {
137  /* it's my self */
138  color = ncomms;
139  }
140  ncomms++;
141  ntasks += allSizes[i];
142  }
143  }
144  }
145  /* Recv the num of communicator from global 0 =>used for the suffix */
146  if ((lRank == 0) && (gRank != 0)) {
147  MPI_Recv(&color, 1, MPI_INT, 0, gRank, gComm, &status);
148  DPRINTFP((32, "sion_paropen_comms_mpi", gRank, "Received ncomms=%05d from %d with TAG %d\n", color, status.MPI_SOURCE, status.MPI_TAG));
149 
150  }
151 
152  MPI_Bcast(&ncomms, 1, MPI_INT, 0, gComm);
153  MPI_Bcast(&ntasks, 1, MPI_INT, 0, gComm);
154 
155 
156  DPRINTFP((32, "_sion_get_info_from_splitted_comm_mpi", gRank, "#Comms=%05d #Tasks=%05d #TotalTasks=%05d\n", ncomms, ntasks, gSize));
157 
158  if (lComm != MPI_COMM_NULL) {
159  MPI_Bcast(&color, 1, MPI_INT, 0, lComm);
160  }
161 
162  if (gRank == 0) {
163  if(allRanks) free(allRanks);
164  if(allSizes) free(allSizes);
165  }
166  /* return parameters */
167  *numComm = ncomms;
168  *CommNumber = color;
169  *lrank = lRank;
170  *lsize = lSize;
171 
172  return (rc);
173 }
174 
175 int _sion_errorprint_ompi(int rc, int level, const char *format, ...)
176 {
177  int rank=-1;
178  va_list ap;
179  int rankselect=-1;
180  const char *t;
181 
182 
183  int thread_num = omp_get_thread_num();
184  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
185 
186  t = _sion_getenv("SION_ERROR_MSG_RANK");
187  if (t) rankselect = atoi(t);
188 
189  switch (level) {
190  case _SION_ERROR_RETURN:
191  if((rankselect<0) || (rankselect==rank)) {
192  fprintf(stderr,"SION_ERROR_RETURN on MPI rank %d, OMP thread %d, rc=%d: ",rank, thread_num,rc);
193  va_start(ap, format);
194  vfprintf(stderr, format, ap);
195  va_end(ap);
196  fprintf(stderr,"\n");
197  }
198  return (rc);
199  break;
200  case _SION_ERROR_ABORT:
201  if((rankselect<0) || (rankselect==rank)) {
202  fprintf(stderr,"SION_ERROR_ABORT on MPI rank %d, OMP thread %d, rc=%d: ",rank, thread_num,rc);
203  va_start(ap, format);
204  vfprintf(stderr, format, ap);
205  va_end(ap);
206  fprintf(stderr,"\n");
207  }
208  exit (rc);
209  break;
210  default:
211  if((rankselect<0) || (rankselect==rank)) {
212 
213  fprintf(stderr,"SION_ERROR_UNKNOWN on MPI rank %d, OMP thread %d, rc=%d: ",rank, thread_num,rc);
214  va_start(ap, format);
215  vfprintf(stderr, format, ap);
216  va_end(ap);
217  fprintf(stderr,"\n");
218  }
219  return (rc);
220  }
221  return (rc);
222 }
223 
224 
225 /* Help Functions */
226 
227 int _sion_map_rank_ompi_to_mpi(int ompi_rank, int num_threads) {
228  int mpi_rank = -1;
229 
230  mpi_rank = (int) (ompi_rank / num_threads);
231 
232  return mpi_rank;
233 }
234 
235 int _sion_map_rank_ompi_to_thread_num(int ompi_rank, int num_threads) {
236  int thread_num = -1;
237 
238  thread_num = (int) (ompi_rank % num_threads);
239 
240  return thread_num;
241 }
242 
243 
244 int _sion_map_rank_mpi_to_ompi(int mpi_rank, int num_threads, int thread_num) {
245  int ompi_rank = -1;
246 
247  ompi_rank = mpi_rank * num_threads + thread_num;
248 
249  return ompi_rank;
250 }
251 
252 int _sion_get_size_ompi(int mpi_size, int num_threads) {
253  int ompi_size = -1;
254 
255  ompi_size = mpi_size * num_threads;
256 
257  return ompi_size;
258 }
259 
260 
261 /* end of ifdef OMPI */
262 #endif
char * _sion_getenv(const char *name)
Sion Time Stamp Header.