SIONlib  1.7.7
Scalable I/O library for parallel access to task-local files
sion_generic_collective_merge.c
Go to the documentation of this file.
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2019 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
14 #define _XOPEN_SOURCE 700
15 
16 #include <stdlib.h>
17 #include <stdio.h>
18 #include <stdarg.h>
19 #include <string.h>
20 #include <time.h>
21 
22 #include <sys/time.h>
23 
24 #include <sys/types.h>
25 #include <fcntl.h>
26 
27 #include <unistd.h>
28 
29 #include "sion.h"
30 #include "sion_debug.h"
31 #include "sion_error_handler.h"
32 #include "sion_internal.h"
33 #include "sion_fd.h"
34 #include "sion_filedesc.h"
35 #include "sion_printts.h"
36 #include "sion_generic_apidesc.h"
38 
39 /* callback functions */
40 int _sion_generic_collective_process_write_merge( const void *data, sion_int64 *spec, int sid );
41 
42 
43 /* collective I/O */
44 #define DFUNCTION "_sion_coll_fwrite_merge"
45 size_t _sion_coll_fwrite_merge(const void *data, size_t size, size_t nitems, int sid) {
46  _sion_filedesc *sion_filedesc;
47  _sion_generic_gendata *sion_gendata;
48  _sion_generic_apidesc *sion_apidesc;
49  sion_int64 bwrote=0, spec[2];
50  int rc_own=SION_STD_SUCCESS,rc_cb=SION_STD_SUCCESS;
51  int collector, firstsender, lastsender;
52 
53  if ( (sid<0) || (_sion_vcdtype(sid) != SION_FILEDESCRIPTOR) || !(sion_filedesc = _sion_vcdtovcon(sid))) {
54  return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"sion_coll_fwrite: invalid sion_filedesc %d", sid));
55  }
56  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "enter usecoll=%d collector=%d collsize=%d (%d tasks, %d files)\n",
57  sion_filedesc->usecoll, sion_filedesc->collector, sion_filedesc->collsize, sion_filedesc->ntasks,sion_filedesc->nfiles));
58 
59  sion_gendata=sion_filedesc->dataptr;
60  sion_apidesc=sion_gendata->apidesc;
61 
62  /* needed for avoiding subsequent non-collective calls */
63  sion_filedesc->collcmdused=1;
64 
65  /* check collsize */
66  if (sion_filedesc->collsize<=0) {
67  return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"sion_coll_fwrite: collsize=%d <= 0, returning ...\n", (int) sion_filedesc->collsize));
68  }
69 
70  /* parameter of callback function */
71  collector=(int) sion_filedesc->collector;
72  firstsender=collector+1;
73  lastsender=sion_filedesc->rank+sion_filedesc->collsize-1;
74  if(lastsender>sion_filedesc->ntasks) lastsender=sion_filedesc->ntasks-1;
75 
76  spec[0]=-2; /* signaling that data should be written in chunk of collector */
77  spec[1]=size*nitems;
78 
79  /* write own part */
80  if(sion_filedesc->rank == sion_filedesc->collector) {
81  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "COLLECTOR write data of size %ld at current position\n", (long) spec[1]));
82  rc_own=_sion_generic_collective_process_write_merge(data,spec,sid);
83  } else {
84  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "SENDER send data of size %ld at current position\n", (long) spec[1]));
85  }
86 
87 
88  /* collect and write parts of sender tasks via callback function */
89  if(!sion_apidesc->gather_execute_cb ) {
90  return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"sion_coll_fwrite: API %s not correctly initalized, collective I/O calls missing, aborting",sion_apidesc->name));
91  }
92  rc_cb=sion_apidesc->gather_execute_cb(data,spec,2, sion_filedesc->fsblksize,
93  sion_gendata->comm_data_local,collector,firstsender,lastsender,sid,
94  _sion_generic_collective_process_write_merge);
95 
96  /* return code */
97  if( (rc_own == SION_STD_SUCCESS) && (rc_cb == SION_STD_SUCCESS) ) {
98  bwrote=size*nitems;
99  } else {
100  bwrote=0;
101  }
102 
103  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "leave usecoll=%d collector=%d collsize=%d (%d tasks, %d files) rc=%d\n",
104  sion_filedesc->usecoll, sion_filedesc->collector, sion_filedesc->collsize, sion_filedesc->ntasks,sion_filedesc->nfiles,bwrote));
105 
106  return(bwrote);
107 }
108 #undef DFUNCTION
109 
110 
111 #define DFUNCTION "_sion_generic_collective_process_write_merge"
112 int _sion_generic_collective_process_write_merge( const void *data, sion_int64 *spec, int sid ) {
113  _sion_filedesc *sion_filedesc;
114  int rc=SION_STD_SUCCESS;
115  sion_int64 bwrote=0, bytestowrite;
116  if ( (sid<0) || (_sion_vcdtype(sid) != SION_FILEDESCRIPTOR) || !(sion_filedesc = _sion_vcdtovcon(sid))) {
117  return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"_sion_generic_collective_process_write: invalid sion_filedesc %d", sid));
118  }
119  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "enter spec[0]=%d spec[1]=%d sid=%d\n", (int) spec[0],(int) spec[1],sid));
120 
121  /* move file pointer */
122  bytestowrite=spec[1];
123 
124  /* ensure free space for this block */
125  if(sion_ensure_free_space(sid,bytestowrite) != SION_SUCCESS) {
126  _sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"could not ensure free space for this block, returning %d ...\n", sid);
127  }
128 
129  /* get and write data */
130  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector start to write data of size %lld at position %lld\n",
131  (long long) bytestowrite, (long long) sion_filedesc->currentpos));
132  bwrote = _sion_file_write(data, bytestowrite, sion_filedesc->fileptr);
133  if(bwrote != bytestowrite) {
134  return(_sion_errorprint(SION_STD_NOT_SUCCESS,_SION_ERROR_RETURN,"_sion_generic_collective_process_write: problems writing data ...\n"));
135  }
136 
137  /* update internal data */
138  sion_filedesc->currentpos+=bytestowrite;
139  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector wrote data block (bwrote=%d) of size %ld new pos %lld, %lld\n",
140  bwrote, (long) bytestowrite, (long long) sion_filedesc->currentpos, (long long) _sion_file_get_position(sion_filedesc->fileptr)));
141 
142 
143  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "leave spec[0]=%d spec[1]=%d sid=%d rc=%d\n", (int) spec[0],(int) spec[1],sid,rc));
144  return(rc);
145 
146 }
147 #undef DFUNCTION
148 
int sion_ensure_free_space(int sid, sion_int64 bytes)
Funtion to ensure that enough space is available for writing.
Definition: sion_common.c:1053
void * _sion_vcdtovcon(int sid)
Definition: sion_fd.c:53
int _sion_vcdtype(int sid)
Definition: sion_fd.c:58
#define SION_FILEDESCRIPTOR
Definition: sion_fd.h:17
sion_int64 _sion_file_get_position(_sion_fileptr *sion_fileptr)
Get new position in file.
Definition: sion_file.c:401
sion_int64 _sion_file_write(const void *data, sion_int64 bytes, _sion_fileptr *sion_fileptr)
Write data to file.
Definition: sion_file.c:221
Sion Time Stamp Header.
Sion File Descriptor Structure.
Definition: sion_filedesc.h:79
_sion_fileptr * fileptr
Definition: sion_filedesc.h:82