SIONlib  1.6.2
Scalable I/O library for parallel access to task-local files
sion_mpi_internal_gen.c
Go to the documentation of this file.
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2016 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
14 #include <stdlib.h>
15 #include <stdio.h>
16 #include <stdarg.h>
17 #include <string.h>
18 #include <time.h>
19 
20 #include "mpi.h"
21 
22 #if defined(_BGL)
23 #include <rts.h>
24 #ifndef __USE_FILE_OFFSET64
25 #define __USE_FILE_OFFSET64
26 #endif
27 #endif
28 
29 #include <sys/time.h>
30 
31 #include <sys/types.h>
32 #include <fcntl.h>
33 
34 #include <unistd.h>
35 
36 #include "sion.h"
37 #include "sion_debug.h"
38 #include "sion_internal.h"
39 #include "sion_fd.h"
40 #include "sion_filedesc.h"
41 #include "sion_printts.h"
42 
43 #include "sion_mpi_internal_gen.h"
44 
45 
46 #ifdef SION_MPI
47 
48 
49 int _sion_errorprint_mpi(int rc, int level, const char *format, ...)
50 {
51  int rank=-1;
52  va_list ap;
53  int rankselect=-1;
54  const char *t;
55 
56 #ifdef SION_MPI
57  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
58 #endif
59 
60 
61  t = _sion_getenv("SION_ERROR_MSG_RANK");
62  if (t) rankselect = atoi(t);
63 
64  switch (level) {
65  case _SION_ERROR_RETURN:
66  if((rankselect<0) || (rankselect==rank)) {
67  fprintf(stderr,"SION_ERROR_RETURN on rank %d, rc=%d: ",rank,rc);
68  va_start(ap, format);
69  vfprintf(stderr, format, ap);
70  va_end(ap);
71  fprintf(stderr,"\n");
72  }
73  return (rc);
74  break;
75  case _SION_ERROR_ABORT:
76  if((rankselect<0) || (rankselect==rank)) {
77  fprintf(stderr,"SION_ERROR_ABORT on rank %d, rc=%d: ",rank,rc);
78  va_start(ap, format);
79  vfprintf(stderr, format, ap);
80  va_end(ap);
81  fprintf(stderr,"\n");
82  }
83  exit (rc);
84  break;
85  default:
86  if((rankselect<0) || (rankselect==rank)) {
87 
88  fprintf(stderr,"SION_ERROR_UNKNOWN on rank %d, rc=%d: ",rank,rc);
89  va_start(ap, format);
90  vfprintf(stderr, format, ap);
91  va_end(ap);
92  fprintf(stderr,"\n");
93  }
94  return (rc);
95  }
96  return (rc);
97 }
98 
99 
100 /* Help Functions */
101 
112 int _sion_gen_info_from_gcomm_mpi(int numFiles, MPI_Comm gComm, int *filenumber, int *lrank, int *lsize)
113 {
114  int gtasks, gRank;
115  int rc = SION_SUCCESS;
116  int task_per_file;
117 
118 
119  MPI_Comm_size(gComm, &gtasks);
120  MPI_Comm_rank(gComm, &gRank);
121 
122  if (gtasks < numFiles) {
123  return(_sion_errorprint_mpi(SION_NOT_SUCCESS,_SION_ERROR_RETURN,
124  "_sion_gen_info_from_gcomm_mpi: Number of files(%d) cannot be bigger the the number of tasks(%d), aborting...\n",
125  numFiles, gtasks));
126  }
127 
128  task_per_file = gtasks / numFiles;
129 
130  /* remaining tasks are added to last communicator */
131  if (gRank >= (numFiles * task_per_file)) {
132  *filenumber = numFiles - 1;
133  }
134  else {
135  *filenumber = gRank / task_per_file;
136  }
137 
138  if(*filenumber == numFiles - 1) {
139  *lrank=gRank-(numFiles - 1)*task_per_file;
140  *lsize=gtasks-(numFiles - 1)*task_per_file;
141  } else {
142  *lrank=gRank%task_per_file;
143  *lsize=task_per_file;
144  }
145 
146  DPRINTFP((1, "_sion_get_info_from_splitted_comm_mpi", gRank, "Global communicator divided in %d local communicators (%f: %d of %d)\n",
147  numFiles,*filenumber,*lrank,*lsize));
148 
149  return (rc);
150 }
151 
152 int _sion_get_info_from_splitted_comm_mpi(MPI_Comm gComm, MPI_Comm lComm, int *numComm, int *CommNumber, int *lrank, int *lsize) {
153  int gSize, gRank, lSize, lRank;
154  int *allRanks=NULL, *allSizes=NULL, i, ntasks;
155  int ncomms, color;
156  int rc = SION_SUCCESS;
157  MPI_Status status;
158 
159  MPI_Comm_size(gComm, &gSize);
160  MPI_Comm_rank(gComm, &gRank);
161 
162  if (lComm != MPI_COMM_NULL) {
163  MPI_Comm_size(lComm, &lSize);
164  MPI_Comm_rank(lComm, &lRank);
165  }
166  else {
167  lSize = gSize;
168  lRank = gRank;
169  }
170 
171  DPRINTFP((32, "_sion_get_info_from_splitted_comm_mpi", gRank, "lRank: %d\n", lRank));
172 
173  if (gRank == 0) {
174 
175  allRanks = (int *) malloc(gSize * sizeof(int));
176  allSizes = (int *) malloc(gSize * sizeof(int));
177  if ((allRanks == NULL) || (allSizes == NULL)) {
178  return(_sion_errorprint_mpi(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"_sion_get_info_from_splitted_comm_mpi: Cannot allocate temp arrays of size %lu\n", (unsigned long) gSize * sizeof(int)));
179  }
180  }
181 
182  /* Gather all local ranks to task 0 for counting.. */
183  MPI_Gather(&lRank, 1, MPI_INT, allRanks, 1, MPI_INT, 0, gComm);
184  MPI_Gather(&lSize, 1, MPI_INT, allSizes, 1, MPI_INT, 0, gComm);
185 
186  ncomms = 0;
187  ntasks = 0;
188  if (gRank == 0) {
189  for (i = 0; i < gSize; i++) {
190  if (allRanks[i] == 0) {
191  /* Use the current number of the communicator for the file suffix!! */
192  if (i != 0) {
193  MPI_Send(&ncomms, 1, MPI_INT, i, i, gComm);
194  DPRINTFP((32, "sion_paropen_comms_mpi", gRank, "Sent ncomms=%05d to %d with TAG %d\n", ncomms, i, i));
195  }
196  else {
197  /* it's my self */
198  color = ncomms;
199  }
200  ncomms++;
201  ntasks += allSizes[i];
202  }
203  }
204  }
205  /* Recv the num of communicator from global 0 =>used for the suffix */
206  if ((lRank == 0) && (gRank != 0)) {
207  MPI_Recv(&color, 1, MPI_INT, 0, gRank, gComm, &status);
208  DPRINTFP((32, "sion_paropen_comms_mpi", gRank, "Received ncomms=%05d from %d with TAG %d\n", color, status.MPI_SOURCE, status.MPI_TAG));
209 
210  }
211 
212  MPI_Bcast(&ncomms, 1, MPI_INT, 0, gComm);
213  MPI_Bcast(&ntasks, 1, MPI_INT, 0, gComm);
214 
215 
216  DPRINTFP((32, "_sion_get_info_from_splitted_comm_mpi", gRank, "#Comms=%05d #Tasks=%05d #TotalTasks=%05d\n", ncomms, ntasks, gSize));
217 
218  if (lComm != MPI_COMM_NULL) {
219  MPI_Bcast(&color, 1, MPI_INT, 0, lComm);
220  }
221 
222  if (gRank == 0) {
223  if(allRanks) free(allRanks);
224  if(allSizes) free(allSizes);
225  }
226  /* return parameters */
227  *numComm = ncomms;
228  *CommNumber = color;
229  *lrank = lRank;
230  *lsize = lSize;
231 
232  return (rc);
233 }
234 
235 int _sion_get_numfiles_from_file_mpi(char *fname) {
236 
237  int rank, sid;
238  FILE *fileptr;
239  int numfiles = SION_SIZE_NOT_VALID;
240  sion_int64 chunksize;
241  sion_int32 fsblksize;
242 
243  rank = 0;
244  sid = _sion_open_rank(fname, "br", &chunksize, &fsblksize, &rank, &fileptr);
245 
246  numfiles = sion_get_number_of_files(sid);
247 
248  _sion_close_sid(sid);
249 
250  DPRINTFP((1, "_sion_get_numfiles_from_file_mpi", _SION_DEFAULT_RANK, "sion file %s has %d files\n", fname, numfiles));
251 
252  return (numfiles);
253 }
254 
255 int _sion_get_filenumber_from_files_mpi(char *fname, MPI_Comm gComm, int *numfiles, int *filenumber, int *lRank) {
256 
257  int sid, gSize, gRank, ntasks, nfiles;
258  int rc = SION_SUCCESS;
259  FILE *fileptr;
260  sion_int64 *chunksizes;
261  sion_int32 fsblksize;
262  int *globalranks;
263  int mapping_size;
264  sion_int32 *mapping;
265  sion_int32 lpos[2];
266 
267  MPI_Comm_size(gComm, &gSize);
268  MPI_Comm_rank(gComm, &gRank);
269 
270  DPRINTFP((1, "_sion_get_filenumber_from_files_mpi", gRank, "before open\n"));
271  if(gRank == 0) {
272  /* open and get mapping of sion file */
273  chunksizes=NULL;globalranks=NULL;
274  sid=_sion_open_read(fname,_SION_FMODE_READ|_SION_FMODE_ANSI,_SION_READ_MASTER_ONLY_OF_MULTI_FILES,
275  &ntasks,&nfiles,&chunksizes,&fsblksize,&globalranks,&fileptr);
276  /* sid = sion_open(fname, "br", &ntasks, &nfiles, &chunksizes, &fsblksize, &globalranks, &fileptr); */
277  if(sid>=0) {
278  DPRINTFP((1, "_sion_get_filenumber_from_files_mpi", gRank, "after open\n"));
279  rc=sion_get_mapping(sid,&mapping_size,&mapping,numfiles);
280  DPRINTFP((1, "_sion_get_filenumber_from_files_mpi", gRank, "sion file %d files rc=%d\n", *numfiles, rc));
281  } else {
282  *numfiles=-1;
283  }
284  }
285 
286  /* each task has to know if more than file was used in sion file */
287  MPI_Bcast(numfiles, 1, MPI_INT, 0, gComm);
288 
289  if((gRank == 0) && (*numfiles>1)) {
290  if(mapping_size!=gSize) {
291  return(_sion_errorprint_mpi(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"_sion_get_filenumber_from_files_mpi: Incorrect sum of ntasks of files %d <> %d\n", mapping_size, gSize));
292  }
293  }
294 
295  if(*numfiles<0) {
296  return(_sion_errorprint_mpi(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"_sion_get_filenumber_from_files_mpi: could not get numfiles from sion file\n"));
297  }
298 
299  if(*numfiles>1) {
300  DPRINTFP((1, "_sion_get_filenumber_from_files_mpi", gRank, "before scatter\n"));
301  MPI_Scatter(mapping, 2, SION_MPI_INT32, lpos, 2, SION_MPI_INT32, 0, gComm);
302  *filenumber=lpos[0];
303  *lRank =lpos[1];
304  DPRINTFP((1, "_sion_get_filenumber_from_files_mpi", gRank, "after scatter\n"));
305  } else {
306  *filenumber=0;
307  *lRank =gRank;
308  DPRINTFP((1, "_sion_get_filenumber_from_files_mpi", gRank, "only one file -> filenumber=%d lRank=%d\n",*filenumber,*lRank));
309  }
310 
311  if(gRank == 0) {
312  /* frees also mapping vector */
313  if (sid>=0) _sion_close_sid(sid);
314  }
315 
316  DPRINTFP((1, "_sion_get_filenumber_from_files_mpi", gRank, "close rc=%d\n",rc));
317  return(rc);
318 }
319 
320 /* end of ifdef MPI */
321 #endif
int sion_get_mapping(int sid, int *mapping_size, sion_int32 **mapping, int *numfiles)
Returns pointers to the internal field mapping.
Definition: sion_common.c:213
char * _sion_getenv(const char *name)
int _sion_gen_info_from_gcomm_mpi(int numFiles, MPI_Comm gComm, int *filenumber, int *lrank, int *lsize)
Splits a Communicator in numfiles different communicators.
Sion Time Stamp Header.
int _sion_open_read(const char *fname, sion_int64 file_mode_flags, int read_all, int *ntasks, int *nfiles, sion_int64 **chunksizes, sion_int32 *fsblksize, int **globalranks, FILE **fileptr)
internal sion serial open function for reading on one or more files