SIONlib  1.7.0
Scalable I/O library for parallel access to task-local files
sion_mpi_internal_gen.c
Go to the documentation of this file.
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2016 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
14 #include <stdlib.h>
15 #include <stdio.h>
16 #include <stdarg.h>
17 #include <string.h>
18 #include <time.h>
19 
20 #include "mpi.h"
21 
22 #if defined(_BGL)
23 #include <rts.h>
24 #ifndef __USE_FILE_OFFSET64
25 #define __USE_FILE_OFFSET64
26 #endif
27 #endif
28 
29 #include <sys/time.h>
30 
31 #include <sys/types.h>
32 #include <fcntl.h>
33 
34 #include <unistd.h>
35 
36 #include "sion.h"
37 #include "sion_debug.h"
38 #include "sion_error_handler.h"
39 #include "sion_internal.h"
40 #include "sion_fd.h"
41 #include "sion_filedesc.h"
42 #include "sion_printts.h"
43 
44 #include "sion_mpi_internal_gen.h"
45 
46 
47 #ifdef SION_MPI
48 
49 
50 int _sion_errorprint_mpi(int rc, int level, const char *format, ...)
51 {
52  int rank=-1, thread=-1;
53  va_list ap;
54 
55 #ifdef SION_MPI
56  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
57 #endif
58 
59  va_start(ap, format);
60  rc=__sion_errorprint_vargs(rc, level, rank, thread, format, ap);
61  va_end(ap);
62 
63  return (rc);
64 }
65 
66 
67 /* Help Functions */
68 
79 int _sion_gen_info_from_gcomm_mpi(int numFiles, MPI_Comm gComm, int *filenumber, int *lrank, int *lsize)
80 {
81  int gtasks, gRank;
82  int rc = SION_SUCCESS;
83  int task_per_file;
84 
85 
86  MPI_Comm_size(gComm, &gtasks);
87  MPI_Comm_rank(gComm, &gRank);
88 
89  if (gtasks < numFiles) {
90  return(_sion_errorprint_mpi(SION_NOT_SUCCESS,_SION_ERROR_RETURN,
91  "_sion_gen_info_from_gcomm_mpi: Number of files(%d) cannot be bigger the the number of tasks(%d), aborting...\n",
92  numFiles, gtasks));
93  }
94 
95  task_per_file = gtasks / numFiles;
96 
97  /* remaining tasks are added to last communicator */
98  if (gRank >= (numFiles * task_per_file)) {
99  *filenumber = numFiles - 1;
100  }
101  else {
102  *filenumber = gRank / task_per_file;
103  }
104 
105  if(*filenumber == numFiles - 1) {
106  *lrank=gRank-(numFiles - 1)*task_per_file;
107  *lsize=gtasks-(numFiles - 1)*task_per_file;
108  } else {
109  *lrank=gRank%task_per_file;
110  *lsize=task_per_file;
111  }
112 
113  DPRINTFP((1, "_sion_get_info_from_splitted_comm_mpi", gRank, "Global communicator divided in %d local communicators (%f: %d of %d)\n",
114  numFiles,*filenumber,*lrank,*lsize));
115 
116  return (rc);
117 }
118 
119 int _sion_get_info_from_splitted_comm_mpi(MPI_Comm gComm, MPI_Comm lComm, int *numComm, int *CommNumber, int *lrank, int *lsize) {
120  int gSize, gRank, lSize, lRank;
121  int *allRanks=NULL, *allSizes=NULL, i, ntasks;
122  int ncomms, color = 0;
123  int rc = SION_SUCCESS;
124  MPI_Status status;
125 
126  MPI_Comm_size(gComm, &gSize);
127  MPI_Comm_rank(gComm, &gRank);
128 
129  if (lComm != MPI_COMM_NULL) {
130  MPI_Comm_size(lComm, &lSize);
131  MPI_Comm_rank(lComm, &lRank);
132  }
133  else {
134  lSize = gSize;
135  lRank = gRank;
136  }
137 
138  DPRINTFP((32, "_sion_get_info_from_splitted_comm_mpi", gRank, "lRank: %d\n", lRank));
139 
140  if (gRank == 0) {
141 
142  allRanks = (int *) malloc(gSize * sizeof(int));
143  allSizes = (int *) malloc(gSize * sizeof(int));
144  if ((allRanks == NULL) || (allSizes == NULL)) {
145  free(allRanks);
146  free(allSizes);
147  return(_sion_errorprint_mpi(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"_sion_get_info_from_splitted_comm_mpi: Cannot allocate temp arrays of size %lu\n", (unsigned long) gSize * sizeof(int)));
148  }
149  }
150 
151  /* Gather all local ranks to task 0 for counting.. */
152  MPI_Gather(&lRank, 1, MPI_INT, allRanks, 1, MPI_INT, 0, gComm);
153  MPI_Gather(&lSize, 1, MPI_INT, allSizes, 1, MPI_INT, 0, gComm);
154 
155  ncomms = 0;
156  ntasks = 0;
157  if (gRank == 0) {
158  for (i = 0; i < gSize; i++) {
159  if (allRanks[i] == 0) {
160  /* Use the current number of the communicator for the file suffix!! */
161  if (i != 0) {
162  MPI_Send(&ncomms, 1, MPI_INT, i, i, gComm);
163  DPRINTFP((32, "sion_paropen_comms_mpi", gRank, "Sent ncomms=%05d to %d with TAG %d\n", ncomms, i, i));
164  }
165  else {
166  /* it's my self */
167  color = ncomms;
168  }
169  ncomms++;
170  ntasks += allSizes[i];
171  }
172  }
173  }
174  /* Recv the num of communicator from global 0 =>used for the suffix */
175  if ((lRank == 0) && (gRank != 0)) {
176  MPI_Recv(&color, 1, MPI_INT, 0, gRank, gComm, &status);
177  DPRINTFP((32, "sion_paropen_comms_mpi", gRank, "Received ncomms=%05d from %d with TAG %d\n", color, status.MPI_SOURCE, status.MPI_TAG));
178 
179  }
180 
181  MPI_Bcast(&ncomms, 1, MPI_INT, 0, gComm);
182  MPI_Bcast(&ntasks, 1, MPI_INT, 0, gComm);
183 
184 
185  DPRINTFP((32, "_sion_get_info_from_splitted_comm_mpi", gRank, "#Comms=%05d #Tasks=%05d #TotalTasks=%05d\n", ncomms, ntasks, gSize));
186 
187  if (lComm != MPI_COMM_NULL) {
188  MPI_Bcast(&color, 1, MPI_INT, 0, lComm);
189  }
190 
191  if (gRank == 0) {
192  if(allRanks) free(allRanks);
193  if(allSizes) free(allSizes);
194  }
195  /* return parameters */
196  *numComm = ncomms;
197  *CommNumber = color;
198  *lrank = lRank;
199  *lsize = lSize;
200 
201  return (rc);
202 }
203 
204 int _sion_get_numfiles_from_file_mpi(char *fname) {
205 
206  int rank, sid;
207  FILE *fileptr;
208  int numfiles = 0;
209  sion_int64 chunksize;
210  sion_int32 fsblksize;
211 
212  rank = 0;
213  sid = _sion_open_rank(fname, "br", &chunksize, &fsblksize, &rank, &fileptr);
214 
215  numfiles = sion_get_number_of_files(sid);
216 
217  _sion_close_sid(sid);
218 
219  DPRINTFP((1, "_sion_get_numfiles_from_file_mpi", _SION_DEFAULT_RANK, "sion file %s has %d files\n", fname, numfiles));
220 
221  return (numfiles);
222 }
223 
224 int _sion_get_filenumber_from_files_mpi(char *fname, MPI_Comm gComm, int *numfiles, int *filenumber, int *lRank) {
225 
226  int sid, gSize, gRank, ntasks, nfiles;
227  int rc = SION_SUCCESS;
228  FILE *fileptr;
229  sion_int64 *chunksizes;
230  sion_int32 fsblksize;
231  int *globalranks;
232  int mapping_size = 0;
233  sion_int32 *mapping = NULL;
234  sion_int32 lpos[2];
235 
236  MPI_Comm_size(gComm, &gSize);
237  MPI_Comm_rank(gComm, &gRank);
238 
239  DPRINTFP((1, "_sion_get_filenumber_from_files_mpi", gRank, "before open\n"));
240  if(gRank == 0) {
241  /* open and get mapping of sion file */
242  chunksizes=NULL;globalranks=NULL;
243  sid=_sion_open_read(fname,_SION_FMODE_READ|_SION_FMODE_ANSI,_SION_READ_MASTER_ONLY_OF_MULTI_FILES,
244  &ntasks,&nfiles,&chunksizes,&fsblksize,&globalranks,&fileptr);
245  /* sid = sion_open(fname, "br", &ntasks, &nfiles, &chunksizes, &fsblksize, &globalranks, &fileptr); */
246  if(sid>=0) {
247  DPRINTFP((1, "_sion_get_filenumber_from_files_mpi", gRank, "after open\n"));
248  rc=sion_get_mapping(sid,&mapping_size,&mapping,numfiles);
249  DPRINTFP((1, "_sion_get_filenumber_from_files_mpi", gRank, "sion file %d files rc=%d\n", *numfiles, rc));
250  } else {
251  *numfiles=-1;
252  }
253  }
254 
255  /* each task has to know if more than file was used in sion file */
256  MPI_Bcast(numfiles, 1, MPI_INT, 0, gComm);
257 
258  if((gRank == 0) && (*numfiles>1)) {
259  if(mapping_size!=gSize) {
260  return(_sion_errorprint_mpi(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"_sion_get_filenumber_from_files_mpi: Incorrect sum of ntasks of files %d <> %d\n", mapping_size, gSize));
261  }
262  }
263 
264  if(*numfiles<0) {
265  return(_sion_errorprint_mpi(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"_sion_get_filenumber_from_files_mpi: could not get numfiles from sion file\n"));
266  }
267 
268  if(*numfiles>1) {
269  DPRINTFP((1, "_sion_get_filenumber_from_files_mpi", gRank, "before scatter\n"));
270  MPI_Scatter(mapping, 2, SION_MPI_INT32, lpos, 2, SION_MPI_INT32, 0, gComm);
271  *filenumber=lpos[0];
272  *lRank =lpos[1];
273  DPRINTFP((1, "_sion_get_filenumber_from_files_mpi", gRank, "after scatter\n"));
274  } else {
275  *filenumber=0;
276  *lRank =gRank;
277  DPRINTFP((1, "_sion_get_filenumber_from_files_mpi", gRank, "only one file -> filenumber=%d lRank=%d\n",*filenumber,*lRank));
278  }
279 
280  if(gRank == 0) {
281  /* frees also mapping vector */
282  if (sid>=0) _sion_close_sid(sid);
283  }
284 
285  DPRINTFP((1, "_sion_get_filenumber_from_files_mpi", gRank, "close rc=%d\n",rc));
286  return(rc);
287 }
288 
289 /* end of ifdef MPI */
290 #endif
int sion_get_mapping(int sid, int *mapping_size, sion_int32 **mapping, int *numfiles)
Returns pointers to the internal field mapping.
Definition: sion_common.c:221
int _sion_gen_info_from_gcomm_mpi(int numFiles, MPI_Comm gComm, int *filenumber, int *lrank, int *lsize)
Splits a Communicator in numfiles different communicators.
Sion Time Stamp Header.
int _sion_open_read(const char *fname, sion_int64 file_mode_flags, int read_all, int *ntasks, int *nfiles, sion_int64 **chunksizes, sion_int32 *fsblksize, int **globalranks, FILE **fileptr)
internal sion serial open function for reading on one or more files