22 #include <sys/types.h>    29 #include "sion_error_handler.h"    35 #include "sion_generic_buddy.h"    39 #define DFUNCTION "sion_coll_fwrite"    40 size_t sion_coll_fwrite(
const void *data, 
size_t size, 
size_t nitems, 
int sid) {
    44   sion_int64             bwrote=0, spec[2], ownnewposition, items_wrote;
    45   int                    rc_own=SION_STD_SUCCESS,rc_cb=SION_STD_SUCCESS,rc_buddy=SION_STD_SUCCESS;
    46   int  collector, firstsender, lastsender;
    49     return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,
"sion_coll_fwrite: invalid sion_filedesc %d", sid));
    51   DPRINTFP((4, 
DFUNCTION, _SION_DEFAULT_RANK, 
"enter usecoll=%d collector=%d collsize=%d (%d tasks, %d files)\n", 
    54   sion_gendata=sion_filedesc->
dataptr;
    55   sion_apidesc=sion_gendata->apidesc;
    64     return(_sion_coll_fwrite_merge(data,size,nitems,sid));
    73     return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,
"sion_coll_fwrite: collsize=%d <= 0, returning ...\n", (
int) sion_filedesc->
collsize));
    78   firstsender=collector+1;
    79   lastsender=collector+sion_filedesc->
collsize-1;
    80   if(lastsender>sion_filedesc->
ntasks) lastsender=sion_filedesc->
ntasks-1;
    81   DPRINTFP((4, 
DFUNCTION, _SION_DEFAULT_RANK, 
"collector=%d collsize=%d firstsender=%d lastsender=%d\n",
    82         collector, sion_filedesc->
collsize, firstsender, lastsender));
    86     _sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,
"could not ensure free space for this block, returning %d ...\n", sid);
    95     rc_own=_sion_generic_collective_process_write(data,spec,sid);
   100   if(!sion_apidesc->gather_execute_cb ) {
   101     return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,
"sion_coll_fwrite: API %s not correctly initalized, collective I/O calls missing, aborting",sion_apidesc->name));
   103   rc_cb=sion_apidesc->gather_execute_cb(data,spec,2, sion_filedesc->
fsblksize,
   104                        sion_gendata->comm_data_local,collector,firstsender,lastsender,sid, 
   105                        _sion_generic_collective_process_write);
   123     rc_buddy=_sion_coll_fwrite_buddy(data, size, nitems, sid, sion_gendata);
   127   if( (rc_own == SION_STD_SUCCESS) && (rc_cb == SION_STD_SUCCESS)&& (rc_buddy == SION_STD_SUCCESS) ) {
   133   items_wrote = size ? bwrote / size : 0;
   135   DPRINTFP((4, 
DFUNCTION, _SION_DEFAULT_RANK, 
"leave usecoll=%d collector=%d collsize=%d (%d tasks, %d files) rc=%d\n", 
   142 #define DFUNCTION "sion_coll_fread"   143 size_t sion_coll_fread( 
void *data, 
size_t size, 
size_t nitems, 
int sid) {
   147   sion_int64             bread=-1, spec[2], ownnewposition, items_read;
   148   int                    rc_own=SION_STD_SUCCESS,rc_cb=SION_STD_SUCCESS;
   149   int                    collector, firstsender, lastsender;
   152     return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,
"sion_coll_fread: invalid sion_filedesc %d", sid));
   154   DPRINTFP((4, 
DFUNCTION, _SION_DEFAULT_RANK, 
"enter usecoll=%d collector=%d collsize=%d (%d tasks, %d files)\n", 
   157   sion_gendata=sion_filedesc->
dataptr;
   158   sion_apidesc=sion_gendata->apidesc;
   167     return( _sion_coll_fread_buddy(data, size, nitems, sid));
   175     return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,
"sion_coll_fread: collsize=%d <= 0, returning ...\n", 
   180   collector=(int) sion_filedesc->
collector;
   181   firstsender=collector+1;
   182   lastsender=sion_filedesc->
rank+sion_filedesc->
collsize-1;
   183   if(lastsender>sion_filedesc->
ntasks) lastsender=sion_filedesc->
ntasks-1;
   188       _sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,
"early eof found for this block, returning %d ...\n", sid);
   202     rc_own=_sion_generic_collective_process_read(data,spec,sid);
   208   if(!sion_apidesc->execute_scatter_cb ) {
   209     return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,
   210                 "sion_coll_fread: API %s not correctly initalized, collective I/O calls missing, aborting",sion_apidesc->name));
   212   rc_cb=sion_apidesc->execute_scatter_cb(data,spec,2, sion_filedesc->
fsblksize,
   213                         sion_gendata->comm_data_local,collector,firstsender,lastsender,sid, 
   214                         _sion_generic_collective_process_read);
   230   if( (rc_own == SION_STD_SUCCESS) && (rc_cb == SION_STD_SUCCESS) ) {
   236   items_read = size ? bread / size : 0;
   238   DPRINTFP((4, 
DFUNCTION, _SION_DEFAULT_RANK, 
"leave usecoll=%d collector=%d collsize=%d (%d tasks, %d files) rc=%d\n", 
   247 #define DFUNCTION "_sion_generic_collective_process_write"   248 int _sion_generic_collective_process_write( 
const void *data, sion_int64 *spec, 
int sid ) {
   250   int                    rc=SION_STD_SUCCESS;
   251   sion_int64             bwrote=0, destpos, bytestowrite;
   253     return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,
"_sion_generic_collective_process_write: invalid sion_filedesc %d", sid));
   255   DPRINTFP((4, 
DFUNCTION, _SION_DEFAULT_RANK, 
"enter spec[0]=%d spec[1]=%d sid=%d\n", (
int) spec[0],(
int) spec[1],sid));
   259   bytestowrite=spec[1];
   260   DPRINTFP((4, 
DFUNCTION, _SION_DEFAULT_RANK, 
"currentpos=%d destpos=%d\n", (
int) sion_filedesc->
currentpos,(
int) destpos));
   267   DPRINTFP((4, 
DFUNCTION, _SION_DEFAULT_RANK, 
"collector start to write data of size %lld at position %lld\n", 
   268         (
long long) bytestowrite, (
long long) destpos));
   270   if(bwrote != bytestowrite) {
   271     return(_sion_errorprint(SION_STD_NOT_SUCCESS,_SION_ERROR_RETURN,
"_sion_generic_collective_process_write: problems writing data ...\n"));
   276   DPRINTFP((4, 
DFUNCTION, _SION_DEFAULT_RANK, 
"collector wrote data block (bwrote=%d) of size %ld new pos %lld, %lld\n", 
   285 #define DFUNCTION "_sion_generic_collective_process_read"   286 int _sion_generic_collective_process_read( 
void *data, sion_int64 *spec, 
int sid ) {
   288   int                    rc=SION_STD_SUCCESS;
   289   sion_int64             bread=0, destpos, bytestoread;
   292     return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,
"_sion_generic_collective_process_read: invalid sion_filedesc %d", sid));
   294   DPRINTFP((4, 
DFUNCTION, _SION_DEFAULT_RANK, 
"enter spec[0]=%d spec[1]=%d sid=%d\n", (
int) spec[0],(
int) spec[1],sid));
   300     if(sion_filedesc->
fileptr!=NULL) {
   308   DPRINTFP((4, 
DFUNCTION, _SION_DEFAULT_RANK, 
"collector start to read data of size %lld at position %lld\n", 
   309         (
long long) bytestoread, (
long long) destpos));
   311   if(bread != bytestoread) {
   312     return(_sion_errorprint(SION_STD_NOT_SUCCESS,_SION_ERROR_RETURN,
"_sion_generic_collective_process_read: problems reading data ...\n"));
   316     ONLY_DEBUG(
char *p=data;)
   317     DPRINTFP((128, 
DFUNCTION, _SION_DEFAULT_RANK, 
"data[0]=%c data[%d]=%c\n",(
char) p[0], (
int) bytestoread, (
char) p[bytestoread-1]));
   322   DPRINTFP((4, 
DFUNCTION, _SION_DEFAULT_RANK, 
"collector read data block (bread=%d) of size %ld new pos %lld, %lld\n", 
   341 #define DFUNCTION "_sion_coll_check_env"   346   int       rc = SION_SUCCESS, numcoll;
   360       fprintf(stderr, 
"collective statistics:            SION_COLLSIZE=%11d\n",sion_filedesc->
collsize);
   365       if(numcoll>sion_filedesc->
ntasks) numcoll=sion_filedesc->
ntasks;
   370     fprintf(stderr, 
"collective statistics:             SION_COLLNUM=%11d\n",numcoll);
   371     fprintf(stderr, 
"collective statistics:                 collsize=%11d\n",sion_filedesc->
collsize);
   384   DPRINTFP((2, 
DFUNCTION, _SION_DEFAULT_RANK, 
"usecoll=%d collsize=%d (%d tasks, %d files) colldebug=%d\n", sion_filedesc->
usecoll, sion_filedesc->
collsize,sion_filedesc->
ntasks,sion_filedesc->
nfiles,sion_filedesc->
colldebug));
 int sion_feof(int sid)
Function that indicates whether the end of file is reached for this task.
sion_int64 _sion_file_write(const void *data, sion_int64 bytes, _sion_fileptr *sion_fileptr)
Write data to file.
sion_int64 _sion_file_get_position(_sion_fileptr *sion_fileptr)
Get new position in file.
sion_int64 _sion_file_set_position(_sion_fileptr *sion_fileptr, sion_int64 startpointer)
Set new position in file.
Sion File Descriptor Structure.
int sion_ensure_free_space(int sid, sion_int64 bytes)
Funtion to ensure that enough space is available for writing.
sion_int64 _sion_file_read(void *data, sion_int64 bytes, _sion_fileptr *sion_fileptr)
Read data from file.
int _sion_vcdtype(int sid)
sion_int32 fileptr_exported
char * _sion_getenv(const char *name)
size_t sion_fwrite(const void *data, size_t size, size_t nitems, int sid)
Write data to sion file.
void * _sion_vcdtovcon(int sid)
#define SION_FILEDESCRIPTOR
#define DFUNCTION
checks if environment variables are set to use collective I/O SION_COLLSIZE = -1 -> number of collect...
size_t sion_fread(void *data, size_t size, size_t nitems, int sid)
Read data from sion file.
int _sion_file_flush(_sion_fileptr *sion_fileptr)
Flush data to file.