14 #define _XOPEN_SOURCE 700
24 #include <sys/types.h>
30 #include <cuda_runtime.h>
35 #include "sion_error_handler.h"
41 #include "sion_generic_buddy.h"
45 #define DFUNCTION "sion_coll_fwrite"
46 size_t sion_coll_fwrite(
const void *data,
size_t size,
size_t nitems,
int sid) {
50 sion_int64 bwrote=0, spec[2], ownnewposition, items_wrote;
51 int rc_own=SION_STD_SUCCESS,rc_cb=SION_STD_SUCCESS,rc_buddy=SION_STD_SUCCESS;
52 int collector, firstsender, lastsender;
55 return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,
"sion_coll_fwrite: invalid sion_filedesc %d", sid));
57 DPRINTFP((4,
DFUNCTION, _SION_DEFAULT_RANK,
"enter usecoll=%d collector=%d collsize=%d (%d tasks, %d files)\n",
60 sion_gendata=sion_filedesc->
dataptr;
61 sion_apidesc=sion_gendata->apidesc;
70 return(_sion_coll_fwrite_merge(data,size,nitems,sid));
79 return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,
"sion_coll_fwrite: collsize=%d <= 0, returning ...\n", (
int) sion_filedesc->
collsize));
84 firstsender=collector+1;
85 lastsender=collector+sion_filedesc->
collsize-1;
86 if(lastsender>sion_filedesc->
ntasks) lastsender=sion_filedesc->
ntasks-1;
87 DPRINTFP((4,
DFUNCTION, _SION_DEFAULT_RANK,
"collector=%d collsize=%d firstsender=%d lastsender=%d\n",
88 collector, sion_filedesc->
collsize, firstsender, lastsender));
92 _sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,
"could not ensure free space for this block, returning %d ...\n", sid);
101 rc_own=_sion_generic_collective_process_write(data,spec,sid);
106 if(!sion_apidesc->gather_execute_cb ) {
107 return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,
"sion_coll_fwrite: API %s not correctly initalized, collective I/O calls missing, aborting",sion_apidesc->name));
109 rc_cb=sion_apidesc->gather_execute_cb(data,spec,2, sion_filedesc->
fsblksize,
110 sion_gendata->comm_data_local,collector,firstsender,lastsender,sid,
111 _sion_generic_collective_process_write);
129 rc_buddy=_sion_coll_fwrite_buddy(data, size, nitems, sid, sion_gendata);
133 if( (rc_own == SION_STD_SUCCESS) && (rc_cb == SION_STD_SUCCESS)&& (rc_buddy == SION_STD_SUCCESS) ) {
139 items_wrote = size ? bwrote / size : 0;
141 DPRINTFP((4,
DFUNCTION, _SION_DEFAULT_RANK,
"leave usecoll=%d collector=%d collsize=%d (%d tasks, %d files) rc=%d\n",
148 #define DFUNCTION "sion_coll_fread"
149 size_t sion_coll_fread(
void *data,
size_t size,
size_t nitems,
int sid) {
153 sion_int64 bread=-1, spec[2], ownnewposition, items_read;
154 int rc_own=SION_STD_SUCCESS,rc_cb=SION_STD_SUCCESS;
155 int collector, firstsender, lastsender;
158 return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,
"sion_coll_fread: invalid sion_filedesc %d", sid));
160 DPRINTFP((4,
DFUNCTION, _SION_DEFAULT_RANK,
"enter usecoll=%d collector=%d collsize=%d (%d tasks, %d files)\n",
163 sion_gendata=sion_filedesc->
dataptr;
164 sion_apidesc=sion_gendata->apidesc;
173 return( _sion_coll_fread_buddy(data, size, nitems, sid));
181 return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,
"sion_coll_fread: collsize=%d <= 0, returning ...\n",
186 collector=(int) sion_filedesc->
collector;
187 firstsender=collector+1;
188 lastsender=sion_filedesc->
rank+sion_filedesc->
collsize-1;
189 if(lastsender>sion_filedesc->
ntasks) lastsender=sion_filedesc->
ntasks-1;
194 _sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,
"early eof found for this block, returning %d ...\n", sid);
208 rc_own=_sion_generic_collective_process_read(data,spec,sid);
214 if(!sion_apidesc->execute_scatter_cb ) {
215 return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,
216 "sion_coll_fread: API %s not correctly initalized, collective I/O calls missing, aborting",sion_apidesc->name));
218 rc_cb=sion_apidesc->execute_scatter_cb(data,spec,2, sion_filedesc->
fsblksize,
219 sion_gendata->comm_data_local,collector,firstsender,lastsender,sid,
220 _sion_generic_collective_process_read);
236 if( (rc_own == SION_STD_SUCCESS) && (rc_cb == SION_STD_SUCCESS) ) {
242 items_read = size ? bread / size : 0;
244 DPRINTFP((4,
DFUNCTION, _SION_DEFAULT_RANK,
"leave usecoll=%d collector=%d collsize=%d (%d tasks, %d files) rc=%d\n",
253 #define DFUNCTION "_sion_generic_collective_process_write"
254 int _sion_generic_collective_process_write(
const void *data, sion_int64 *spec,
int sid ) {
256 int rc=SION_STD_SUCCESS;
257 sion_int64 bwrote=0, destpos, bytestowrite;
259 return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,
"_sion_generic_collective_process_write: invalid sion_filedesc %d", sid));
261 DPRINTFP((4,
DFUNCTION, _SION_DEFAULT_RANK,
"enter spec[0]=%d spec[1]=%d sid=%d\n", (
int) spec[0],(
int) spec[1],sid));
265 bytestowrite=spec[1];
266 DPRINTFP((4,
DFUNCTION, _SION_DEFAULT_RANK,
"currentpos=%d destpos=%d\n", (
int) sion_filedesc->
currentpos,(
int) destpos));
273 DPRINTFP((4,
DFUNCTION, _SION_DEFAULT_RANK,
"collector start to write data of size %lld at position %lld\n",
274 (
long long) bytestowrite, (
long long) destpos));
276 struct cudaPointerAttributes attrs;
277 cudaError_t err = cudaPointerGetAttributes(&attrs, data);
278 if ((err == cudaSuccess) && _sion_cuda_ptr_is_device(attrs) ) {
279 char* buffer = malloc(sion_filedesc->
fsblksize);
280 const char* data_ = data;
281 while (bwrote < bytestowrite) {
282 sion_int64 to_write = (bytestowrite - bwrote) > sion_filedesc->
fsblksize ? sion_filedesc->
fsblksize : (bytestowrite - bwrote);
283 cudaMemcpy(buffer, data_, to_write, cudaMemcpyDeviceToHost);
285 if (bwrote_ != to_write)
break;
296 if(bwrote != bytestowrite) {
297 return(_sion_errorprint(SION_STD_NOT_SUCCESS,_SION_ERROR_RETURN,
"_sion_generic_collective_process_write: problems writing data ...\n"));
302 DPRINTFP((4,
DFUNCTION, _SION_DEFAULT_RANK,
"collector wrote data block (bwrote=%d) of size %ld new pos %lld, %lld\n",
311 #define DFUNCTION "_sion_generic_collective_process_read"
312 int _sion_generic_collective_process_read(
void *data, sion_int64 *spec,
int sid ) {
314 int rc=SION_STD_SUCCESS;
315 sion_int64 bread=0, destpos, bytestoread;
318 return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,
"_sion_generic_collective_process_read: invalid sion_filedesc %d", sid));
320 DPRINTFP((4,
DFUNCTION, _SION_DEFAULT_RANK,
"enter spec[0]=%d spec[1]=%d sid=%d\n", (
int) spec[0],(
int) spec[1],sid));
326 if(sion_filedesc->
fileptr!=NULL) {
334 DPRINTFP((4,
DFUNCTION, _SION_DEFAULT_RANK,
"collector start to read data of size %lld at position %lld\n",
335 (
long long) bytestoread, (
long long) destpos));
337 struct cudaPointerAttributes attrs;
338 cudaError_t err = cudaPointerGetAttributes(&attrs, data);
339 if ((err == cudaSuccess) && _sion_cuda_ptr_is_device(attrs) ) {
340 char* buffer = malloc(sion_filedesc->
fsblksize);
342 while (bread < bytestoread) {
343 sion_int64 to_read = (bytestoread - bread) > sion_filedesc->
fsblksize ? sion_filedesc->
fsblksize : (bytestoread - bread);
345 if (bread_ != to_read)
break;
346 cudaMemcpy(data_, buffer, bread_, cudaMemcpyHostToDevice);
357 if(bread != bytestoread) {
358 return(_sion_errorprint(SION_STD_NOT_SUCCESS,_SION_ERROR_RETURN,
"_sion_generic_collective_process_read: problems reading data ...\n"));
362 ONLY_DEBUG(
char *p=data;)
363 DPRINTFP((128,
DFUNCTION, _SION_DEFAULT_RANK,
"data[0]=%c data[%d]=%c\n",(
char) p[0], (
int) bytestoread, (
char) p[bytestoread-1]));
368 DPRINTFP((4,
DFUNCTION, _SION_DEFAULT_RANK,
"collector read data block (bread=%d) of size %ld new pos %lld, %lld\n",
387 #define DFUNCTION "_sion_coll_check_env"
392 int rc = SION_SUCCESS, numcoll;
406 fprintf(stderr,
"collective statistics: SION_COLLSIZE=%11d\n",sion_filedesc->
collsize);
411 if(numcoll>sion_filedesc->
ntasks) numcoll=sion_filedesc->
ntasks;
416 fprintf(stderr,
"collective statistics: SION_COLLNUM=%11d\n",numcoll);
417 fprintf(stderr,
"collective statistics: collsize=%11d\n",sion_filedesc->
collsize);
430 DPRINTFP((2,
DFUNCTION, _SION_DEFAULT_RANK,
"usecoll=%d collsize=%d (%d tasks, %d files) colldebug=%d\n", sion_filedesc->
usecoll, sion_filedesc->
collsize,sion_filedesc->
ntasks,sion_filedesc->
nfiles,sion_filedesc->
colldebug));
size_t sion_fread(void *data, size_t size, size_t nitems, int sid)
Read data from sion file.
size_t sion_fwrite(const void *data, size_t size, size_t nitems, int sid)
Write data to sion file.
int sion_ensure_free_space(int sid, sion_int64 bytes)
Funtion to ensure that enough space is available for writing.
int sion_feof(int sid)
Function that indicates whether the end of file is reached for this task.
void * _sion_vcdtovcon(int sid)
int _sion_vcdtype(int sid)
#define SION_FILEDESCRIPTOR
int _sion_file_flush(_sion_fileptr *sion_fileptr)
Flush data to file.
sion_int64 _sion_file_set_position(_sion_fileptr *sion_fileptr, sion_int64 startpointer)
Set new position in file.
sion_int64 _sion_file_get_position(_sion_fileptr *sion_fileptr)
Get new position in file.
sion_int64 _sion_file_write(const void *data, sion_int64 bytes, _sion_fileptr *sion_fileptr)
Write data to file.
sion_int64 _sion_file_read(void *data, sion_int64 bytes, _sion_fileptr *sion_fileptr)
Read data from file.
#define DFUNCTION
checks if environment variables are set to use collective I/O SION_COLLSIZE = -1 -> number of collect...
char * _sion_getenv(const char *name)
Sion File Descriptor Structure.
sion_int32 fileptr_exported