10 #define _XOPEN_SOURCE 700 
   18 #include "sion_common.h" 
   19 #include "sion_const.h" 
   20 #include "sion_debug.h" 
   21 #include "sion_error_handler.h" 
   23 #include "sion_file.h" 
   24 #include "sion_filedesc.h" 
   25 #include "sion_generic_apidesc.h" 
   26 #include "sion_generic_buddy.h" 
   27 #include "sion_generic_collective.h" 
   28 #include "sion_internal.h" 
   29 #include "sion_internal_positioning.h" 
   32 #define DFUNCTION "sion_coll_write" 
   35   _sion_filedesc *sion_filedesc;
 
   36   _sion_generic_gendata *sion_gendata;
 
   37   _sion_generic_apidesc *sion_apidesc;
 
   39   if ((sid < 0) || (_sion_vcdtype(sid) != SION_FILEDESCRIPTOR) || !(sion_filedesc = _sion_vcdtovcon(sid))) {
 
   40     _sion_errorprint(SION_SIZE_NOT_VALID, _SION_ERROR_ABORT, 
"sion_coll_write: invalid sion_filedesc %d", sid);
 
   43   DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"enter usecoll=%d collector=%d collsize=%d (%d tasks, %d files)\n",
 
   44     sion_filedesc->usecoll, sion_filedesc->collector, sion_filedesc->collsize, sion_filedesc->ntasks, sion_filedesc->nfiles));
 
   46   sion_gendata = sion_filedesc->dataptr;
 
   47   sion_apidesc = sion_gendata->apidesc;
 
   50   if (!sion_filedesc->usecoll) {
 
   55   if (sion_filedesc->collmergemode) {
 
   56     return _sion_coll_fwrite_merge(data, size, nitems, sid);
 
   60   sion_filedesc->collcmdused = 1;
 
   63   if (sion_filedesc->collsize <= 0) {
 
   65       SION_SIZE_NOT_VALID, _SION_ERROR_ABORT, 
"sion_coll_write: collsize=%d <= 0, returning ...\n", (
int)sion_filedesc->collsize);
 
   70   if (!sion_apidesc->gather_execute_cb) {
 
   71     _sion_errorprint(SION_SIZE_NOT_VALID, _SION_ERROR_ABORT,
 
   72       "sion_coll_write: API %s not correctly initalized, collective I/O calls missing, aborting", sion_apidesc->name);
 
   77   int collector = (int)sion_filedesc->collector;
 
   78   int firstsender = collector + 1;
 
   79   int lastsender = collector + sion_filedesc->collsize - 1;
 
   80   if (lastsender > sion_filedesc->ntasks) {
 
   81     lastsender = sion_filedesc->ntasks - 1;
 
   83   DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"collector=%d collsize=%d firstsender=%d lastsender=%d\n", collector,
 
   84     sion_filedesc->collsize, firstsender, lastsender));
 
   86   int64_t bytes_written;
 
   87   if (sion_filedesc->rank == sion_filedesc->collector) {
 
   89     bytes_written = _sion_write_multi_chunk(sion_filedesc, data, nitems * size);
 
   90     int64_t ownnewposition = sion_filedesc->currentpos;
 
   93     sion_apidesc->gather_execute_cb(data, sion_filedesc->fsblksize, sion_gendata->comm_data_local, collector, firstsender,
 
   94       lastsender, sid, _sion_generic_collective_process_write, NULL, _sion_generic_collective_next_write_spec);
 
   97     _sion_file_set_position(sion_filedesc->fileptr, ownnewposition);
 
   98     sion_filedesc->currentpos = ownnewposition;
 
  100     _sion_partitioned_range_iterator ranges =
 
  101       _sion_partitioned_range_iterator_from_filedesc(sion_filedesc, size * nitems, sion_filedesc->fsblksize);
 
  103     bytes_written = sion_apidesc->gather_execute_cb(data, sion_filedesc->fsblksize, sion_gendata->comm_data_local, collector,
 
  104       firstsender, lastsender, sid, _sion_generic_collective_process_write, &ranges, _sion_generic_collective_next_write_spec);
 
  107     _sion_write_dry_run(sion_filedesc, nitems * size);
 
  109   int64_t items_written = size ? bytes_written / size : 0;
 
  112   if (sion_filedesc->usebuddy) {
 
  114     _sion_coll_fwrite_buddy(data, size, nitems, sid, sion_gendata);
 
  117   DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"leave usecoll=%d collector=%d collsize=%d (%d tasks, %d files) rc=%d\n",
 
  118     sion_filedesc->usecoll, sion_filedesc->collector, sion_filedesc->collsize, sion_filedesc->ntasks, sion_filedesc->nfiles,
 
  121   return items_written;
 
  130 #define DFUNCTION "sion_coll_read" 
  133   _sion_filedesc *sion_filedesc;
 
  134   _sion_generic_gendata *sion_gendata;
 
  135   _sion_generic_apidesc *sion_apidesc;
 
  137   if ((sid < 0) || (_sion_vcdtype(sid) != SION_FILEDESCRIPTOR) || !(sion_filedesc = _sion_vcdtovcon(sid))) {
 
  138     _sion_errorprint(SION_SIZE_NOT_VALID, _SION_ERROR_ABORT, 
"sion_coll_read: invalid sion_filedesc %d", sid);
 
  141   DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"enter usecoll=%d collector=%d collsize=%d (%d tasks, %d files)\n",
 
  142     sion_filedesc->usecoll, sion_filedesc->collector, sion_filedesc->collsize, sion_filedesc->ntasks, sion_filedesc->nfiles));
 
  144   sion_gendata = sion_filedesc->dataptr;
 
  145   sion_apidesc = sion_gendata->apidesc;
 
  148   if (!sion_filedesc->usecoll) {
 
  149     return sion_read(data, size, nitems, sid);
 
  153   if (sion_filedesc->usebuddy) {
 
  154     return _sion_coll_fread_buddy(data, size, nitems, sid);
 
  158   sion_filedesc->collcmdused = 1;
 
  161   if (sion_filedesc->collsize <= 0) {
 
  163       SION_SIZE_NOT_VALID, _SION_ERROR_ABORT, 
"sion_coll_read: collsize=%d <= 0, returning ...\n", (
int)sion_filedesc->collsize);
 
  168   if (!sion_apidesc->execute_scatter_cb) {
 
  169     _sion_errorprint(SION_SIZE_NOT_VALID, _SION_ERROR_ABORT,
 
  170       "sion_coll_read: API %s not correctly initalized, collective I/O calls missing, aborting", sion_apidesc->name);
 
  175   int collector = (int)sion_filedesc->collector;
 
  176   int firstsender = collector + 1;
 
  177   int lastsender = sion_filedesc->rank + sion_filedesc->collsize - 1;
 
  178   if (lastsender > sion_filedesc->ntasks) {
 
  179     lastsender = sion_filedesc->ntasks - 1;
 
  181   DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"collector=%d collsize=%d firstsender=%d lastsender=%d\n", collector,
 
  182     sion_filedesc->collsize, firstsender, lastsender));
 
  185   if (sion_filedesc->rank == sion_filedesc->collector) {
 
  187     bytes_read = _sion_read_multi_chunk(sion_filedesc, data, nitems * size);
 
  188     int64_t ownnewposition = sion_filedesc->currentpos;
 
  192     sion_apidesc->execute_scatter_cb(data, sion_filedesc->fsblksize, sion_gendata->comm_data_local, collector, firstsender,
 
  193       lastsender, sid, _sion_generic_collective_process_read, NULL, _sion_generic_collective_next_read_spec);
 
  196     _sion_file_set_position(sion_filedesc->fileptr, ownnewposition);
 
  197     sion_filedesc->currentpos = ownnewposition;
 
  199     _sion_partitioned_limited_range_iterator ranges =
 
  200       _sion_partitioned_limited_range_iterator_from_filedesc(sion_filedesc, size * nitems, sion_filedesc->fsblksize);
 
  203     bytes_read = sion_apidesc->execute_scatter_cb(data, sion_filedesc->fsblksize, sion_gendata->comm_data_local, collector,
 
  204       firstsender, lastsender, sid, _sion_generic_collective_process_read, &ranges, _sion_generic_collective_next_read_spec);
 
  205     bytes_read = nitems * size;
 
  208     _sion_read_dry_run(sion_filedesc, nitems * size);
 
  210   int64_t items_read = size ? bytes_read / size : 0;
 
  212   DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"leave usecoll=%d collector=%d collsize=%d (%d tasks, %d files) rc=%d\n",
 
  213     sion_filedesc->usecoll, sion_filedesc->collector, sion_filedesc->collsize, sion_filedesc->ntasks, sion_filedesc->nfiles,
 
  225 #define DFUNCTION "_sion_generic_collective_process_write" 
  226 int64_t _sion_generic_collective_process_write(
const void *data, int64_t *spec, 
int sid)
 
  228   _sion_filedesc *sion_filedesc;
 
  229   if ((sid < 0) || (_sion_vcdtype(sid) != SION_FILEDESCRIPTOR) || !(sion_filedesc = _sion_vcdtovcon(sid))) {
 
  230     return _sion_errorprint(
 
  231       SION_SIZE_NOT_VALID, _SION_ERROR_RETURN, 
"_sion_generic_collective_process_write: invalid sion_filedesc %d", sid);
 
  233   DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"enter spec[0]=%d spec[1]=%d sid=%d\n", (
int)spec[0], (
int)spec[1], sid));
 
  236   int64_t destpos = spec[0], bytestowrite = spec[1];
 
  237   DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"currentpos=%d destpos=%d\n", (
int)sion_filedesc->currentpos, (
int)destpos));
 
  238   if (sion_filedesc->currentpos != destpos) {
 
  239     _sion_file_set_position(sion_filedesc->fileptr, destpos);
 
  240     sion_filedesc->currentpos = destpos;
 
  244   DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"collector start to write data of size %lld at position %lld\n",
 
  245     (
long long)bytestowrite, (
long long)destpos));
 
  246   int64_t bwrote = _sion_file_write(data, bytestowrite, sion_filedesc->fileptr);
 
  247   if (bwrote != bytestowrite) {
 
  248     return _sion_errorprint(bwrote, _SION_ERROR_RETURN, 
"_sion_generic_collective_process_write: problems writing data ...\n");
 
  252   sion_filedesc->currentpos += bytestowrite;
 
  253   DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"collector wrote data block (bwrote=%d) of size %ld new pos %lld, %lld\n", bwrote,
 
  254     (
long)bytestowrite, (
long long)sion_filedesc->currentpos, (
long long)_sion_file_get_position(sion_filedesc->fileptr)));
 
  260 bool _sion_generic_collective_next_write_spec(
void *iterator, int64_t *spec)
 
  263   if (_sion_partitioned_range_iterator_next((_sion_partitioned_range_iterator *)iterator, &range)) {
 
  264     _sion_range_into_spec(range, spec);
 
  271 #define DFUNCTION "_sion_generic_collective_process_read" 
  272 int64_t _sion_generic_collective_process_read(
void *data, int64_t *spec, 
int sid)
 
  274   _sion_filedesc *sion_filedesc;
 
  276   if ((sid < 0) || (_sion_vcdtype(sid) != SION_FILEDESCRIPTOR) || !(sion_filedesc = _sion_vcdtovcon(sid))) {
 
  277     return _sion_errorprint(
 
  278       SION_SIZE_NOT_VALID, _SION_ERROR_RETURN, 
"_sion_generic_collective_process_read: invalid sion_filedesc %d", sid);
 
  280   DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"enter spec[0]=%" PRId64 
" spec[1]=%" PRId64 
" sid=%d\n", spec[0], spec[1], sid));
 
  283   int64_t destpos = spec[0], bytestoread = spec[1];
 
  284   if (sion_filedesc->currentpos != destpos) {
 
  285     if (sion_filedesc->fileptr != NULL) {
 
  286       _sion_file_set_position(sion_filedesc->fileptr, destpos);
 
  288     sion_filedesc->currentpos = destpos;
 
  292   DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"collector start to read data of size %lld at position %lld\n",
 
  293     (
long long)bytestoread, (
long long)destpos));
 
  294   int64_t bread = _sion_file_read(data, bytestoread, sion_filedesc->fileptr);
 
  295   if (bread != bytestoread) {
 
  296     return _sion_errorprint(bread, _SION_ERROR_RETURN, 
"_sion_generic_collective_process_read: problems reading data ...\n");
 
  300     ONLY_DEBUG(
char *p = data;)
 
  302       (128, DFUNCTION, _SION_DEFAULT_RANK, 
"data[0]=%c data[%d]=%c\n", (
char)p[0], (
int)bytestoread, (
char)p[bytestoread - 1]));
 
  306   sion_filedesc->currentpos += bytestoread;
 
  307   DPRINTFP((4, DFUNCTION, _SION_DEFAULT_RANK, 
"collector read data block (bread=%d) of size %ld new pos %lld, %lld\n", bread,
 
  308     (
long)bytestoread, (
long long)sion_filedesc->currentpos, (
long long)_sion_file_get_position(sion_filedesc->fileptr)));
 
  314 bool _sion_generic_collective_next_read_spec(
void *iterator, int64_t *spec)
 
  317   if (_sion_partitioned_limited_range_iterator_next((_sion_partitioned_limited_range_iterator *)iterator, &range)) {
 
  318     _sion_range_into_spec(range, spec);
 
  325 #define DFUNCTION "_sion_coll_check_env" 
  326 void _sion_coll_check_env(_sion_filedesc *sion_filedesc)
 
  328   const char *cd = _sion_getenv(
"SION_COLLDEBUG");
 
  330     sion_filedesc->colldebug = atoi(cd);
 
  333   const char *cs = _sion_getenv(
"SION_COLLSIZE");
 
  334   const char *cn = _sion_getenv(
"SION_COLLNUM");
 
  336     sion_filedesc->collsize = atoi(cs);
 
  337     if (sion_filedesc->collsize > sion_filedesc->ntasks) {
 
  338       sion_filedesc->collsize = sion_filedesc->ntasks;
 
  340     if (sion_filedesc->colldebug >= 1) {
 
  341       fprintf(stderr, 
"collective statistics:            SION_COLLSIZE=%11d\n", sion_filedesc->collsize);
 
  344     int numcoll = atoi(cn);
 
  346       if (numcoll > sion_filedesc->ntasks) {
 
  347         numcoll = sion_filedesc->ntasks;
 
  349       sion_filedesc->collsize = sion_filedesc->ntasks / numcoll;
 
  350       if (sion_filedesc->ntasks % numcoll > 0) {
 
  351         sion_filedesc->collsize++;
 
  354       if (sion_filedesc->colldebug >= 1) {
 
  355         fprintf(stderr, 
"collective statistics:             SION_COLLNUM=%11d\n", numcoll);
 
  356         fprintf(stderr, 
"collective statistics:                 collsize=%11d\n", sion_filedesc->collsize);
 
  363     if (sion_filedesc->collsize > 0) {
 
  364       sion_filedesc->usecoll = 1;
 
  366     if (sion_filedesc->collsize < 0) {
 
  367       sion_filedesc->usecoll = 1;
 
  369     if (sion_filedesc->collsize == 0) {
 
  370       sion_filedesc->usecoll = 0;
 
  374   DPRINTFP((2, DFUNCTION, _SION_DEFAULT_RANK, 
"usecoll=%d collsize=%d (%d tasks, %d files) colldebug=%d\n",
 
  375     sion_filedesc->usecoll, sion_filedesc->collsize, sion_filedesc->ntasks, sion_filedesc->nfiles, sion_filedesc->colldebug));
 
size_t sion_coll_write(const void *data, size_t size, size_t nitems, int sid)
Write data to a SIONlib file using collective I/O.
size_t sion_coll_read(void *data, size_t size, size_t nitems, int sid)
Read data from SIONlib file using collective I/O.
size_t sion_coll_fread(void *data, size_t size, size_t nitems, int sid)
Read data from SIONlib file using collective I/O.
size_t sion_coll_fwrite(const void *data, size_t size, size_t nitems, int sid)
Write data to a SIONlib file using collective I/O.
size_t sion_write(const void *data, size_t size, size_t nitems, int sid)
Write data to a SIONlib file.
size_t sion_read(void *data, size_t size, size_t nitems, int sid)
Read data from SIONlib file.