SIONlib  1.7.7
Scalable I/O library for parallel access to task-local files
sion_keyvalue_keymngr.c
Go to the documentation of this file.
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2019 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
14 #define _XOPEN_SOURCE 700
15 
16 #include <stdlib.h>
17 #include <stdio.h>
18 #include <string.h>
19 #include <time.h>
20 
21 #include <sys/types.h>
22 #include <sys/stat.h>
23 #include <fcntl.h>
24 #include <unistd.h>
25 #include <assert.h>
26 
27 
28 #include "sion.h"
29 #include "sion_debug.h"
30 #include "sion_error_handler.h"
31 #include "sion_internal.h"
32 #include "sion_printts.h"
33 #include "sion_keyvalue_keymngr.h"
34 #include "sion_keyvalue_table.h"
35 
38 
39 #define POS_BEHIND_END -302
40 
43  int size;
45  size_t next_scan_pos;
46  int scan_done;
51 };
52 
55  sion_int64 key;
56  ssize_t current_pos;
57  size_t bytes_left;
62 };
63 
66  size_t offset;
67  size_t len;
68  int blocknum;
69  size_t offset_in_entry;
73 };
74 
75 
76 #define DFUNCTION "_sion_keyvalue_keymngr_init"
77 _sion_keyvalue_keymngr* _sion_keyvalue_keymngr_init(int size) {
78  _sion_keyvalue_keymngr* keymngr=NULL;
79 
80  DPRINTFP((2, DFUNCTION, -1, "enter init size=%d\n",size));
81 
82  keymngr = malloc(sizeof(_sion_keyvalue_keymngr));
83  if (keymngr == NULL) {
84  _sion_errorprint(0,_SION_ERROR_RETURN,"cannot allocate internal keyvalue keymngr of size %lu , aborting ...\n", (unsigned long) sizeof(_sion_keyvalue_keymngr));
85  return(NULL);
86  }
87 
88  /* init main data structure */
89  keymngr->lastentry_used= NULL;
90  keymngr->next_scan_pos = 0;
91  keymngr->scan_done = SION_NOT_SUCCESS;
92  keymngr->iterator_last_block = NULL;
93  keymngr->block_inwriteorder_head = NULL;
94  keymngr->block_inwriteorder_tail = NULL;
95  keymngr->size = size;
96  keymngr->key_table = _sion_keyvalue_table_init(size);
97  if (keymngr->key_table == NULL) {
98  _sion_errorprint(0,_SION_ERROR_RETURN,"cannot allocate internal keyvalue table of for %lu entries , aborting ...\n", (unsigned long) size);
99  free(keymngr);
100  return(NULL);
101  }
102 
103  DPRINTFP((2, DFUNCTION, -1, "leave\n"));
104  return (keymngr);
105 }
106 #undef DFUNCTION
107 
108 
109 #define DFUNCTION "_sion_keyvalue_keymngr_destroy"
110 int _sion_keyvalue_keymngr_destroy(_sion_keyvalue_keymngr** keymngr) {
111  size_t rc=SION_SUCCESS;
112  _sion_key_block_entry *block, *tmp_block;
113 
114  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
115  block = (*keymngr)->block_inwriteorder_head;
116  while (block != NULL) {
117  tmp_block = block;
118  block = block->next_inwriteorder;
119  free(tmp_block);
120  tmp_block = NULL;
121  }
122  if((*keymngr)->key_table) {
123  rc=_sion_keyvalue_table_destroy(&((*keymngr)->key_table));
124  }
125  free((*keymngr));
126  *keymngr = NULL;
127 
128  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
129  return (rc);
130 }
131 #undef DFUNCTION
132 
133 #define DFUNCTION "_sion_keyvalue_keymngr_add_block"
134 int _sion_keyvalue_keymngr_add_block(_sion_keyvalue_keymngr* keymngr, sion_table_key_t key, size_t offset, size_t len) {
135  size_t rc=SION_SUCCESS;
136  _sion_key_entry *entry;
137  _sion_key_block_entry *new_block;
138  void *p;
139 
140  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
141 
142  /* first check if key is known */
143  p=_sion_keyvalue_table_lookup(keymngr->key_table,key);
144  if( p==NULL ) {
145  /* new key */
146 
147  entry = (_sion_key_entry*) malloc(sizeof(_sion_key_entry));
148  if (entry == NULL) {
149  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"cannot allocate internal keyvalue table entry, aborting ...\n"));
150  }
151  entry->key = key;
152  entry->blocks_avail= 0;
153  entry->blocklist_head = NULL; /* block will be added later */
154  entry->blocklist_current = NULL; /* block will be added later */
155  entry->blocklist_tail = NULL; /* block will be added later */
156 
157  /* it is the first block of this key,
158  therefore initialize also info about current read pos */
159  entry->current_pos = offset; /* it is the first block for that key */
160  entry->bytes_left = len; /* and no bytes available so far in this block */
161 
162  p= (void *) entry;
163 
164  /* store new entry in keytable */
165  rc=_sion_keyvalue_table_store(keymngr->key_table,key,p);
166  if (rc != SION_SUCCESS) return(rc);
167 
168  } else {
169  entry = (_sion_key_entry *) p;
170  }
171 
172  /* add new block */
173  new_block = (_sion_key_block_entry*) malloc(sizeof(_sion_key_block_entry));
174  if (new_block == NULL) {
175  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"cannot allocate internal keyvalue table block entry, aborting ...\n"));
176  }
177  new_block->entry = entry;
178  new_block->offset = offset;
179  new_block->len = len;
180  new_block->next = NULL; /* will be added at tail */
181  new_block->next_inwriteorder = NULL; /* will be added at tail */
182 
183  /* push block to blocklist of entry */
184  if((entry->blocklist_head != NULL) && (entry->blocklist_current != NULL) && (entry->blocklist_tail != NULL)) {
185  /* list exists */
186  new_block->blocknum = entry->blocklist_tail->blocknum + 1;
187  new_block->offset_in_entry = entry->blocklist_tail->offset_in_entry + entry->blocklist_tail->len;
188  entry->blocklist_tail->next = new_block; /* append block */
189  entry->blocklist_tail = entry->blocklist_tail->next; /* advance tail pointer */
190 
191  /* advance currentpos if currentpos behind last blocks */
192  if(entry->current_pos == POS_BEHIND_END) {
193  entry->blocklist_current = new_block; /* it must be this block */
194  entry->current_pos = entry->blocklist_current->offset;
195  entry->bytes_left = entry->blocklist_current->len;
196  }
197 
198  } else {
199  /* init list */
200  new_block->blocknum = 0;
201  new_block->offset_in_entry = 0;
202  entry->blocklist_head = new_block;
203  entry->blocklist_current = entry->blocklist_head;
204  entry->blocklist_tail = entry->blocklist_current;
205  entry->current_pos = entry->blocklist_current->offset;
206  entry->bytes_left = entry->blocklist_current->len;
207  }
208  entry->blocks_avail++;
209 
210  /* push block to lists of blocks in write order */
211  if((keymngr->block_inwriteorder_head != NULL) && (keymngr->block_inwriteorder_tail != NULL)) {
212  keymngr->block_inwriteorder_tail->next_inwriteorder = new_block; /* append block */
213  keymngr->block_inwriteorder_tail = keymngr->block_inwriteorder_tail->next_inwriteorder; /* advance tail pointer */
214  } else {
215  /* init list */
216  keymngr->block_inwriteorder_head = new_block;
218  }
219 
220  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
221  return (rc);
222 }
223 #undef DFUNCTION
224 
225 #define DFUNCTION "_sion_keyvalue_keymngr_update_read_pos"
226 int _sion_keyvalue_keymngr_update_read_pos(_sion_keyvalue_keymngr* keymngr, sion_table_key_t key,
227  size_t bytes_read, sion_int64 current_pos) {
228  size_t rc=SION_SUCCESS;
229  _sion_key_entry *entry=NULL;
230  void *p=NULL;
231 
232  /* check if using the same key as last time */
233  if(keymngr->lastentry_used!=NULL) {
234  if(keymngr->lastentry_used->key == key) {
235  DPRINTFP((2, DFUNCTION, -1, "its the last key %ld\n",(long) key));
236  entry=keymngr->lastentry_used;
237  }
238  }
239 
240  if( entry==NULL) {
241  /* try to lookup */
242  DPRINTFP((2, DFUNCTION, -1, "lookup for key %ld\n",(long) key));
243  p=_sion_keyvalue_table_lookup(keymngr->key_table,key);
244  if( p ) {
245  DPRINTFP((2, DFUNCTION, -1, "key %ld found\n",(long) key));
246  entry = (_sion_key_entry *) p;
247  } else {
248  DPRINTFP((2, DFUNCTION, -1, "key %ld not found\n",(long) key));
249  rc=SION_NOT_SUCCESS;
250  }
251  }
252 
253  if(entry) {
254  if(bytes_read<=entry->bytes_left) {
255 
256  /* advance counter */
257  DPRINTFP((2, DFUNCTION, -1, "advance key counters by %ld bytes\n",(long) bytes_read));
258  entry->current_pos = current_pos;
259  entry->bytes_left -= bytes_read;
260 
261  /* check if block is completely read */
262  if(entry->bytes_left==0) {
263  DPRINTFP((2, DFUNCTION, -1, "block is completely read, move to next if possible\n"));
264 
265  /* next block available */
266  if(entry->blocklist_current->next!=NULL) {
267 
268  /* move forward to next block */
269  entry->blocklist_current=entry->blocklist_current->next;
270  entry->current_pos=entry->blocklist_current->offset;
271  entry->bytes_left=entry->blocklist_current->len;
272  DPRINTFP((2, DFUNCTION, -1, "another block found (%ld,%ld)\n", (long) entry->current_pos, (long) entry->bytes_left ));
273 
274  } else {
275  /* stay in current block, but behind end */
276  DPRINTFP((2, DFUNCTION, -1, "no other block found so far ...\n"));
277  entry->current_pos = POS_BEHIND_END;
278  entry->bytes_left = 0;
279  }
280 
281  }
282  rc=SION_SUCCESS;
283  } else {
284  DPRINTFP((2, DFUNCTION, -1, "bytes_read > entry->bytes_left (%zu > %zu)\n", bytes_read, entry->bytes_left));
285  rc=SION_NOT_SUCCESS;
286  }
287  }
288 
289  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
290  return (rc);
291 
292 }
293 #undef DFUNCTION
294 
295 
296 #define DFUNCTION "_sion_keyvalue_keymngr_set_next_scan_pos"
297 int _sion_keyvalue_keymngr_set_next_scan_pos(_sion_keyvalue_keymngr* keymngr, size_t pos) {
298  size_t rc=SION_SUCCESS;
299  DPRINTFP((2, DFUNCTION, -1, "enter pos=%ld\n",rc, (long) pos));
300  keymngr->next_scan_pos=pos;
301  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d next_scan_pos=%ld\n",rc, (long) keymngr->next_scan_pos));
302  return(rc);
303 }
304 #undef DFUNCTION
305 
306 #define DFUNCTION "_sion_keyvalue_keymngr_get_next_scan_pos"
307 int _sion_keyvalue_keymngr_get_next_scan_pos(_sion_keyvalue_keymngr* keymngr, size_t *pos) {
308  size_t rc=SION_SUCCESS;
309  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
310 
311  *pos=keymngr->next_scan_pos;
312 
313  if(*pos==0) {
314  rc=SION_NOT_SUCCESS;
315  }
316 
317  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d pos=%ld\n",rc, (long) *pos));
318  return(rc);
319 }
320 #undef DFUNCTION
321 
322 #define DFUNCTION "_sion_keyvalue_keymngr_set_scan_done"
323 int _sion_keyvalue_keymngr_set_scan_done(_sion_keyvalue_keymngr* keymngr) {
324  size_t rc=SION_SUCCESS;
325  keymngr->scan_done=SION_SUCCESS;
326  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d scan_done=%d\n",rc, (int) keymngr->scan_done));
327  return(rc);
328 }
329 #undef DFUNCTION
330 
331 #define DFUNCTION "_sion_keyvalue_keymngr_is_scan_done"
332 int _sion_keyvalue_keymngr_is_scan_done(_sion_keyvalue_keymngr* keymngr) {
333  size_t rc=keymngr->scan_done;
334  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
335  return(rc);
336 }
337 #undef DFUNCTION
338 
339 
340 #define DFUNCTION "_sion_keyvalue_keymngr_lookup"
341 int _sion_keyvalue_keymngr_lookup(_sion_keyvalue_keymngr* keymngr, sion_table_key_t key, size_t *current_pos, size_t *bytes_left) {
342  size_t rc=SION_SUCCESS;
343  _sion_key_entry *entry;
344  void *p;
345 
346  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
347 
348  /* check if key is known */
349  p=_sion_keyvalue_table_lookup(keymngr->key_table,key);
350  if( p==NULL ) {
351  rc=SION_NOT_SUCCESS;
352  } else {
353  rc=SION_SUCCESS;
354  entry=(_sion_key_entry *) p;
355  DPRINTFP((2, DFUNCTION, -1, "found entry with %d blocks\n",entry->blocks_avail));
356 
357  if(entry->current_pos != POS_BEHIND_END) {
358  *current_pos = entry->current_pos;
359  *bytes_left = entry->bytes_left;
360  DPRINTFP((2, DFUNCTION, -1, "found block #%d --> (%ld,%ld)\n",(int) entry->blocklist_current->blocknum,(long) *current_pos,(long) *bytes_left));
361 
362  } else {
363  rc=SION_NOT_SUCCESS;
364  }
365  }
366 
367  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
368 
369  return(rc);
370 }
371 #undef DFUNCTION
372 
373 #define DFUNCTION "_sion_keyvalue_keymngr_lookup_and_set_pos"
374 int _sion_keyvalue_keymngr_lookup_and_set_pos(_sion_keyvalue_keymngr* keymngr, sion_table_key_t key, int blocknum, sion_int64 posinblock,
375  size_t *current_pos, size_t *bytes_left) {
376  size_t rc=SION_SUCCESS;
377  _sion_key_entry *entry;
378  _sion_key_block_entry *block;
379  sion_int64 absolutepos;
380  void *p;
381 
382  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
383 
384  *current_pos = 0;
385  *bytes_left = 0;
386 
387  /* check if key is known */
388  p=_sion_keyvalue_table_lookup(keymngr->key_table,key);
389  if( p==NULL ) {
390  rc=SION_NOT_SUCCESS;
391  DPRINTFP((2, DFUNCTION, -1, "leave key not found rc=%d\n",rc));
392  return(rc);
393  }
394 
395  entry=(_sion_key_entry *) p;
396  DPRINTFP((2, DFUNCTION, -1, "found entry with %d blocks\n",entry->blocks_avail));
397 
398  if((posinblock == SION_CURRENT_POS) && (blocknum==SION_CURRENT_BLOCK) ) {
399  /* dummy call, leave position unchanged */
400  *current_pos = entry->current_pos;
401  *bytes_left = entry->bytes_left;
402  DPRINTFP((2, DFUNCTION, -1, "dummy call (SION_CURRENT_BLOCK,SION_CURRENT_POS), leave position unchanged rc=%d\n",rc));
403 
404  if(entry->current_pos != POS_BEHIND_END) {
405  rc=SION_SUCCESS;
406  DPRINTFP((2, DFUNCTION, -1, "leave, seek position is already current pos in block %d\n",blocknum));
407  } else {
408  rc=SION_NOT_SUCCESS;
409  DPRINTFP((2, DFUNCTION, -1, "leave, currently behind last block rc=%d\n",rc));
410  }
411  return(rc);
412  }
413 
414  if(blocknum == SION_CURRENT_BLOCK) {
415 
416  DPRINTFP((2, DFUNCTION, -1, "seek in current_block request\n"));
417 
418  if(entry->current_pos != POS_BEHIND_END) {
419 
420  /* search in current block */
421  blocknum = entry->blocklist_current->blocknum;
422  DPRINTFP((2, DFUNCTION, -1, "seek position is in block %d\n",blocknum));
423 
424  } else {
425 
426  rc=SION_NOT_SUCCESS;
427  DPRINTFP((2, DFUNCTION, -1, "leave, currently behind last block rc=%d\n",rc));
428  return(rc);
429  }
430 
431  }
432 
433  if(blocknum != SION_ABSOLUTE_POS) {
434 
435  /* search for blocknum */
436 
437  if ( (blocknum<0) || (blocknum >= entry->blocks_avail)) {
438  rc=SION_NOT_SUCCESS; /* wrong blocknum */
439  DPRINTFP((2, DFUNCTION, -1, "leave, blocknum outside known range rc=%d\n",rc));
440  return(rc);
441  }
442 
443  /* determine start block for search: current block or head of list */
444  if (blocknum >= entry->blocklist_current->blocknum) {
445  block=entry->blocklist_current;
446  } else {
447  block=entry->blocklist_head;
448  }
449 
450  /* loop until block found */
451  while ((block!=NULL) && (blocknum != block->blocknum)) {
452  block=block->next;
453  }
454  if(block==NULL) {
455  DPRINTFP((2, DFUNCTION, -1, "internal error: somethink went wrong, seek, aborting, blocknum position block is NULL\n"));
456  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"internal error: somethink went wrong, seek, aborting ...\n"));
457  }
458 
459  /* position is in block */
460  DPRINTFP((2, DFUNCTION, -1, "position is in this block %d\n",block->blocknum));
461 
462  if((posinblock>=0) && (posinblock < block->len)) {
463  entry->blocklist_current = block;
464  entry->current_pos = entry->blocklist_current->offset + posinblock;
465  entry->bytes_left = block->len - posinblock;
466  DPRINTFP((2, DFUNCTION, -1, "set current pointer for entry: current_pos=%d bytes_left=%d blocknum=%d\n",(int) entry->current_pos,(int) entry->bytes_left, (int) block->blocknum));
467  } else {
468  DPRINTFP((2, DFUNCTION, -1, "wrong pos in block %d\n",(int) posinblock));
469  rc=SION_NOT_SUCCESS; /* wrong pos in block */
470  }
471 
472  } else { /* (blocknum == SION_ABSOLUTE_POS) */
473 
474  absolutepos=posinblock;
475  if ( (absolutepos < 0)
476  || ( absolutepos >= (entry->blocklist_tail->offset_in_entry + entry->blocklist_tail->len) )
477  ){
478  rc=SION_NOT_SUCCESS; /* wrong absolute position */
479  DPRINTFP((2, DFUNCTION, -1, "leave, absolute position outside known range rc=%d\n",rc));
480  return(rc);
481  }
482 
483  /* determine start block for search: current block or head of list */
484  if (absolutepos >= entry->blocklist_current->offset_in_entry) {
485  block=entry->blocklist_current;
486  } else {
487  block=entry->blocklist_head;
488  }
489 
490  /* loop until block found */
491  while ((block!=NULL) && (absolutepos >= (block->offset_in_entry + block->len))) {
492  block=block->next;
493  }
494  if(block==NULL) {
495  DPRINTFP((2, DFUNCTION, -1, "internal error: somethink went wrong, seek, aborting, absolute position block is NULL\n"));
496  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"internal error: somethink went wrong, seek, aborting ...\n"));
497  }
498  /* recompute posinblock */
499  posinblock = absolutepos - block->offset_in_entry;
500 
501  /* set current information */
502  entry->blocklist_current = block;
503  entry->current_pos = entry->blocklist_current->offset + posinblock;
504  entry->bytes_left = block->len - posinblock;
505  DPRINTFP((2, DFUNCTION, -1, "set current pointer for entry: current_pos=%d bytes_left=%d blocknum=%d\n",(int) entry->current_pos,(int) entry->bytes_left, (int) block->blocknum));
506 
507  }
508 
509  *current_pos = entry->current_pos;
510  *bytes_left = entry->bytes_left;
511  DPRINTFP((2, DFUNCTION, -1, "found block --> (%ld,%ld)\n",(long) *current_pos,(long) *bytes_left));
512 
513  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
514 
515  return(rc);
516 }
517 #undef DFUNCTION
518 
519 #define DFUNCTION "_sion_keyvalue_keymngr_iterator_reset"
520  int _sion_keyvalue_keymngr_iterator_reset(_sion_keyvalue_keymngr* keymngr) {
521 
522  int rc=SION_SUCCESS;
523 
524  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
525 
526  keymngr->iterator_last_block = NULL;
527 
528  DPRINTFP((2, DFUNCTION, -1, "leave\n"));
529  return(rc);
530  }
531 #undef DFUNCTION
532 
533  /* moves to next block in write order, sets also internal current_pos to beginning of block */
534  /* this function could also be called after end of block list is reached, this is nedded for supporting iterator on in
535  inline mode where meta data is read on the fly during iteration */
536 #define DFUNCTION "_sion_keyvalue_keymngr_iterator_next"
537  int _sion_keyvalue_keymngr_iterator_next(_sion_keyvalue_keymngr* keymngr, sion_table_key_t *key, size_t *current_pos, size_t *offset, size_t *len) {
538  int rc=SION_SUCCESS;
539  int found = 0;
540 
541  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
542 
543  /* first time called after reset */
544  if(keymngr->iterator_last_block==NULL) {
545  /* next block is the head block */
547  if(keymngr->iterator_last_block!=NULL) {
548  found=1;
549  }
550  } else {
551  /* not already at end of list */
552  if(keymngr->iterator_last_block!=keymngr->block_inwriteorder_tail) {
553  /* move forward in list */
555  if(keymngr->iterator_last_block==NULL) {
556  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"internal error: somethink went wrong, checking end of list , aborting ...\n"));
557  }
558  found=1;
559  }
560  }
561 
562  if(found) {
563 
564  /* set current_block and current_pos and bytes_left of entry to this block */
568 
569  /* set output parameters */
570  *key = keymngr->iterator_last_block->entry->key;
571  *offset = keymngr->iterator_last_block->offset;
572  *len = keymngr->iterator_last_block->len;
573  *current_pos= keymngr->iterator_last_block->offset;
574 
575  rc=SION_SUCCESS;
576 
577  } else {
578 
579  rc=SION_NOT_SUCCESS;
580 
581  }
582 
583  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
584  return(rc);
585  }
586 #undef DFUNCTION
587 
588 #define DFUNCTION "_sion_keyvalue_keymngr_key_list_iterator_reset"
589  int _sion_keyvalue_keymngr_key_list_iterator_reset(_sion_keyvalue_keymngr* keymngr) {
590 
591  int rc=SION_SUCCESS;
592 
593  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
594 
595  _sion_keyvalue_table_iterator_reset(keymngr->key_table);
596 
597  DPRINTFP((2, DFUNCTION, -1, "leave\n"));
598  return(rc);
599  }
600 #undef DFUNCTION
601 
602 #define DFUNCTION "_sion_keyvalue_keymngr_key_list_iterator_next"
603  int _sion_keyvalue_keymngr_key_list_iterator_next(_sion_keyvalue_keymngr* keymngr, sion_table_key_t *key) {
604  int rc=0;
605  void *data;
606 
607  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
608 
609  rc=_sion_keyvalue_table_iterator_next_in_store_order(keymngr->key_table, key, &data);
610 
611  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d key=%ld\n",rc, (long) *key));
612  return(rc);
613  }
614 #undef DFUNCTION
615 
616  /* this function will create a duplicate of the keymngr structure containing the the description of all blocks
617  of all keys, execpt that the current read posiotion for each key is a beginning of first block of each key */
618 #define DFUNCTION "_sion_keyvalue_keymngr_dup"
619 _sion_keyvalue_keymngr* _sion_keyvalue_keymngr_dup(_sion_keyvalue_keymngr* keymngr_orig, int dup_mode, sion_table_key_t sel_key) {
620  _sion_keyvalue_keymngr *keymngr=NULL;
621  _sion_key_block_entry *block;
622  _sion_key_entry *entry;
623  void *p;
624 
625  DPRINTFP((2, DFUNCTION, -1, "enter dup\n"));
626 
627  keymngr = malloc(sizeof(_sion_keyvalue_keymngr));
628  if (keymngr == NULL) {
629  _sion_errorprint(0,_SION_ERROR_RETURN,"cannot allocate internal keyvalue keymngr of size %lu , aborting ...\n", (unsigned long) sizeof(_sion_keyvalue_keymngr));
630  return(NULL);
631  }
632 
633  /* init main data structure */
634  keymngr->lastentry_used= NULL;
635  keymngr->next_scan_pos = keymngr_orig->next_scan_pos;
636  keymngr->iterator_last_block = NULL;
637  keymngr->block_inwriteorder_head = NULL;
638  keymngr->block_inwriteorder_tail = NULL;
639 
640  /* create empty table of keys */
641  keymngr->size = keymngr_orig->size;
642  keymngr->key_table = _sion_keyvalue_table_init(keymngr_orig->size);
643  if (keymngr->key_table == NULL) {
644  _sion_errorprint(0,_SION_ERROR_RETURN,"cannot allocate internal keyvalue table of for %lu entries , aborting ...\n", (unsigned long) keymngr->size);
645  free(keymngr);
646  return(NULL);
647  }
648 
649 
650  if(dup_mode==SION_DESCSTATE_DUP_SEL_RANK_KEY) {
651  DPRINTFP((2, DFUNCTION, -1, "start duplicating entry for key %llu\n",sel_key));
652 
653  p=_sion_keyvalue_table_lookup(keymngr_orig->key_table,sel_key);
654  if( p!=NULL ) {
655  entry = (_sion_key_entry *) p;
656  block=entry->blocklist_head;
657  while(block!=NULL) {
658  /* using own API routine to add block to new keymngr structure */
659  DPRINTFP((2, DFUNCTION, -1, "add block to keymngr key=%ld offset=%ld len=%ld\n",
660  (long)block->entry->key,(long)block->offset,(long)block->len));
661  _sion_keyvalue_keymngr_add_block(keymngr,block->entry->key,block->offset,block->len);
662  block=block->next;
663  }
664  }
665  DPRINTFP((2, DFUNCTION, -1, "end duplicating one entry\n"));
666 
667  } else {
668  /* loop over all blocks in orig keymngr */
669  DPRINTFP((2, DFUNCTION, -1, "start duplicating all entries\n"));
670  block=keymngr_orig->block_inwriteorder_head;
671  while(block!=NULL) {
672  /* using own API routine to add block to new keymngr structure */
673  DPRINTFP((2, DFUNCTION, -1, "add block to keymngr key=%ld offset=%ld len=%ld\n",
674  (long)block->entry->key,(long)block->offset,(long)block->len));
675  _sion_keyvalue_keymngr_add_block(keymngr,block->entry->key,block->offset,block->len);
676  block=block->next_inwriteorder;
677  }
678  DPRINTFP((2, DFUNCTION, -1, "end duplicating entries\n"));
679  }
680 
681  DPRINTFP((2, DFUNCTION, -1, "leave\n"));
682  return (keymngr);
683  }
684 #undef DFUNCTION
685 
686 #define DFUNCTION "_sion_keyvalue_keymngr_key_get_stat"
687 int _sion_keyvalue_keymngr_key_get_stat(_sion_keyvalue_keymngr* keymngr, sion_table_key_t key, sion_key_stat_t *keystat) {
688  size_t rc=SION_SUCCESS;
689  _sion_key_entry *entry;
690  void *p;
691 
692  DPRINTFP((2, DFUNCTION, -1, "enter key=%ld\n",(long) key));
693 
694  /* check if key is known */
695  p=_sion_keyvalue_table_lookup(keymngr->key_table,key);
696  if( p==NULL ) {
697  rc=SION_NOT_SUCCESS;
698  } else {
699  entry=(_sion_key_entry *) p;
700  DPRINTFP((2, DFUNCTION, -1, "found entry with %d blocks\n",entry->blocks_avail));
701 
702  if(entry->blocklist_tail != NULL) {
703  keystat->key = (uint64_t) key;
704  keystat->num_blocks = entry->blocklist_tail->blocknum+1;
705  keystat->total_size = entry->blocklist_tail->offset_in_entry+entry->blocklist_tail->len;
706  rc=SION_SUCCESS;
707  DPRINTFP((2, DFUNCTION, -1, "found entry for key %ld (%ld,%ld)\n",(long) keystat->key, (long) keystat->num_blocks, (long) keystat->total_size ));
708  } else {
709  rc=SION_NOT_SUCCESS;
710  }
711  }
712 
713  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
714 
715  return(rc);
716 }
717 #undef DFUNCTION
718 
719 #define DFUNCTION "_sion_keyvalue_keymngr_key_get_sizeof"
720 int _sion_keyvalue_keymngr_key_get_sizeof(_sion_keyvalue_keymngr* keymngr) {
721  size_t bytes=0, help_bytes;
722  _sion_key_block_entry *block;
723 
724 
725  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
726  help_bytes=sizeof(_sion_keyvalue_keymngr);
727  DPRINTFP((2, DFUNCTION, -1, " sizeof key_keymgr= %5d\n", help_bytes));
728  bytes+=help_bytes;
729 
730  block = keymngr->block_inwriteorder_head;
731  while (block != NULL) {
732  help_bytes=sizeof(_sion_key_block_entry);
733  block = block->next_inwriteorder;
734  }
735  DPRINTFP((2, DFUNCTION, -1, " sizeof key_blocks= %5d\n", help_bytes));
736  bytes+=help_bytes;
737 
738  if(keymngr->key_table) {
739  help_bytes=_sion_keyvalue_table_get_size(keymngr->key_table);
740  DPRINTFP((2, DFUNCTION, -1, " sizeof key_blocks= %5d\n", help_bytes));
741  bytes+=help_bytes;
742 
743  }
744  DPRINTFP((2, DFUNCTION, -1, "leave bytes=%d\n",bytes));
745 
746  return(bytes);
747 }
748 #undef DFUNCTION
#define SION_ABSOLUTE_POS
Definition: sion_const.h:71
#define SION_CURRENT_BLOCK
Definition: sion_const.h:69
#define SION_CURRENT_POS
Definition: sion_const.h:70
#define SION_DESCSTATE_DUP_SEL_RANK_KEY
Definition: sion_filedesc.h:48
Sion Time Stamp Header.
size_t len
_sion_key_entry * entry
_sion_key_block_entry * next_inwriteorder
size_t offset
int blocknum
size_t offset_in_entry
_sion_key_block_entry * next
_sion_key_block_entry * blocklist_head
_sion_key_block_entry * blocklist_current
ssize_t current_pos
int blocks_avail
sion_int64 key
_sion_key_block_entry * blocklist_tail
size_t bytes_left
_sion_key_block_entry * block_inwriteorder_tail
_sion_key_block_entry * block_inwriteorder_head
_sion_key_block_entry * iterator_last_block
_sion_keyvalue_table * key_table