SIONlib  1.7.1
Scalable I/O library for parallel access to task-local files
sion_keyvalue_keymngr.c
Go to the documentation of this file.
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2016 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
14 #include <stdlib.h>
15 #include <stdio.h>
16 #include <string.h>
17 #include <time.h>
18 
19 #include <sys/types.h>
20 #include <sys/stat.h>
21 #include <fcntl.h>
22 #include <unistd.h>
23 #include <assert.h>
24 
25 
26 #include "sion.h"
27 #include "sion_debug.h"
28 #include "sion_error_handler.h"
29 #include "sion_internal.h"
30 #include "sion_printts.h"
31 #include "sion_keyvalue_keymngr.h"
32 #include "sion_keyvalue_table.h"
33 
36 
37 #define POS_BEHIND_END -302
38 
41  int size;
43  size_t next_scan_pos;
44  int scan_done;
49 };
50 
53  sion_int64 key;
54  ssize_t current_pos;
55  size_t bytes_left;
60 };
61 
64  size_t offset;
65  size_t len;
66  int blocknum;
67  size_t offset_in_entry;
71 };
72 
73 
74 #define DFUNCTION "_sion_keyvalue_keymngr_init"
75 _sion_keyvalue_keymngr* _sion_keyvalue_keymngr_init(int size) {
76  _sion_keyvalue_keymngr* keymngr=NULL;
77 
78  DPRINTFP((2, DFUNCTION, -1, "enter init size=%d\n",size));
79 
80  keymngr = malloc(sizeof(_sion_keyvalue_keymngr));
81  if (keymngr == NULL) {
82  _sion_errorprint(0,_SION_ERROR_RETURN,"cannot allocate internal keyvalue keymngr of size %lu , aborting ...\n", (unsigned long) sizeof(_sion_keyvalue_keymngr));
83  return(NULL);
84  }
85 
86  /* init main data structure */
87  keymngr->lastentry_used= NULL;
88  keymngr->next_scan_pos = 0;
89  keymngr->scan_done = SION_NOT_SUCCESS;
90  keymngr->iterator_last_block = NULL;
91  keymngr->block_inwriteorder_head = NULL;
92  keymngr->block_inwriteorder_tail = NULL;
93  keymngr->size = size;
94  keymngr->key_table = _sion_keyvalue_table_init(size);
95  if (keymngr->key_table == NULL) {
96  _sion_errorprint(0,_SION_ERROR_RETURN,"cannot allocate internal keyvalue table of for %lu entries , aborting ...\n", (unsigned long) size);
97  free(keymngr);
98  return(NULL);
99  }
100 
101  DPRINTFP((2, DFUNCTION, -1, "leave\n"));
102  return (keymngr);
103 }
104 #undef DFUNCTION
105 
106 
107 #define DFUNCTION "_sion_keyvalue_keymngr_destroy"
108 int _sion_keyvalue_keymngr_destroy(_sion_keyvalue_keymngr** keymngr) {
109  size_t rc=SION_SUCCESS;
110  _sion_key_block_entry *block, *tmp_block;
111 
112  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
113  block = (*keymngr)->block_inwriteorder_head;
114  while (block != NULL) {
115  tmp_block = block;
116  block = block->next_inwriteorder;
117  free(tmp_block);
118  tmp_block = NULL;
119  }
120  if((*keymngr)->key_table) {
121  rc=_sion_keyvalue_table_destroy(&((*keymngr)->key_table));
122  }
123  free((*keymngr));
124  *keymngr = NULL;
125 
126  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
127  return (rc);
128 }
129 #undef DFUNCTION
130 
131 #define DFUNCTION "_sion_keyvalue_keymngr_add_block"
132 int _sion_keyvalue_keymngr_add_block(_sion_keyvalue_keymngr* keymngr, sion_table_key_t key, size_t offset, size_t len) {
133  size_t rc=SION_SUCCESS;
134  _sion_key_entry *entry;
135  _sion_key_block_entry *new_block;
136  void *p;
137 
138  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
139 
140  /* first check if key is known */
141  p=_sion_keyvalue_table_lookup(keymngr->key_table,key);
142  if( p==NULL ) {
143  /* new key */
144 
145  entry = (_sion_key_entry*) malloc(sizeof(_sion_key_entry));
146  if (entry == NULL) {
147  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"cannot allocate internal keyvalue table entry, aborting ...\n"));
148  }
149  entry->key = key;
150  entry->blocks_avail= 0;
151  entry->blocklist_head = NULL; /* block will be added later */
152  entry->blocklist_current = NULL; /* block will be added later */
153  entry->blocklist_tail = NULL; /* block will be added later */
154 
155  /* it is the first block of this key,
156  therefore initialize also info about current read pos */
157  entry->current_pos = offset; /* it is the first block for that key */
158  entry->bytes_left = len; /* and no bytes available so far in this block */
159 
160  p= (void *) entry;
161 
162  /* store new entry in keytable */
163  rc=_sion_keyvalue_table_store(keymngr->key_table,key,p);
164  if (rc != SION_SUCCESS) return(rc);
165 
166  } else {
167  entry = (_sion_key_entry *) p;
168  }
169 
170  /* add new block */
171  new_block = (_sion_key_block_entry*) malloc(sizeof(_sion_key_block_entry));
172  if (new_block == NULL) {
173  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"cannot allocate internal keyvalue table block entry, aborting ...\n"));
174  }
175  new_block->entry = entry;
176  new_block->offset = offset;
177  new_block->len = len;
178  new_block->next = NULL; /* will be added at tail */
179  new_block->next_inwriteorder = NULL; /* will be added at tail */
180 
181  /* push block to blocklist of entry */
182  if((entry->blocklist_head != NULL) && (entry->blocklist_current != NULL) && (entry->blocklist_tail != NULL)) {
183  /* list exists */
184  new_block->blocknum = entry->blocklist_tail->blocknum + 1;
185  new_block->offset_in_entry = entry->blocklist_tail->offset_in_entry + entry->blocklist_tail->len;
186  entry->blocklist_tail->next = new_block; /* append block */
187  entry->blocklist_tail = entry->blocklist_tail->next; /* advance tail pointer */
188 
189  /* advance currentpos if currentpos behind last blocks */
190  if(entry->current_pos == POS_BEHIND_END) {
191  entry->blocklist_current = new_block; /* it must be this block */
192  entry->current_pos = entry->blocklist_current->offset;
193  entry->bytes_left = entry->blocklist_current->len;
194  }
195 
196  } else {
197  /* init list */
198  new_block->blocknum = 0;
199  new_block->offset_in_entry = 0;
200  entry->blocklist_head = new_block;
201  entry->blocklist_current = entry->blocklist_head;
202  entry->blocklist_tail = entry->blocklist_current;
203  entry->current_pos = entry->blocklist_current->offset;
204  entry->bytes_left = entry->blocklist_current->len;
205  }
206  entry->blocks_avail++;
207 
208  /* push block to lists of blocks in write order */
209  if((keymngr->block_inwriteorder_head != NULL) && (keymngr->block_inwriteorder_tail != NULL)) {
210  keymngr->block_inwriteorder_tail->next_inwriteorder = new_block; /* append block */
211  keymngr->block_inwriteorder_tail = keymngr->block_inwriteorder_tail->next_inwriteorder; /* advance tail pointer */
212  } else {
213  /* init list */
214  keymngr->block_inwriteorder_head = new_block;
216  }
217 
218  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
219  return (rc);
220 }
221 #undef DFUNCTION
222 
223 #define DFUNCTION "_sion_keyvalue_keymngr_update_read_pos"
224 int _sion_keyvalue_keymngr_update_read_pos(_sion_keyvalue_keymngr* keymngr, sion_table_key_t key,
225  size_t bytes_read, sion_int64 current_pos) {
226  size_t rc=SION_SUCCESS;
227  _sion_key_entry *entry=NULL;
228  void *p=NULL;
229 
230  /* check if using the same key as last time */
231  if(keymngr->lastentry_used!=NULL) {
232  if(keymngr->lastentry_used->key == key) {
233  DPRINTFP((2, DFUNCTION, -1, "its the last key %ld\n",(long) key));
234  entry=keymngr->lastentry_used;
235  }
236  }
237 
238  if( entry==NULL) {
239  /* try to lookup */
240  DPRINTFP((2, DFUNCTION, -1, "lookup for key %ld\n",(long) key));
241  p=_sion_keyvalue_table_lookup(keymngr->key_table,key);
242  if( p ) {
243  DPRINTFP((2, DFUNCTION, -1, "key %ld found\n",(long) key));
244  entry = (_sion_key_entry *) p;
245  } else {
246  DPRINTFP((2, DFUNCTION, -1, "key %ld not found\n",(long) key));
247  rc=SION_NOT_SUCCESS;
248  }
249  }
250 
251  if(entry) {
252  if(bytes_read<=entry->bytes_left) {
253 
254  /* advance counter */
255  DPRINTFP((2, DFUNCTION, -1, "advance key counters by %ld bytes\n",(long) bytes_read));
256  entry->current_pos = current_pos;
257  entry->bytes_left -= bytes_read;
258 
259  /* check if block is completely read */
260  if(entry->bytes_left==0) {
261  DPRINTFP((2, DFUNCTION, -1, "block is completely read, move to next if possible\n"));
262 
263  /* next block available */
264  if(entry->blocklist_current->next!=NULL) {
265 
266  /* move forward to next block */
267  entry->blocklist_current=entry->blocklist_current->next;
268  entry->current_pos=entry->blocklist_current->offset;
269  entry->bytes_left=entry->blocklist_current->len;
270  DPRINTFP((2, DFUNCTION, -1, "another block found (%ld,%ld)\n", (long) entry->current_pos, (long) entry->bytes_left ));
271 
272  } else {
273  /* stay in current block, but behind end */
274  DPRINTFP((2, DFUNCTION, -1, "no other block found so far ...\n"));
275  entry->current_pos = POS_BEHIND_END;
276  entry->bytes_left = 0;
277  }
278 
279  }
280  rc=SION_SUCCESS;
281  } else {
282  DPRINTFP((2, DFUNCTION, -1, "bytes_read > entry->bytes_left (%zu > %zu)\n", bytes_read, entry->bytes_left));
283  rc=SION_NOT_SUCCESS;
284  }
285  }
286 
287  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
288  return (rc);
289 
290 }
291 #undef DFUNCTION
292 
293 
294 #define DFUNCTION "_sion_keyvalue_keymngr_set_next_scan_pos"
295 int _sion_keyvalue_keymngr_set_next_scan_pos(_sion_keyvalue_keymngr* keymngr, size_t pos) {
296  size_t rc=SION_SUCCESS;
297  DPRINTFP((2, DFUNCTION, -1, "enter pos=%ld\n",rc, (long) pos));
298  keymngr->next_scan_pos=pos;
299  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d next_scan_pos=%ld\n",rc, (long) keymngr->next_scan_pos));
300  return(rc);
301 }
302 #undef DFUNCTION
303 
304 #define DFUNCTION "_sion_keyvalue_keymngr_get_next_scan_pos"
305 int _sion_keyvalue_keymngr_get_next_scan_pos(_sion_keyvalue_keymngr* keymngr, size_t *pos) {
306  size_t rc=SION_SUCCESS;
307  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
308 
309  *pos=keymngr->next_scan_pos;
310 
311  if(*pos==0) {
312  rc=SION_NOT_SUCCESS;
313  }
314 
315  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d pos=%ld\n",rc, (long) *pos));
316  return(rc);
317 }
318 #undef DFUNCTION
319 
320 #define DFUNCTION "_sion_keyvalue_keymngr_set_scan_done"
321 int _sion_keyvalue_keymngr_set_scan_done(_sion_keyvalue_keymngr* keymngr) {
322  size_t rc=SION_SUCCESS;
323  keymngr->scan_done=SION_SUCCESS;
324  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d scan_done=%d\n",rc, (int) keymngr->scan_done));
325  return(rc);
326 }
327 #undef DFUNCTION
328 
329 #define DFUNCTION "_sion_keyvalue_keymngr_is_scan_done"
330 int _sion_keyvalue_keymngr_is_scan_done(_sion_keyvalue_keymngr* keymngr) {
331  size_t rc=keymngr->scan_done;
332  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
333  return(rc);
334 }
335 #undef DFUNCTION
336 
337 
338 #define DFUNCTION "_sion_keyvalue_keymngr_lookup"
339 int _sion_keyvalue_keymngr_lookup(_sion_keyvalue_keymngr* keymngr, sion_table_key_t key, size_t *current_pos, size_t *bytes_left) {
340  size_t rc=SION_SUCCESS;
341  _sion_key_entry *entry;
342  void *p;
343 
344  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
345 
346  /* check if key is known */
347  p=_sion_keyvalue_table_lookup(keymngr->key_table,key);
348  if( p==NULL ) {
349  rc=SION_NOT_SUCCESS;
350  } else {
351  rc=SION_SUCCESS;
352  entry=(_sion_key_entry *) p;
353  DPRINTFP((2, DFUNCTION, -1, "found entry with %d blocks\n",entry->blocks_avail));
354 
355  if(entry->current_pos != POS_BEHIND_END) {
356  *current_pos = entry->current_pos;
357  *bytes_left = entry->bytes_left;
358  DPRINTFP((2, DFUNCTION, -1, "found block #%d --> (%ld,%ld)\n",(int) entry->blocklist_current->blocknum,(long) *current_pos,(long) *bytes_left));
359 
360  } else {
361  rc=SION_NOT_SUCCESS;
362  }
363  }
364 
365  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
366 
367  return(rc);
368 }
369 #undef DFUNCTION
370 
371 #define DFUNCTION "_sion_keyvalue_keymngr_lookup_and_set_pos"
372 int _sion_keyvalue_keymngr_lookup_and_set_pos(_sion_keyvalue_keymngr* keymngr, sion_table_key_t key, int blocknum, sion_int64 posinblock,
373  size_t *current_pos, size_t *bytes_left) {
374  size_t rc=SION_SUCCESS;
375  _sion_key_entry *entry;
376  _sion_key_block_entry *block;
377  sion_int64 absolutepos;
378  void *p;
379 
380  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
381 
382  *current_pos = 0;
383  *bytes_left = 0;
384 
385  /* check if key is known */
386  p=_sion_keyvalue_table_lookup(keymngr->key_table,key);
387  if( p==NULL ) {
388  rc=SION_NOT_SUCCESS;
389  DPRINTFP((2, DFUNCTION, -1, "leave key not found rc=%d\n",rc));
390  return(rc);
391  }
392 
393  entry=(_sion_key_entry *) p;
394  DPRINTFP((2, DFUNCTION, -1, "found entry with %d blocks\n",entry->blocks_avail));
395 
396  if((posinblock == SION_CURRENT_POS) && (blocknum==SION_CURRENT_BLOCK) ) {
397  /* dummy call, leave position unchanged */
398  *current_pos = entry->current_pos;
399  *bytes_left = entry->bytes_left;
400  DPRINTFP((2, DFUNCTION, -1, "dummy call (SION_CURRENT_BLOCK,SION_CURRENT_POS), leave position unchanged rc=%d\n",rc));
401 
402  if(entry->current_pos != POS_BEHIND_END) {
403  rc=SION_SUCCESS;
404  DPRINTFP((2, DFUNCTION, -1, "leave, seek position is already current pos in block %d\n",blocknum));
405  } else {
406  rc=SION_NOT_SUCCESS;
407  DPRINTFP((2, DFUNCTION, -1, "leave, currently behind last block rc=%d\n",rc));
408  }
409  return(rc);
410  }
411 
412  if(blocknum == SION_CURRENT_BLOCK) {
413 
414  DPRINTFP((2, DFUNCTION, -1, "seek in current_block request\n"));
415 
416  if(entry->current_pos != POS_BEHIND_END) {
417 
418  /* search in current block */
419  blocknum = entry->blocklist_current->blocknum;
420  DPRINTFP((2, DFUNCTION, -1, "seek position is in block %d\n",blocknum));
421 
422  } else {
423 
424  rc=SION_NOT_SUCCESS;
425  DPRINTFP((2, DFUNCTION, -1, "leave, currently behind last block rc=%d\n",rc));
426  return(rc);
427  }
428 
429  }
430 
431  if(blocknum != SION_ABSOLUTE_POS) {
432 
433  /* search for blocknum */
434 
435  if ( (blocknum<0) || (blocknum >= entry->blocks_avail)) {
436  rc=SION_NOT_SUCCESS; /* wrong blocknum */
437  DPRINTFP((2, DFUNCTION, -1, "leave, blocknum outside known range rc=%d\n",rc));
438  return(rc);
439  }
440 
441  /* determine start block for search: current block or head of list */
442  if (blocknum >= entry->blocklist_current->blocknum) {
443  block=entry->blocklist_current;
444  } else {
445  block=entry->blocklist_head;
446  }
447 
448  /* loop until block found */
449  while ((block!=NULL) && (blocknum != block->blocknum)) {
450  block=block->next;
451  }
452  if(block==NULL) {
453  DPRINTFP((2, DFUNCTION, -1, "internal error: somethink went wrong, seek, aborting, blocknum position block is NULL\n"));
454  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"internal error: somethink went wrong, seek, aborting ...\n"));
455  }
456 
457  /* position is in block */
458  DPRINTFP((2, DFUNCTION, -1, "position is in this block %d\n",block->blocknum));
459 
460  if((posinblock>=0) && (posinblock < block->len)) {
461  entry->blocklist_current = block;
462  entry->current_pos = entry->blocklist_current->offset + posinblock;
463  entry->bytes_left = block->len - posinblock;
464  DPRINTFP((2, DFUNCTION, -1, "set current pointer for entry: current_pos=%d bytes_left=%d blocknum=%d\n",(int) entry->current_pos,(int) entry->bytes_left, (int) block->blocknum));
465  } else {
466  DPRINTFP((2, DFUNCTION, -1, "wrong pos in block %d\n",(int) posinblock));
467  rc=SION_NOT_SUCCESS; /* wrong pos in block */
468  }
469 
470  } else { /* (blocknum == SION_ABSOLUTE_POS) */
471 
472  absolutepos=posinblock;
473  if ( (absolutepos < 0)
474  || ( absolutepos >= (entry->blocklist_tail->offset_in_entry + entry->blocklist_tail->len) )
475  ){
476  rc=SION_NOT_SUCCESS; /* wrong absolute position */
477  DPRINTFP((2, DFUNCTION, -1, "leave, absolute position outside known range rc=%d\n",rc));
478  return(rc);
479  }
480 
481  /* determine start block for search: current block or head of list */
482  if (absolutepos >= entry->blocklist_current->offset_in_entry) {
483  block=entry->blocklist_current;
484  } else {
485  block=entry->blocklist_head;
486  }
487 
488  /* loop until block found */
489  while ((block!=NULL) && (absolutepos >= (block->offset_in_entry + block->len))) {
490  block=block->next;
491  }
492  if(block==NULL) {
493  DPRINTFP((2, DFUNCTION, -1, "internal error: somethink went wrong, seek, aborting, absolute position block is NULL\n"));
494  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"internal error: somethink went wrong, seek, aborting ...\n"));
495  }
496  /* recompute posinblock */
497  posinblock = absolutepos - block->offset_in_entry;
498 
499  /* set current information */
500  entry->blocklist_current = block;
501  entry->current_pos = entry->blocklist_current->offset + posinblock;
502  entry->bytes_left = block->len - posinblock;
503  DPRINTFP((2, DFUNCTION, -1, "set current pointer for entry: current_pos=%d bytes_left=%d blocknum=%d\n",(int) entry->current_pos,(int) entry->bytes_left, (int) block->blocknum));
504 
505  }
506 
507  *current_pos = entry->current_pos;
508  *bytes_left = entry->bytes_left;
509  DPRINTFP((2, DFUNCTION, -1, "found block --> (%ld,%ld)\n",(long) *current_pos,(long) *bytes_left));
510 
511  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
512 
513  return(rc);
514 }
515 #undef DFUNCTION
516 
517 #define DFUNCTION "_sion_keyvalue_keymngr_iterator_reset"
518  int _sion_keyvalue_keymngr_iterator_reset(_sion_keyvalue_keymngr* keymngr) {
519 
520  int rc=SION_SUCCESS;
521 
522  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
523 
524  keymngr->iterator_last_block = NULL;
525 
526  DPRINTFP((2, DFUNCTION, -1, "leave\n"));
527  return(rc);
528  }
529 #undef DFUNCTION
530 
531  /* moves to next block in write order, sets also internal current_pos to beginning of block */
532  /* this function could also be called after end of block list is reached, this is nedded for supporting iterator on in
533  inline mode where meta data is read on the fly during iteration */
534 #define DFUNCTION "_sion_keyvalue_keymngr_iterator_next"
535  int _sion_keyvalue_keymngr_iterator_next(_sion_keyvalue_keymngr* keymngr, sion_table_key_t *key, size_t *current_pos, size_t *offset, size_t *len) {
536  int rc=SION_SUCCESS;
537  int found = 0;
538 
539  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
540 
541  /* first time called after reset */
542  if(keymngr->iterator_last_block==NULL) {
543  /* next block is the head block */
545  if(keymngr->iterator_last_block!=NULL) {
546  found=1;
547  }
548  } else {
549  /* not already at end of list */
550  if(keymngr->iterator_last_block!=keymngr->block_inwriteorder_tail) {
551  /* move forward in list */
553  if(keymngr->iterator_last_block==NULL) {
554  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"internal error: somethink went wrong, checking end of list , aborting ...\n"));
555  }
556  found=1;
557  }
558  }
559 
560  if(found) {
561 
562  /* set current_block and current_pos and bytes_left of entry to this block */
566 
567  /* set output parameters */
568  *key = keymngr->iterator_last_block->entry->key;
569  *offset = keymngr->iterator_last_block->offset;
570  *len = keymngr->iterator_last_block->len;
571  *current_pos= keymngr->iterator_last_block->offset;
572 
573  rc=SION_SUCCESS;
574 
575  } else {
576 
577  rc=SION_NOT_SUCCESS;
578 
579  }
580 
581  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
582  return(rc);
583  }
584 #undef DFUNCTION
585 
586 #define DFUNCTION "_sion_keyvalue_keymngr_key_list_iterator_reset"
587  int _sion_keyvalue_keymngr_key_list_iterator_reset(_sion_keyvalue_keymngr* keymngr) {
588 
589  int rc=SION_SUCCESS;
590 
591  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
592 
593  _sion_keyvalue_table_iterator_reset(keymngr->key_table);
594 
595  DPRINTFP((2, DFUNCTION, -1, "leave\n"));
596  return(rc);
597  }
598 #undef DFUNCTION
599 
600 #define DFUNCTION "_sion_keyvalue_keymngr_key_list_iterator_next"
601  int _sion_keyvalue_keymngr_key_list_iterator_next(_sion_keyvalue_keymngr* keymngr, sion_table_key_t *key) {
602  int rc=0;
603  void *data;
604 
605  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
606 
607  rc=_sion_keyvalue_table_iterator_next_in_store_order(keymngr->key_table, key, &data);
608 
609  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d key=%ld\n",rc, (long) *key));
610  return(rc);
611  }
612 #undef DFUNCTION
613 
614  /* this function will create a duplicate of the keymngr structure containing the the description of all blocks
615  of all keys, execpt that the current read posiotion for each key is a beginning of first block of each key */
616 #define DFUNCTION "_sion_keyvalue_keymngr_dup"
617 _sion_keyvalue_keymngr* _sion_keyvalue_keymngr_dup(_sion_keyvalue_keymngr* keymngr_orig, int dup_mode, sion_table_key_t sel_key) {
618  _sion_keyvalue_keymngr *keymngr=NULL;
619  _sion_key_block_entry *block;
620  _sion_key_entry *entry;
621  void *p;
622 
623  DPRINTFP((2, DFUNCTION, -1, "enter dup\n"));
624 
625  keymngr = malloc(sizeof(_sion_keyvalue_keymngr));
626  if (keymngr == NULL) {
627  _sion_errorprint(0,_SION_ERROR_RETURN,"cannot allocate internal keyvalue keymngr of size %lu , aborting ...\n", (unsigned long) sizeof(_sion_keyvalue_keymngr));
628  return(NULL);
629  }
630 
631  /* init main data structure */
632  keymngr->lastentry_used= NULL;
633  keymngr->next_scan_pos = keymngr_orig->next_scan_pos;
634  keymngr->iterator_last_block = NULL;
635  keymngr->block_inwriteorder_head = NULL;
636  keymngr->block_inwriteorder_tail = NULL;
637 
638  /* create empty table of keys */
639  keymngr->size = keymngr_orig->size;
640  keymngr->key_table = _sion_keyvalue_table_init(keymngr_orig->size);
641  if (keymngr->key_table == NULL) {
642  _sion_errorprint(0,_SION_ERROR_RETURN,"cannot allocate internal keyvalue table of for %lu entries , aborting ...\n", (unsigned long) keymngr->size);
643  free(keymngr);
644  return(NULL);
645  }
646 
647 
648  if(dup_mode==SION_DESCSTATE_DUP_SEL_RANK_KEY) {
649  DPRINTFP((2, DFUNCTION, -1, "start duplicating entry for key %llu\n",sel_key));
650 
651  p=_sion_keyvalue_table_lookup(keymngr_orig->key_table,sel_key);
652  if( p!=NULL ) {
653  entry = (_sion_key_entry *) p;
654  block=entry->blocklist_head;
655  while(block!=NULL) {
656  /* using own API routine to add block to new keymngr structure */
657  DPRINTFP((2, DFUNCTION, -1, "add block to keymngr key=%ld offset=%ld len=%ld\n",
658  (long)block->entry->key,(long)block->offset,(long)block->len));
659  _sion_keyvalue_keymngr_add_block(keymngr,block->entry->key,block->offset,block->len);
660  block=block->next;
661  }
662  }
663  DPRINTFP((2, DFUNCTION, -1, "end duplicating one entry\n"));
664 
665  } else {
666  /* loop over all blocks in orig keymngr */
667  DPRINTFP((2, DFUNCTION, -1, "start duplicating all entries\n"));
668  block=keymngr_orig->block_inwriteorder_head;
669  while(block!=NULL) {
670  /* using own API routine to add block to new keymngr structure */
671  DPRINTFP((2, DFUNCTION, -1, "add block to keymngr key=%ld offset=%ld len=%ld\n",
672  (long)block->entry->key,(long)block->offset,(long)block->len));
673  _sion_keyvalue_keymngr_add_block(keymngr,block->entry->key,block->offset,block->len);
674  block=block->next_inwriteorder;
675  }
676  DPRINTFP((2, DFUNCTION, -1, "end duplicating entries\n"));
677  }
678 
679  DPRINTFP((2, DFUNCTION, -1, "leave\n"));
680  return (keymngr);
681  }
682 #undef DFUNCTION
683 
684 #define DFUNCTION "_sion_keyvalue_keymngr_key_get_stat"
685 int _sion_keyvalue_keymngr_key_get_stat(_sion_keyvalue_keymngr* keymngr, sion_table_key_t key, sion_key_stat_t *keystat) {
686  size_t rc=SION_SUCCESS;
687  _sion_key_entry *entry;
688  void *p;
689 
690  DPRINTFP((2, DFUNCTION, -1, "enter key=%ld\n",(long) key));
691 
692  /* check if key is known */
693  p=_sion_keyvalue_table_lookup(keymngr->key_table,key);
694  if( p==NULL ) {
695  rc=SION_NOT_SUCCESS;
696  } else {
697  entry=(_sion_key_entry *) p;
698  DPRINTFP((2, DFUNCTION, -1, "found entry with %d blocks\n",entry->blocks_avail));
699 
700  if(entry->blocklist_tail != NULL) {
701  keystat->key = (uint64_t) key;
702  keystat->num_blocks = entry->blocklist_tail->blocknum+1;
703  keystat->total_size = entry->blocklist_tail->offset_in_entry+entry->blocklist_tail->len;
704  rc=SION_SUCCESS;
705  DPRINTFP((2, DFUNCTION, -1, "found entry for key %ld (%ld,%ld)\n",(long) keystat->key, (long) keystat->num_blocks, (long) keystat->total_size ));
706  } else {
707  rc=SION_NOT_SUCCESS;
708  }
709  }
710 
711  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
712 
713  return(rc);
714 }
715 #undef DFUNCTION
716 
717 #define DFUNCTION "_sion_keyvalue_keymngr_key_get_sizeof"
718 int _sion_keyvalue_keymngr_key_get_sizeof(_sion_keyvalue_keymngr* keymngr) {
719  size_t bytes=0, help_bytes;
720  _sion_key_block_entry *block;
721 
722 
723  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
724  help_bytes=sizeof(_sion_keyvalue_keymngr);
725  DPRINTFP((2, DFUNCTION, -1, " sizeof key_keymgr= %5d\n", help_bytes));
726  bytes+=help_bytes;
727 
728  block = keymngr->block_inwriteorder_head;
729  while (block != NULL) {
730  help_bytes=sizeof(_sion_key_block_entry);
731  block = block->next_inwriteorder;
732  }
733  DPRINTFP((2, DFUNCTION, -1, " sizeof key_blocks= %5d\n", help_bytes));
734  bytes+=help_bytes;
735 
736  if(keymngr->key_table) {
737  help_bytes=_sion_keyvalue_table_get_size(keymngr->key_table);
738  DPRINTFP((2, DFUNCTION, -1, " sizeof key_blocks= %5d\n", help_bytes));
739  bytes+=help_bytes;
740 
741  }
742  DPRINTFP((2, DFUNCTION, -1, "leave bytes=%d\n",bytes));
743 
744  return(bytes);
745 }
746 #undef DFUNCTION
_sion_key_block_entry * next
_sion_key_entry * entry
_sion_key_block_entry * next_inwriteorder
size_t offset_in_entry
int blocknum
size_t bytes_left
#define SION_DESCSTATE_DUP_SEL_RANK_KEY
Definition: sion_filedesc.h:44
_sion_key_block_entry * blocklist_current
size_t offset
_sion_keyvalue_table * key_table
#define SION_CURRENT_POS
Definition: sion_const.h:70
#define SION_ABSOLUTE_POS
Definition: sion_const.h:71
_sion_key_block_entry * block_inwriteorder_head
_sion_key_block_entry * blocklist_head
_sion_key_block_entry * block_inwriteorder_tail
#define SION_CURRENT_BLOCK
Definition: sion_const.h:69
size_t len
Sion Time Stamp Header.
_sion_key_block_entry * blocklist_tail
int blocks_avail
ssize_t current_pos
sion_int64 key
_sion_key_block_entry * iterator_last_block