22 #include <sys/types.h>    29 #include "sion_error_handler.h"    38 int _sion_generic_collective_process_write_merge( 
const void *data, sion_int64 *spec, 
int sid );
    42 #define DFUNCTION "_sion_coll_fwrite_merge"    43 size_t _sion_coll_fwrite_merge(
const void *data, 
size_t size, 
size_t nitems, 
int sid) {
    47   sion_int64             bwrote=0, spec[2];
    48   int                    rc_own=SION_STD_SUCCESS,rc_cb=SION_STD_SUCCESS;
    49   int                    collector, firstsender, lastsender;
    52     return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,
"sion_coll_fwrite: invalid sion_filedesc %d", sid));
    54   DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"enter usecoll=%d collector=%d collsize=%d (%d tasks, %d files)\n", 
    57   sion_gendata=sion_filedesc->
dataptr;
    58   sion_apidesc=sion_gendata->apidesc;
    65     return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,
"sion_coll_fwrite: collsize=%d <= 0, returning ...\n", (
int) sion_filedesc->
collsize));
    70   firstsender=collector+1;
    71   lastsender=sion_filedesc->
rank+sion_filedesc->
collsize-1;
    72   if(lastsender>sion_filedesc->
ntasks) lastsender=sion_filedesc->
ntasks-1;
    79     DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"COLLECTOR write data of size %ld at current position\n", (
long) spec[1]));
    80     rc_own=_sion_generic_collective_process_write_merge(data,spec,sid);
    82     DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"SENDER send data of size %ld at current position\n", (
long) spec[1]));
    87   if(!sion_apidesc->gather_execute_cb ) {
    88     return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,
"sion_coll_fwrite: API %s not correctly initalized, collective I/O calls missing, aborting",sion_apidesc->name));
    90   rc_cb=sion_apidesc->gather_execute_cb(data,spec,2, sion_filedesc->
fsblksize,
    91                     sion_gendata->comm_data_local,collector,firstsender,lastsender,sid, 
    92                     _sion_generic_collective_process_write_merge);
    95   if( (rc_own == SION_STD_SUCCESS) && (rc_cb == SION_STD_SUCCESS) ) {
   101   DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"leave usecoll=%d collector=%d collsize=%d (%d tasks, %d files) rc=%d\n", 
   109 #define DFUNCTION "_sion_generic_collective_process_write_merge"   110 int _sion_generic_collective_process_write_merge( 
const void *data, sion_int64 *spec, 
int sid ) {
   112   int                    rc=SION_STD_SUCCESS;
   113   sion_int64             bwrote=0, bytestowrite;
   115     return(_sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,
"_sion_generic_collective_process_write: invalid sion_filedesc %d", sid));
   117   DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"enter spec[0]=%d spec[1]=%d sid=%d\n", (
int) spec[0],(
int) spec[1],sid));
   120   bytestowrite=spec[1];
   124     _sion_errorprint(SION_SIZE_NOT_VALID,_SION_ERROR_RETURN,
"could not ensure free space for this block, returning %d ...\n", sid);
   128   DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"collector start to write data of size %lld at position %lld\n", 
   129         (
long long) bytestowrite, (
long long) sion_filedesc->
currentpos));
   131   if(bwrote != bytestowrite) {
   132     return(_sion_errorprint(SION_STD_NOT_SUCCESS,_SION_ERROR_RETURN,
"_sion_generic_collective_process_write: problems writing data ...\n"));
   137   DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"collector wrote data block (bwrote=%d) of size %ld new pos %lld, %lld\n", 
   141   DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"leave spec[0]=%d spec[1]=%d sid=%d rc=%d\n", (
int) spec[0],(
int) spec[1],sid,rc));
 sion_int64 _sion_file_write(const void *data, sion_int64 bytes, _sion_fileptr *sion_fileptr)
Write data to file.
sion_int64 _sion_file_get_position(_sion_fileptr *sion_fileptr)
Get new position in file.
Sion File Descriptor Structure.
int sion_ensure_free_space(int sid, sion_int64 bytes)
Funtion to ensure that enough space is available for writing.
int _sion_vcdtype(int sid)
void * _sion_vcdtovcon(int sid)
#define SION_FILEDESCRIPTOR