SIONlib  1.7.1
Scalable I/O library for parallel access to task-local files
sion_keyvalue_inline.c
Go to the documentation of this file.
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2016 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
14 #include <stdlib.h>
15 #include <stdio.h>
16 #include <string.h>
17 #include <time.h>
18 #include <assert.h>
19 
20 #include <sys/types.h>
21 #include <sys/stat.h>
22 #include <fcntl.h>
23 #include <unistd.h>
24 
25 
26 #include "sion.h"
27 #include "sion_debug.h"
28 #include "sion_error_handler.h"
29 #include "sion_filedesc.h"
30 #include "sion_tools.h"
31 #include "sion_fd.h"
32 #include "sion_file.h"
33 #include "sion_metadata.h"
34 #include "sion_internal.h"
35 #include "sion_internal_seek.h"
36 #include "sion_printts.h"
37 #include "sion_keyvalue.h"
38 #include "sion_keyvalue_inline.h"
39 #include "sion_keyvalue_keymngr.h"
40 
41 #define TABLE_SIZE 127
42 
43 #define SEARCH_TO_KEY 0
44 #define SEARCH_TO_NEXT 1
45 #define SEARCH_TO_END 2
46 
47 /* Utility functions */
48 _sion_keyvalue_keymngr * _sion_get_or_init_key_info(_sion_filedesc *sion_filedesc);
49 int _sion_move_to_pos(_sion_filedesc *sion_filedesc, size_t pos);
50 int _sion_scan_forward_to_key(_sion_filedesc *sion_filedesc, uint64_t key, int search_mode);
51 int _sion_move_to_next_chunk(_sion_filedesc *sion_filedesc);
52 size_t _sion_write_data_to_chunks_inline(_sion_filedesc *sion_filedesc, const void *data, sion_int64 bytes_to_write);
53 sion_int64 _sion_compute_next_position_inline(_sion_filedesc *sion_filedesc, sion_int64 bytes_to_read);
54 size_t _sion_skip_data_from_chunks_inline(_sion_filedesc *sion_filedesc, sion_int64 bytes_to_read);
55 size_t _sion_read_data_from_chunks_inline(_sion_filedesc *sion_filedesc, void *data, sion_int64 bytes_to_read);
56 
57 
58 /* ***************** */
59 /* Write functions */
60 /* ***************** */
61 
62 
63 
64 #define DFUNCTION "_sion_store_and_write_key_and_len_inline"
65 size_t _sion_store_and_write_key_and_len_inline(_sion_filedesc *sion_filedesc, uint64_t key, size_t len) {
66  sion_int64 bytes_to_write;
67  sion_int64 buffer[2];
68  size_t rc=SION_SUCCESS, frc;
69 
70 
71  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
72 
73  /* TODO: need to check if current_pos is set correctly (in case of sion_seeks in between, currently not allowed) ... */
74 
75  /* assemble data */
76  buffer[0]=(sion_int64) key;
77  buffer[1]=(sion_int64) len;
78  bytes_to_write=2*sizeof(sion_int64);
79 
80  frc=_sion_write_data_to_chunks_inline(sion_filedesc, (const void *) buffer, (sion_int64) bytes_to_write);
81 
82  /* check return code */
83  if(frc != bytes_to_write) {
84  return(_sion_errorprint_on_rank(-1,_SION_ERROR_RETURN,sion_filedesc->rank,
85  "could not write data (%d bytes) to file (frc=%d sid=%d) ...", (int) bytes_to_write, (int) frc, sion_filedesc->sid));
86  }
87 
88  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
89  return(rc);
90 }
91 #undef DFUNCTION
92 
93 #define DFUNCTION "_sion_write_value_inline"
94 size_t _sion_write_value_inline(_sion_filedesc *sion_filedesc, const void *data, uint64_t key, size_t len) {
95  size_t rc=0, frc;
96  sion_int64 bytes_to_write;
97  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
98 
99 
100  bytes_to_write=len;
101  frc=_sion_write_data_to_chunks_inline(sion_filedesc, data, (sion_int64) bytes_to_write);
102  if(frc != bytes_to_write) {
103  return(_sion_errorprint_on_rank(-1,_SION_ERROR_RETURN,sion_filedesc->rank,
104  "could not write data (%d bytes) to file (frc=%d sid=%d) ...", (int) bytes_to_write, (int) frc, sion_filedesc->sid));
105  }
106 
107  rc=(size_t) bytes_to_write;
108 
109  DPRINTFP((2, DFUNCTION, -1, "leave\n"));
110  return (rc);
111 }
112 #undef DFUNCTION
113 
114 
115 /* ***************** */
116 /* Read functions */
117 /* ***************** */
118 
119 #define DFUNCTION "_sion_find_and_read_key_and_len_inline"
120 int _sion_find_and_read_key_and_len_inline(_sion_filedesc *sion_filedesc, uint64_t key, size_t len, size_t *datalen) {
121  _sion_keyvalue_keymngr *keymngr;
122  int rc = 0;
123  int position_found=0;
124  sion_table_key_t mkey;
125  size_t key_current_pos = (size_t) -1, key_bytes_left = (size_t) -1;
126 
127  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
128 
129  /* get or init internal data structure if first call */
130  keymngr=_sion_get_or_init_key_info(sion_filedesc);
131 
132  /* map key to table type */
133  mkey = key;
134 
135  *datalen=0;
136 
137  /* indicator if data could read, key_current_pos, key_bytes_left are set correctly */
138  position_found=0;
139 
140  /* check if info about key is available */
141  rc=_sion_keyvalue_keymngr_lookup(keymngr, mkey, &key_current_pos, &key_bytes_left);
142  if(rc!=SION_SUCCESS) {
143  DPRINTFP((2, DFUNCTION, -1, "key %ld is not found in keymngr, scan forward\n", (long)mkey));
144 
145  /* scan forward until key is found */
146  rc=_sion_scan_forward_to_key(sion_filedesc, key, SEARCH_TO_KEY);
147 
148  /* check again if info about key is now available */
149  if(rc==SION_SUCCESS) {
150  DPRINTFP((2, DFUNCTION, -1, "scan forward found key %ld, get info \n", (long)mkey));
151  rc=_sion_keyvalue_keymngr_lookup(keymngr, mkey, &key_current_pos, &key_bytes_left);
152  DPRINTFP((2, DFUNCTION, -1, "key %ld, get info(2): pos=%ld bytes_left=%ld\n", (long)mkey, (long) key_current_pos, (long) key_bytes_left ));
153  }
154  } else {
155  DPRINTFP((2, DFUNCTION, -1, "key %ld, get info(1): pos=%ld bytes_left=%ld\n", (long)mkey, (long) key_current_pos, (long) key_bytes_left ));
156  }
157  if(rc==SION_SUCCESS) {
158  /* check if enough data is available in block */
159  position_found=1;
160  if(len<=key_bytes_left) {
161  *datalen=len;
162  } else {
163  /* only a part of requested data can be read */
164  *datalen=key_bytes_left;
165  }
166  } else {
167  /* not more data in file for key */
168  rc=SION_NOT_SUCCESS;
169  DPRINTFP((2, DFUNCTION, -1, "no more data for key %ld\n", (long)mkey ));
170  }
171 
172  /* check current position and move if necessary */
173  DPRINTFP((2, DFUNCTION, -1, "check position: current_pos %ld, key_current_pos=%ld len=%ld key_bytes_left=%ld\n",
174  (long) sion_filedesc->currentpos, (long) key_current_pos, (long) len, (long) key_bytes_left ));
175  if(position_found) {
176  if(key_current_pos!=sion_filedesc->currentpos) {
177  rc=_sion_move_to_pos(sion_filedesc, key_current_pos);
178  }
179  /* ready to read data */
180  }
181 
182  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n", rc));
183  return rc;
184 }
185 #undef DFUNCTION
186 
187 #define DFUNCTION "_sion_read_value_inline"
188 size_t _sion_read_value_inline(_sion_filedesc *sion_filedesc, void *data, uint64_t key, size_t len) {
189  size_t rc=0;
190  _sion_keyvalue_keymngr *keymngr;
191  sion_table_key_t mkey;
192  sion_int64 bread;
193  sion_int64 bytes_to_read;
194  char *out;
195 
196  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
197 
198  /* get or init internal data structure if first call */
199  keymngr=_sion_get_or_init_key_info(sion_filedesc);
200  mkey = (sion_table_key_t) key;
201 
202  /* there are enough bytes available, checked during read of key,len */
203 
204  out = (char*) data;
205 
206  /* read data */
207  bytes_to_read=len;
208 
209  bread=_sion_read_data_from_chunks_inline(sion_filedesc, (void *) out, (sion_int64) bytes_to_read);
210  if (bread != bytes_to_read) {
211  return(_sion_errorprint_on_rank(SION_NOT_SUCCESS,_SION_ERROR_RETURN,sion_filedesc->rank,
212  "could not read data (%d bytes) from file ...", (int) bytes_to_read));
213  }
214 
215  DPRINTFP((2, DFUNCTION, -1, "first value = %3hhu | %c\n", out[0], out[0]));
216 
217  /* update meta data of current block */
218  _sion_keyvalue_keymngr_update_read_pos(keymngr, mkey, (size_t) bread, (sion_int64) sion_filedesc->currentpos);
219 
220  rc = (size_t) bread;
221 
222  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",(int) rc));
223  return (rc);
224 }
225 #undef DFUNCTION
226 
227 
228 #define DFUNCTION "_sion_key_full_scan_inline"
229 int _sion_key_full_scan_inline(_sion_filedesc *sion_filedesc) {
230  int rc=SION_NOT_SUCCESS;
231  _sion_keyvalue_keymngr* keymngr;
232  sion_table_key_t key=0;
233 
234  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
235 
236  /* get or init internal data structure if first call */
237  keymngr=_sion_get_or_init_key_info(sion_filedesc);
238 
239  if(!_sion_keyvalue_keymngr_is_scan_done(keymngr)) {
240  rc=_sion_scan_forward_to_key(sion_filedesc, key, SEARCH_TO_END);
241  } else {
242  /* scan already done */
243  rc=SION_SUCCESS;
244  }
245 
246  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
247  return rc;
248 }
249 #undef DFUNCTION
250 
251 
252 #define DFUNCTION "_sion_iterator_reset_inline"
253 int _sion_iterator_reset_inline(_sion_filedesc *sion_filedesc) {
254  int rc=0;
255  _sion_keyvalue_keymngr* keymngr;
256 
257  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
258 
259  /* get or init internal data structure if first call */
260  keymngr=_sion_get_or_init_key_info(sion_filedesc);
261 
262  rc=_sion_keyvalue_keymngr_iterator_reset(keymngr);
263 
264  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
265  return rc;
266 }
267 #undef DFUNCTION
268 
269 #define DFUNCTION "_sion_iterator_next_inline"
270 int _sion_iterator_next_inline(_sion_filedesc *sion_filedesc, uint64_t *keyptr, size_t *sizeptr) {
271  int rc=SION_NOT_SUCCESS;
272  _sion_keyvalue_keymngr* keymngr;
273  sion_table_key_t key=0;
274  size_t current_pos, offset, len;
275 
276  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
277 
278  /* get or init internal data structure if first call */
279  keymngr=_sion_get_or_init_key_info(sion_filedesc);
280 
281  if(_sion_keyvalue_keymngr_iterator_next(keymngr, &key, &current_pos, &offset, &len)==SION_SUCCESS) {
282  /* next block found */
283  rc=SION_SUCCESS;
284  } else {
285  /* scan forward if not at end of file to get more meta data */
286  if (_sion_scan_forward_to_key(sion_filedesc, key, SEARCH_TO_NEXT)==SION_SUCCESS) {
287  /* get info about block */
288  if(_sion_keyvalue_keymngr_iterator_next(keymngr, &key, &current_pos, &offset, &len)==SION_SUCCESS) {
289  rc=SION_SUCCESS;
290  } else {
291  return(_sion_errorprint_on_rank(SION_NOT_SUCCESS,_SION_ERROR_RETURN, sion_filedesc->rank,
292  "internal error: block could not be find at end of block list ..."));
293  }
294  } else {
295  /* end of file reached and all blocks are already iterated */
296  rc=SION_NOT_SUCCESS;
297  }
298  }
299 
300  /* set output parameter */
301  if(rc==SION_SUCCESS) {
302  *keyptr = (uint64_t) key;
303  *sizeptr = (size_t) len;
304  }
305 
306  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
307  return rc;
308 }
309 #undef DFUNCTION
310 
311 #define DFUNCTION "_sion_seek_key_inline"
312 int _sion_seek_key_inline(_sion_filedesc *sion_filedesc, uint64_t key, int blocknum, sion_int64 posinblock) {
313  int rc=SION_NOT_SUCCESS;
314  _sion_keyvalue_keymngr* keymngr;
315  size_t offset, len;
316 
317  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
318 
319  /* get or init internal data structure if first call */
320  keymngr=_sion_get_or_init_key_info(sion_filedesc);
321 
322  /* lookup if a key entry at that position is already in keymngr */
323  rc=_sion_keyvalue_keymngr_lookup_and_set_pos(keymngr, key, blocknum, posinblock, &offset, &len);
324  DPRINTFP((2, DFUNCTION, -1, "after first lookup rc=%d offset=%d len=%d\n",rc, (int)offset, (int) len ));
325 
326  while( (rc!=SION_SUCCESS) && !_sion_keyvalue_keymngr_is_scan_done(keymngr) ) {
327 
328  /* scan forward to next entry with that key */
329  DPRINTFP((2, DFUNCTION, -1, "scan forward to find key %d\n",(int) key));
330  if (_sion_scan_forward_to_key(sion_filedesc, key, SEARCH_TO_NEXT)==SION_SUCCESS) {
331 
332  /* lookup again if a key entry at that position is already in keymngr */
333  rc=_sion_keyvalue_keymngr_lookup_and_set_pos(keymngr, key, blocknum, posinblock, &offset, &len);
334  DPRINTFP((2, DFUNCTION, -1, "in loop lookup rc=%d offset=%d len=%d\n",rc, (int)offset, (int) len ));
335  } else {
336  rc=SION_NOT_SUCCESS;
337  }
338  }
339 
340  /* move to scanpos if necessary */
341  if( rc==SION_SUCCESS ) {
342  DPRINTFP((2, DFUNCTION, -1, "move from %ld to new scanpos (%ld) \n",(long) sion_filedesc->currentpos,(long) offset));
343  rc=_sion_move_to_pos(sion_filedesc, offset);
344  }
345 
346  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
347  return rc;
348 }
349 #undef DFUNCTION
350 
351 #define DFUNCTION "_sion_key_list_iterator_reset_inline"
352 int _sion_key_list_iterator_reset_inline(_sion_filedesc *sion_filedesc) {
353  int rc=0;
354  _sion_keyvalue_keymngr* keymngr;
355 
356  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
357 
358  /* get or init internal data structure if first call */
359  keymngr=_sion_get_or_init_key_info(sion_filedesc);
360 
361  rc=_sion_keyvalue_keymngr_key_list_iterator_reset(keymngr);
362 
363  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
364  return rc;
365 }
366 #undef DFUNCTION
367 
368 #define DFUNCTION "_sion_iterator_next_inline"
369 int _sion_key_list_iterator_next_inline(_sion_filedesc *sion_filedesc, uint64_t *keyptr) {
370  int rc=SION_NOT_SUCCESS;
371  _sion_keyvalue_keymngr* keymngr;
372  sion_table_key_t key=0;
373 
374  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
375 
376  /* get or init internal data structure if first call */
377  keymngr=_sion_get_or_init_key_info(sion_filedesc);
378 
379  if(_sion_keyvalue_keymngr_key_list_iterator_next(keymngr, &key)==SION_SUCCESS) {
380  *keyptr = (uint64_t) key;
381  rc=SION_SUCCESS;
382  } else {
383  *keyptr = 0;
384  rc=SION_NOT_SUCCESS;
385  }
386 
387  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d key=%ld\n",rc,(long) key));
388  return rc;
389 }
390 #undef DFUNCTION
391 
392 /* fills a stat-data structure containing information about the key */
393 #define DFUNCTION "_sion_key_get_stat_inline"
394 int _sion_key_get_stat_inline(_sion_filedesc *sion_filedesc, uint64_t searchkey, sion_key_stat_t *keystat) {
395  int rc=SION_NOT_SUCCESS;
396  _sion_keyvalue_keymngr* keymngr;
397  sion_table_key_t key=0;
398 
399  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
400 
401  /* get or init internal data structure if first call */
402  keymngr=_sion_get_or_init_key_info(sion_filedesc);
403 
404  key= (sion_table_key_t) searchkey;
405 
406  if(_sion_keyvalue_keymngr_key_get_stat(keymngr, key, keystat)==SION_SUCCESS) {
407  rc=SION_SUCCESS;
408  } else {
409  rc=SION_NOT_SUCCESS;
410  }
411 
412  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d key=%ld\n",rc,(long) key));
413  return rc;
414 }
415 #undef DFUNCTION
416 
417 /* ***************** */
418 /* Utility functions */
419 /* ***************** */
420 
421 #define DFUNCTION "_sion_scan_forward_to_key"
422 int _sion_scan_forward_to_key(_sion_filedesc *sion_filedesc, uint64_t key, int search_mode) {
423  int rc = SION_SUCCESS;
424  _sion_keyvalue_keymngr *keymngr;
425  sion_table_key_t mkey;
426 
427  size_t scanpos;
428  int key_found;
429  size_t bytes_left, lastposinblock, lastposinfile;
430  uint64_t key_and_len[2];
431 
432  size_t bread, bskip, len;
433 
434  DPRINTFP((2, DFUNCTION, -1, "enter key=%ld\n",(long) key));
435 
436  /* get keymngr */
437  keymngr = (_sion_keyvalue_keymngr *) sion_filedesc->keyvalptr;
438 
439  key_found=0;
440 
441  /* get position from where scan has to be started */
442  if(_sion_keyvalue_keymngr_get_next_scan_pos(keymngr, &scanpos) != SION_SUCCESS) {
443  /* can only be at start time, start then from current position in file */
444  scanpos=(size_t) sion_filedesc->currentpos;
445  DPRINTFP((2, DFUNCTION, -1, "set scanpos to first pos in file pos=%ld\n",(long) scanpos));
446  }
447  /* check if file is already completely scanned */
448  lastposinfile=sion_filedesc->startpos + sion_filedesc->lastchunknr * sion_filedesc->globalskip
449  + sion_filedesc->blocksizes[sion_filedesc->lastchunknr];
450 
451  DPRINTFP((2, DFUNCTION, -1, "check if not already scanned to endoffile scanpos=%ld lastposinfile=%ld lastchunknr=%d\n",(long) scanpos,(long) lastposinfile, (int) sion_filedesc->lastchunknr ));
452  if(scanpos>=lastposinfile) {
453  DPRINTFP((2, DFUNCTION, -1, "scan is already at end of file (%ld>=%ld) \n",(long) scanpos, (long) lastposinfile));
454  _sion_keyvalue_keymngr_set_scan_done(keymngr);
455  rc=SION_NOT_SUCCESS;
456  }
457 
458  /* move to scanpos if necessary */
459  if( (sion_filedesc->currentpos != scanpos) && (!_sion_keyvalue_keymngr_is_scan_done(keymngr)) ) {
460  DPRINTFP((2, DFUNCTION, -1, "move from %ld to new scanpos (%ld) \n",(long) sion_filedesc->currentpos,(long) scanpos));
461  rc=_sion_move_to_pos(sion_filedesc, scanpos);
462  }
463 
464  /* search forward */
465  while ( (!_sion_keyvalue_keymngr_is_scan_done(keymngr)) && (!key_found) ) {
466 
467  lastposinblock = sion_filedesc->startpos + sion_filedesc->currentblocknr * sion_filedesc->globalskip
468  + sion_filedesc->blocksizes[sion_filedesc->currentblocknr];
469  bytes_left = lastposinblock - sion_filedesc->currentpos;
470  DPRINTFP((2, DFUNCTION, -1, "search loop: lastposinblock=%ld bytes_left=%ld currentpos=%ld\n",(long) lastposinblock,(long) bytes_left,(long) sion_filedesc->currentpos));
471 
472 
473  /* is there data in current chunk to start reading next block? */
474  if ( bytes_left > 0 ) {
475  /* read next block from current and following chunks */
476  bread=_sion_read_data_from_chunks_inline(sion_filedesc, (void *) key_and_len, (sion_int64) sizeof(key_and_len));
477  if (bread != sizeof(key_and_len)) {
478  return(_sion_errorprint_on_rank(SION_NOT_SUCCESS,_SION_ERROR_RETURN,sion_filedesc->rank,
479  "could not read data (%d bytes) from file ...", (int) sizeof(key_and_len)));
480  }
481  sion_swap(key_and_len, key_and_len, sizeof(sion_int64), 2, sion_filedesc->swapbytes);
482 
483 
484  DPRINTFP((2, DFUNCTION, -1, "search loop: found next key,len (%ld,%ld)\n",(long) key_and_len[0],(long) key_and_len[1]));
485  bytes_left-=bread;
486 
487  DPRINTFP((2, DFUNCTION, -1, "search loop: position is now %ld bytes_left=%ld\n",(long) sion_filedesc->currentpos, (long) bytes_left));
488 
489  /* check if filepointer has to moved to next block to be in front of data */
490  if(bytes_left==0) {
491  DPRINTFP((2, DFUNCTION, -1, "search loop: data starts in next chunk, moving forward\n"));
492  if(! _sion_move_to_next_chunk(sion_filedesc)) {
493  return(_sion_errorprint_on_rank(SION_NOT_SUCCESS,_SION_ERROR_RETURN,sion_filedesc->rank,
494  "could not move to data section in next block ..."));
495  }
496  lastposinblock = sion_filedesc->startpos + sion_filedesc->currentblocknr * sion_filedesc->globalskip
497  + sion_filedesc->blocksizes[sion_filedesc->currentblocknr];
498  bytes_left = lastposinblock - sion_filedesc->currentpos;
499  DPRINTFP((2, DFUNCTION, -1, "search loop: position is moved to %ld bytes_left=%ld\n",(long) sion_filedesc->currentpos, (long) bytes_left));
500  }
501 
502  /* store new block info */
503  mkey=key_and_len[0];
504  len =key_and_len[1];
505  rc=_sion_keyvalue_keymngr_add_block(keymngr, mkey, (size_t) sion_filedesc->currentpos, len);
506  DPRINTFP((2, DFUNCTION, -1, "search loop: stored new block key=%ld pos=%ld len=%ld\n",(long) mkey, (long) sion_filedesc->currentpos, (long) len));
507 
508  /* check key */
509  if (
510  ( ( (search_mode==SEARCH_TO_KEY) && (key_and_len[0] == key) ) || (search_mode==SEARCH_TO_NEXT) )
511  && ( !(search_mode==SEARCH_TO_END) )
512  ) {
513  DPRINTFP((2, DFUNCTION, -1, "search loop: found searched key=%ld, leaving\n",(long) key));
514  rc=SION_SUCCESS;
515  key_found=1; /* end of loop */
516  scanpos=_sion_compute_next_position_inline(sion_filedesc, len); /* next search behind data of that block */
517  } else {
518  DPRINTFP((2, DFUNCTION, -1, "search loop: found another key=%ld <=> %ld, continue\n",(long) key_and_len[0], (long) key));
519 
520  /* move forward behind data of block in the same or next chunk */
521  DPRINTFP((2, DFUNCTION, -1, "search loop: found move position forward by %ld bytes\n",(long) len));
522  bskip=_sion_skip_data_from_chunks_inline(sion_filedesc, (sion_int64) len);
523 
524  if ( bskip == len ) {
525 
526  /* recompute bytes_left in current block */
527  lastposinblock = sion_filedesc->startpos + sion_filedesc->currentblocknr * sion_filedesc->globalskip
528  + sion_filedesc->blocksizes[sion_filedesc->currentblocknr];
529  bytes_left = lastposinblock - sion_filedesc->currentpos;
530  scanpos=sion_filedesc->currentpos; /* next search behind data */
531 
532  } else {
533  return(_sion_errorprint_on_rank(SION_NOT_SUCCESS,_SION_ERROR_RETURN,sion_filedesc->rank,
534  "could not skip data section of one block (%d bytes) from file ...", (int) len));
535  }
536 
537  /* check if filepointer has to moved to next block */
538  if(bytes_left==0) {
539  DPRINTFP((2, DFUNCTION, -1, "search loop: chunk empty, move to next chunk\n"));
540  if(! _sion_move_to_next_chunk(sion_filedesc)) {
541  _sion_keyvalue_keymngr_set_scan_done(keymngr); /* end of loop */
542  if(search_mode!=SEARCH_TO_END) {
543  rc=SION_NOT_SUCCESS;
544  } else {
545  rc=SION_SUCCESS; /* end of data reached */
546  }
547  }
548  }
549  }
550 
551  } else {
552  /* not enough byte in block to read next key+len */
553  if(bytes_left==0) {
554  if(! _sion_move_to_next_chunk(sion_filedesc)) {
555  _sion_keyvalue_keymngr_set_scan_done(keymngr); /* end of loop */
556  rc=SION_NOT_SUCCESS;
557  }
558  } else {
559  return(_sion_errorprint_on_rank(SION_NOT_SUCCESS,_SION_ERROR_RETURN,sion_filedesc->rank,
560  "unknown data at end of chunk (%d bytes) ...", (int) bytes_left));
561  }
562  }
563 
564  } /* while */
565 
566  /* update scanpos */
567  if(_sion_keyvalue_keymngr_set_next_scan_pos(keymngr, scanpos) != SION_SUCCESS) {
568  return(_sion_errorprint_on_rank(SION_NOT_SUCCESS,_SION_ERROR_RETURN,sion_filedesc->rank,
569  "internal error set seekpos ..."));
570  }
571 
572  DPRINTFP((2, DFUNCTION, -1, "leave\n"));
573  return (rc);
574 }
575 #undef DFUNCTION
576 
577 
578 #define DFUNCTION "_sion_get_or_init_key_info"
579 _sion_keyvalue_keymngr * _sion_get_or_init_key_info(_sion_filedesc *sion_filedesc) {
580 
581  _sion_keyvalue_keymngr* keymngr;
582 
583  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
584 
585  if (sion_filedesc->keyvalptr == NULL) {
586  DPRINTFP((2, DFUNCTION, -1, "sion_filedesc->keyvalptr == NULL\n"));
587  keymngr =_sion_keyvalue_keymngr_init(TABLE_SIZE);
588  if (keymngr == NULL) {
589  _sion_errorprint_on_rank(-1,_SION_ERROR_RETURN,sion_filedesc->rank,"could not allocate keymngr ...");
590  }
591  sion_filedesc->keyvalptr=keymngr;
592  DPRINTFP((2, DFUNCTION, -1, "alloc now KEYVALPTR = %x\n",sion_filedesc->keyvalptr));
593  }
594 
595  DPRINTFP((2, DFUNCTION, -1, "leave\n"));
596  return sion_filedesc->keyvalptr;
597 }
598 #undef DFUNCTION
599 
600 #define DFUNCTION "_sion_move_to_next_chunk"
601 int _sion_move_to_next_chunk(_sion_filedesc *sion_filedesc) {
602  int rc = SION_SUCCESS;
603 
604  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
605 
606  /* is another chink available */
607  if (sion_filedesc->currentblocknr < sion_filedesc->lastchunknr) {
608  /* move to next chunk */
609  sion_filedesc->currentblocknr++;
610  sion_filedesc->currentpos = sion_filedesc->startpos + sion_filedesc->currentblocknr * sion_filedesc->globalskip;
611  _sion_file_purge(sion_filedesc->fileptr);
612  _sion_file_set_position(sion_filedesc->fileptr, sion_filedesc->currentpos);
613  } else {
614  rc=SION_NOT_SUCCESS;
615  }
616 
617  DPRINTFP((2, DFUNCTION, -1, "leave, rc = %d\n", rc));
618  return rc;
619 }
620 #undef DFUNCTION
621 
622 #define DFUNCTION "_sion_move_to_pos"
623 int _sion_move_to_pos(_sion_filedesc *sion_filedesc, size_t pos) {
624  int rc = SION_SUCCESS;
625  size_t block_min_pos, block_max_pos;
626  int c, pos_found=0;
627 
628  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
629 
630  block_min_pos = sion_filedesc->startpos + sion_filedesc->currentblocknr * sion_filedesc->globalskip;
631  block_max_pos = block_min_pos+sion_filedesc->blocksizes[sion_filedesc->currentblocknr]; /* position of first byte behind chunk */
632 
633  DPRINTFP((2, DFUNCTION, -1, "current_chunk: %ld to %ld (pos=%ld)\n", (long) block_min_pos, (long) block_max_pos, (long) pos));
634 
635  if ( (pos>=block_min_pos) && (pos<block_max_pos) ) {
636  /* stay in the same chunk */
637  pos_found=1;
638  sion_filedesc->currentpos=pos;
639  _sion_file_purge(sion_filedesc->fileptr);
640  _sion_file_set_position(sion_filedesc->fileptr, sion_filedesc->currentpos);
641  DPRINTFP((2, DFUNCTION, -1, "stay in current chunk: currentpos=%ld\n", (long) sion_filedesc->currentpos));
642 
643  } else {
644  /* scan from beginning over all chunks */
645 
646  for(c=0;c<=sion_filedesc->lastchunknr;c++) {
647  block_min_pos = sion_filedesc->startpos + c * sion_filedesc->globalskip;
648  block_max_pos = block_min_pos+sion_filedesc->blocksizes[c]; /* position of first byte behind chunk */
649 
650  DPRINTFP((2, DFUNCTION, -1, "check chunk%2d: %ld to %ld (pos=%ld)\n", c, (long) block_min_pos, (long) block_max_pos, (long) pos));
651 
652  if ( (pos>=block_min_pos) && (pos<block_max_pos) ) {
653 
654  /* stay in this chunk */
655  pos_found=1;
656  sion_filedesc->currentblocknr=c;
657  sion_filedesc->currentpos=pos;
658  _sion_file_purge(sion_filedesc->fileptr);
659  _sion_file_set_position(sion_filedesc->fileptr, sion_filedesc->currentpos);
660  DPRINTFP((2, DFUNCTION, -1, "stay in this chunk: currentpos=%ld\n", (long) sion_filedesc->currentpos));
661  break;
662 
663  }
664  }
665  }
666 
667  if(!pos_found) {
668  rc=SION_NOT_SUCCESS;
669  }
670 
671  DPRINTFP((2, DFUNCTION, -1, "leave, rc = %d\n", rc));
672  return rc;
673 }
674 #undef DFUNCTION
675 
676 /* write data block to current and following chunks, data could be distributed over several chunks */
677 #define DFUNCTION "_sion_write_data_to_chunks_inline"
678 size_t _sion_write_data_to_chunks_inline(_sion_filedesc *sion_filedesc, const void *data, sion_int64 bytes_to_write) {
679  sion_int64 btowr, byteswritten, offset;
680  size_t rc=0, frc;
681 
682 
683  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
684 
685  /* loop to write data in current and following chunks */
686  offset=0;
687  while(bytes_to_write>0) {
688 
689  _sion_flush_block(sion_filedesc);
690 
691  /* determine size of data which could written into current chunk */
692  byteswritten = sion_filedesc->blocksizes[sion_filedesc->currentblocknr];
693  btowr=bytes_to_write;
694  if ((byteswritten + btowr) > sion_filedesc->chunksize) btowr = sion_filedesc->chunksize - byteswritten;
695  DPRINTFP((2, DFUNCTION, -1, " bytes_to_write=%ld btowr=%ld chunksize=%ld byteswritten=%ld currentblocknr=%d blsize[%d]=%ld\n",
696  (long) bytes_to_write, (long) btowr, (long) sion_filedesc->chunksize, (long) byteswritten, (int) sion_filedesc->currentblocknr,
697  sion_filedesc->currentblocknr, (long) sion_filedesc->blocksizes[sion_filedesc->currentblocknr] ));
698 
699  /* write part or rest of data */
700  frc = _sion_file_write(((char *) data+offset), btowr, sion_filedesc->fileptr);
701 
702  /* check return code */
703  if(frc != btowr) {
704  return(_sion_errorprint_on_rank(-1,_SION_ERROR_RETURN,sion_filedesc->rank,
705  "could not write data (%d bytes) to file (frc=%d sid=%d) ...", (int) btowr, (int) frc, sion_filedesc->sid));
706  }
707 
708  /* increase current position and update other counters */
709  sion_filedesc->currentpos+=btowr;
710  bytes_to_write -=btowr;
711  offset +=btowr;
712 
713  /* check if all data has already be processed */
714  if(bytes_to_write>0) {
715 
716  /* create new block for writing next data */
717  _sion_flush_block(sion_filedesc);
718  _sion_create_new_block(sion_filedesc);
719  }
720  }
721  rc=offset;
722  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
723  return(rc);
724 }
725 #undef DFUNCTION
726 
727 /* skip data block from current and following chunks, data could be distributed over several chunks */
728 #define DFUNCTION "_sion_compute_next_position_inline"
729 sion_int64 _sion_compute_next_position_inline(_sion_filedesc *sion_filedesc, sion_int64 bytes_to_read) {
730  sion_int64 btord = 0, bytesread = 0, offset = 0;
731  int blocknr = 0;
732 
733 
734  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
735 
736  /* check data in current block */
737  blocknr = sion_filedesc->currentblocknr;
738  bytesread = sion_filedesc->currentpos - (sion_filedesc->startpos + sion_filedesc->currentblocknr * sion_filedesc->globalskip);
739 
740  DPRINTFP((2, DFUNCTION, -1, " currentpos=%ld bytesread=%ld blocknr=%d bytes_to_read=%ld\n",
741  (long) sion_filedesc->currentpos, (long) bytesread, (int) blocknr, (long) bytes_to_read ));
742 
743  DPRINTFP((2, DFUNCTION, -1, " blocksizes[%d]=%ld\n",
744  (int) sion_filedesc->currentblocknr, (long) sion_filedesc->blocksizes[sion_filedesc->currentblocknr] ));
745 
746  /* data is all in current block */
747  if ((bytesread + bytes_to_read) <= sion_filedesc->blocksizes[sion_filedesc->currentblocknr]) {
748  offset=sion_filedesc->currentpos+bytes_to_read;
749  } else {
750  btord = sion_filedesc->blocksizes[sion_filedesc->currentblocknr] - bytesread; /* size of data in current chunk */
751  bytes_to_read-=btord; /* rest of data in next chunks */
752 
753  DPRINTFP((2, DFUNCTION, -1, " skip to next block, bytes_to_read=%ld btord=%ld\n",(long) bytes_to_read, (long) btord ));
754 
755  /* loop to behind data in current and following chunks */
756  while(bytes_to_read>0) {
757 
758  /* next block */
759  if (blocknr < sion_filedesc->lastchunknr) { blocknr++; } else {
760  return(_sion_errorprint_on_rank(-1,_SION_ERROR_RETURN,sion_filedesc->rank,"internal error next block not available, but should ..."));
761  }
762 
763  DPRINTFP((2, DFUNCTION, -1, " blocksizes[%d]=%ld\n", (int) blocknr, (long) sion_filedesc->blocksizes[blocknr] ));
764  if ( bytes_to_read > sion_filedesc->blocksizes[blocknr]) {
765  btord = sion_filedesc->blocksizes[blocknr];
766  bytes_to_read-=btord; /* rest of data in next chunks */
767  DPRINTFP((2, DFUNCTION, -1, "whole block, bytes_to_read=%ld btord=%ld\n",(long) bytes_to_read, (long) btord ));
768  } else {
769  offset=sion_filedesc->startpos + blocknr * sion_filedesc->globalskip + bytes_to_read;
770  bytes_to_read=0;
771  }
772  }
773  }
774 
775  /* check if behind end of actual block */
776  if(offset == (sion_filedesc->startpos + blocknr * sion_filedesc->globalskip + sion_filedesc->blocksizes[blocknr])) {
777  /* move next block if available, if last blovk of file, stay behind current block */
778  if (blocknr < sion_filedesc->lastchunknr) {
779  blocknr++;
780  offset=sion_filedesc->startpos + blocknr * sion_filedesc->globalskip;
781  DPRINTFP((2, DFUNCTION, -1, " behind end of block, move to next block, blocksizes[%d]=%ld\n", (int) blocknr, (long) sion_filedesc->blocksizes[blocknr] ));
782  }
783  }
784 
785 
786  DPRINTFP((2, DFUNCTION, -1, "leave offset=%ld\n",(long) offset));
787  return(offset);
788 }
789 #undef DFUNCTION
790 
791 /* skip data block from current and following chunks, data could be distributed over several chunks */
792 #define DFUNCTION "_sion_skip_data_from_chunks_inline"
793 size_t _sion_skip_data_from_chunks_inline(_sion_filedesc *sion_filedesc, sion_int64 bytes_to_read) {
794  sion_int64 btord, bytesread, offset;
795  size_t rc=0;
796 
797 
798  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
799 
800  /* loop to read data in current and following chunks */
801  offset=0;
802  while(bytes_to_read>0) {
803 
804  /* determine size of data which could read from current chunk */
805  bytesread = sion_filedesc->currentpos - (sion_filedesc->startpos + sion_filedesc->currentblocknr * sion_filedesc->globalskip);
806 
807  btord=bytes_to_read;
808  if ((bytesread + btord) > sion_filedesc->blocksizes[sion_filedesc->currentblocknr])
809  btord = sion_filedesc->blocksizes[sion_filedesc->currentblocknr] - bytesread;
810  DPRINTFP((2, DFUNCTION, -1, " bytes_to_read=%ld btord=%ld\n",(long) bytes_to_read, (long) btord ));
811 
812  /* file pointer will not here adjusted, only at end of loop */
813 
814  /* increase current position and update other counters */
815  sion_filedesc->currentpos+=btord;
816  bytes_to_read -=btord;
817  offset +=btord;
818 
819  /* check if all data has already be processed, otherwise forward to next chunk */
820  if(bytes_to_read>0) {
821 
822  if(! _sion_move_to_next_chunk(sion_filedesc)) {
823  return(_sion_errorprint_on_rank(-1,_SION_ERROR_RETURN,sion_filedesc->rank,
824  "could not read data (%d bytes) to file (end of file reached sid=%d) ...", (int) btord, sion_filedesc->sid));
825  }
826 
827  }
828  }
829 
830  /* move to new position */
831  _sion_file_purge(sion_filedesc->fileptr);
832  _sion_file_set_position(sion_filedesc->fileptr, sion_filedesc->currentpos);
833 
834  rc=offset;
835  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
836  return(rc);
837 }
838 #undef DFUNCTION
839 
840 /* read data block from current and following chunks, data could be distributed over several chunks */
841 #define DFUNCTION "_sion_read_data_from_chunks_inline"
842 size_t _sion_read_data_from_chunks_inline(_sion_filedesc *sion_filedesc, void *data, sion_int64 bytes_to_read) {
843  sion_int64 btord, bytesread, offset;
844  size_t rc=0, frc;
845 
846 
847  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
848 
849  /* loop to read data in current and following chunks */
850  offset=0;
851  while(bytes_to_read>0) {
852 
853  /* determine size of data which could read from current chunk */
854  bytesread = sion_filedesc->currentpos - (sion_filedesc->startpos + sion_filedesc->currentblocknr * sion_filedesc->globalskip);
855  DPRINTFP((2, DFUNCTION, -1, "bytesread=%ld, curpos=%ld,startpos=%ld,curblock=%ld,gskip=%ld\n",
856  (long) bytesread, (long) sion_filedesc->currentpos, (long) sion_filedesc->startpos,
857  (long) sion_filedesc->currentblocknr, (long) sion_filedesc->globalskip ));
858 
859  btord=bytes_to_read;
860  if ((bytesread + btord) > sion_filedesc->blocksizes[sion_filedesc->currentblocknr]) {
861  btord = sion_filedesc->blocksizes[sion_filedesc->currentblocknr] - bytesread;
862  DPRINTFP((2, DFUNCTION, -1, "block is split to multiple chunks, bytes_to_read=%ld btord=%ld\n",(long) bytes_to_read, (long) btord ));
863  } else {
864  DPRINTFP((2, DFUNCTION, -1, "block in one chunk, bytes_to_read=%ld btord=%ld\n",(long) bytes_to_read, (long) btord ));
865  }
866 
867  /* read part or rest of data */
868  frc = _sion_file_read(((char *)data+offset), btord, sion_filedesc->fileptr);
869 
870  /* check return code */
871  if(frc != btord) {
872  return(_sion_errorprint_on_rank(-1,_SION_ERROR_RETURN,sion_filedesc->rank,
873  "could not read data (%d bytes) to file (frc=%d sid=%d) ...", (int) btord, (int) frc, sion_filedesc->sid));
874  }
875 
876  /* increase current position and update other counters */
877  sion_filedesc->currentpos+=btord;
878  bytes_to_read -=btord;
879  offset +=btord;
880 
881  /* check if all data has already be processed, otherwise forward to next chunk */
882  if(bytes_to_read>0) {
883 
884  if(! _sion_move_to_next_chunk(sion_filedesc)) {
885  return(_sion_errorprint_on_rank(-1,_SION_ERROR_RETURN,sion_filedesc->rank,
886  "could not read data (%d bytes) to file (end of file reached, frc=%d sid=%d) ...", (int) btord, (int) frc, sion_filedesc->sid));
887  }
888 
889  }
890  }
891 
892  rc=offset;
893  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
894  return(rc);
895 }
896 #undef DFUNCTION
897 
898 /* dup internal data structures from first to secon sion_filedesc */
899 #define DFUNCTION "_sion_keyval_dup_dataptr_inline"
900 int _sion_keyval_dup_dataptr_inline(_sion_filedesc *sion_filedesc, _sion_filedesc *new_filedesc) {
901  int rc=0;
902  _sion_keyvalue_keymngr* keymngr_orig;
903  _sion_keyvalue_keymngr* keymngr;
904 
905  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
906 
907  if(sion_filedesc->keyvalptr!=NULL) {
908 
909  /* get orig keymngr */
910  keymngr_orig = (_sion_keyvalue_keymngr *) sion_filedesc->keyvalptr;
911 
912  keymngr =_sion_keyvalue_keymngr_dup(keymngr_orig,new_filedesc->dup_mode, new_filedesc->dup_sel_key);
913  if (keymngr == NULL) {
914  return(_sion_errorprint_on_rank(-1,_SION_ERROR_RETURN,sion_filedesc->rank,"dup: could not duplicate keymngr ..."));
915  }
916  new_filedesc->keyvalptr=keymngr;
917 
918  } else {
919 
920  new_filedesc->keyvalptr=NULL;
921 
922  }
923 
924  DPRINTFP((2, DFUNCTION, -1, "leave\n"));
925  return (rc);
926 }
927 #undef DFUNCTION
928 
sion_int64 _sion_file_write(const void *data, sion_int64 bytes, _sion_fileptr *sion_fileptr)
Write data to file.
Definition: sion_file.c:148
int _sion_flush_block(_sion_filedesc *sion_filedesc)
Update the internal data structure.
sion_int64 _sion_file_set_position(_sion_fileptr *sion_fileptr, sion_int64 startpointer)
Set new position in file.
Definition: sion_file.c:239
Sion File Descriptor Structure.
Definition: sion_filedesc.h:77
int _sion_file_purge(_sion_fileptr *sion_fileptr)
Purge data to file.
Definition: sion_file.c:328
sion_int64 _sion_file_read(void *data, sion_int64 bytes, _sion_fileptr *sion_fileptr)
Read data from file.
Definition: sion_file.c:169
sion_int64 * blocksizes
Definition: sion_filedesc.h:98
sion_int32 currentblocknr
Definition: sion_filedesc.h:95
int _sion_create_new_block(_sion_filedesc *sion_filedesc)
Create a new block for the internal data structure.
Sion Time Stamp Header.
void sion_swap(void *target, void *source, int size, int n, int aflag)
Definition: sion_convert.c:36
_sion_fileptr * fileptr
Definition: sion_filedesc.h:80