SIONlib  1.7.2
Scalable I/O library for parallel access to task-local files
sion_keyvalue_inline.c
Go to the documentation of this file.
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2018 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
14 #define _XOPEN_SOURCE 700
15 
16 #include <stdlib.h>
17 #include <stdio.h>
18 #include <string.h>
19 #include <time.h>
20 #include <assert.h>
21 
22 #include <sys/types.h>
23 #include <sys/stat.h>
24 #include <fcntl.h>
25 #include <unistd.h>
26 
27 
28 #include "sion.h"
29 #include "sion_debug.h"
30 #include "sion_error_handler.h"
31 #include "sion_filedesc.h"
32 #include "sion_tools.h"
33 #include "sion_fd.h"
34 #include "sion_file.h"
35 #include "sion_metadata.h"
36 #include "sion_internal.h"
37 #include "sion_internal_seek.h"
38 #include "sion_printts.h"
39 #include "sion_keyvalue.h"
40 #include "sion_keyvalue_inline.h"
41 #include "sion_keyvalue_keymngr.h"
42 
43 #define TABLE_SIZE 127
44 
45 #define SEARCH_TO_KEY 0
46 #define SEARCH_TO_NEXT 1
47 #define SEARCH_TO_END 2
48 
49 /* Utility functions */
50 _sion_keyvalue_keymngr * _sion_get_or_init_key_info(_sion_filedesc *sion_filedesc);
51 int _sion_move_to_pos(_sion_filedesc *sion_filedesc, size_t pos);
52 int _sion_scan_forward_to_key(_sion_filedesc *sion_filedesc, uint64_t key, int search_mode);
53 int _sion_move_to_next_chunk(_sion_filedesc *sion_filedesc);
54 size_t _sion_write_data_to_chunks_inline(_sion_filedesc *sion_filedesc, const void *data, sion_int64 bytes_to_write);
55 sion_int64 _sion_compute_next_position_inline(_sion_filedesc *sion_filedesc, sion_int64 bytes_to_read);
56 size_t _sion_skip_data_from_chunks_inline(_sion_filedesc *sion_filedesc, sion_int64 bytes_to_read);
57 size_t _sion_read_data_from_chunks_inline(_sion_filedesc *sion_filedesc, void *data, sion_int64 bytes_to_read);
58 
59 
60 /* ***************** */
61 /* Write functions */
62 /* ***************** */
63 
64 
65 
66 #define DFUNCTION "_sion_store_and_write_key_and_len_inline"
67 size_t _sion_store_and_write_key_and_len_inline(_sion_filedesc *sion_filedesc, uint64_t key, size_t len) {
68  sion_int64 bytes_to_write;
69  sion_int64 buffer[2];
70  size_t rc=SION_SUCCESS, frc;
71 
72 
73  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
74 
75  /* TODO: need to check if current_pos is set correctly (in case of sion_seeks in between, currently not allowed) ... */
76 
77  /* assemble data */
78  buffer[0]=(sion_int64) key;
79  buffer[1]=(sion_int64) len;
80  bytes_to_write=2*sizeof(sion_int64);
81 
82  frc=_sion_write_data_to_chunks_inline(sion_filedesc, (const void *) buffer, (sion_int64) bytes_to_write);
83 
84  /* check return code */
85  if(frc != bytes_to_write) {
86  return(_sion_errorprint_on_rank(-1,_SION_ERROR_RETURN,sion_filedesc->rank,
87  "could not write data (%d bytes) to file (frc=%d sid=%d) ...", (int) bytes_to_write, (int) frc, sion_filedesc->sid));
88  }
89 
90  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
91  return(rc);
92 }
93 #undef DFUNCTION
94 
95 #define DFUNCTION "_sion_write_value_inline"
96 size_t _sion_write_value_inline(_sion_filedesc *sion_filedesc, const void *data, uint64_t key, size_t len) {
97  size_t rc=0, frc;
98  sion_int64 bytes_to_write;
99  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
100 
101 
102  bytes_to_write=len;
103  frc=_sion_write_data_to_chunks_inline(sion_filedesc, data, (sion_int64) bytes_to_write);
104  if(frc != bytes_to_write) {
105  return(_sion_errorprint_on_rank(-1,_SION_ERROR_RETURN,sion_filedesc->rank,
106  "could not write data (%d bytes) to file (frc=%d sid=%d) ...", (int) bytes_to_write, (int) frc, sion_filedesc->sid));
107  }
108 
109  rc=(size_t) bytes_to_write;
110 
111  DPRINTFP((2, DFUNCTION, -1, "leave\n"));
112  return (rc);
113 }
114 #undef DFUNCTION
115 
116 
117 /* ***************** */
118 /* Read functions */
119 /* ***************** */
120 
121 #define DFUNCTION "_sion_find_and_read_key_and_len_inline"
122 int _sion_find_and_read_key_and_len_inline(_sion_filedesc *sion_filedesc, uint64_t key, size_t len, size_t *datalen) {
123  _sion_keyvalue_keymngr *keymngr;
124  int rc = 0;
125  int position_found=0;
126  sion_table_key_t mkey;
127  size_t key_current_pos = (size_t) -1, key_bytes_left = (size_t) -1;
128 
129  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
130 
131  /* get or init internal data structure if first call */
132  keymngr=_sion_get_or_init_key_info(sion_filedesc);
133 
134  /* map key to table type */
135  mkey = key;
136 
137  *datalen=0;
138 
139  /* indicator if data could read, key_current_pos, key_bytes_left are set correctly */
140  position_found=0;
141 
142  /* check if info about key is available */
143  rc=_sion_keyvalue_keymngr_lookup(keymngr, mkey, &key_current_pos, &key_bytes_left);
144  if(rc!=SION_SUCCESS) {
145  DPRINTFP((2, DFUNCTION, -1, "key %ld is not found in keymngr, scan forward\n", (long)mkey));
146 
147  /* scan forward until key is found */
148  rc=_sion_scan_forward_to_key(sion_filedesc, key, SEARCH_TO_KEY);
149 
150  /* check again if info about key is now available */
151  if(rc==SION_SUCCESS) {
152  DPRINTFP((2, DFUNCTION, -1, "scan forward found key %ld, get info \n", (long)mkey));
153  rc=_sion_keyvalue_keymngr_lookup(keymngr, mkey, &key_current_pos, &key_bytes_left);
154  DPRINTFP((2, DFUNCTION, -1, "key %ld, get info(2): pos=%ld bytes_left=%ld\n", (long)mkey, (long) key_current_pos, (long) key_bytes_left ));
155  }
156  } else {
157  DPRINTFP((2, DFUNCTION, -1, "key %ld, get info(1): pos=%ld bytes_left=%ld\n", (long)mkey, (long) key_current_pos, (long) key_bytes_left ));
158  }
159  if(rc==SION_SUCCESS) {
160  /* check if enough data is available in block */
161  position_found=1;
162  if(len<=key_bytes_left) {
163  *datalen=len;
164  } else {
165  /* only a part of requested data can be read */
166  *datalen=key_bytes_left;
167  }
168  } else {
169  /* not more data in file for key */
170  rc=SION_NOT_SUCCESS;
171  DPRINTFP((2, DFUNCTION, -1, "no more data for key %ld\n", (long)mkey ));
172  }
173 
174  /* check current position and move if necessary */
175  DPRINTFP((2, DFUNCTION, -1, "check position: current_pos %ld, key_current_pos=%ld len=%ld key_bytes_left=%ld\n",
176  (long) sion_filedesc->currentpos, (long) key_current_pos, (long) len, (long) key_bytes_left ));
177  if(position_found) {
178  if(key_current_pos!=sion_filedesc->currentpos) {
179  rc=_sion_move_to_pos(sion_filedesc, key_current_pos);
180  }
181  /* ready to read data */
182  }
183 
184  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n", rc));
185  return rc;
186 }
187 #undef DFUNCTION
188 
189 #define DFUNCTION "_sion_read_value_inline"
190 size_t _sion_read_value_inline(_sion_filedesc *sion_filedesc, void *data, uint64_t key, size_t len) {
191  size_t rc=0;
192  _sion_keyvalue_keymngr *keymngr;
193  sion_table_key_t mkey;
194  sion_int64 bread;
195  sion_int64 bytes_to_read;
196  char *out;
197 
198  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
199 
200  /* get or init internal data structure if first call */
201  keymngr=_sion_get_or_init_key_info(sion_filedesc);
202  mkey = (sion_table_key_t) key;
203 
204  /* there are enough bytes available, checked during read of key,len */
205 
206  out = (char*) data;
207 
208  /* read data */
209  bytes_to_read=len;
210 
211  bread=_sion_read_data_from_chunks_inline(sion_filedesc, (void *) out, (sion_int64) bytes_to_read);
212  if (bread != bytes_to_read) {
213  return(_sion_errorprint_on_rank(SION_NOT_SUCCESS,_SION_ERROR_RETURN,sion_filedesc->rank,
214  "could not read data (%d bytes) from file ...", (int) bytes_to_read));
215  }
216 
217  DPRINTFP((2, DFUNCTION, -1, "first value = %3hhu | %c\n", out[0], out[0]));
218 
219  /* update meta data of current block */
220  _sion_keyvalue_keymngr_update_read_pos(keymngr, mkey, (size_t) bread, (sion_int64) sion_filedesc->currentpos);
221 
222  rc = (size_t) bread;
223 
224  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",(int) rc));
225  return (rc);
226 }
227 #undef DFUNCTION
228 
229 
230 #define DFUNCTION "_sion_key_full_scan_inline"
231 int _sion_key_full_scan_inline(_sion_filedesc *sion_filedesc) {
232  int rc=SION_NOT_SUCCESS;
233  _sion_keyvalue_keymngr* keymngr;
234  sion_table_key_t key=0;
235 
236  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
237 
238  /* get or init internal data structure if first call */
239  keymngr=_sion_get_or_init_key_info(sion_filedesc);
240 
241  if(!_sion_keyvalue_keymngr_is_scan_done(keymngr)) {
242  rc=_sion_scan_forward_to_key(sion_filedesc, key, SEARCH_TO_END);
243  } else {
244  /* scan already done */
245  rc=SION_SUCCESS;
246  }
247 
248  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
249  return rc;
250 }
251 #undef DFUNCTION
252 
253 
254 #define DFUNCTION "_sion_iterator_reset_inline"
255 int _sion_iterator_reset_inline(_sion_filedesc *sion_filedesc) {
256  int rc=0;
257  _sion_keyvalue_keymngr* keymngr;
258 
259  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
260 
261  /* get or init internal data structure if first call */
262  keymngr=_sion_get_or_init_key_info(sion_filedesc);
263 
264  rc=_sion_keyvalue_keymngr_iterator_reset(keymngr);
265 
266  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
267  return rc;
268 }
269 #undef DFUNCTION
270 
271 #define DFUNCTION "_sion_iterator_next_inline"
272 int _sion_iterator_next_inline(_sion_filedesc *sion_filedesc, uint64_t *keyptr, size_t *sizeptr) {
273  int rc=SION_NOT_SUCCESS;
274  _sion_keyvalue_keymngr* keymngr;
275  sion_table_key_t key=0;
276  size_t current_pos, offset, len;
277 
278  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
279 
280  /* get or init internal data structure if first call */
281  keymngr=_sion_get_or_init_key_info(sion_filedesc);
282 
283  if(_sion_keyvalue_keymngr_iterator_next(keymngr, &key, &current_pos, &offset, &len)==SION_SUCCESS) {
284  /* next block found */
285  rc=SION_SUCCESS;
286  } else {
287  /* scan forward if not at end of file to get more meta data */
288  if (_sion_scan_forward_to_key(sion_filedesc, key, SEARCH_TO_NEXT)==SION_SUCCESS) {
289  /* get info about block */
290  if(_sion_keyvalue_keymngr_iterator_next(keymngr, &key, &current_pos, &offset, &len)==SION_SUCCESS) {
291  rc=SION_SUCCESS;
292  } else {
293  return(_sion_errorprint_on_rank(SION_NOT_SUCCESS,_SION_ERROR_RETURN, sion_filedesc->rank,
294  "internal error: block could not be find at end of block list ..."));
295  }
296  } else {
297  /* end of file reached and all blocks are already iterated */
298  rc=SION_NOT_SUCCESS;
299  }
300  }
301 
302  /* set output parameter */
303  if(rc==SION_SUCCESS) {
304  *keyptr = (uint64_t) key;
305  *sizeptr = (size_t) len;
306  }
307 
308  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
309  return rc;
310 }
311 #undef DFUNCTION
312 
313 #define DFUNCTION "_sion_seek_key_inline"
314 int _sion_seek_key_inline(_sion_filedesc *sion_filedesc, uint64_t key, int blocknum, sion_int64 posinblock) {
315  int rc=SION_NOT_SUCCESS;
316  _sion_keyvalue_keymngr* keymngr;
317  size_t offset, len;
318 
319  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
320 
321  /* get or init internal data structure if first call */
322  keymngr=_sion_get_or_init_key_info(sion_filedesc);
323 
324  /* lookup if a key entry at that position is already in keymngr */
325  rc=_sion_keyvalue_keymngr_lookup_and_set_pos(keymngr, key, blocknum, posinblock, &offset, &len);
326  DPRINTFP((2, DFUNCTION, -1, "after first lookup rc=%d offset=%d len=%d\n",rc, (int)offset, (int) len ));
327 
328  while( (rc!=SION_SUCCESS) && !_sion_keyvalue_keymngr_is_scan_done(keymngr) ) {
329 
330  /* scan forward to next entry with that key */
331  DPRINTFP((2, DFUNCTION, -1, "scan forward to find key %d\n",(int) key));
332  if (_sion_scan_forward_to_key(sion_filedesc, key, SEARCH_TO_NEXT)==SION_SUCCESS) {
333 
334  /* lookup again if a key entry at that position is already in keymngr */
335  rc=_sion_keyvalue_keymngr_lookup_and_set_pos(keymngr, key, blocknum, posinblock, &offset, &len);
336  DPRINTFP((2, DFUNCTION, -1, "in loop lookup rc=%d offset=%d len=%d\n",rc, (int)offset, (int) len ));
337  } else {
338  rc=SION_NOT_SUCCESS;
339  }
340  }
341 
342  /* move to scanpos if necessary */
343  if( rc==SION_SUCCESS ) {
344  DPRINTFP((2, DFUNCTION, -1, "move from %ld to new scanpos (%ld) \n",(long) sion_filedesc->currentpos,(long) offset));
345  rc=_sion_move_to_pos(sion_filedesc, offset);
346  }
347 
348  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
349  return rc;
350 }
351 #undef DFUNCTION
352 
353 #define DFUNCTION "_sion_key_list_iterator_reset_inline"
354 int _sion_key_list_iterator_reset_inline(_sion_filedesc *sion_filedesc) {
355  int rc=0;
356  _sion_keyvalue_keymngr* keymngr;
357 
358  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
359 
360  /* get or init internal data structure if first call */
361  keymngr=_sion_get_or_init_key_info(sion_filedesc);
362 
363  rc=_sion_keyvalue_keymngr_key_list_iterator_reset(keymngr);
364 
365  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
366  return rc;
367 }
368 #undef DFUNCTION
369 
370 #define DFUNCTION "_sion_iterator_next_inline"
371 int _sion_key_list_iterator_next_inline(_sion_filedesc *sion_filedesc, uint64_t *keyptr) {
372  int rc=SION_NOT_SUCCESS;
373  _sion_keyvalue_keymngr* keymngr;
374  sion_table_key_t key=0;
375 
376  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
377 
378  /* get or init internal data structure if first call */
379  keymngr=_sion_get_or_init_key_info(sion_filedesc);
380 
381  if(_sion_keyvalue_keymngr_key_list_iterator_next(keymngr, &key)==SION_SUCCESS) {
382  *keyptr = (uint64_t) key;
383  rc=SION_SUCCESS;
384  } else {
385  *keyptr = 0;
386  rc=SION_NOT_SUCCESS;
387  }
388 
389  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d key=%ld\n",rc,(long) key));
390  return rc;
391 }
392 #undef DFUNCTION
393 
394 /* fills a stat-data structure containing information about the key */
395 #define DFUNCTION "_sion_key_get_stat_inline"
396 int _sion_key_get_stat_inline(_sion_filedesc *sion_filedesc, uint64_t searchkey, sion_key_stat_t *keystat) {
397  int rc=SION_NOT_SUCCESS;
398  _sion_keyvalue_keymngr* keymngr;
399  sion_table_key_t key=0;
400 
401  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
402 
403  /* get or init internal data structure if first call */
404  keymngr=_sion_get_or_init_key_info(sion_filedesc);
405 
406  key= (sion_table_key_t) searchkey;
407 
408  if(_sion_keyvalue_keymngr_key_get_stat(keymngr, key, keystat)==SION_SUCCESS) {
409  rc=SION_SUCCESS;
410  } else {
411  rc=SION_NOT_SUCCESS;
412  }
413 
414  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d key=%ld\n",rc,(long) key));
415  return rc;
416 }
417 #undef DFUNCTION
418 
419 /* ***************** */
420 /* Utility functions */
421 /* ***************** */
422 
423 #define DFUNCTION "_sion_scan_forward_to_key"
424 int _sion_scan_forward_to_key(_sion_filedesc *sion_filedesc, uint64_t key, int search_mode) {
425  int rc = SION_SUCCESS;
426  _sion_keyvalue_keymngr *keymngr;
427  sion_table_key_t mkey;
428 
429  size_t scanpos;
430  int key_found;
431  size_t bytes_left, lastposinblock, lastposinfile;
432  uint64_t key_and_len[2];
433 
434  size_t bread, bskip, len;
435 
436  DPRINTFP((2, DFUNCTION, -1, "enter key=%ld\n",(long) key));
437 
438  /* get keymngr */
439  keymngr = (_sion_keyvalue_keymngr *) sion_filedesc->keyvalptr;
440 
441  key_found=0;
442 
443  /* get position from where scan has to be started */
444  if(_sion_keyvalue_keymngr_get_next_scan_pos(keymngr, &scanpos) != SION_SUCCESS) {
445  /* can only be at start time, start then from current position in file */
446  scanpos=(size_t) sion_filedesc->currentpos;
447  DPRINTFP((2, DFUNCTION, -1, "set scanpos to first pos in file pos=%ld\n",(long) scanpos));
448  }
449  /* check if file is already completely scanned */
450  lastposinfile=sion_filedesc->startpos + sion_filedesc->lastchunknr * sion_filedesc->globalskip
451  + sion_filedesc->blocksizes[sion_filedesc->lastchunknr];
452 
453  DPRINTFP((2, DFUNCTION, -1, "check if not already scanned to endoffile scanpos=%ld lastposinfile=%ld lastchunknr=%d\n",(long) scanpos,(long) lastposinfile, (int) sion_filedesc->lastchunknr ));
454  if(scanpos>=lastposinfile) {
455  DPRINTFP((2, DFUNCTION, -1, "scan is already at end of file (%ld>=%ld) \n",(long) scanpos, (long) lastposinfile));
456  _sion_keyvalue_keymngr_set_scan_done(keymngr);
457  rc=SION_NOT_SUCCESS;
458  }
459 
460  /* move to scanpos if necessary */
461  if( (sion_filedesc->currentpos != scanpos) && (!_sion_keyvalue_keymngr_is_scan_done(keymngr)) ) {
462  DPRINTFP((2, DFUNCTION, -1, "move from %ld to new scanpos (%ld) \n",(long) sion_filedesc->currentpos,(long) scanpos));
463  rc=_sion_move_to_pos(sion_filedesc, scanpos);
464  }
465 
466  /* search forward */
467  while ( (!_sion_keyvalue_keymngr_is_scan_done(keymngr)) && (!key_found) ) {
468 
469  lastposinblock = sion_filedesc->startpos + sion_filedesc->currentblocknr * sion_filedesc->globalskip
470  + sion_filedesc->blocksizes[sion_filedesc->currentblocknr];
471  bytes_left = lastposinblock - sion_filedesc->currentpos;
472  DPRINTFP((2, DFUNCTION, -1, "search loop: lastposinblock=%ld bytes_left=%ld currentpos=%ld\n",(long) lastposinblock,(long) bytes_left,(long) sion_filedesc->currentpos));
473 
474 
475  /* is there data in current chunk to start reading next block? */
476  if ( bytes_left > 0 ) {
477  /* read next block from current and following chunks */
478  bread=_sion_read_data_from_chunks_inline(sion_filedesc, (void *) key_and_len, (sion_int64) sizeof(key_and_len));
479  if (bread != sizeof(key_and_len)) {
480  return(_sion_errorprint_on_rank(SION_NOT_SUCCESS,_SION_ERROR_RETURN,sion_filedesc->rank,
481  "could not read data (%d bytes) from file ...", (int) sizeof(key_and_len)));
482  }
483  sion_swap(key_and_len, key_and_len, sizeof(sion_int64), 2, sion_filedesc->swapbytes);
484 
485 
486  DPRINTFP((2, DFUNCTION, -1, "search loop: found next key,len (%ld,%ld)\n",(long) key_and_len[0],(long) key_and_len[1]));
487  bytes_left-=bread;
488 
489  DPRINTFP((2, DFUNCTION, -1, "search loop: position is now %ld bytes_left=%ld\n",(long) sion_filedesc->currentpos, (long) bytes_left));
490 
491  /* check if filepointer has to moved to next block to be in front of data */
492  if(bytes_left==0) {
493  DPRINTFP((2, DFUNCTION, -1, "search loop: data starts in next chunk, moving forward\n"));
494  if(! _sion_move_to_next_chunk(sion_filedesc)) {
495  return(_sion_errorprint_on_rank(SION_NOT_SUCCESS,_SION_ERROR_RETURN,sion_filedesc->rank,
496  "could not move to data section in next block ..."));
497  }
498  lastposinblock = sion_filedesc->startpos + sion_filedesc->currentblocknr * sion_filedesc->globalskip
499  + sion_filedesc->blocksizes[sion_filedesc->currentblocknr];
500  bytes_left = lastposinblock - sion_filedesc->currentpos;
501  DPRINTFP((2, DFUNCTION, -1, "search loop: position is moved to %ld bytes_left=%ld\n",(long) sion_filedesc->currentpos, (long) bytes_left));
502  }
503 
504  /* store new block info */
505  mkey=key_and_len[0];
506  len =key_and_len[1];
507  rc=_sion_keyvalue_keymngr_add_block(keymngr, mkey, (size_t) sion_filedesc->currentpos, len);
508  DPRINTFP((2, DFUNCTION, -1, "search loop: stored new block key=%ld pos=%ld len=%ld\n",(long) mkey, (long) sion_filedesc->currentpos, (long) len));
509 
510  /* check key */
511  if (
512  ( ( (search_mode==SEARCH_TO_KEY) && (key_and_len[0] == key) ) || (search_mode==SEARCH_TO_NEXT) )
513  && ( !(search_mode==SEARCH_TO_END) )
514  ) {
515  DPRINTFP((2, DFUNCTION, -1, "search loop: found searched key=%ld, leaving\n",(long) key));
516  rc=SION_SUCCESS;
517  key_found=1; /* end of loop */
518  scanpos=_sion_compute_next_position_inline(sion_filedesc, len); /* next search behind data of that block */
519  } else {
520  DPRINTFP((2, DFUNCTION, -1, "search loop: found another key=%ld <=> %ld, continue\n",(long) key_and_len[0], (long) key));
521 
522  /* move forward behind data of block in the same or next chunk */
523  DPRINTFP((2, DFUNCTION, -1, "search loop: found move position forward by %ld bytes\n",(long) len));
524  bskip=_sion_skip_data_from_chunks_inline(sion_filedesc, (sion_int64) len);
525 
526  if ( bskip == len ) {
527 
528  /* recompute bytes_left in current block */
529  lastposinblock = sion_filedesc->startpos + sion_filedesc->currentblocknr * sion_filedesc->globalskip
530  + sion_filedesc->blocksizes[sion_filedesc->currentblocknr];
531  bytes_left = lastposinblock - sion_filedesc->currentpos;
532  scanpos=sion_filedesc->currentpos; /* next search behind data */
533 
534  } else {
535  return(_sion_errorprint_on_rank(SION_NOT_SUCCESS,_SION_ERROR_RETURN,sion_filedesc->rank,
536  "could not skip data section of one block (%d bytes) from file ...", (int) len));
537  }
538 
539  /* check if filepointer has to moved to next block */
540  if(bytes_left==0) {
541  DPRINTFP((2, DFUNCTION, -1, "search loop: chunk empty, move to next chunk\n"));
542  if(! _sion_move_to_next_chunk(sion_filedesc)) {
543  _sion_keyvalue_keymngr_set_scan_done(keymngr); /* end of loop */
544  if(search_mode!=SEARCH_TO_END) {
545  rc=SION_NOT_SUCCESS;
546  } else {
547  rc=SION_SUCCESS; /* end of data reached */
548  }
549  }
550  }
551  }
552 
553  } else {
554  /* not enough byte in block to read next key+len */
555  if(bytes_left==0) {
556  if(! _sion_move_to_next_chunk(sion_filedesc)) {
557  _sion_keyvalue_keymngr_set_scan_done(keymngr); /* end of loop */
558  rc=SION_NOT_SUCCESS;
559  }
560  } else {
561  return(_sion_errorprint_on_rank(SION_NOT_SUCCESS,_SION_ERROR_RETURN,sion_filedesc->rank,
562  "unknown data at end of chunk (%d bytes) ...", (int) bytes_left));
563  }
564  }
565 
566  } /* while */
567 
568  /* update scanpos */
569  if(_sion_keyvalue_keymngr_set_next_scan_pos(keymngr, scanpos) != SION_SUCCESS) {
570  return(_sion_errorprint_on_rank(SION_NOT_SUCCESS,_SION_ERROR_RETURN,sion_filedesc->rank,
571  "internal error set seekpos ..."));
572  }
573 
574  DPRINTFP((2, DFUNCTION, -1, "leave\n"));
575  return (rc);
576 }
577 #undef DFUNCTION
578 
579 
580 #define DFUNCTION "_sion_get_or_init_key_info"
581 _sion_keyvalue_keymngr * _sion_get_or_init_key_info(_sion_filedesc *sion_filedesc) {
582 
583  _sion_keyvalue_keymngr* keymngr;
584 
585  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
586 
587  if (sion_filedesc->keyvalptr == NULL) {
588  DPRINTFP((2, DFUNCTION, -1, "sion_filedesc->keyvalptr == NULL\n"));
589  keymngr =_sion_keyvalue_keymngr_init(TABLE_SIZE);
590  if (keymngr == NULL) {
591  _sion_errorprint_on_rank(-1,_SION_ERROR_RETURN,sion_filedesc->rank,"could not allocate keymngr ...");
592  }
593  sion_filedesc->keyvalptr=keymngr;
594  DPRINTFP((2, DFUNCTION, -1, "alloc now KEYVALPTR = %x\n",sion_filedesc->keyvalptr));
595  }
596 
597  DPRINTFP((2, DFUNCTION, -1, "leave\n"));
598  return sion_filedesc->keyvalptr;
599 }
600 #undef DFUNCTION
601 
602 #define DFUNCTION "_sion_move_to_next_chunk"
603 int _sion_move_to_next_chunk(_sion_filedesc *sion_filedesc) {
604  int rc = SION_SUCCESS;
605 
606  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
607 
608  /* is another chink available */
609  if (sion_filedesc->currentblocknr < sion_filedesc->lastchunknr) {
610  /* move to next chunk */
611  sion_filedesc->currentblocknr++;
612  sion_filedesc->currentpos = sion_filedesc->startpos + sion_filedesc->currentblocknr * sion_filedesc->globalskip;
613  _sion_file_purge(sion_filedesc->fileptr);
614  _sion_file_set_position(sion_filedesc->fileptr, sion_filedesc->currentpos);
615  } else {
616  rc=SION_NOT_SUCCESS;
617  }
618 
619  DPRINTFP((2, DFUNCTION, -1, "leave, rc = %d\n", rc));
620  return rc;
621 }
622 #undef DFUNCTION
623 
624 #define DFUNCTION "_sion_move_to_pos"
625 int _sion_move_to_pos(_sion_filedesc *sion_filedesc, size_t pos) {
626  int rc = SION_SUCCESS;
627  size_t block_min_pos, block_max_pos;
628  int c, pos_found=0;
629 
630  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
631 
632  block_min_pos = sion_filedesc->startpos + sion_filedesc->currentblocknr * sion_filedesc->globalskip;
633  block_max_pos = block_min_pos+sion_filedesc->blocksizes[sion_filedesc->currentblocknr]; /* position of first byte behind chunk */
634 
635  DPRINTFP((2, DFUNCTION, -1, "current_chunk: %ld to %ld (pos=%ld)\n", (long) block_min_pos, (long) block_max_pos, (long) pos));
636 
637  if ( (pos>=block_min_pos) && (pos<block_max_pos) ) {
638  /* stay in the same chunk */
639  pos_found=1;
640  sion_filedesc->currentpos=pos;
641  _sion_file_purge(sion_filedesc->fileptr);
642  _sion_file_set_position(sion_filedesc->fileptr, sion_filedesc->currentpos);
643  DPRINTFP((2, DFUNCTION, -1, "stay in current chunk: currentpos=%ld\n", (long) sion_filedesc->currentpos));
644 
645  } else {
646  /* scan from beginning over all chunks */
647 
648  for(c=0;c<=sion_filedesc->lastchunknr;c++) {
649  block_min_pos = sion_filedesc->startpos + c * sion_filedesc->globalskip;
650  block_max_pos = block_min_pos+sion_filedesc->blocksizes[c]; /* position of first byte behind chunk */
651 
652  DPRINTFP((2, DFUNCTION, -1, "check chunk%2d: %ld to %ld (pos=%ld)\n", c, (long) block_min_pos, (long) block_max_pos, (long) pos));
653 
654  if ( (pos>=block_min_pos) && (pos<block_max_pos) ) {
655 
656  /* stay in this chunk */
657  pos_found=1;
658  sion_filedesc->currentblocknr=c;
659  sion_filedesc->currentpos=pos;
660  _sion_file_purge(sion_filedesc->fileptr);
661  _sion_file_set_position(sion_filedesc->fileptr, sion_filedesc->currentpos);
662  DPRINTFP((2, DFUNCTION, -1, "stay in this chunk: currentpos=%ld\n", (long) sion_filedesc->currentpos));
663  break;
664 
665  }
666  }
667  }
668 
669  if(!pos_found) {
670  rc=SION_NOT_SUCCESS;
671  }
672 
673  DPRINTFP((2, DFUNCTION, -1, "leave, rc = %d\n", rc));
674  return rc;
675 }
676 #undef DFUNCTION
677 
678 /* write data block to current and following chunks, data could be distributed over several chunks */
679 #define DFUNCTION "_sion_write_data_to_chunks_inline"
680 size_t _sion_write_data_to_chunks_inline(_sion_filedesc *sion_filedesc, const void *data, sion_int64 bytes_to_write) {
681  sion_int64 btowr, byteswritten, offset;
682  size_t rc=0, frc;
683 
684 
685  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
686 
687  /* loop to write data in current and following chunks */
688  offset=0;
689  while(bytes_to_write>0) {
690 
691  _sion_flush_block(sion_filedesc);
692 
693  /* determine size of data which could written into current chunk */
694  byteswritten = sion_filedesc->blocksizes[sion_filedesc->currentblocknr];
695  btowr=bytes_to_write;
696  if ((byteswritten + btowr) > sion_filedesc->chunksize) btowr = sion_filedesc->chunksize - byteswritten;
697  DPRINTFP((2, DFUNCTION, -1, " bytes_to_write=%ld btowr=%ld chunksize=%ld byteswritten=%ld currentblocknr=%d blsize[%d]=%ld\n",
698  (long) bytes_to_write, (long) btowr, (long) sion_filedesc->chunksize, (long) byteswritten, (int) sion_filedesc->currentblocknr,
699  sion_filedesc->currentblocknr, (long) sion_filedesc->blocksizes[sion_filedesc->currentblocknr] ));
700 
701  /* write part or rest of data */
702  frc = _sion_file_write(((char *) data+offset), btowr, sion_filedesc->fileptr);
703 
704  /* check return code */
705  if(frc != btowr) {
706  return(_sion_errorprint_on_rank(-1,_SION_ERROR_RETURN,sion_filedesc->rank,
707  "could not write data (%d bytes) to file (frc=%d sid=%d) ...", (int) btowr, (int) frc, sion_filedesc->sid));
708  }
709 
710  /* increase current position and update other counters */
711  sion_filedesc->currentpos+=btowr;
712  bytes_to_write -=btowr;
713  offset +=btowr;
714 
715  /* check if all data has already be processed */
716  if(bytes_to_write>0) {
717 
718  /* create new block for writing next data */
719  _sion_flush_block(sion_filedesc);
720  _sion_create_new_block(sion_filedesc);
721  }
722  }
723  rc=offset;
724  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
725  return(rc);
726 }
727 #undef DFUNCTION
728 
729 /* skip data block from current and following chunks, data could be distributed over several chunks */
730 #define DFUNCTION "_sion_compute_next_position_inline"
731 sion_int64 _sion_compute_next_position_inline(_sion_filedesc *sion_filedesc, sion_int64 bytes_to_read) {
732  sion_int64 btord = 0, bytesread = 0, offset = 0;
733  int blocknr = 0;
734 
735 
736  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
737 
738  /* check data in current block */
739  blocknr = sion_filedesc->currentblocknr;
740  bytesread = sion_filedesc->currentpos - (sion_filedesc->startpos + sion_filedesc->currentblocknr * sion_filedesc->globalskip);
741 
742  DPRINTFP((2, DFUNCTION, -1, " currentpos=%ld bytesread=%ld blocknr=%d bytes_to_read=%ld\n",
743  (long) sion_filedesc->currentpos, (long) bytesread, (int) blocknr, (long) bytes_to_read ));
744 
745  DPRINTFP((2, DFUNCTION, -1, " blocksizes[%d]=%ld\n",
746  (int) sion_filedesc->currentblocknr, (long) sion_filedesc->blocksizes[sion_filedesc->currentblocknr] ));
747 
748  /* data is all in current block */
749  if ((bytesread + bytes_to_read) <= sion_filedesc->blocksizes[sion_filedesc->currentblocknr]) {
750  offset=sion_filedesc->currentpos+bytes_to_read;
751  } else {
752  btord = sion_filedesc->blocksizes[sion_filedesc->currentblocknr] - bytesread; /* size of data in current chunk */
753  bytes_to_read-=btord; /* rest of data in next chunks */
754 
755  DPRINTFP((2, DFUNCTION, -1, " skip to next block, bytes_to_read=%ld btord=%ld\n",(long) bytes_to_read, (long) btord ));
756 
757  /* loop to behind data in current and following chunks */
758  while(bytes_to_read>0) {
759 
760  /* next block */
761  if (blocknr < sion_filedesc->lastchunknr) { blocknr++; } else {
762  return(_sion_errorprint_on_rank(-1,_SION_ERROR_RETURN,sion_filedesc->rank,"internal error next block not available, but should ..."));
763  }
764 
765  DPRINTFP((2, DFUNCTION, -1, " blocksizes[%d]=%ld\n", (int) blocknr, (long) sion_filedesc->blocksizes[blocknr] ));
766  if ( bytes_to_read > sion_filedesc->blocksizes[blocknr]) {
767  btord = sion_filedesc->blocksizes[blocknr];
768  bytes_to_read-=btord; /* rest of data in next chunks */
769  DPRINTFP((2, DFUNCTION, -1, "whole block, bytes_to_read=%ld btord=%ld\n",(long) bytes_to_read, (long) btord ));
770  } else {
771  offset=sion_filedesc->startpos + blocknr * sion_filedesc->globalskip + bytes_to_read;
772  bytes_to_read=0;
773  }
774  }
775  }
776 
777  /* check if behind end of actual block */
778  if(offset == (sion_filedesc->startpos + blocknr * sion_filedesc->globalskip + sion_filedesc->blocksizes[blocknr])) {
779  /* move next block if available, if last blovk of file, stay behind current block */
780  if (blocknr < sion_filedesc->lastchunknr) {
781  blocknr++;
782  offset=sion_filedesc->startpos + blocknr * sion_filedesc->globalskip;
783  DPRINTFP((2, DFUNCTION, -1, " behind end of block, move to next block, blocksizes[%d]=%ld\n", (int) blocknr, (long) sion_filedesc->blocksizes[blocknr] ));
784  }
785  }
786 
787 
788  DPRINTFP((2, DFUNCTION, -1, "leave offset=%ld\n",(long) offset));
789  return(offset);
790 }
791 #undef DFUNCTION
792 
793 /* skip data block from current and following chunks, data could be distributed over several chunks */
794 #define DFUNCTION "_sion_skip_data_from_chunks_inline"
795 size_t _sion_skip_data_from_chunks_inline(_sion_filedesc *sion_filedesc, sion_int64 bytes_to_read) {
796  sion_int64 btord, bytesread, offset;
797  size_t rc=0;
798 
799 
800  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
801 
802  /* loop to read data in current and following chunks */
803  offset=0;
804  while(bytes_to_read>0) {
805 
806  /* determine size of data which could read from current chunk */
807  bytesread = sion_filedesc->currentpos - (sion_filedesc->startpos + sion_filedesc->currentblocknr * sion_filedesc->globalskip);
808 
809  btord=bytes_to_read;
810  if ((bytesread + btord) > sion_filedesc->blocksizes[sion_filedesc->currentblocknr])
811  btord = sion_filedesc->blocksizes[sion_filedesc->currentblocknr] - bytesread;
812  DPRINTFP((2, DFUNCTION, -1, " bytes_to_read=%ld btord=%ld\n",(long) bytes_to_read, (long) btord ));
813 
814  /* file pointer will not here adjusted, only at end of loop */
815 
816  /* increase current position and update other counters */
817  sion_filedesc->currentpos+=btord;
818  bytes_to_read -=btord;
819  offset +=btord;
820 
821  /* check if all data has already be processed, otherwise forward to next chunk */
822  if(bytes_to_read>0) {
823 
824  if(! _sion_move_to_next_chunk(sion_filedesc)) {
825  return(_sion_errorprint_on_rank(-1,_SION_ERROR_RETURN,sion_filedesc->rank,
826  "could not read data (%d bytes) to file (end of file reached sid=%d) ...", (int) btord, sion_filedesc->sid));
827  }
828 
829  }
830  }
831 
832  /* move to new position */
833  _sion_file_purge(sion_filedesc->fileptr);
834  _sion_file_set_position(sion_filedesc->fileptr, sion_filedesc->currentpos);
835 
836  rc=offset;
837  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
838  return(rc);
839 }
840 #undef DFUNCTION
841 
842 /* read data block from current and following chunks, data could be distributed over several chunks */
843 #define DFUNCTION "_sion_read_data_from_chunks_inline"
844 size_t _sion_read_data_from_chunks_inline(_sion_filedesc *sion_filedesc, void *data, sion_int64 bytes_to_read) {
845  sion_int64 btord, bytesread, offset;
846  size_t rc=0, frc;
847 
848 
849  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
850 
851  /* loop to read data in current and following chunks */
852  offset=0;
853  while(bytes_to_read>0) {
854 
855  /* determine size of data which could read from current chunk */
856  bytesread = sion_filedesc->currentpos - (sion_filedesc->startpos + sion_filedesc->currentblocknr * sion_filedesc->globalskip);
857  DPRINTFP((2, DFUNCTION, -1, "bytesread=%ld, curpos=%ld,startpos=%ld,curblock=%ld,gskip=%ld\n",
858  (long) bytesread, (long) sion_filedesc->currentpos, (long) sion_filedesc->startpos,
859  (long) sion_filedesc->currentblocknr, (long) sion_filedesc->globalskip ));
860 
861  btord=bytes_to_read;
862  if ((bytesread + btord) > sion_filedesc->blocksizes[sion_filedesc->currentblocknr]) {
863  btord = sion_filedesc->blocksizes[sion_filedesc->currentblocknr] - bytesread;
864  DPRINTFP((2, DFUNCTION, -1, "block is split to multiple chunks, bytes_to_read=%ld btord=%ld\n",(long) bytes_to_read, (long) btord ));
865  } else {
866  DPRINTFP((2, DFUNCTION, -1, "block in one chunk, bytes_to_read=%ld btord=%ld\n",(long) bytes_to_read, (long) btord ));
867  }
868 
869  /* read part or rest of data */
870  frc = _sion_file_read(((char *)data+offset), btord, sion_filedesc->fileptr);
871 
872  /* check return code */
873  if(frc != btord) {
874  return(_sion_errorprint_on_rank(-1,_SION_ERROR_RETURN,sion_filedesc->rank,
875  "could not read data (%d bytes) to file (frc=%d sid=%d) ...", (int) btord, (int) frc, sion_filedesc->sid));
876  }
877 
878  /* increase current position and update other counters */
879  sion_filedesc->currentpos+=btord;
880  bytes_to_read -=btord;
881  offset +=btord;
882 
883  /* check if all data has already be processed, otherwise forward to next chunk */
884  if(bytes_to_read>0) {
885 
886  if(! _sion_move_to_next_chunk(sion_filedesc)) {
887  return(_sion_errorprint_on_rank(-1,_SION_ERROR_RETURN,sion_filedesc->rank,
888  "could not read data (%d bytes) to file (end of file reached, frc=%d sid=%d) ...", (int) btord, (int) frc, sion_filedesc->sid));
889  }
890 
891  }
892  }
893 
894  rc=offset;
895  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
896  return(rc);
897 }
898 #undef DFUNCTION
899 
900 /* dup internal data structures from first to secon sion_filedesc */
901 #define DFUNCTION "_sion_keyval_dup_dataptr_inline"
902 int _sion_keyval_dup_dataptr_inline(_sion_filedesc *sion_filedesc, _sion_filedesc *new_filedesc) {
903  int rc=0;
904  _sion_keyvalue_keymngr* keymngr_orig;
905  _sion_keyvalue_keymngr* keymngr;
906 
907  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
908 
909  if(sion_filedesc->keyvalptr!=NULL) {
910 
911  /* get orig keymngr */
912  keymngr_orig = (_sion_keyvalue_keymngr *) sion_filedesc->keyvalptr;
913 
914  keymngr =_sion_keyvalue_keymngr_dup(keymngr_orig,new_filedesc->dup_mode, new_filedesc->dup_sel_key);
915  if (keymngr == NULL) {
916  return(_sion_errorprint_on_rank(-1,_SION_ERROR_RETURN,sion_filedesc->rank,"dup: could not duplicate keymngr ..."));
917  }
918  new_filedesc->keyvalptr=keymngr;
919 
920  } else {
921 
922  new_filedesc->keyvalptr=NULL;
923 
924  }
925 
926  DPRINTFP((2, DFUNCTION, -1, "leave\n"));
927  return (rc);
928 }
929 #undef DFUNCTION
930 
sion_int64 _sion_file_write(const void *data, sion_int64 bytes, _sion_fileptr *sion_fileptr)
Write data to file.
Definition: sion_file.c:141
int _sion_flush_block(_sion_filedesc *sion_filedesc)
Update the internal data structure.
sion_int64 _sion_file_set_position(_sion_fileptr *sion_fileptr, sion_int64 startpointer)
Set new position in file.
Definition: sion_file.c:219
Sion File Descriptor Structure.
Definition: sion_filedesc.h:79
int _sion_file_purge(_sion_fileptr *sion_fileptr)
Purge data to file.
Definition: sion_file.c:285
sion_int64 _sion_file_read(void *data, sion_int64 bytes, _sion_fileptr *sion_fileptr)
Read data from file.
Definition: sion_file.c:166
void sion_swap(void *target, void *source, int size, int n, int do_swap)
Definition: sion_convert.c:44
sion_int64 * blocksizes
sion_int32 currentblocknr
Definition: sion_filedesc.h:97
int _sion_create_new_block(_sion_filedesc *sion_filedesc)
Create a new block for the internal data structure.
Sion Time Stamp Header.
_sion_fileptr * fileptr
Definition: sion_filedesc.h:82