SIONlib  1.6.2
Scalable I/O library for parallel access to task-local files
sion_keyvalue_inline.c
Go to the documentation of this file.
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2016 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
14 #include <stdlib.h>
15 #include <stdio.h>
16 #include <string.h>
17 #include <time.h>
18 #include <assert.h>
19 
20 #include <sys/types.h>
21 #include <sys/stat.h>
22 #include <fcntl.h>
23 #include <unistd.h>
24 
25 
26 #include "sion.h"
27 #include "sion_debug.h"
28 #include "sion_filedesc.h"
29 #include "sion_tools.h"
30 #include "sion_fd.h"
31 #include "sion_file.h"
32 #include "sion_metadata.h"
33 #include "sion_internal.h"
34 #include "sion_internal_seek.h"
35 #include "sion_printts.h"
36 #include "sion_keyvalue.h"
37 #include "sion_keyvalue_inline.h"
38 #include "sion_keyvalue_keymngr.h"
39 
40 #define TABLE_SIZE 127
41 
42 #define SEARCH_TO_KEY 0
43 #define SEARCH_TO_NEXT 1
44 #define SEARCH_TO_END 2
45 
46 /* Utility functions */
47 _sion_keyvalue_keymngr * _sion_get_or_init_key_info(_sion_filedesc *sion_filedesc);
48 int _sion_move_to_pos(_sion_filedesc *sion_filedesc, size_t pos);
49 int _sion_scan_forward_to_key(_sion_filedesc *sion_filedesc, uint64_t key, int search_mode);
50 int _sion_move_to_next_chunk(_sion_filedesc *sion_filedesc);
51 size_t _sion_write_data_to_chunks_inline(_sion_filedesc *sion_filedesc, const void *data, sion_int64 bytes_to_write);
52 sion_int64 _sion_compute_next_position_inline(_sion_filedesc *sion_filedesc, sion_int64 bytes_to_read);
53 size_t _sion_skip_data_from_chunks_inline(_sion_filedesc *sion_filedesc, sion_int64 bytes_to_read);
54 size_t _sion_read_data_from_chunks_inline(_sion_filedesc *sion_filedesc, void *data, sion_int64 bytes_to_read);
55 
56 
57 /* ***************** */
58 /* Write functions */
59 /* ***************** */
60 
61 
62 
63 #define DFUNCTION "_sion_store_and_write_key_and_len_inline"
64 size_t _sion_store_and_write_key_and_len_inline(_sion_filedesc *sion_filedesc, uint64_t key, size_t len) {
65  sion_int64 bytes_to_write;
66  sion_int64 buffer[2];
67  size_t rc=SION_SUCCESS, frc;
68 
69 
70  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
71 
72  /* TODO: need to check if current_pos is set correctly (in case of sion_seeks in between, currently not allowed) ... */
73 
74  /* assemble data */
75  buffer[0]=(sion_int64) key;
76  buffer[1]=(sion_int64) len;
77  bytes_to_write=2*sizeof(sion_int64);
78 
79  frc=_sion_write_data_to_chunks_inline(sion_filedesc, (const void *) buffer, (sion_int64) bytes_to_write);
80 
81  /* check return code */
82  if(frc != bytes_to_write) {
83  return(_sion_errorprint_on_rank(-1,_SION_ERROR_RETURN,sion_filedesc->rank,
84  "could not write data (%d bytes) to file (frc=%d sid=%d) ...", (int) bytes_to_write, (int) frc, sion_filedesc->sid));
85  }
86 
87  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
88  return(rc);
89 }
90 #undef DFUNCTION
91 
92 #define DFUNCTION "_sion_write_value_inline"
93 size_t _sion_write_value_inline(_sion_filedesc *sion_filedesc, const void *data, uint64_t key, size_t len) {
94  size_t rc=0, frc;
95  sion_int64 bytes_to_write;
96  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
97 
98 
99  bytes_to_write=len;
100  frc=_sion_write_data_to_chunks_inline(sion_filedesc, data, (sion_int64) bytes_to_write);
101  if(frc != bytes_to_write) {
102  return(_sion_errorprint_on_rank(-1,_SION_ERROR_RETURN,sion_filedesc->rank,
103  "could not write data (%d bytes) to file (frc=%d sid=%d) ...", (int) bytes_to_write, (int) frc, sion_filedesc->sid));
104  }
105 
106  rc=(size_t) bytes_to_write;
107 
108  DPRINTFP((2, DFUNCTION, -1, "leave\n"));
109  return (rc);
110 }
111 #undef DFUNCTION
112 
113 
114 /* ***************** */
115 /* Read functions */
116 /* ***************** */
117 
118 #define DFUNCTION "_sion_find_and_read_key_and_len_inline"
119 int _sion_find_and_read_key_and_len_inline(_sion_filedesc *sion_filedesc, uint64_t key, size_t len, size_t *datalen) {
120  _sion_keyvalue_keymngr *keymngr;
121  int rc = SION_SUCCESS;
122  int position_found=0;
123  sion_table_key_t mkey;
124  size_t key_current_pos = (size_t) -1, key_bytes_left = (size_t) -1;
125 
126  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
127 
128  /* get or init internal data structure if first call */
129  keymngr=_sion_get_or_init_key_info(sion_filedesc);
130 
131  /* map key to table type */
132  mkey = key;
133 
134  *datalen=0;
135 
136  /* indicator if data could read, key_current_pos, key_bytes_left are set correctly */
137  position_found=0;
138 
139  /* check if info about key is available */
140  rc=_sion_keyvalue_keymngr_lookup(keymngr, mkey, &key_current_pos, &key_bytes_left);
141  if(rc!=SION_SUCCESS) {
142  DPRINTFP((2, DFUNCTION, -1, "key %ld is not found in keymngr, scan forward\n", (long)mkey));
143 
144  /* scan forward until key is found */
145  rc=_sion_scan_forward_to_key(sion_filedesc, key, SEARCH_TO_KEY);
146 
147  /* check again if info about key is now available */
148  if(rc==SION_SUCCESS) {
149  DPRINTFP((2, DFUNCTION, -1, "scan forward found key %ld, get info \n", (long)mkey));
150  rc=_sion_keyvalue_keymngr_lookup(keymngr, mkey, &key_current_pos, &key_bytes_left);
151  DPRINTFP((2, DFUNCTION, -1, "key %ld, get info(2): pos=%ld bytes_left=%ld\n", (long)mkey, (long) key_current_pos, (long) key_bytes_left ));
152  }
153  } else {
154  DPRINTFP((2, DFUNCTION, -1, "key %ld, get info(1): pos=%ld bytes_left=%ld\n", (long)mkey, (long) key_current_pos, (long) key_bytes_left ));
155  }
156  if(rc==SION_SUCCESS) {
157  /* check if enough data is available in block */
158  position_found=1;
159  if(len<=key_bytes_left) {
160  *datalen=len;
161  } else {
162  /* only a part of requested data can be read */
163  *datalen=key_bytes_left;
164  }
165  } else {
166  /* not more data in file for key */
167  rc=SION_NOT_SUCCESS;
168  DPRINTFP((2, DFUNCTION, -1, "no more data for key %ld\n", (long)mkey ));
169  }
170 
171  /* check current position and move if necessary */
172  DPRINTFP((2, DFUNCTION, -1, "check position: current_pos %ld, key_current_pos=%ld len=%ld key_bytes_left=%ld\n",
173  (long) sion_filedesc->currentpos, (long) key_current_pos, (long) len, (long) key_bytes_left ));
174  if(position_found) {
175  if(key_current_pos!=sion_filedesc->currentpos) {
176  rc=_sion_move_to_pos(sion_filedesc, key_current_pos);
177  }
178  /* ready to read data */
179  }
180 
181  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n", rc));
182  return rc;
183 }
184 #undef DFUNCTION
185 
186 #define DFUNCTION "_sion_read_value_inline"
187 size_t _sion_read_value_inline(_sion_filedesc *sion_filedesc, void *data, uint64_t key, size_t len) {
188  size_t rc=0;
189  _sion_keyvalue_keymngr *keymngr;
190  sion_table_key_t mkey;
191  sion_int64 bread;
192  sion_int64 bytes_to_read;
193  char *out;
194 
195  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
196 
197  /* get or init internal data structure if first call */
198  keymngr=_sion_get_or_init_key_info(sion_filedesc);
199  mkey = (sion_table_key_t) key;
200 
201  /* there are enough bytes available, checked during read of key,len */
202 
203  out = (char*) data;
204 
205  /* read data */
206  bytes_to_read=len;
207 
208  bread=_sion_read_data_from_chunks_inline(sion_filedesc, (void *) out, (sion_int64) bytes_to_read);
209  if (bread != bytes_to_read) {
210  return(_sion_errorprint_on_rank(SION_NOT_SUCCESS,_SION_ERROR_RETURN,sion_filedesc->rank,
211  "could not read data (%d bytes) from file ...", (int) bytes_to_read));
212  }
213 
214  DPRINTFP((2, DFUNCTION, -1, "first value = %3hhu | %c\n", out[0], out[0]));
215 
216  /* update meta data of current block */
217  _sion_keyvalue_keymngr_update_read_pos(keymngr, mkey, (size_t) bread, (sion_int64) sion_filedesc->currentpos);
218 
219  rc = (size_t) bread;
220 
221  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",(int) rc));
222  return (rc);
223 }
224 #undef DFUNCTION
225 
226 
227 #define DFUNCTION "_sion_key_full_scan_inline"
228 int _sion_key_full_scan_inline(_sion_filedesc *sion_filedesc) {
229  int rc=SION_NOT_SUCCESS;
230  _sion_keyvalue_keymngr* keymngr;
231  sion_table_key_t key=0;
232 
233  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
234 
235  /* get or init internal data structure if first call */
236  keymngr=_sion_get_or_init_key_info(sion_filedesc);
237 
238  if(!_sion_keyvalue_keymngr_is_scan_done(keymngr)) {
239  rc=_sion_scan_forward_to_key(sion_filedesc, key, SEARCH_TO_END);
240  } else {
241  /* scan already done */
242  rc=SION_SUCCESS;
243  }
244 
245  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
246  return rc;
247 }
248 #undef DFUNCTION
249 
250 
251 #define DFUNCTION "_sion_iterator_reset_inline"
252 int _sion_iterator_reset_inline(_sion_filedesc *sion_filedesc) {
253  int rc=SION_SUCCESS;
254  _sion_keyvalue_keymngr* keymngr;
255 
256  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
257 
258  /* get or init internal data structure if first call */
259  keymngr=_sion_get_or_init_key_info(sion_filedesc);
260 
261  rc=_sion_keyvalue_keymngr_iterator_reset(keymngr);
262 
263  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
264  return rc;
265 }
266 #undef DFUNCTION
267 
268 #define DFUNCTION "_sion_iterator_next_inline"
269 int _sion_iterator_next_inline(_sion_filedesc *sion_filedesc, uint64_t *keyptr, size_t *sizeptr) {
270  int rc=SION_NOT_SUCCESS;
271  _sion_keyvalue_keymngr* keymngr;
272  sion_table_key_t key=0;
273  size_t current_pos, offset, len;
274 
275  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
276 
277  /* get or init internal data structure if first call */
278  keymngr=_sion_get_or_init_key_info(sion_filedesc);
279 
280  if(_sion_keyvalue_keymngr_iterator_next(keymngr, &key, &current_pos, &offset, &len)==SION_SUCCESS) {
281  /* next block found */
282  rc=SION_SUCCESS;
283  } else {
284  /* scan forward if not at end of file to get more meta data */
285  if (_sion_scan_forward_to_key(sion_filedesc, key, SEARCH_TO_NEXT)==SION_SUCCESS) {
286  /* get info about block */
287  if(_sion_keyvalue_keymngr_iterator_next(keymngr, &key, &current_pos, &offset, &len)==SION_SUCCESS) {
288  rc=SION_SUCCESS;
289  } else {
290  return(_sion_errorprint_on_rank(SION_NOT_SUCCESS,_SION_ERROR_RETURN, sion_filedesc->rank,
291  "internal error: block could not be find at end of block list ..."));
292  }
293  } else {
294  /* end of file reached and all blocks are already iterated */
295  rc=SION_NOT_SUCCESS;
296  }
297  }
298 
299  /* set output parameter */
300  if(rc==SION_SUCCESS) {
301  *keyptr = (uint64_t) key;
302  *sizeptr = (size_t) len;
303  }
304 
305  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
306  return rc;
307 }
308 #undef DFUNCTION
309 
310 #define DFUNCTION "_sion_seek_key_inline"
311 int _sion_seek_key_inline(_sion_filedesc *sion_filedesc, uint64_t key, int blocknum, sion_int64 posinblock) {
312  int rc=SION_NOT_SUCCESS;
313  _sion_keyvalue_keymngr* keymngr;
314  size_t offset, len;
315 
316  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
317 
318  /* get or init internal data structure if first call */
319  keymngr=_sion_get_or_init_key_info(sion_filedesc);
320 
321  /* lookup if a key entry at that position is already in keymngr */
322  rc=_sion_keyvalue_keymngr_lookup_and_set_pos(keymngr, key, blocknum, posinblock, &offset, &len);
323  DPRINTFP((2, DFUNCTION, -1, "after first lookup rc=%d offset=%d len=%d\n",rc, (int)offset, (int) len ));
324 
325  while( (rc!=SION_SUCCESS) && !_sion_keyvalue_keymngr_is_scan_done(keymngr) ) {
326 
327  /* scan forward to next entry with that key */
328  DPRINTFP((2, DFUNCTION, -1, "scan forward to find key %d\n",(int) key));
329  if (_sion_scan_forward_to_key(sion_filedesc, key, SEARCH_TO_NEXT)==SION_SUCCESS) {
330 
331  /* lookup again if a key entry at that position is already in keymngr */
332  rc=_sion_keyvalue_keymngr_lookup_and_set_pos(keymngr, key, blocknum, posinblock, &offset, &len);
333  DPRINTFP((2, DFUNCTION, -1, "in loop lookup rc=%d offset=%d len=%d\n",rc, (int)offset, (int) len ));
334  } else {
335  rc=SION_NOT_SUCCESS;
336  }
337  }
338 
339  /* move to scanpos if necessary */
340  if( rc==SION_SUCCESS ) {
341  DPRINTFP((2, DFUNCTION, -1, "move from %ld to new scanpos (%ld) \n",(long) sion_filedesc->currentpos,(long) offset));
342  rc=_sion_move_to_pos(sion_filedesc, offset);
343  }
344 
345  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
346  return rc;
347 }
348 #undef DFUNCTION
349 
350 #define DFUNCTION "_sion_key_list_iterator_reset_inline"
351 int _sion_key_list_iterator_reset_inline(_sion_filedesc *sion_filedesc) {
352  int rc=SION_SUCCESS;
353  _sion_keyvalue_keymngr* keymngr;
354 
355  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
356 
357  /* get or init internal data structure if first call */
358  keymngr=_sion_get_or_init_key_info(sion_filedesc);
359 
360  rc=_sion_keyvalue_keymngr_key_list_iterator_reset(keymngr);
361 
362  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
363  return rc;
364 }
365 #undef DFUNCTION
366 
367 #define DFUNCTION "_sion_iterator_next_inline"
368 int _sion_key_list_iterator_next_inline(_sion_filedesc *sion_filedesc, uint64_t *keyptr) {
369  int rc=SION_NOT_SUCCESS;
370  _sion_keyvalue_keymngr* keymngr;
371  sion_table_key_t key=0;
372 
373  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
374 
375  /* get or init internal data structure if first call */
376  keymngr=_sion_get_or_init_key_info(sion_filedesc);
377 
378  if(_sion_keyvalue_keymngr_key_list_iterator_next(keymngr, &key)==SION_SUCCESS) {
379  *keyptr = (uint64_t) key;
380  rc=SION_SUCCESS;
381  } else {
382  *keyptr = 0;
383  rc=SION_NOT_SUCCESS;
384  }
385 
386  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d key=%ld\n",rc,(long) key));
387  return rc;
388 }
389 #undef DFUNCTION
390 
391 /* fills a stat-data structure containing information about the key */
392 #define DFUNCTION "_sion_key_get_stat_inline"
393 int _sion_key_get_stat_inline(_sion_filedesc *sion_filedesc, uint64_t searchkey, sion_key_stat_t *keystat) {
394  int rc=SION_NOT_SUCCESS;
395  _sion_keyvalue_keymngr* keymngr;
396  sion_table_key_t key=0;
397 
398  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
399 
400  /* get or init internal data structure if first call */
401  keymngr=_sion_get_or_init_key_info(sion_filedesc);
402 
403  key= (sion_table_key_t) searchkey;
404 
405  if(_sion_keyvalue_keymngr_key_get_stat(keymngr, key, keystat)==SION_SUCCESS) {
406  rc=SION_SUCCESS;
407  } else {
408  rc=SION_NOT_SUCCESS;
409  }
410 
411  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d key=%ld\n",rc,(long) key));
412  return rc;
413 }
414 #undef DFUNCTION
415 
416 /* ***************** */
417 /* Utility functions */
418 /* ***************** */
419 
420 #define DFUNCTION "_sion_scan_forward_to_key"
421 int _sion_scan_forward_to_key(_sion_filedesc *sion_filedesc, uint64_t key, int search_mode) {
422  int rc = SION_SUCCESS;
423  _sion_keyvalue_keymngr *keymngr;
424  sion_table_key_t mkey;
425 
426  size_t scanpos;
427  int key_found;
428  size_t bytes_left, lastposinblock, lastposinfile;
429  uint64_t key_and_len[2];
430 
431  size_t bread, bskip, len;
432 
433  DPRINTFP((2, DFUNCTION, -1, "enter key=%ld\n",(long) key));
434 
435  /* get keymngr */
436  keymngr = (_sion_keyvalue_keymngr *) sion_filedesc->keyvalptr;
437 
438  key_found=0;
439 
440  /* get position from where scan has to be started */
441  if(_sion_keyvalue_keymngr_get_next_scan_pos(keymngr, &scanpos) != SION_SUCCESS) {
442  /* can only be at start time, start then from current position in file */
443  scanpos=(size_t) sion_filedesc->currentpos;
444  DPRINTFP((2, DFUNCTION, -1, "set scanpos to first pos in file pos=%ld\n",(long) scanpos));
445  }
446  /* check if file is already completely scanned */
447  lastposinfile=sion_filedesc->startpos + sion_filedesc->lastchunknr * sion_filedesc->globalskip
448  + sion_filedesc->blocksizes[sion_filedesc->lastchunknr];
449 
450  DPRINTFP((2, DFUNCTION, -1, "check if not already scanned to endoffile scanpos=%ld lastposinfile=%ld lastchunknr=%d\n",(long) scanpos,(long) lastposinfile, (int) sion_filedesc->lastchunknr ));
451  if(scanpos>=lastposinfile) {
452  DPRINTFP((2, DFUNCTION, -1, "scan is already at end of file (%ld>=%ld) \n",(long) scanpos, (long) lastposinfile));
453  _sion_keyvalue_keymngr_set_scan_done(keymngr);
454  rc=SION_NOT_SUCCESS;
455  }
456 
457  /* move to scanpos if necessary */
458  if( (sion_filedesc->currentpos != scanpos) && (!_sion_keyvalue_keymngr_is_scan_done(keymngr)) ) {
459  DPRINTFP((2, DFUNCTION, -1, "move from %ld to new scanpos (%ld) \n",(long) sion_filedesc->currentpos,(long) scanpos));
460  rc=_sion_move_to_pos(sion_filedesc, scanpos);
461  }
462 
463  /* search forward */
464  while ( (!_sion_keyvalue_keymngr_is_scan_done(keymngr)) && (!key_found) ) {
465 
466  lastposinblock = sion_filedesc->startpos + sion_filedesc->currentblocknr * sion_filedesc->globalskip
467  + sion_filedesc->blocksizes[sion_filedesc->currentblocknr];
468  bytes_left = lastposinblock - sion_filedesc->currentpos;
469  DPRINTFP((2, DFUNCTION, -1, "search loop: lastposinblock=%ld bytes_left=%ld currentpos=%ld\n",(long) lastposinblock,(long) bytes_left,(long) sion_filedesc->currentpos));
470 
471 
472  /* is there data in current chunk to start reading next block? */
473  if ( bytes_left > 0 ) {
474  /* read next block from current and following chunks */
475  bread=_sion_read_data_from_chunks_inline(sion_filedesc, (void *) key_and_len, (sion_int64) sizeof(key_and_len));
476  if (bread != sizeof(key_and_len)) {
477  return(_sion_errorprint_on_rank(SION_NOT_SUCCESS,_SION_ERROR_RETURN,sion_filedesc->rank,
478  "could not read data (%d bytes) from file ...", (int) sizeof(key_and_len)));
479  }
480  sion_swap(key_and_len, key_and_len, sizeof(sion_int64), 2, sion_filedesc->swapbytes);
481 
482 
483  DPRINTFP((2, DFUNCTION, -1, "search loop: found next key,len (%ld,%ld)\n",(long) key_and_len[0],(long) key_and_len[1]));
484  bytes_left-=bread;
485 
486  DPRINTFP((2, DFUNCTION, -1, "search loop: position is now %ld bytes_left=%ld\n",(long) sion_filedesc->currentpos, (long) bytes_left));
487 
488  /* check if filepointer has to moved to next block to be in front of data */
489  if(bytes_left==0) {
490  DPRINTFP((2, DFUNCTION, -1, "search loop: data starts in next chunk, moving forward\n"));
491  if(! _sion_move_to_next_chunk(sion_filedesc)) {
492  return(_sion_errorprint_on_rank(SION_NOT_SUCCESS,_SION_ERROR_RETURN,sion_filedesc->rank,
493  "could not move to data section in next block ..."));
494  }
495  lastposinblock = sion_filedesc->startpos + sion_filedesc->currentblocknr * sion_filedesc->globalskip
496  + sion_filedesc->blocksizes[sion_filedesc->currentblocknr];
497  bytes_left = lastposinblock - sion_filedesc->currentpos;
498  DPRINTFP((2, DFUNCTION, -1, "search loop: position is moved to %ld bytes_left=%ld\n",(long) sion_filedesc->currentpos, (long) bytes_left));
499  }
500 
501  /* store new block info */
502  mkey=key_and_len[0];
503  len =key_and_len[1];
504  rc=_sion_keyvalue_keymngr_add_block(keymngr, mkey, (size_t) sion_filedesc->currentpos, len);
505  DPRINTFP((2, DFUNCTION, -1, "search loop: stored new block key=%ld pos=%ld len=%ld\n",(long) mkey, (long) sion_filedesc->currentpos, (long) len));
506 
507  /* check key */
508  if (
509  ( ( (search_mode==SEARCH_TO_KEY) && (key_and_len[0] == key) ) || (search_mode==SEARCH_TO_NEXT) )
510  && ( !(search_mode==SEARCH_TO_END) )
511  ) {
512  DPRINTFP((2, DFUNCTION, -1, "search loop: found searched key=%ld, leaving\n",(long) key));
513  rc=SION_SUCCESS;
514  key_found=1; /* end of loop */
515  scanpos=_sion_compute_next_position_inline(sion_filedesc, len); /* next search behind data of that block */
516  } else {
517  DPRINTFP((2, DFUNCTION, -1, "search loop: found another key=%ld <=> %ld, continue\n",(long) key_and_len[0], (long) key));
518 
519  /* move forward behind data of block in the same or next chunk */
520  DPRINTFP((2, DFUNCTION, -1, "search loop: found move position forward by %ld bytes\n",(long) len));
521  bskip=_sion_skip_data_from_chunks_inline(sion_filedesc, (sion_int64) len);
522 
523  if ( bskip == len ) {
524 
525  /* recompute bytes_left in current block */
526  lastposinblock = sion_filedesc->startpos + sion_filedesc->currentblocknr * sion_filedesc->globalskip
527  + sion_filedesc->blocksizes[sion_filedesc->currentblocknr];
528  bytes_left = lastposinblock - sion_filedesc->currentpos;
529  scanpos=sion_filedesc->currentpos; /* next search behind data */
530 
531  } else {
532  return(_sion_errorprint_on_rank(SION_NOT_SUCCESS,_SION_ERROR_RETURN,sion_filedesc->rank,
533  "could not skip data section of one block (%d bytes) from file ...", (int) len));
534  }
535 
536  /* check if filepointer has to moved to next block */
537  if(bytes_left==0) {
538  DPRINTFP((2, DFUNCTION, -1, "search loop: chunk empty, move to next chunk\n"));
539  if(! _sion_move_to_next_chunk(sion_filedesc)) {
540  _sion_keyvalue_keymngr_set_scan_done(keymngr); /* end of loop */
541  if(search_mode!=SEARCH_TO_END) {
542  rc=SION_NOT_SUCCESS;
543  } else {
544  rc=SION_SUCCESS; /* end of data reached */
545  }
546  }
547  }
548  }
549 
550  } else {
551  /* not enough byte in block to read next key+len */
552  if(bytes_left==0) {
553  if(! _sion_move_to_next_chunk(sion_filedesc)) {
554  _sion_keyvalue_keymngr_set_scan_done(keymngr); /* end of loop */
555  rc=SION_NOT_SUCCESS;
556  }
557  } else {
558  return(_sion_errorprint_on_rank(SION_NOT_SUCCESS,_SION_ERROR_RETURN,sion_filedesc->rank,
559  "unknown data at end of chunk (%d bytes) ...", (int) bytes_left));
560  }
561  }
562 
563  } /* while */
564 
565  /* update scanpos */
566  if(_sion_keyvalue_keymngr_set_next_scan_pos(keymngr, scanpos) != SION_SUCCESS) {
567  return(_sion_errorprint_on_rank(SION_NOT_SUCCESS,_SION_ERROR_RETURN,sion_filedesc->rank,
568  "internal error set seekpos ..."));
569  }
570 
571  DPRINTFP((2, DFUNCTION, -1, "leave\n"));
572  return (rc);
573 }
574 #undef DFUNCTION
575 
576 
577 #define DFUNCTION "_sion_get_or_init_key_info"
578 _sion_keyvalue_keymngr * _sion_get_or_init_key_info(_sion_filedesc *sion_filedesc) {
579 
580  _sion_keyvalue_keymngr* keymngr;
581 
582  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
583 
584  if (sion_filedesc->keyvalptr == NULL) {
585  DPRINTFP((2, DFUNCTION, -1, "sion_filedesc->keyvalptr == NULL\n"));
586  keymngr =_sion_keyvalue_keymngr_init(TABLE_SIZE);
587  if (keymngr == NULL) {
588  _sion_errorprint_on_rank(-1,_SION_ERROR_RETURN,sion_filedesc->rank,"could not allocate keymngr ...");
589  }
590  sion_filedesc->keyvalptr=keymngr;
591  DPRINTFP((2, DFUNCTION, -1, "alloc now KEYVALPTR = %x\n",sion_filedesc->keyvalptr));
592  }
593 
594  DPRINTFP((2, DFUNCTION, -1, "leave\n"));
595  return sion_filedesc->keyvalptr;
596 }
597 #undef DFUNCTION
598 
599 #define DFUNCTION "_sion_move_to_next_chunk"
600 int _sion_move_to_next_chunk(_sion_filedesc *sion_filedesc) {
601  int rc = SION_SUCCESS;
602 
603  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
604 
605  /* is another chink available */
606  if (sion_filedesc->currentblocknr < sion_filedesc->lastchunknr) {
607  /* move to next chunk */
608  sion_filedesc->currentblocknr++;
609  sion_filedesc->currentpos = sion_filedesc->startpos + sion_filedesc->currentblocknr * sion_filedesc->globalskip;
610  _sion_file_purge(sion_filedesc->fileptr);
611  _sion_file_set_position(sion_filedesc->fileptr, sion_filedesc->currentpos);
612  } else {
613  rc=SION_NOT_SUCCESS;
614  }
615 
616  DPRINTFP((2, DFUNCTION, -1, "leave, rc = %d\n", rc));
617  return rc;
618 }
619 #undef DFUNCTION
620 
621 #define DFUNCTION "_sion_move_to_pos"
622 int _sion_move_to_pos(_sion_filedesc *sion_filedesc, size_t pos) {
623  int rc = SION_SUCCESS;
624  size_t block_min_pos, block_max_pos;
625  int c, pos_found=0;
626 
627  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
628 
629  block_min_pos = sion_filedesc->startpos + sion_filedesc->currentblocknr * sion_filedesc->globalskip;
630  block_max_pos = block_min_pos+sion_filedesc->blocksizes[sion_filedesc->currentblocknr]; /* position of first byte behind chunk */
631 
632  DPRINTFP((2, DFUNCTION, -1, "current_chunk: %ld to %ld (pos=%ld)\n", (long) block_min_pos, (long) block_max_pos, (long) pos));
633 
634  if ( (pos>=block_min_pos) && (pos<block_max_pos) ) {
635  /* stay in the same chunk */
636  pos_found=1;
637  sion_filedesc->currentpos=pos;
638  _sion_file_purge(sion_filedesc->fileptr);
639  _sion_file_set_position(sion_filedesc->fileptr, sion_filedesc->currentpos);
640  DPRINTFP((2, DFUNCTION, -1, "stay in current chunk: currentpos=%ld\n", (long) sion_filedesc->currentpos));
641 
642  } else {
643  /* scan from beginning over all chunks */
644 
645  for(c=0;c<=sion_filedesc->lastchunknr;c++) {
646  block_min_pos = sion_filedesc->startpos + c * sion_filedesc->globalskip;
647  block_max_pos = block_min_pos+sion_filedesc->blocksizes[c]; /* position of first byte behind chunk */
648 
649  DPRINTFP((2, DFUNCTION, -1, "check chunk%2d: %ld to %ld (pos=%ld)\n", c, (long) block_min_pos, (long) block_max_pos, (long) pos));
650 
651  if ( (pos>=block_min_pos) && (pos<block_max_pos) ) {
652 
653  /* stay in this chunk */
654  pos_found=1;
655  sion_filedesc->currentblocknr=c;
656  sion_filedesc->currentpos=pos;
657  _sion_file_purge(sion_filedesc->fileptr);
658  _sion_file_set_position(sion_filedesc->fileptr, sion_filedesc->currentpos);
659  DPRINTFP((2, DFUNCTION, -1, "stay in this chunk: currentpos=%ld\n", (long) sion_filedesc->currentpos));
660  break;
661 
662  }
663  }
664  }
665 
666  if(!pos_found) {
667  rc=SION_NOT_SUCCESS;
668  }
669 
670  DPRINTFP((2, DFUNCTION, -1, "leave, rc = %d\n", rc));
671  return rc;
672 }
673 #undef DFUNCTION
674 
675 /* write data block to current and following chunks, data could be distributed over several chunks */
676 #define DFUNCTION "_sion_write_data_to_chunks_inline"
677 size_t _sion_write_data_to_chunks_inline(_sion_filedesc *sion_filedesc, const void *data, sion_int64 bytes_to_write) {
678  sion_int64 btowr, byteswritten, offset;
679  size_t rc=0, frc;
680 
681 
682  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
683 
684  /* loop to write data in current and following chunks */
685  offset=0;
686  while(bytes_to_write>0) {
687 
688  _sion_flush_block(sion_filedesc);
689 
690  /* determine size of data which could written into current chunk */
691  byteswritten = sion_filedesc->blocksizes[sion_filedesc->currentblocknr];
692  btowr=bytes_to_write;
693  if ((byteswritten + btowr) > sion_filedesc->chunksize) btowr = sion_filedesc->chunksize - byteswritten;
694  DPRINTFP((2, DFUNCTION, -1, " bytes_to_write=%ld btowr=%ld chunksize=%ld byteswritten=%ld currentblocknr=%d blsize[%d]=%ld\n",
695  (long) bytes_to_write, (long) btowr, (long) sion_filedesc->chunksize, (long) byteswritten, (int) sion_filedesc->currentblocknr,
696  sion_filedesc->currentblocknr, (long) sion_filedesc->blocksizes[sion_filedesc->currentblocknr] ));
697 
698  /* write part or rest of data */
699  frc = _sion_file_write(((char *) data+offset), btowr, sion_filedesc->fileptr);
700 
701  /* check return code */
702  if(frc != btowr) {
703  return(_sion_errorprint_on_rank(-1,_SION_ERROR_RETURN,sion_filedesc->rank,
704  "could not write data (%d bytes) to file (frc=%d sid=%d) ...", (int) btowr, (int) frc, sion_filedesc->sid));
705  }
706 
707  /* increase current position and update other counters */
708  sion_filedesc->currentpos+=btowr;
709  byteswritten +=btowr;
710  bytes_to_write -=btowr;
711  offset +=btowr;
712 
713  /* check if all data has already be processed */
714  if(bytes_to_write>0) {
715 
716  /* create new block for writing next data */
717  _sion_flush_block(sion_filedesc);
718  _sion_create_new_block(sion_filedesc);
719  }
720  }
721  rc=offset;
722  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
723  return(rc);
724 }
725 #undef DFUNCTION
726 
727 /* skip data block from current and following chunks, data could be distributed over several chunks */
728 #define DFUNCTION "_sion_compute_next_position_inline"
729 sion_int64 _sion_compute_next_position_inline(_sion_filedesc *sion_filedesc, sion_int64 bytes_to_read) {
730  sion_int64 btord, bytesread, offset;
731  int blocknr;
732 
733 
734  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
735 
736  /* check data in current block */
737  blocknr = sion_filedesc->currentblocknr;
738  bytesread = sion_filedesc->currentpos - (sion_filedesc->startpos + sion_filedesc->currentblocknr * sion_filedesc->globalskip);
739 
740  DPRINTFP((2, DFUNCTION, -1, " currentpos=%ld bytesread=%ld blocknr=%d bytes_to_read=%ld\n",
741  (long) sion_filedesc->currentpos, (long) bytesread, (int) blocknr, (long) bytes_to_read ));
742 
743  DPRINTFP((2, DFUNCTION, -1, " blocksizes[%d]=%ld\n",
744  (int) sion_filedesc->currentblocknr, (long) sion_filedesc->blocksizes[sion_filedesc->currentblocknr] ));
745 
746  /* data is all in current block */
747  if ((bytesread + bytes_to_read) <= sion_filedesc->blocksizes[sion_filedesc->currentblocknr]) {
748  offset=sion_filedesc->currentpos+bytes_to_read;
749  } else {
750  btord = sion_filedesc->blocksizes[sion_filedesc->currentblocknr] - bytesread; /* size of data in current chunk */
751  bytes_to_read-=btord; /* rest of data in next chunks */
752 
753  DPRINTFP((2, DFUNCTION, -1, " skip to next block, bytes_to_read=%ld btord=%ld\n",(long) bytes_to_read, (long) btord ));
754 
755  /* loop to behind data in current and following chunks */
756  while(bytes_to_read>0) {
757 
758  /* next block */
759  if (blocknr < sion_filedesc->lastchunknr) { blocknr++; } else {
760  return(_sion_errorprint_on_rank(-1,_SION_ERROR_RETURN,sion_filedesc->rank,"internal error next block not available, but should ..."));
761  }
762 
763  DPRINTFP((2, DFUNCTION, -1, " blocksizes[%d]=%ld\n", (int) blocknr, (long) sion_filedesc->blocksizes[blocknr] ));
764  if ( bytes_to_read > sion_filedesc->blocksizes[blocknr]) {
765  btord = sion_filedesc->blocksizes[blocknr];
766  bytes_to_read-=btord; /* rest of data in next chunks */
767  DPRINTFP((2, DFUNCTION, -1, "whole block, bytes_to_read=%ld btord=%ld\n",(long) bytes_to_read, (long) btord ));
768  } else {
769  offset=sion_filedesc->startpos + blocknr * sion_filedesc->globalskip + bytes_to_read;
770  bytes_to_read=0;
771  }
772  }
773  }
774 
775  /* check if behind end of actual block */
776  if(offset == (sion_filedesc->startpos + blocknr * sion_filedesc->globalskip + sion_filedesc->blocksizes[blocknr])) {
777  /* move next block if available, if last blovk of file, stay behind current block */
778  if (blocknr < sion_filedesc->lastchunknr) {
779  blocknr++;
780  offset=sion_filedesc->startpos + blocknr * sion_filedesc->globalskip;
781  DPRINTFP((2, DFUNCTION, -1, " behind end of block, move to next block, blocksizes[%d]=%ld\n", (int) blocknr, (long) sion_filedesc->blocksizes[blocknr] ));
782  }
783  }
784 
785 
786  DPRINTFP((2, DFUNCTION, -1, "leave offset=%ld\n",(long) offset));
787  return(offset);
788 }
789 #undef DFUNCTION
790 
791 /* skip data block from current and following chunks, data could be distributed over several chunks */
792 #define DFUNCTION "_sion_skip_data_from_chunks_inline"
793 size_t _sion_skip_data_from_chunks_inline(_sion_filedesc *sion_filedesc, sion_int64 bytes_to_read) {
794  sion_int64 btord, bytesread, offset;
795  size_t rc=0;
796 
797 
798  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
799 
800  /* loop to read data in current and following chunks */
801  offset=0;
802  while(bytes_to_read>0) {
803 
804  /* determine size of data which could read from current chunk */
805  bytesread = sion_filedesc->currentpos - (sion_filedesc->startpos + sion_filedesc->currentblocknr * sion_filedesc->globalskip);
806 
807  btord=bytes_to_read;
808  if ((bytesread + btord) > sion_filedesc->blocksizes[sion_filedesc->currentblocknr])
809  btord = sion_filedesc->blocksizes[sion_filedesc->currentblocknr] - bytesread;
810  DPRINTFP((2, DFUNCTION, -1, " bytes_to_read=%ld btord=%ld\n",(long) bytes_to_read, (long) btord ));
811 
812  /* file pointer will not here adjusted, only at end of loop */
813 
814  /* increase current position and update other counters */
815  sion_filedesc->currentpos+=btord;
816  bytesread +=btord;
817  bytes_to_read -=btord;
818  offset +=btord;
819 
820  /* check if all data has already be processed, otherwise forward to next chunk */
821  if(bytes_to_read>0) {
822 
823  if(! _sion_move_to_next_chunk(sion_filedesc)) {
824  return(_sion_errorprint_on_rank(-1,_SION_ERROR_RETURN,sion_filedesc->rank,
825  "could not read data (%d bytes) to file (end of file reached sid=%d) ...", (int) btord, sion_filedesc->sid));
826  }
827 
828  }
829  }
830 
831  /* move to new position */
832  _sion_file_purge(sion_filedesc->fileptr);
833  _sion_file_set_position(sion_filedesc->fileptr, sion_filedesc->currentpos);
834 
835  rc=offset;
836  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
837  return(rc);
838 }
839 #undef DFUNCTION
840 
841 /* read data block from current and following chunks, data could be distributed over several chunks */
842 #define DFUNCTION "_sion_read_data_from_chunks_inline"
843 size_t _sion_read_data_from_chunks_inline(_sion_filedesc *sion_filedesc, void *data, sion_int64 bytes_to_read) {
844  sion_int64 btord, bytesread, offset;
845  size_t rc=0, frc;
846 
847 
848  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
849 
850  /* loop to read data in current and following chunks */
851  offset=0;
852  while(bytes_to_read>0) {
853 
854  /* determine size of data which could read from current chunk */
855  bytesread = sion_filedesc->currentpos - (sion_filedesc->startpos + sion_filedesc->currentblocknr * sion_filedesc->globalskip);
856  DPRINTFP((2, DFUNCTION, -1, "bytesread=%ld, curpos=%ld,startpos=%ld,curblock=%ld,gskip=%ld\n",
857  (long) bytesread, (long) sion_filedesc->currentpos, (long) sion_filedesc->startpos,
858  (long) sion_filedesc->currentblocknr, (long) sion_filedesc->globalskip ));
859 
860  btord=bytes_to_read;
861  if ((bytesread + btord) > sion_filedesc->blocksizes[sion_filedesc->currentblocknr]) {
862  btord = sion_filedesc->blocksizes[sion_filedesc->currentblocknr] - bytesread;
863  DPRINTFP((2, DFUNCTION, -1, "block is split to multiple chunks, bytes_to_read=%ld btord=%ld\n",(long) bytes_to_read, (long) btord ));
864  } else {
865  DPRINTFP((2, DFUNCTION, -1, "block in one chunk, bytes_to_read=%ld btord=%ld\n",(long) bytes_to_read, (long) btord ));
866  }
867 
868  /* read part or rest of data */
869  frc = _sion_file_read(((char *)data+offset), btord, sion_filedesc->fileptr);
870 
871  /* check return code */
872  if(frc != btord) {
873  return(_sion_errorprint_on_rank(-1,_SION_ERROR_RETURN,sion_filedesc->rank,
874  "could not read data (%d bytes) to file (frc=%d sid=%d) ...", (int) btord, (int) frc, sion_filedesc->sid));
875  }
876 
877  /* increase current position and update other counters */
878  sion_filedesc->currentpos+=btord;
879  bytesread +=btord;
880  bytes_to_read -=btord;
881  offset +=btord;
882 
883  /* check if all data has already be processed, otherwise forward to next chunk */
884  if(bytes_to_read>0) {
885 
886  if(! _sion_move_to_next_chunk(sion_filedesc)) {
887  return(_sion_errorprint_on_rank(-1,_SION_ERROR_RETURN,sion_filedesc->rank,
888  "could not read data (%d bytes) to file (end of file reached, frc=%d sid=%d) ...", (int) btord, (int) frc, sion_filedesc->sid));
889  }
890 
891  }
892  }
893 
894  rc=offset;
895  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
896  return(rc);
897 }
898 #undef DFUNCTION
899 
900 /* dup internal data structures from first to secon sion_filedesc */
901 #define DFUNCTION "_sion_keyval_dup_dataptr_inline"
902 int _sion_keyval_dup_dataptr_inline(_sion_filedesc *sion_filedesc, _sion_filedesc *new_filedesc) {
903  int rc=0;
904  _sion_keyvalue_keymngr* keymngr_orig;
905  _sion_keyvalue_keymngr* keymngr;
906 
907  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
908 
909  if(sion_filedesc->keyvalptr!=NULL) {
910 
911  /* get orig keymngr */
912  keymngr_orig = (_sion_keyvalue_keymngr *) sion_filedesc->keyvalptr;
913 
914  keymngr =_sion_keyvalue_keymngr_dup(keymngr_orig,new_filedesc->dup_mode, new_filedesc->dup_sel_key);
915  if (keymngr == NULL) {
916  return(_sion_errorprint_on_rank(-1,_SION_ERROR_RETURN,sion_filedesc->rank,"dup: could not duplicate keymngr ..."));
917  }
918  new_filedesc->keyvalptr=keymngr;
919 
920  } else {
921 
922  new_filedesc->keyvalptr=NULL;
923 
924  }
925 
926  DPRINTFP((2, DFUNCTION, -1, "leave\n"));
927  return (rc);
928 }
929 #undef DFUNCTION
930 
sion_int64 _sion_file_write(const void *data, sion_int64 bytes, _sion_fileptr *sion_fileptr)
Write data to file.
Definition: sion_file.c:147
int _sion_flush_block(_sion_filedesc *sion_filedesc)
Update the internal data structure.
sion_int64 _sion_file_set_position(_sion_fileptr *sion_fileptr, sion_int64 startpointer)
Set new position in file.
Definition: sion_file.c:238
Sion File Descriptor Structure.
Definition: sion_filedesc.h:77
int _sion_file_purge(_sion_fileptr *sion_fileptr)
Purge data to file.
Definition: sion_file.c:322
sion_int64 _sion_file_read(void *data, sion_int64 bytes, _sion_fileptr *sion_fileptr)
Read data from file.
Definition: sion_file.c:168
sion_int64 * blocksizes
Definition: sion_filedesc.h:98
sion_int32 currentblocknr
Definition: sion_filedesc.h:95
int _sion_create_new_block(_sion_filedesc *sion_filedesc)
Create a new block for the internal data structure.
Sion Time Stamp Header.
void sion_swap(void *target, void *source, int size, int n, int aflag)
Definition: sion_convert.c:36
_sion_fileptr * fileptr
Definition: sion_filedesc.h:80