10 #define _XOPEN_SOURCE 700
18 #include "sion_common.h"
19 #include "sion_const.h"
20 #include "sion_debug.h"
21 #include "sion_error_handler.h"
23 #include "sion_file.h"
24 #include "sion_filedesc.h"
25 #include "sion_generic_apidesc.h"
26 #include "sion_generic_buddy.h"
27 #include "sion_generic_collective.h"
28 #include "sion_internal.h"
29 #include "sion_internal_positioning.h"
32 #define DFUNCTION "sion_coll_write"
35 _sion_filedesc *sion_filedesc;
36 _sion_generic_gendata *sion_gendata;
37 _sion_generic_apidesc *sion_apidesc;
39 if ((sid < 0) || (_sion_vcdtype(sid) != SION_FILEDESCRIPTOR) || !(sion_filedesc = _sion_vcdtovcon(sid))) {
40 _sion_errorprint(SION_SIZE_NOT_VALID, _SION_ERROR_ABORT,
"sion_coll_write: invalid sion_filedesc %d", sid);
43 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"enter usecoll=%d collector=%d collsize=%d (%d tasks, %d files)\n",
44 sion_filedesc->usecoll, sion_filedesc->collector, sion_filedesc->collsize, sion_filedesc->ntasks, sion_filedesc->nfiles));
46 sion_gendata = sion_filedesc->dataptr;
47 sion_apidesc = sion_gendata->apidesc;
50 if (!sion_filedesc->usecoll) {
55 if (sion_filedesc->collmergemode) {
56 return _sion_coll_fwrite_merge(data, size, nitems, sid);
60 sion_filedesc->collcmdused = 1;
63 if (sion_filedesc->collsize <= 0) {
65 SION_SIZE_NOT_VALID, _SION_ERROR_ABORT,
"sion_coll_write: collsize=%d <= 0, returning ...\n", (
int)sion_filedesc->collsize);
70 if (!sion_apidesc->gather_execute_cb) {
71 _sion_errorprint(SION_SIZE_NOT_VALID, _SION_ERROR_ABORT,
72 "sion_coll_write: API %s not correctly initalized, collective I/O calls missing, aborting", sion_apidesc->name);
77 int collector = (int)sion_filedesc->collector;
78 int firstsender = collector + 1;
79 int lastsender = collector + sion_filedesc->collsize - 1;
80 if (lastsender > sion_filedesc->ntasks) {
81 lastsender = sion_filedesc->ntasks - 1;
83 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector=%d collsize=%d firstsender=%d lastsender=%d\n", collector,
84 sion_filedesc->collsize, firstsender, lastsender));
86 int64_t bytes_written;
87 if (sion_filedesc->rank == sion_filedesc->collector) {
89 bytes_written = _sion_write_multi_chunk(sion_filedesc, data, nitems * size);
90 int64_t ownnewposition = sion_filedesc->currentpos;
93 sion_apidesc->gather_execute_cb(data, sion_filedesc->fsblksize, sion_gendata->comm_data_local, collector, firstsender,
94 lastsender, sid, _sion_generic_collective_process_write, NULL, _sion_generic_collective_next_write_spec);
97 _sion_file_set_position(sion_filedesc->fileptr, ownnewposition);
98 sion_filedesc->currentpos = ownnewposition;
100 _sion_partitioned_range_iterator ranges =
101 _sion_partitioned_range_iterator_from_filedesc(sion_filedesc, size * nitems, sion_filedesc->fsblksize);
103 bytes_written = sion_apidesc->gather_execute_cb(data, sion_filedesc->fsblksize, sion_gendata->comm_data_local, collector,
104 firstsender, lastsender, sid, _sion_generic_collective_process_write, &ranges, _sion_generic_collective_next_write_spec);
107 _sion_write_dry_run(sion_filedesc, nitems * size);
109 int64_t items_written = size ? bytes_written / size : 0;
112 if (sion_filedesc->usebuddy) {
114 _sion_coll_fwrite_buddy(data, size, nitems, sid, sion_gendata);
117 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"leave usecoll=%d collector=%d collsize=%d (%d tasks, %d files) rc=%d\n",
118 sion_filedesc->usecoll, sion_filedesc->collector, sion_filedesc->collsize, sion_filedesc->ntasks, sion_filedesc->nfiles,
121 return items_written;
130 #define DFUNCTION "sion_coll_read"
133 _sion_filedesc *sion_filedesc;
134 _sion_generic_gendata *sion_gendata;
135 _sion_generic_apidesc *sion_apidesc;
137 if ((sid < 0) || (_sion_vcdtype(sid) != SION_FILEDESCRIPTOR) || !(sion_filedesc = _sion_vcdtovcon(sid))) {
138 _sion_errorprint(SION_SIZE_NOT_VALID, _SION_ERROR_ABORT,
"sion_coll_read: invalid sion_filedesc %d", sid);
141 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"enter usecoll=%d collector=%d collsize=%d (%d tasks, %d files)\n",
142 sion_filedesc->usecoll, sion_filedesc->collector, sion_filedesc->collsize, sion_filedesc->ntasks, sion_filedesc->nfiles));
144 sion_gendata = sion_filedesc->dataptr;
145 sion_apidesc = sion_gendata->apidesc;
148 if (!sion_filedesc->usecoll) {
149 return sion_read(data, size, nitems, sid);
153 if (sion_filedesc->usebuddy) {
154 return _sion_coll_fread_buddy(data, size, nitems, sid);
158 sion_filedesc->collcmdused = 1;
161 if (sion_filedesc->collsize <= 0) {
163 SION_SIZE_NOT_VALID, _SION_ERROR_ABORT,
"sion_coll_read: collsize=%d <= 0, returning ...\n", (
int)sion_filedesc->collsize);
168 if (!sion_apidesc->execute_scatter_cb) {
169 _sion_errorprint(SION_SIZE_NOT_VALID, _SION_ERROR_ABORT,
170 "sion_coll_read: API %s not correctly initalized, collective I/O calls missing, aborting", sion_apidesc->name);
175 int collector = (int)sion_filedesc->collector;
176 int firstsender = collector + 1;
177 int lastsender = sion_filedesc->rank + sion_filedesc->collsize - 1;
178 if (lastsender > sion_filedesc->ntasks) {
179 lastsender = sion_filedesc->ntasks - 1;
181 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector=%d collsize=%d firstsender=%d lastsender=%d\n", collector,
182 sion_filedesc->collsize, firstsender, lastsender));
185 if (sion_filedesc->rank == sion_filedesc->collector) {
187 bytes_read = _sion_read_multi_chunk(sion_filedesc, data, nitems * size);
188 int64_t ownnewposition = sion_filedesc->currentpos;
192 sion_apidesc->execute_scatter_cb(data, sion_filedesc->fsblksize, sion_gendata->comm_data_local, collector, firstsender,
193 lastsender, sid, _sion_generic_collective_process_read, NULL, _sion_generic_collective_next_read_spec);
196 _sion_file_set_position(sion_filedesc->fileptr, ownnewposition);
197 sion_filedesc->currentpos = ownnewposition;
199 _sion_partitioned_limited_range_iterator ranges =
200 _sion_partitioned_limited_range_iterator_from_filedesc(sion_filedesc, size * nitems, sion_filedesc->fsblksize);
203 bytes_read = sion_apidesc->execute_scatter_cb(data, sion_filedesc->fsblksize, sion_gendata->comm_data_local, collector,
204 firstsender, lastsender, sid, _sion_generic_collective_process_read, &ranges, _sion_generic_collective_next_read_spec);
205 bytes_read = nitems * size;
208 _sion_read_dry_run(sion_filedesc, nitems * size);
210 int64_t items_read = size ? bytes_read / size : 0;
212 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"leave usecoll=%d collector=%d collsize=%d (%d tasks, %d files) rc=%d\n",
213 sion_filedesc->usecoll, sion_filedesc->collector, sion_filedesc->collsize, sion_filedesc->ntasks, sion_filedesc->nfiles,
225 #define DFUNCTION "_sion_generic_collective_process_write"
226 int64_t _sion_generic_collective_process_write(
const void *data, int64_t *spec,
int sid)
228 _sion_filedesc *sion_filedesc;
229 if ((sid < 0) || (_sion_vcdtype(sid) != SION_FILEDESCRIPTOR) || !(sion_filedesc = _sion_vcdtovcon(sid))) {
230 return _sion_errorprint(
231 SION_SIZE_NOT_VALID, _SION_ERROR_RETURN,
"_sion_generic_collective_process_write: invalid sion_filedesc %d", sid);
233 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"enter spec[0]=%d spec[1]=%d sid=%d\n", (
int)spec[0], (
int)spec[1], sid));
236 int64_t destpos = spec[0], bytestowrite = spec[1];
237 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"currentpos=%d destpos=%d\n", (
int)sion_filedesc->currentpos, (
int)destpos));
238 if (sion_filedesc->currentpos != destpos) {
239 _sion_file_set_position(sion_filedesc->fileptr, destpos);
240 sion_filedesc->currentpos = destpos;
244 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector start to write data of size %lld at position %lld\n",
245 (
long long)bytestowrite, (
long long)destpos));
246 int64_t bwrote = _sion_file_write(data, bytestowrite, sion_filedesc->fileptr);
247 if (bwrote != bytestowrite) {
248 return _sion_errorprint(bwrote, _SION_ERROR_RETURN,
"_sion_generic_collective_process_write: problems writing data ...\n");
252 sion_filedesc->currentpos += bytestowrite;
253 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector wrote data block (bwrote=%d) of size %ld new pos %lld, %lld\n", bwrote,
254 (
long)bytestowrite, (
long long)sion_filedesc->currentpos, (
long long)_sion_file_get_position(sion_filedesc->fileptr)));
260 bool _sion_generic_collective_next_write_spec(
void *iterator, int64_t *spec)
263 if (_sion_partitioned_range_iterator_next((_sion_partitioned_range_iterator *)iterator, &range)) {
264 _sion_range_into_spec(range, spec);
271 #define DFUNCTION "_sion_generic_collective_process_read"
272 int64_t _sion_generic_collective_process_read(
void *data, int64_t *spec,
int sid)
274 _sion_filedesc *sion_filedesc;
276 if ((sid < 0) || (_sion_vcdtype(sid) != SION_FILEDESCRIPTOR) || !(sion_filedesc = _sion_vcdtovcon(sid))) {
277 return _sion_errorprint(
278 SION_SIZE_NOT_VALID, _SION_ERROR_RETURN,
"_sion_generic_collective_process_read: invalid sion_filedesc %d", sid);
280 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"enter spec[0]=%" PRId64
" spec[1]=%" PRId64
" sid=%d\n", spec[0], spec[1], sid));
283 int64_t destpos = spec[0], bytestoread = spec[1];
284 if (sion_filedesc->currentpos != destpos) {
285 if (sion_filedesc->fileptr != NULL) {
286 _sion_file_set_position(sion_filedesc->fileptr, destpos);
288 sion_filedesc->currentpos = destpos;
292 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector start to read data of size %lld at position %lld\n",
293 (
long long)bytestoread, (
long long)destpos));
294 int64_t bread = _sion_file_read(data, bytestoread, sion_filedesc->fileptr);
295 if (bread != bytestoread) {
296 return _sion_errorprint(bread, _SION_ERROR_RETURN,
"_sion_generic_collective_process_read: problems reading data ...\n");
300 ONLY_DEBUG(
char *p = data;)
302 (128, DFUNCTION, _SION_DEFAULT_RANK,
"data[0]=%c data[%d]=%c\n", (
char)p[0], (
int)bytestoread, (
char)p[bytestoread - 1]));
306 sion_filedesc->currentpos += bytestoread;
307 DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK,
"collector read data block (bread=%d) of size %ld new pos %lld, %lld\n", bread,
308 (
long)bytestoread, (
long long)sion_filedesc->currentpos, (
long long)_sion_file_get_position(sion_filedesc->fileptr)));
314 bool _sion_generic_collective_next_read_spec(
void *iterator, int64_t *spec)
317 if (_sion_partitioned_limited_range_iterator_next((_sion_partitioned_limited_range_iterator *)iterator, &range)) {
318 _sion_range_into_spec(range, spec);
325 #define DFUNCTION "_sion_coll_check_env"
326 void _sion_coll_check_env(_sion_filedesc *sion_filedesc)
328 const char *cd = _sion_getenv(
"SION_COLLDEBUG");
330 sion_filedesc->colldebug = atoi(cd);
333 const char *cs = _sion_getenv(
"SION_COLLSIZE");
334 const char *cn = _sion_getenv(
"SION_COLLNUM");
336 sion_filedesc->collsize = atoi(cs);
337 if (sion_filedesc->collsize > sion_filedesc->ntasks) {
338 sion_filedesc->collsize = sion_filedesc->ntasks;
340 if (sion_filedesc->colldebug >= 1) {
341 fprintf(stderr,
"collective statistics: SION_COLLSIZE=%11d\n", sion_filedesc->collsize);
344 int numcoll = atoi(cn);
346 if (numcoll > sion_filedesc->ntasks) {
347 numcoll = sion_filedesc->ntasks;
349 sion_filedesc->collsize = sion_filedesc->ntasks / numcoll;
350 if (sion_filedesc->ntasks % numcoll > 0) {
351 sion_filedesc->collsize++;
354 if (sion_filedesc->colldebug >= 1) {
355 fprintf(stderr,
"collective statistics: SION_COLLNUM=%11d\n", numcoll);
356 fprintf(stderr,
"collective statistics: collsize=%11d\n", sion_filedesc->collsize);
363 if (sion_filedesc->collsize > 0) {
364 sion_filedesc->usecoll = 1;
366 if (sion_filedesc->collsize < 0) {
367 sion_filedesc->usecoll = 1;
369 if (sion_filedesc->collsize == 0) {
370 sion_filedesc->usecoll = 0;
374 DPRINTFP((2, DFUNCTION, _SION_DEFAULT_RANK,
"usecoll=%d collsize=%d (%d tasks, %d files) colldebug=%d\n",
375 sion_filedesc->usecoll, sion_filedesc->collsize, sion_filedesc->ntasks, sion_filedesc->nfiles, sion_filedesc->colldebug));
size_t sion_coll_write(const void *data, size_t size, size_t nitems, int sid)
Write data to a SIONlib file using collective I/O.
size_t sion_coll_read(void *data, size_t size, size_t nitems, int sid)
Read data from SIONlib file using collective I/O.
size_t sion_coll_fread(void *data, size_t size, size_t nitems, int sid)
Read data from SIONlib file using collective I/O.
size_t sion_coll_fwrite(const void *data, size_t size, size_t nitems, int sid)
Write data to a SIONlib file using collective I/O.
size_t sion_write(const void *data, size_t size, size_t nitems, int sid)
Write data to a SIONlib file.
size_t sion_read(void *data, size_t size, size_t nitems, int sid)
Read data from SIONlib file.