SIONlib  1.7.1
Scalable I/O library for parallel access to task-local files
sion_generic_collective_merge.c
Go to the documentation of this file.
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2016 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
14 #include <stdlib.h>
15 #include <stdio.h>
16 #include <stdarg.h>
17 #include <string.h>
18 #include <time.h>
19 
20 #include <sys/time.h>
21 
22 #include <sys/types.h>
23 #include <fcntl.h>
24 
25 #include <unistd.h>
26 
27 #include "sion.h"
28 #include "sion_debug.h"
29 #include "sion_error_handler.h"
30 #include "sion_internal.h"
31 #include "sion_fd.h"
32 #include "sion_filedesc.h"
33 #include "sion_printts.h"
34 #include "sion_generic_apidesc.h"
36 
37 /* callback functions */
38 int _sion_generic_collective_process_write_merge( const void *data, sion_int64 *spec, int sid );
39 
40 
41 /* collective I/O */
42 #define DFUNCTION "_sion_coll_fwrite_merge"
43 size_t _sion_coll_fwrite_merge(const void *data, size_t size, size_t nitems, int sid) {
44  _sion_filedesc *sion_filedesc;
45  _sion_generic_gendata *sion_gendata;
46  _sion_generic_apidesc *sion_apidesc;
47  sion_int64 bwrote=0, spec[2];
48  int rc_own=SION_STD_SUCCESS,rc_cb=SION_STD_SUCCESS;
49  int collector, firstsender, lastsender;
50 
51  if ( (sid<0) || (_sion_vcdtype(sid) != SION_FILEDESCRIPTOR) || !(sion_filedesc = _sion_vcdtovcon(sid))) {
52  return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"sion_coll_fwrite: invalid sion_filedesc %d", sid));
53  }
54  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "enter usecoll=%d collector=%d collsize=%d (%d tasks, %d files)\n",
55  sion_filedesc->usecoll, sion_filedesc->collector, sion_filedesc->collsize, sion_filedesc->ntasks,sion_filedesc->nfiles));
56 
57  sion_gendata=sion_filedesc->dataptr;
58  sion_apidesc=sion_gendata->apidesc;
59 
60  /* needed for avoiding subsequent non-collective calls */
61  sion_filedesc->collcmdused=1;
62 
63  /* check collsize */
64  if (sion_filedesc->collsize<=0) {
65  return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"sion_coll_fwrite: collsize=%d <= 0, returning ...\n", (int) sion_filedesc->collsize));
66  }
67 
68  /* parameter of callback function */
69  collector=(int) sion_filedesc->collector;
70  firstsender=collector+1;
71  lastsender=sion_filedesc->rank+sion_filedesc->collsize-1;
72  if(lastsender>sion_filedesc->ntasks) lastsender=sion_filedesc->ntasks-1;
73 
74  spec[0]=-2; /* signaling that data should be written in chunk of collector */
75  spec[1]=size*nitems;
76 
77  /* write own part */
78  if(sion_filedesc->rank == sion_filedesc->collector) {
79  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "COLLECTOR write data of size %ld at current position\n", (long) spec[1]));
80  rc_own=_sion_generic_collective_process_write_merge(data,spec,sid);
81  } else {
82  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "SENDER send data of size %ld at current position\n", (long) spec[1]));
83  }
84 
85 
86  /* collect and write parts of sender tasks via callback function */
87  if(!sion_apidesc->gather_execute_cb ) {
88  return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"sion_coll_fwrite: API %s not correctly initalized, collective I/O calls missing, aborting",sion_apidesc->name));
89  }
90  rc_cb=sion_apidesc->gather_execute_cb(data,spec,2, sion_filedesc->fsblksize,
91  sion_gendata->comm_data_local,collector,firstsender,lastsender,sid,
92  _sion_generic_collective_process_write_merge);
93 
94  /* return code */
95  if( (rc_own == SION_STD_SUCCESS) && (rc_cb == SION_STD_SUCCESS) ) {
96  bwrote=size*nitems;
97  } else {
98  bwrote=0;
99  }
100 
101  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "leave usecoll=%d collector=%d collsize=%d (%d tasks, %d files) rc=%d\n",
102  sion_filedesc->usecoll, sion_filedesc->collector, sion_filedesc->collsize, sion_filedesc->ntasks,sion_filedesc->nfiles,bwrote));
103 
104  return(bwrote);
105 }
106 #undef DFUNCTION
107 
108 
109 #define DFUNCTION "_sion_generic_collective_process_write_merge"
110 int _sion_generic_collective_process_write_merge( const void *data, sion_int64 *spec, int sid ) {
111  _sion_filedesc *sion_filedesc;
112  int rc=SION_STD_SUCCESS;
113  sion_int64 bwrote=0, bytestowrite;
114  if ( (sid<0) || (_sion_vcdtype(sid) != SION_FILEDESCRIPTOR) || !(sion_filedesc = _sion_vcdtovcon(sid))) {
115  return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"_sion_generic_collective_process_write: invalid sion_filedesc %d", sid));
116  }
117  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "enter spec[0]=%d spec[1]=%d sid=%d\n", (int) spec[0],(int) spec[1],sid));
118 
119  /* move file pointer */
120  bytestowrite=spec[1];
121 
122  /* ensure free space for this block */
123  if(sion_ensure_free_space(sid,bytestowrite) != SION_SUCCESS) {
124  _sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,"could not ensure free space for this block, returning %d ...\n", sid);
125  }
126 
127  /* get and write data */
128  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector start to write data of size %lld at position %lld\n",
129  (long long) bytestowrite, (long long) sion_filedesc->currentpos));
130  bwrote = _sion_file_write(data, bytestowrite, sion_filedesc->fileptr);
131  if(bwrote != bytestowrite) {
132  return(_sion_errorprint(SION_STD_NOT_SUCCESS,_SION_ERROR_RETURN,"_sion_generic_collective_process_write: problems writing data ...\n"));
133  }
134 
135  /* update internal data */
136  sion_filedesc->currentpos+=bytestowrite;
137  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "collector wrote data block (bwrote=%d) of size %ld new pos %lld, %lld\n",
138  bwrote, (long) bytestowrite, (long long) sion_filedesc->currentpos, (long long) _sion_file_get_position(sion_filedesc->fileptr)));
139 
140 
141  DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, "leave spec[0]=%d spec[1]=%d sid=%d rc=%d\n", (int) spec[0],(int) spec[1],sid,rc));
142  return(rc);
143 
144 }
145 #undef DFUNCTION
146 
sion_int64 _sion_file_write(const void *data, sion_int64 bytes, _sion_fileptr *sion_fileptr)
Write data to file.
Definition: sion_file.c:148
sion_int64 _sion_file_get_position(_sion_fileptr *sion_fileptr)
Get new position in file.
Definition: sion_file.c:272
Sion File Descriptor Structure.
Definition: sion_filedesc.h:77
int sion_ensure_free_space(int sid, sion_int64 bytes)
Funtion to ensure that enough space is available for writing.
Definition: sion_common.c:1014
int _sion_vcdtype(int sid)
Definition: sion_fd.c:56
void * _sion_vcdtovcon(int sid)
Definition: sion_fd.c:51
#define SION_FILEDESCRIPTOR
Definition: sion_fd.h:17
Sion Time Stamp Header.
_sion_fileptr * fileptr
Definition: sion_filedesc.h:80