SIONlib  1.7.4
Scalable I/O library for parallel access to task-local files
sion_keyvalue_inline.c
Go to the documentation of this file.
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2019 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
14 #define _XOPEN_SOURCE 700
15 
16 #include <stdlib.h>
17 #include <stdio.h>
18 #include <string.h>
19 #include <time.h>
20 #include <assert.h>
21 
22 #include <sys/types.h>
23 #include <sys/stat.h>
24 #include <fcntl.h>
25 #include <unistd.h>
26 
27 #ifdef _SION_CUDA
28 #include <cuda_runtime.h>
29 #endif
30 
31 #include "sion.h"
32 #include "sion_debug.h"
33 #include "sion_error_handler.h"
34 #include "sion_filedesc.h"
35 #include "sion_tools.h"
36 #include "sion_fd.h"
37 #include "sion_file.h"
38 #include "sion_metadata.h"
39 #include "sion_internal.h"
40 #include "sion_internal_seek.h"
41 #include "sion_printts.h"
42 #include "sion_keyvalue.h"
43 #include "sion_keyvalue_inline.h"
44 #include "sion_keyvalue_keymngr.h"
45 
46 #define TABLE_SIZE 127
47 
48 #define SEARCH_TO_KEY 0
49 #define SEARCH_TO_NEXT 1
50 #define SEARCH_TO_END 2
51 
52 /* Utility functions */
53 _sion_keyvalue_keymngr * _sion_get_or_init_key_info(_sion_filedesc *sion_filedesc);
54 int _sion_move_to_pos(_sion_filedesc *sion_filedesc, size_t pos);
55 int _sion_scan_forward_to_key(_sion_filedesc *sion_filedesc, uint64_t key, int search_mode);
56 int _sion_move_to_next_chunk(_sion_filedesc *sion_filedesc);
57 size_t _sion_write_data_to_chunks_inline(_sion_filedesc *sion_filedesc, const void *data, sion_int64 bytes_to_write);
58 sion_int64 _sion_compute_next_position_inline(_sion_filedesc *sion_filedesc, sion_int64 bytes_to_read);
59 size_t _sion_skip_data_from_chunks_inline(_sion_filedesc *sion_filedesc, sion_int64 bytes_to_read);
60 size_t _sion_read_data_from_chunks_inline(_sion_filedesc *sion_filedesc, void *data, sion_int64 bytes_to_read);
61 
62 
63 /* ***************** */
64 /* Write functions */
65 /* ***************** */
66 
67 
68 
69 #define DFUNCTION "_sion_store_and_write_key_and_len_inline"
70 size_t _sion_store_and_write_key_and_len_inline(_sion_filedesc *sion_filedesc, uint64_t key, size_t len) {
71  sion_int64 bytes_to_write;
72  sion_int64 buffer[2];
73  size_t rc=SION_SUCCESS, frc;
74 
75 
76  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
77 
78  /* TODO: need to check if current_pos is set correctly (in case of sion_seeks in between, currently not allowed) ... */
79 
80  /* assemble data */
81  buffer[0]=(sion_int64) key;
82  buffer[1]=(sion_int64) len;
83  bytes_to_write=2*sizeof(sion_int64);
84 
85  frc=_sion_write_data_to_chunks_inline(sion_filedesc, (const void *) buffer, (sion_int64) bytes_to_write);
86 
87  /* check return code */
88  if(frc != bytes_to_write) {
89  return(_sion_errorprint_on_rank(-1,_SION_ERROR_RETURN,sion_filedesc->rank,
90  "could not write data (%d bytes) to file (frc=%d sid=%d) ...", (int) bytes_to_write, (int) frc, sion_filedesc->sid));
91  }
92 
93  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
94  return(rc);
95 }
96 #undef DFUNCTION
97 
98 #define DFUNCTION "_sion_write_value_inline"
99 size_t _sion_write_value_inline(_sion_filedesc *sion_filedesc, const void *data, uint64_t key, size_t len) {
100  size_t rc=0, frc;
101  sion_int64 bytes_to_write;
102  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
103 
104 
105  bytes_to_write=len;
106  frc=_sion_write_data_to_chunks_inline(sion_filedesc, data, (sion_int64) bytes_to_write);
107  if(frc != bytes_to_write) {
108  return(_sion_errorprint_on_rank(-1,_SION_ERROR_RETURN,sion_filedesc->rank,
109  "could not write data (%d bytes) to file (frc=%d sid=%d) ...", (int) bytes_to_write, (int) frc, sion_filedesc->sid));
110  }
111 
112  rc=(size_t) bytes_to_write;
113 
114  DPRINTFP((2, DFUNCTION, -1, "leave\n"));
115  return (rc);
116 }
117 #undef DFUNCTION
118 
119 
120 /* ***************** */
121 /* Read functions */
122 /* ***************** */
123 
124 #define DFUNCTION "_sion_find_and_read_key_and_len_inline"
125 int _sion_find_and_read_key_and_len_inline(_sion_filedesc *sion_filedesc, uint64_t key, size_t len, size_t *datalen) {
126  _sion_keyvalue_keymngr *keymngr;
127  int rc = 0;
128  int position_found=0;
129  sion_table_key_t mkey;
130  size_t key_current_pos = (size_t) -1, key_bytes_left = (size_t) -1;
131 
132  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
133 
134  /* get or init internal data structure if first call */
135  keymngr=_sion_get_or_init_key_info(sion_filedesc);
136 
137  /* map key to table type */
138  mkey = key;
139 
140  *datalen=0;
141 
142  /* indicator if data could read, key_current_pos, key_bytes_left are set correctly */
143  position_found=0;
144 
145  /* check if info about key is available */
146  rc=_sion_keyvalue_keymngr_lookup(keymngr, mkey, &key_current_pos, &key_bytes_left);
147  if(rc!=SION_SUCCESS) {
148  DPRINTFP((2, DFUNCTION, -1, "key %ld is not found in keymngr, scan forward\n", (long)mkey));
149 
150  /* scan forward until key is found */
151  rc=_sion_scan_forward_to_key(sion_filedesc, key, SEARCH_TO_KEY);
152 
153  /* check again if info about key is now available */
154  if(rc==SION_SUCCESS) {
155  DPRINTFP((2, DFUNCTION, -1, "scan forward found key %ld, get info \n", (long)mkey));
156  rc=_sion_keyvalue_keymngr_lookup(keymngr, mkey, &key_current_pos, &key_bytes_left);
157  DPRINTFP((2, DFUNCTION, -1, "key %ld, get info(2): pos=%ld bytes_left=%ld\n", (long)mkey, (long) key_current_pos, (long) key_bytes_left ));
158  }
159  } else {
160  DPRINTFP((2, DFUNCTION, -1, "key %ld, get info(1): pos=%ld bytes_left=%ld\n", (long)mkey, (long) key_current_pos, (long) key_bytes_left ));
161  }
162  if(rc==SION_SUCCESS) {
163  /* check if enough data is available in block */
164  position_found=1;
165  if(len<=key_bytes_left) {
166  *datalen=len;
167  } else {
168  /* only a part of requested data can be read */
169  *datalen=key_bytes_left;
170  }
171  } else {
172  /* not more data in file for key */
173  rc=SION_NOT_SUCCESS;
174  DPRINTFP((2, DFUNCTION, -1, "no more data for key %ld\n", (long)mkey ));
175  }
176 
177  /* check current position and move if necessary */
178  DPRINTFP((2, DFUNCTION, -1, "check position: current_pos %ld, key_current_pos=%ld len=%ld key_bytes_left=%ld\n",
179  (long) sion_filedesc->currentpos, (long) key_current_pos, (long) len, (long) key_bytes_left ));
180  if(position_found) {
181  if(key_current_pos!=sion_filedesc->currentpos) {
182  rc=_sion_move_to_pos(sion_filedesc, key_current_pos);
183  }
184  /* ready to read data */
185  }
186 
187  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n", rc));
188  return rc;
189 }
190 #undef DFUNCTION
191 
192 #define DFUNCTION "_sion_read_value_inline"
193 size_t _sion_read_value_inline(_sion_filedesc *sion_filedesc, void *data, uint64_t key, size_t len) {
194  size_t rc=0;
195  _sion_keyvalue_keymngr *keymngr;
196  sion_table_key_t mkey;
197  sion_int64 bread;
198  sion_int64 bytes_to_read;
199  char *out;
200 
201  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
202 
203  /* get or init internal data structure if first call */
204  keymngr=_sion_get_or_init_key_info(sion_filedesc);
205  mkey = (sion_table_key_t) key;
206 
207  /* there are enough bytes available, checked during read of key,len */
208 
209  out = (char*) data;
210 
211  /* read data */
212  bytes_to_read=len;
213 
214  bread=_sion_read_data_from_chunks_inline(sion_filedesc, (void *) out, (sion_int64) bytes_to_read);
215  if (bread != bytes_to_read) {
216  return(_sion_errorprint_on_rank(SION_NOT_SUCCESS,_SION_ERROR_RETURN,sion_filedesc->rank,
217  "could not read data (%d bytes) from file ...", (int) bytes_to_read));
218  }
219 
220  DPRINTFP((2, DFUNCTION, -1, "first value = %3hhu | %c\n", out[0], out[0]));
221 
222  /* update meta data of current block */
223  _sion_keyvalue_keymngr_update_read_pos(keymngr, mkey, (size_t) bread, (sion_int64) sion_filedesc->currentpos);
224 
225  rc = (size_t) bread;
226 
227  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",(int) rc));
228  return (rc);
229 }
230 #undef DFUNCTION
231 
232 
233 #define DFUNCTION "_sion_key_full_scan_inline"
234 int _sion_key_full_scan_inline(_sion_filedesc *sion_filedesc) {
235  int rc=SION_NOT_SUCCESS;
236  _sion_keyvalue_keymngr* keymngr;
237  sion_table_key_t key=0;
238 
239  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
240 
241  /* get or init internal data structure if first call */
242  keymngr=_sion_get_or_init_key_info(sion_filedesc);
243 
244  if(!_sion_keyvalue_keymngr_is_scan_done(keymngr)) {
245  rc=_sion_scan_forward_to_key(sion_filedesc, key, SEARCH_TO_END);
246  } else {
247  /* scan already done */
248  rc=SION_SUCCESS;
249  }
250 
251  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
252  return rc;
253 }
254 #undef DFUNCTION
255 
256 
257 #define DFUNCTION "_sion_iterator_reset_inline"
258 int _sion_iterator_reset_inline(_sion_filedesc *sion_filedesc) {
259  int rc=0;
260  _sion_keyvalue_keymngr* keymngr;
261 
262  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
263 
264  /* get or init internal data structure if first call */
265  keymngr=_sion_get_or_init_key_info(sion_filedesc);
266 
267  rc=_sion_keyvalue_keymngr_iterator_reset(keymngr);
268 
269  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
270  return rc;
271 }
272 #undef DFUNCTION
273 
274 #define DFUNCTION "_sion_iterator_next_inline"
275 int _sion_iterator_next_inline(_sion_filedesc *sion_filedesc, uint64_t *keyptr, size_t *sizeptr) {
276  int rc=SION_NOT_SUCCESS;
277  _sion_keyvalue_keymngr* keymngr;
278  sion_table_key_t key=0;
279  size_t current_pos, offset, len;
280 
281  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
282 
283  /* get or init internal data structure if first call */
284  keymngr=_sion_get_or_init_key_info(sion_filedesc);
285 
286  if(_sion_keyvalue_keymngr_iterator_next(keymngr, &key, &current_pos, &offset, &len)==SION_SUCCESS) {
287  /* next block found */
288  rc=SION_SUCCESS;
289  } else {
290  /* scan forward if not at end of file to get more meta data */
291  if (_sion_scan_forward_to_key(sion_filedesc, key, SEARCH_TO_NEXT)==SION_SUCCESS) {
292  /* get info about block */
293  if(_sion_keyvalue_keymngr_iterator_next(keymngr, &key, &current_pos, &offset, &len)==SION_SUCCESS) {
294  rc=SION_SUCCESS;
295  } else {
296  return(_sion_errorprint_on_rank(SION_NOT_SUCCESS,_SION_ERROR_RETURN, sion_filedesc->rank,
297  "internal error: block could not be find at end of block list ..."));
298  }
299  } else {
300  /* end of file reached and all blocks are already iterated */
301  rc=SION_NOT_SUCCESS;
302  }
303  }
304 
305  /* set output parameter */
306  if(rc==SION_SUCCESS) {
307  *keyptr = (uint64_t) key;
308  *sizeptr = (size_t) len;
309  }
310 
311  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
312  return rc;
313 }
314 #undef DFUNCTION
315 
316 #define DFUNCTION "_sion_seek_key_inline"
317 int _sion_seek_key_inline(_sion_filedesc *sion_filedesc, uint64_t key, int blocknum, sion_int64 posinblock) {
318  int rc=SION_NOT_SUCCESS;
319  _sion_keyvalue_keymngr* keymngr;
320  size_t offset, len;
321 
322  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
323 
324  /* get or init internal data structure if first call */
325  keymngr=_sion_get_or_init_key_info(sion_filedesc);
326 
327  /* lookup if a key entry at that position is already in keymngr */
328  rc=_sion_keyvalue_keymngr_lookup_and_set_pos(keymngr, key, blocknum, posinblock, &offset, &len);
329  DPRINTFP((2, DFUNCTION, -1, "after first lookup rc=%d offset=%d len=%d\n",rc, (int)offset, (int) len ));
330 
331  while( (rc!=SION_SUCCESS) && !_sion_keyvalue_keymngr_is_scan_done(keymngr) ) {
332 
333  /* scan forward to next entry with that key */
334  DPRINTFP((2, DFUNCTION, -1, "scan forward to find key %d\n",(int) key));
335  if (_sion_scan_forward_to_key(sion_filedesc, key, SEARCH_TO_NEXT)==SION_SUCCESS) {
336 
337  /* lookup again if a key entry at that position is already in keymngr */
338  rc=_sion_keyvalue_keymngr_lookup_and_set_pos(keymngr, key, blocknum, posinblock, &offset, &len);
339  DPRINTFP((2, DFUNCTION, -1, "in loop lookup rc=%d offset=%d len=%d\n",rc, (int)offset, (int) len ));
340  } else {
341  rc=SION_NOT_SUCCESS;
342  }
343  }
344 
345  /* move to scanpos if necessary */
346  if( rc==SION_SUCCESS ) {
347  DPRINTFP((2, DFUNCTION, -1, "move from %ld to new scanpos (%ld) \n",(long) sion_filedesc->currentpos,(long) offset));
348  rc=_sion_move_to_pos(sion_filedesc, offset);
349  }
350 
351  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
352  return rc;
353 }
354 #undef DFUNCTION
355 
356 #define DFUNCTION "_sion_key_list_iterator_reset_inline"
357 int _sion_key_list_iterator_reset_inline(_sion_filedesc *sion_filedesc) {
358  int rc=0;
359  _sion_keyvalue_keymngr* keymngr;
360 
361  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
362 
363  /* get or init internal data structure if first call */
364  keymngr=_sion_get_or_init_key_info(sion_filedesc);
365 
366  rc=_sion_keyvalue_keymngr_key_list_iterator_reset(keymngr);
367 
368  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
369  return rc;
370 }
371 #undef DFUNCTION
372 
373 #define DFUNCTION "_sion_iterator_next_inline"
374 int _sion_key_list_iterator_next_inline(_sion_filedesc *sion_filedesc, uint64_t *keyptr) {
375  int rc=SION_NOT_SUCCESS;
376  _sion_keyvalue_keymngr* keymngr;
377  sion_table_key_t key=0;
378 
379  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
380 
381  /* get or init internal data structure if first call */
382  keymngr=_sion_get_or_init_key_info(sion_filedesc);
383 
384  if(_sion_keyvalue_keymngr_key_list_iterator_next(keymngr, &key)==SION_SUCCESS) {
385  *keyptr = (uint64_t) key;
386  rc=SION_SUCCESS;
387  } else {
388  *keyptr = 0;
389  rc=SION_NOT_SUCCESS;
390  }
391 
392  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d key=%ld\n",rc,(long) key));
393  return rc;
394 }
395 #undef DFUNCTION
396 
397 /* fills a stat-data structure containing information about the key */
398 #define DFUNCTION "_sion_key_get_stat_inline"
399 int _sion_key_get_stat_inline(_sion_filedesc *sion_filedesc, uint64_t searchkey, sion_key_stat_t *keystat) {
400  int rc=SION_NOT_SUCCESS;
401  _sion_keyvalue_keymngr* keymngr;
402  sion_table_key_t key=0;
403 
404  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
405 
406  /* get or init internal data structure if first call */
407  keymngr=_sion_get_or_init_key_info(sion_filedesc);
408 
409  key= (sion_table_key_t) searchkey;
410 
411  if(_sion_keyvalue_keymngr_key_get_stat(keymngr, key, keystat)==SION_SUCCESS) {
412  rc=SION_SUCCESS;
413  } else {
414  rc=SION_NOT_SUCCESS;
415  }
416 
417  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d key=%ld\n",rc,(long) key));
418  return rc;
419 }
420 #undef DFUNCTION
421 
422 /* ***************** */
423 /* Utility functions */
424 /* ***************** */
425 
426 #define DFUNCTION "_sion_scan_forward_to_key"
427 int _sion_scan_forward_to_key(_sion_filedesc *sion_filedesc, uint64_t key, int search_mode) {
428  int rc = SION_SUCCESS;
429  _sion_keyvalue_keymngr *keymngr;
430  sion_table_key_t mkey;
431 
432  size_t scanpos;
433  int key_found;
434  size_t bytes_left, lastposinblock, lastposinfile;
435  uint64_t key_and_len[2];
436 
437  size_t bread, bskip, len;
438 
439  DPRINTFP((2, DFUNCTION, -1, "enter key=%ld\n",(long) key));
440 
441  /* get keymngr */
442  keymngr = (_sion_keyvalue_keymngr *) sion_filedesc->keyvalptr;
443 
444  key_found=0;
445 
446  /* get position from where scan has to be started */
447  if(_sion_keyvalue_keymngr_get_next_scan_pos(keymngr, &scanpos) != SION_SUCCESS) {
448  /* can only be at start time, start then from current position in file */
449  scanpos=(size_t) sion_filedesc->currentpos;
450  DPRINTFP((2, DFUNCTION, -1, "set scanpos to first pos in file pos=%ld\n",(long) scanpos));
451  }
452  /* check if file is already completely scanned */
453  lastposinfile=sion_filedesc->startpos + sion_filedesc->lastchunknr * sion_filedesc->globalskip
454  + sion_filedesc->blocksizes[sion_filedesc->lastchunknr];
455 
456  DPRINTFP((2, DFUNCTION, -1, "check if not already scanned to endoffile scanpos=%ld lastposinfile=%ld lastchunknr=%d\n",(long) scanpos,(long) lastposinfile, (int) sion_filedesc->lastchunknr ));
457  if(scanpos>=lastposinfile) {
458  DPRINTFP((2, DFUNCTION, -1, "scan is already at end of file (%ld>=%ld) \n",(long) scanpos, (long) lastposinfile));
459  _sion_keyvalue_keymngr_set_scan_done(keymngr);
460  rc=SION_NOT_SUCCESS;
461  }
462 
463  /* move to scanpos if necessary */
464  if( (sion_filedesc->currentpos != scanpos) && (!_sion_keyvalue_keymngr_is_scan_done(keymngr)) ) {
465  DPRINTFP((2, DFUNCTION, -1, "move from %ld to new scanpos (%ld) \n",(long) sion_filedesc->currentpos,(long) scanpos));
466  rc=_sion_move_to_pos(sion_filedesc, scanpos);
467  }
468 
469  /* search forward */
470  while ( (!_sion_keyvalue_keymngr_is_scan_done(keymngr)) && (!key_found) ) {
471 
472  lastposinblock = sion_filedesc->startpos + sion_filedesc->currentblocknr * sion_filedesc->globalskip
473  + sion_filedesc->blocksizes[sion_filedesc->currentblocknr];
474  bytes_left = lastposinblock - sion_filedesc->currentpos;
475  DPRINTFP((2, DFUNCTION, -1, "search loop: lastposinblock=%ld bytes_left=%ld currentpos=%ld\n",(long) lastposinblock,(long) bytes_left,(long) sion_filedesc->currentpos));
476 
477 
478  /* is there data in current chunk to start reading next block? */
479  if ( bytes_left > 0 ) {
480  /* read next block from current and following chunks */
481  bread=_sion_read_data_from_chunks_inline(sion_filedesc, (void *) key_and_len, (sion_int64) sizeof(key_and_len));
482  if (bread != sizeof(key_and_len)) {
483  return(_sion_errorprint_on_rank(SION_NOT_SUCCESS,_SION_ERROR_RETURN,sion_filedesc->rank,
484  "could not read data (%d bytes) from file ...", (int) sizeof(key_and_len)));
485  }
486  sion_swap(key_and_len, key_and_len, sizeof(sion_int64), 2, sion_filedesc->swapbytes);
487 
488 
489  DPRINTFP((2, DFUNCTION, -1, "search loop: found next key,len (%ld,%ld)\n",(long) key_and_len[0],(long) key_and_len[1]));
490  bytes_left-=bread;
491 
492  DPRINTFP((2, DFUNCTION, -1, "search loop: position is now %ld bytes_left=%ld\n",(long) sion_filedesc->currentpos, (long) bytes_left));
493 
494  /* check if filepointer has to moved to next block to be in front of data */
495  if(bytes_left==0) {
496  DPRINTFP((2, DFUNCTION, -1, "search loop: data starts in next chunk, moving forward\n"));
497  if(! _sion_move_to_next_chunk(sion_filedesc)) {
498  return(_sion_errorprint_on_rank(SION_NOT_SUCCESS,_SION_ERROR_RETURN,sion_filedesc->rank,
499  "could not move to data section in next block ..."));
500  }
501  lastposinblock = sion_filedesc->startpos + sion_filedesc->currentblocknr * sion_filedesc->globalskip
502  + sion_filedesc->blocksizes[sion_filedesc->currentblocknr];
503  bytes_left = lastposinblock - sion_filedesc->currentpos;
504  DPRINTFP((2, DFUNCTION, -1, "search loop: position is moved to %ld bytes_left=%ld\n",(long) sion_filedesc->currentpos, (long) bytes_left));
505  }
506 
507  /* store new block info */
508  mkey=key_and_len[0];
509  len =key_and_len[1];
510  rc=_sion_keyvalue_keymngr_add_block(keymngr, mkey, (size_t) sion_filedesc->currentpos, len);
511  DPRINTFP((2, DFUNCTION, -1, "search loop: stored new block key=%ld pos=%ld len=%ld\n",(long) mkey, (long) sion_filedesc->currentpos, (long) len));
512 
513  /* check key */
514  if (
515  ( ( (search_mode==SEARCH_TO_KEY) && (key_and_len[0] == key) ) || (search_mode==SEARCH_TO_NEXT) )
516  && ( !(search_mode==SEARCH_TO_END) )
517  ) {
518  DPRINTFP((2, DFUNCTION, -1, "search loop: found searched key=%ld, leaving\n",(long) key));
519  rc=SION_SUCCESS;
520  key_found=1; /* end of loop */
521  scanpos=_sion_compute_next_position_inline(sion_filedesc, len); /* next search behind data of that block */
522  } else {
523  DPRINTFP((2, DFUNCTION, -1, "search loop: found another key=%ld <=> %ld, continue\n",(long) key_and_len[0], (long) key));
524 
525  /* move forward behind data of block in the same or next chunk */
526  DPRINTFP((2, DFUNCTION, -1, "search loop: found move position forward by %ld bytes\n",(long) len));
527  bskip=_sion_skip_data_from_chunks_inline(sion_filedesc, (sion_int64) len);
528 
529  if ( bskip == len ) {
530 
531  /* recompute bytes_left in current block */
532  lastposinblock = sion_filedesc->startpos + sion_filedesc->currentblocknr * sion_filedesc->globalskip
533  + sion_filedesc->blocksizes[sion_filedesc->currentblocknr];
534  bytes_left = lastposinblock - sion_filedesc->currentpos;
535  scanpos=sion_filedesc->currentpos; /* next search behind data */
536 
537  } else {
538  return(_sion_errorprint_on_rank(SION_NOT_SUCCESS,_SION_ERROR_RETURN,sion_filedesc->rank,
539  "could not skip data section of one block (%d bytes) from file ...", (int) len));
540  }
541 
542  /* check if filepointer has to moved to next block */
543  if(bytes_left==0) {
544  DPRINTFP((2, DFUNCTION, -1, "search loop: chunk empty, move to next chunk\n"));
545  if(! _sion_move_to_next_chunk(sion_filedesc)) {
546  _sion_keyvalue_keymngr_set_scan_done(keymngr); /* end of loop */
547  if(search_mode!=SEARCH_TO_END) {
548  rc=SION_NOT_SUCCESS;
549  } else {
550  rc=SION_SUCCESS; /* end of data reached */
551  }
552  }
553  }
554  }
555 
556  } else {
557  /* not enough byte in block to read next key+len */
558  if(bytes_left==0) {
559  if(! _sion_move_to_next_chunk(sion_filedesc)) {
560  _sion_keyvalue_keymngr_set_scan_done(keymngr); /* end of loop */
561  rc=SION_NOT_SUCCESS;
562  }
563  } else {
564  return(_sion_errorprint_on_rank(SION_NOT_SUCCESS,_SION_ERROR_RETURN,sion_filedesc->rank,
565  "unknown data at end of chunk (%d bytes) ...", (int) bytes_left));
566  }
567  }
568 
569  } /* while */
570 
571  /* update scanpos */
572  if(_sion_keyvalue_keymngr_set_next_scan_pos(keymngr, scanpos) != SION_SUCCESS) {
573  return(_sion_errorprint_on_rank(SION_NOT_SUCCESS,_SION_ERROR_RETURN,sion_filedesc->rank,
574  "internal error set seekpos ..."));
575  }
576 
577  DPRINTFP((2, DFUNCTION, -1, "leave\n"));
578  return (rc);
579 }
580 #undef DFUNCTION
581 
582 
583 #define DFUNCTION "_sion_get_or_init_key_info"
584 _sion_keyvalue_keymngr * _sion_get_or_init_key_info(_sion_filedesc *sion_filedesc) {
585 
586  _sion_keyvalue_keymngr* keymngr;
587 
588  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
589 
590  if (sion_filedesc->keyvalptr == NULL) {
591  DPRINTFP((2, DFUNCTION, -1, "sion_filedesc->keyvalptr == NULL\n"));
592  keymngr =_sion_keyvalue_keymngr_init(TABLE_SIZE);
593  if (keymngr == NULL) {
594  _sion_errorprint_on_rank(-1,_SION_ERROR_RETURN,sion_filedesc->rank,"could not allocate keymngr ...");
595  }
596  sion_filedesc->keyvalptr=keymngr;
597  DPRINTFP((2, DFUNCTION, -1, "alloc now KEYVALPTR = %x\n",sion_filedesc->keyvalptr));
598  }
599 
600  DPRINTFP((2, DFUNCTION, -1, "leave\n"));
601  return sion_filedesc->keyvalptr;
602 }
603 #undef DFUNCTION
604 
605 #define DFUNCTION "_sion_move_to_next_chunk"
606 int _sion_move_to_next_chunk(_sion_filedesc *sion_filedesc) {
607  int rc = SION_SUCCESS;
608 
609  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
610 
611  /* is another chink available */
612  if (sion_filedesc->currentblocknr < sion_filedesc->lastchunknr) {
613  /* move to next chunk */
614  sion_filedesc->currentblocknr++;
615  sion_filedesc->currentpos = sion_filedesc->startpos + sion_filedesc->currentblocknr * sion_filedesc->globalskip;
616  _sion_file_purge(sion_filedesc->fileptr);
617  _sion_file_set_position(sion_filedesc->fileptr, sion_filedesc->currentpos);
618  } else {
619  rc=SION_NOT_SUCCESS;
620  }
621 
622  DPRINTFP((2, DFUNCTION, -1, "leave, rc = %d\n", rc));
623  return rc;
624 }
625 #undef DFUNCTION
626 
627 #define DFUNCTION "_sion_move_to_pos"
628 int _sion_move_to_pos(_sion_filedesc *sion_filedesc, size_t pos) {
629  int rc = SION_SUCCESS;
630  size_t block_min_pos, block_max_pos;
631  int c, pos_found=0;
632 
633  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
634 
635  block_min_pos = sion_filedesc->startpos + sion_filedesc->currentblocknr * sion_filedesc->globalskip;
636  block_max_pos = block_min_pos+sion_filedesc->blocksizes[sion_filedesc->currentblocknr]; /* position of first byte behind chunk */
637 
638  DPRINTFP((2, DFUNCTION, -1, "current_chunk: %ld to %ld (pos=%ld)\n", (long) block_min_pos, (long) block_max_pos, (long) pos));
639 
640  if ( (pos>=block_min_pos) && (pos<block_max_pos) ) {
641  /* stay in the same chunk */
642  pos_found=1;
643  sion_filedesc->currentpos=pos;
644  _sion_file_purge(sion_filedesc->fileptr);
645  _sion_file_set_position(sion_filedesc->fileptr, sion_filedesc->currentpos);
646  DPRINTFP((2, DFUNCTION, -1, "stay in current chunk: currentpos=%ld\n", (long) sion_filedesc->currentpos));
647 
648  } else {
649  /* scan from beginning over all chunks */
650 
651  for(c=0;c<=sion_filedesc->lastchunknr;c++) {
652  block_min_pos = sion_filedesc->startpos + c * sion_filedesc->globalskip;
653  block_max_pos = block_min_pos+sion_filedesc->blocksizes[c]; /* position of first byte behind chunk */
654 
655  DPRINTFP((2, DFUNCTION, -1, "check chunk%2d: %ld to %ld (pos=%ld)\n", c, (long) block_min_pos, (long) block_max_pos, (long) pos));
656 
657  if ( (pos>=block_min_pos) && (pos<block_max_pos) ) {
658 
659  /* stay in this chunk */
660  pos_found=1;
661  sion_filedesc->currentblocknr=c;
662  sion_filedesc->currentpos=pos;
663  _sion_file_purge(sion_filedesc->fileptr);
664  _sion_file_set_position(sion_filedesc->fileptr, sion_filedesc->currentpos);
665  DPRINTFP((2, DFUNCTION, -1, "stay in this chunk: currentpos=%ld\n", (long) sion_filedesc->currentpos));
666  break;
667 
668  }
669  }
670  }
671 
672  if(!pos_found) {
673  rc=SION_NOT_SUCCESS;
674  }
675 
676  DPRINTFP((2, DFUNCTION, -1, "leave, rc = %d\n", rc));
677  return rc;
678 }
679 #undef DFUNCTION
680 
681 /* write data block to current and following chunks, data could be distributed over several chunks */
682 #define DFUNCTION "_sion_write_data_to_chunks_inline"
683 size_t _sion_write_data_to_chunks_inline(_sion_filedesc *sion_filedesc, const void *data, sion_int64 bytes_to_write) {
684  sion_int64 btowr, byteswritten, offset;
685  size_t rc=0, frc;
686 
687 
688  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
689 
690  /* loop to write data in current and following chunks */
691  offset=0;
692  while(bytes_to_write>0) {
693 
694  _sion_flush_block(sion_filedesc);
695 
696  /* determine size of data which could written into current chunk */
697  byteswritten = sion_filedesc->blocksizes[sion_filedesc->currentblocknr];
698  btowr=bytes_to_write;
699  if ((byteswritten + btowr) > sion_filedesc->chunksize) btowr = sion_filedesc->chunksize - byteswritten;
700  DPRINTFP((2, DFUNCTION, -1, " bytes_to_write=%ld btowr=%ld chunksize=%ld byteswritten=%ld currentblocknr=%d blsize[%d]=%ld\n",
701  (long) bytes_to_write, (long) btowr, (long) sion_filedesc->chunksize, (long) byteswritten, (int) sion_filedesc->currentblocknr,
702  sion_filedesc->currentblocknr, (long) sion_filedesc->blocksizes[sion_filedesc->currentblocknr] ));
703 
704  /* write part or rest of data */
705 #ifdef _SION_CUDA
706  struct cudaPointerAttributes attrs;
707  cudaError_t err = cudaPointerGetAttributes(&attrs, data);
708  if ((err == cudaSuccess) && (!attrs.isManaged && (attrs.memoryType == cudaMemoryTypeDevice)) ) {
709  char* buffer = malloc(sion_filedesc->fsblksize);
710  const char* data_ = (char *)data + offset;
711  while (frc < btowr) {
712  sion_int64 to_write = (btowr - frc) > sion_filedesc->fsblksize ? sion_filedesc->fsblksize : (btowr - frc);
713  cudaMemcpy(buffer, data_, to_write, cudaMemcpyDeviceToHost);
714  sion_int64 frc_ = _sion_file_write(buffer, to_write, sion_filedesc->fileptr);
715  if (frc_ != to_write) break;
716  frc += frc_;
717  data_ += frc_;
718  }
719  free(buffer);
720  } else {
721  frc = _sion_file_write((char *)data + offset, btowr, sion_filedesc->fileptr);
722  }
723 #else
724  frc = _sion_file_write(((char *) data+offset), btowr, sion_filedesc->fileptr);
725 #endif
726 
727  /* check return code */
728  if(frc != btowr) {
729  return(_sion_errorprint_on_rank(-1,_SION_ERROR_RETURN,sion_filedesc->rank,
730  "could not write data (%d bytes) to file (frc=%d sid=%d) ...", (int) btowr, (int) frc, sion_filedesc->sid));
731  }
732 
733  /* increase current position and update other counters */
734  sion_filedesc->currentpos+=btowr;
735  bytes_to_write -=btowr;
736  offset +=btowr;
737 
738  /* check if all data has already be processed */
739  if(bytes_to_write>0) {
740 
741  /* create new block for writing next data */
742  _sion_flush_block(sion_filedesc);
743  _sion_create_new_block(sion_filedesc);
744  }
745  }
746  rc=offset;
747  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
748  return(rc);
749 }
750 #undef DFUNCTION
751 
752 /* skip data block from current and following chunks, data could be distributed over several chunks */
753 #define DFUNCTION "_sion_compute_next_position_inline"
754 sion_int64 _sion_compute_next_position_inline(_sion_filedesc *sion_filedesc, sion_int64 bytes_to_read) {
755  sion_int64 btord = 0, bytesread = 0, offset = 0;
756  int blocknr = 0;
757 
758 
759  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
760 
761  /* check data in current block */
762  blocknr = sion_filedesc->currentblocknr;
763  bytesread = sion_filedesc->currentpos - (sion_filedesc->startpos + sion_filedesc->currentblocknr * sion_filedesc->globalskip);
764 
765  DPRINTFP((2, DFUNCTION, -1, " currentpos=%ld bytesread=%ld blocknr=%d bytes_to_read=%ld\n",
766  (long) sion_filedesc->currentpos, (long) bytesread, (int) blocknr, (long) bytes_to_read ));
767 
768  DPRINTFP((2, DFUNCTION, -1, " blocksizes[%d]=%ld\n",
769  (int) sion_filedesc->currentblocknr, (long) sion_filedesc->blocksizes[sion_filedesc->currentblocknr] ));
770 
771  /* data is all in current block */
772  if ((bytesread + bytes_to_read) <= sion_filedesc->blocksizes[sion_filedesc->currentblocknr]) {
773  offset=sion_filedesc->currentpos+bytes_to_read;
774  } else {
775  btord = sion_filedesc->blocksizes[sion_filedesc->currentblocknr] - bytesread; /* size of data in current chunk */
776  bytes_to_read-=btord; /* rest of data in next chunks */
777 
778  DPRINTFP((2, DFUNCTION, -1, " skip to next block, bytes_to_read=%ld btord=%ld\n",(long) bytes_to_read, (long) btord ));
779 
780  /* loop to behind data in current and following chunks */
781  while(bytes_to_read>0) {
782 
783  /* next block */
784  if (blocknr < sion_filedesc->lastchunknr) { blocknr++; } else {
785  return(_sion_errorprint_on_rank(-1,_SION_ERROR_RETURN,sion_filedesc->rank,"internal error next block not available, but should ..."));
786  }
787 
788  DPRINTFP((2, DFUNCTION, -1, " blocksizes[%d]=%ld\n", (int) blocknr, (long) sion_filedesc->blocksizes[blocknr] ));
789  if ( bytes_to_read > sion_filedesc->blocksizes[blocknr]) {
790  btord = sion_filedesc->blocksizes[blocknr];
791  bytes_to_read-=btord; /* rest of data in next chunks */
792  DPRINTFP((2, DFUNCTION, -1, "whole block, bytes_to_read=%ld btord=%ld\n",(long) bytes_to_read, (long) btord ));
793  } else {
794  offset=sion_filedesc->startpos + blocknr * sion_filedesc->globalskip + bytes_to_read;
795  bytes_to_read=0;
796  }
797  }
798  }
799 
800  /* check if behind end of actual block */
801  if(offset == (sion_filedesc->startpos + blocknr * sion_filedesc->globalskip + sion_filedesc->blocksizes[blocknr])) {
802  /* move next block if available, if last blovk of file, stay behind current block */
803  if (blocknr < sion_filedesc->lastchunknr) {
804  blocknr++;
805  offset=sion_filedesc->startpos + blocknr * sion_filedesc->globalskip;
806  DPRINTFP((2, DFUNCTION, -1, " behind end of block, move to next block, blocksizes[%d]=%ld\n", (int) blocknr, (long) sion_filedesc->blocksizes[blocknr] ));
807  }
808  }
809 
810 
811  DPRINTFP((2, DFUNCTION, -1, "leave offset=%ld\n",(long) offset));
812  return(offset);
813 }
814 #undef DFUNCTION
815 
816 /* skip data block from current and following chunks, data could be distributed over several chunks */
817 #define DFUNCTION "_sion_skip_data_from_chunks_inline"
818 size_t _sion_skip_data_from_chunks_inline(_sion_filedesc *sion_filedesc, sion_int64 bytes_to_read) {
819  sion_int64 btord, bytesread, offset;
820  size_t rc=0;
821 
822 
823  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
824 
825  /* loop to read data in current and following chunks */
826  offset=0;
827  while(bytes_to_read>0) {
828 
829  /* determine size of data which could read from current chunk */
830  bytesread = sion_filedesc->currentpos - (sion_filedesc->startpos + sion_filedesc->currentblocknr * sion_filedesc->globalskip);
831 
832  btord=bytes_to_read;
833  if ((bytesread + btord) > sion_filedesc->blocksizes[sion_filedesc->currentblocknr])
834  btord = sion_filedesc->blocksizes[sion_filedesc->currentblocknr] - bytesread;
835  DPRINTFP((2, DFUNCTION, -1, " bytes_to_read=%ld btord=%ld\n",(long) bytes_to_read, (long) btord ));
836 
837  /* file pointer will not here adjusted, only at end of loop */
838 
839  /* increase current position and update other counters */
840  sion_filedesc->currentpos+=btord;
841  bytes_to_read -=btord;
842  offset +=btord;
843 
844  /* check if all data has already be processed, otherwise forward to next chunk */
845  if(bytes_to_read>0) {
846 
847  if(! _sion_move_to_next_chunk(sion_filedesc)) {
848  return(_sion_errorprint_on_rank(-1,_SION_ERROR_RETURN,sion_filedesc->rank,
849  "could not read data (%d bytes) to file (end of file reached sid=%d) ...", (int) btord, sion_filedesc->sid));
850  }
851 
852  }
853  }
854 
855  /* move to new position */
856  _sion_file_purge(sion_filedesc->fileptr);
857  _sion_file_set_position(sion_filedesc->fileptr, sion_filedesc->currentpos);
858 
859  rc=offset;
860  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
861  return(rc);
862 }
863 #undef DFUNCTION
864 
865 /* read data block from current and following chunks, data could be distributed over several chunks */
866 #define DFUNCTION "_sion_read_data_from_chunks_inline"
867 size_t _sion_read_data_from_chunks_inline(_sion_filedesc *sion_filedesc, void *data, sion_int64 bytes_to_read) {
868  sion_int64 btord, bytesread, offset;
869  size_t rc=0, frc;
870 
871 
872  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
873 
874  /* loop to read data in current and following chunks */
875  offset=0;
876  while(bytes_to_read>0) {
877 
878  /* determine size of data which could read from current chunk */
879  bytesread = sion_filedesc->currentpos - (sion_filedesc->startpos + sion_filedesc->currentblocknr * sion_filedesc->globalskip);
880  DPRINTFP((2, DFUNCTION, -1, "bytesread=%ld, curpos=%ld,startpos=%ld,curblock=%ld,gskip=%ld\n",
881  (long) bytesread, (long) sion_filedesc->currentpos, (long) sion_filedesc->startpos,
882  (long) sion_filedesc->currentblocknr, (long) sion_filedesc->globalskip ));
883 
884  btord=bytes_to_read;
885  if ((bytesread + btord) > sion_filedesc->blocksizes[sion_filedesc->currentblocknr]) {
886  btord = sion_filedesc->blocksizes[sion_filedesc->currentblocknr] - bytesread;
887  DPRINTFP((2, DFUNCTION, -1, "block is split to multiple chunks, bytes_to_read=%ld btord=%ld\n",(long) bytes_to_read, (long) btord ));
888  } else {
889  DPRINTFP((2, DFUNCTION, -1, "block in one chunk, bytes_to_read=%ld btord=%ld\n",(long) bytes_to_read, (long) btord ));
890  }
891 
892  /* read part or rest of data */
893 #ifdef _SION_CUDA
894  struct cudaPointerAttributes attrs;
895  cudaError_t err = cudaPointerGetAttributes(&attrs, data);
896  if ((err == cudaSuccess) && (!attrs.isManaged && (attrs.memoryType == cudaMemoryTypeDevice)) ) {
897  char* buffer = malloc(sion_filedesc->fsblksize);
898  char* data_ = (char *)data + offset;
899  while (frc < btord) {
900  sion_int64 to_read = (btord - frc) > sion_filedesc->fsblksize ? sion_filedesc->fsblksize : (btord - frc);
901  sion_int64 frc_ = _sion_file_read(buffer, to_read, sion_filedesc->fileptr);
902  if (frc_ != to_read) break;
903  cudaMemcpy(data_, buffer, frc_, cudaMemcpyHostToDevice);
904  frc += frc_;
905  data_ += frc_;
906  }
907  free(buffer);
908  } else {
909  frc = _sion_file_read(((char *)data + offset), btord, sion_filedesc->fileptr);
910  }
911 #else
912  frc = _sion_file_read(((char *)data+offset), btord, sion_filedesc->fileptr);
913 #endif
914 
915  /* check return code */
916  if(frc != btord) {
917  return(_sion_errorprint_on_rank(-1,_SION_ERROR_RETURN,sion_filedesc->rank,
918  "could not read data (%d bytes) to file (frc=%d sid=%d) ...", (int) btord, (int) frc, sion_filedesc->sid));
919  }
920 
921  /* increase current position and update other counters */
922  sion_filedesc->currentpos+=btord;
923  bytes_to_read -=btord;
924  offset +=btord;
925 
926  /* check if all data has already be processed, otherwise forward to next chunk */
927  if(bytes_to_read>0) {
928 
929  if(! _sion_move_to_next_chunk(sion_filedesc)) {
930  return(_sion_errorprint_on_rank(-1,_SION_ERROR_RETURN,sion_filedesc->rank,
931  "could not read data (%d bytes) to file (end of file reached, frc=%d sid=%d) ...", (int) btord, (int) frc, sion_filedesc->sid));
932  }
933 
934  }
935  }
936 
937  rc=offset;
938  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
939  return(rc);
940 }
941 #undef DFUNCTION
942 
943 /* dup internal data structures from first to secon sion_filedesc */
944 #define DFUNCTION "_sion_keyval_dup_dataptr_inline"
945 int _sion_keyval_dup_dataptr_inline(_sion_filedesc *sion_filedesc, _sion_filedesc *new_filedesc) {
946  int rc=0;
947  _sion_keyvalue_keymngr* keymngr_orig;
948  _sion_keyvalue_keymngr* keymngr;
949 
950  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
951 
952  if(sion_filedesc->keyvalptr!=NULL) {
953 
954  /* get orig keymngr */
955  keymngr_orig = (_sion_keyvalue_keymngr *) sion_filedesc->keyvalptr;
956 
957  keymngr =_sion_keyvalue_keymngr_dup(keymngr_orig,new_filedesc->dup_mode, new_filedesc->dup_sel_key);
958  if (keymngr == NULL) {
959  return(_sion_errorprint_on_rank(-1,_SION_ERROR_RETURN,sion_filedesc->rank,"dup: could not duplicate keymngr ..."));
960  }
961  new_filedesc->keyvalptr=keymngr;
962 
963  } else {
964 
965  new_filedesc->keyvalptr=NULL;
966 
967  }
968 
969  DPRINTFP((2, DFUNCTION, -1, "leave\n"));
970  return (rc);
971 }
972 #undef DFUNCTION
973 
sion_int64 _sion_file_write(const void *data, sion_int64 bytes, _sion_fileptr *sion_fileptr)
Write data to file.
Definition: sion_file.c:141
int _sion_flush_block(_sion_filedesc *sion_filedesc)
Update the internal data structure.
sion_int64 _sion_file_set_position(_sion_fileptr *sion_fileptr, sion_int64 startpointer)
Set new position in file.
Definition: sion_file.c:219
Sion File Descriptor Structure.
Definition: sion_filedesc.h:79
int _sion_file_purge(_sion_fileptr *sion_fileptr)
Purge data to file.
Definition: sion_file.c:285
sion_int64 _sion_file_read(void *data, sion_int64 bytes, _sion_fileptr *sion_fileptr)
Read data from file.
Definition: sion_file.c:166
void sion_swap(void *target, void *source, int size, int n, int do_swap)
Definition: sion_convert.c:44
sion_int64 * blocksizes
sion_int32 currentblocknr
Definition: sion_filedesc.h:97
int _sion_create_new_block(_sion_filedesc *sion_filedesc)
Create a new block for the internal data structure.
Sion Time Stamp Header.
_sion_fileptr * fileptr
Definition: sion_filedesc.h:82