SIONlib  1.6.2
Scalable I/O library for parallel access to task-local files
sion_keyvalue_keymngr.c
Go to the documentation of this file.
1 /****************************************************************************
2 ** SIONLIB http://www.fz-juelich.de/jsc/sionlib **
3 *****************************************************************************
4 ** Copyright (c) 2008-2016 **
5 ** Forschungszentrum Juelich, Juelich Supercomputing Centre **
6 ** **
7 ** See the file COPYRIGHT in the package base directory for details **
8 ****************************************************************************/
9 
14 #include <stdlib.h>
15 #include <stdio.h>
16 #include <string.h>
17 #include <time.h>
18 
19 #include <sys/types.h>
20 #include <sys/stat.h>
21 #include <fcntl.h>
22 #include <unistd.h>
23 #include <assert.h>
24 
25 
26 #include "sion.h"
27 #include "sion_debug.h"
28 #include "sion_internal.h"
29 #include "sion_printts.h"
30 #include "sion_keyvalue_keymngr.h"
31 #include "sion_keyvalue_table.h"
32 
35 
36 #define POS_BEHIND_END -302
37 
40  int size;
42  size_t next_scan_pos;
43  int scan_done;
48 };
49 
52  sion_int64 key;
53  ssize_t current_pos;
54  size_t bytes_left;
59 };
60 
63  size_t offset;
64  size_t len;
65  int blocknum;
66  size_t offset_in_entry;
70 };
71 
72 
73 #define DFUNCTION "_sion_keyvalue_keymngr_init"
74 _sion_keyvalue_keymngr* _sion_keyvalue_keymngr_init(int size) {
75  _sion_keyvalue_keymngr* keymngr=NULL;
76 
77  DPRINTFP((2, DFUNCTION, -1, "enter init size=%d\n",size));
78 
79  keymngr = malloc(sizeof(_sion_keyvalue_keymngr));
80  if (keymngr == NULL) {
81  _sion_errorprint(0,_SION_ERROR_RETURN,"cannot allocate internal keyvalue keymngr of size %lu , aborting ...\n", (unsigned long) sizeof(_sion_keyvalue_keymngr));
82  return(NULL);
83  }
84 
85  /* init main data structure */
86  keymngr->lastentry_used= NULL;
87  keymngr->next_scan_pos = 0;
88  keymngr->scan_done = SION_NOT_SUCCESS;
89  keymngr->iterator_last_block = NULL;
90  keymngr->block_inwriteorder_head = NULL;
91  keymngr->block_inwriteorder_tail = NULL;
92  keymngr->size = size;
93  keymngr->key_table = _sion_keyvalue_table_init(size);
94  if (keymngr->key_table == NULL) {
95  _sion_errorprint(0,_SION_ERROR_RETURN,"cannot allocate internal keyvalue table of for %lu entries , aborting ...\n", (unsigned long) size);
96  free(keymngr);
97  return(NULL);
98  }
99 
100  DPRINTFP((2, DFUNCTION, -1, "leave\n"));
101  return (keymngr);
102 }
103 #undef DFUNCTION
104 
105 
106 #define DFUNCTION "_sion_keyvalue_keymngr_destroy"
107 int _sion_keyvalue_keymngr_destroy(_sion_keyvalue_keymngr* keymngr) {
108  size_t rc=SION_SUCCESS;
109  _sion_key_block_entry *block, *tmp_block;
110 
111  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
112  block = keymngr->block_inwriteorder_head;
113  while (block != NULL) {
114  tmp_block = block;
115  block = block->next_inwriteorder;
116  free(tmp_block);
117  tmp_block = NULL;
118  }
119  if(keymngr->key_table) {
120  rc=_sion_keyvalue_table_destroy(keymngr->key_table);
121  }
122  free(keymngr);
123  keymngr = NULL;
124 
125  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
126  return (rc);
127 }
128 #undef DFUNCTION
129 
130 #define DFUNCTION "_sion_keyvalue_keymngr_add_block"
131 int _sion_keyvalue_keymngr_add_block(_sion_keyvalue_keymngr* keymngr, sion_table_key_t key, size_t offset, size_t len) {
132  size_t rc=SION_SUCCESS;
133  _sion_key_entry *entry;
134  _sion_key_block_entry *new_block;
135  void *p;
136 
137  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
138 
139  /* first check if key is known */
140  p=_sion_keyvalue_table_lookup(keymngr->key_table,key);
141  if( p==NULL ) {
142  /* new key */
143 
144  entry = (_sion_key_entry*) malloc(sizeof(_sion_key_entry));
145  if (entry == NULL) {
146  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"cannot allocate internal keyvalue table entry, aborting ...\n"));
147  }
148  entry->key = key;
149  entry->blocks_avail= 0;
150  entry->blocklist_head = NULL; /* block will be added later */
151  entry->blocklist_current = NULL; /* block will be added later */
152  entry->blocklist_tail = NULL; /* block will be added later */
153 
154  /* it is the first block of this key,
155  therefore initialize also info about current read pos */
156  entry->current_pos = offset; /* it is the first block for that key */
157  entry->bytes_left = len; /* and no bytes available so far in this block */
158 
159  p= (void *) entry;
160 
161  /* store new entry in keytable */
162  rc=_sion_keyvalue_table_store(keymngr->key_table,key,p);
163  if (rc != SION_SUCCESS) return(rc);
164 
165  } else {
166  entry = (_sion_key_entry *) p;
167  }
168 
169  /* add new block */
170  new_block = (_sion_key_block_entry*) malloc(sizeof(_sion_key_block_entry));
171  if (new_block == NULL) {
172  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"cannot allocate internal keyvalue table block entry, aborting ...\n"));
173  }
174  new_block->entry = entry;
175  new_block->offset = offset;
176  new_block->len = len;
177  new_block->next = NULL; /* will be added at tail */
178  new_block->next_inwriteorder = NULL; /* will be added at tail */
179 
180  /* push block to blocklist of entry */
181  if((entry->blocklist_head != NULL) && (entry->blocklist_current != NULL) && (entry->blocklist_tail != NULL)) {
182  /* list exists */
183  new_block->blocknum = entry->blocklist_tail->blocknum + 1;
184  new_block->offset_in_entry = entry->blocklist_tail->offset_in_entry + entry->blocklist_tail->len;
185  entry->blocklist_tail->next = new_block; /* append block */
186  entry->blocklist_tail = entry->blocklist_tail->next; /* advance tail pointer */
187 
188  /* advance currentpos if currentpos behind last blocks */
189  if(entry->current_pos == POS_BEHIND_END) {
190  entry->blocklist_current = new_block; /* it must be this block */
191  entry->current_pos = entry->blocklist_current->offset;
192  entry->bytes_left = entry->blocklist_current->len;
193  }
194 
195  } else {
196  /* init list */
197  new_block->blocknum = 0;
198  new_block->offset_in_entry = 0;
199  entry->blocklist_head = new_block;
200  entry->blocklist_current = entry->blocklist_head;
201  entry->blocklist_tail = entry->blocklist_current;
202  entry->current_pos = entry->blocklist_current->offset;
203  entry->bytes_left = entry->blocklist_current->len;
204  }
205  entry->blocks_avail++;
206 
207  /* push block to lists of blocks in write order */
208  if((keymngr->block_inwriteorder_head != NULL) && (keymngr->block_inwriteorder_tail != NULL)) {
209  keymngr->block_inwriteorder_tail->next_inwriteorder = new_block; /* append block */
210  keymngr->block_inwriteorder_tail = keymngr->block_inwriteorder_tail->next_inwriteorder; /* advance tail pointer */
211  } else {
212  /* init list */
213  keymngr->block_inwriteorder_head = new_block;
215  }
216 
217  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
218  return (rc);
219 }
220 #undef DFUNCTION
221 
222 #define DFUNCTION "_sion_keyvalue_keymngr_update_read_pos"
223 int _sion_keyvalue_keymngr_update_read_pos(_sion_keyvalue_keymngr* keymngr, sion_table_key_t key,
224  size_t bytes_read, sion_int64 current_pos) {
225  size_t rc=SION_SUCCESS;
226  _sion_key_entry *entry=NULL;
227  void *p=NULL;
228 
229  /* check if using the same key as last time */
230  if(keymngr->lastentry_used!=NULL) {
231  if(keymngr->lastentry_used->key == key) {
232  DPRINTFP((2, DFUNCTION, -1, "its the last key %ld\n",(long) key));
233  entry=keymngr->lastentry_used;
234  }
235  }
236 
237  if( entry==NULL) {
238  /* try to lookup */
239  DPRINTFP((2, DFUNCTION, -1, "lookup for key %ld\n",(long) key));
240  p=_sion_keyvalue_table_lookup(keymngr->key_table,key);
241  if( p ) {
242  DPRINTFP((2, DFUNCTION, -1, "key %ld found\n",(long) key));
243  entry = (_sion_key_entry *) p;
244  } else {
245  DPRINTFP((2, DFUNCTION, -1, "key %ld not found\n",(long) key));
246  rc=SION_NOT_SUCCESS;
247  }
248  }
249 
250  if(entry) {
251  if(bytes_read<=entry->bytes_left) {
252 
253  /* advance counter */
254  DPRINTFP((2, DFUNCTION, -1, "advance key counters by %ld bytes\n",(long) bytes_read));
255  entry->current_pos = current_pos;
256  entry->bytes_left -= bytes_read;
257 
258  /* check if block is completely read */
259  if(entry->bytes_left==0) {
260  DPRINTFP((2, DFUNCTION, -1, "block is completely read, move to next if possible\n"));
261 
262  /* next block available */
263  if(entry->blocklist_current->next!=NULL) {
264 
265  /* move forward to next block */
266  entry->blocklist_current=entry->blocklist_current->next;
267  entry->current_pos=entry->blocklist_current->offset;
268  entry->bytes_left=entry->blocklist_current->len;
269  DPRINTFP((2, DFUNCTION, -1, "another block found (%ld,%ld)\n", (long) entry->current_pos, (long) entry->bytes_left ));
270 
271  } else {
272  /* stay in current block, but behind end */
273  DPRINTFP((2, DFUNCTION, -1, "no other block found so far ...\n"));
274  entry->current_pos = POS_BEHIND_END;
275  entry->bytes_left = 0;
276  }
277 
278  }
279  rc=SION_SUCCESS;
280  } else {
281  DPRINTFP((2, DFUNCTION, -1, "bytes_read > entry->bytes_left (%zu > %zu)\n", bytes_read, entry->bytes_left));
282  rc=SION_NOT_SUCCESS;
283  }
284  }
285 
286  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
287  return (rc);
288 
289 }
290 #undef DFUNCTION
291 
292 
293 #define DFUNCTION "_sion_keyvalue_keymngr_set_next_scan_pos"
294 int _sion_keyvalue_keymngr_set_next_scan_pos(_sion_keyvalue_keymngr* keymngr, size_t pos) {
295  size_t rc=SION_SUCCESS;
296  DPRINTFP((2, DFUNCTION, -1, "enter pos=%ld\n",rc, (long) pos));
297  keymngr->next_scan_pos=pos;
298  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d next_scan_pos=%ld\n",rc, (long) keymngr->next_scan_pos));
299  return(rc);
300 }
301 #undef DFUNCTION
302 
303 #define DFUNCTION "_sion_keyvalue_keymngr_get_next_scan_pos"
304 int _sion_keyvalue_keymngr_get_next_scan_pos(_sion_keyvalue_keymngr* keymngr, size_t *pos) {
305  size_t rc=SION_SUCCESS;
306  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
307 
308  *pos=keymngr->next_scan_pos;
309 
310  if(*pos==0) {
311  rc=SION_NOT_SUCCESS;
312  }
313 
314  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d pos=%ld\n",rc, (long) *pos));
315  return(rc);
316 }
317 #undef DFUNCTION
318 
319 #define DFUNCTION "_sion_keyvalue_keymngr_set_scan_done"
320 int _sion_keyvalue_keymngr_set_scan_done(_sion_keyvalue_keymngr* keymngr) {
321  size_t rc=SION_SUCCESS;
322  keymngr->scan_done=SION_SUCCESS;
323  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d scan_done=%d\n",rc, (int) keymngr->scan_done));
324  return(rc);
325 }
326 #undef DFUNCTION
327 
328 #define DFUNCTION "_sion_keyvalue_keymngr_is_scan_done"
329 int _sion_keyvalue_keymngr_is_scan_done(_sion_keyvalue_keymngr* keymngr) {
330  size_t rc=keymngr->scan_done;
331  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
332  return(rc);
333 }
334 #undef DFUNCTION
335 
336 
337 #define DFUNCTION "_sion_keyvalue_keymngr_lookup"
338 int _sion_keyvalue_keymngr_lookup(_sion_keyvalue_keymngr* keymngr, sion_table_key_t key, size_t *current_pos, size_t *bytes_left) {
339  size_t rc=SION_SUCCESS;
340  _sion_key_entry *entry;
341  void *p;
342 
343  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
344 
345  /* check if key is known */
346  p=_sion_keyvalue_table_lookup(keymngr->key_table,key);
347  if( p==NULL ) {
348  rc=SION_NOT_SUCCESS;
349  } else {
350  rc=SION_SUCCESS;
351  entry=(_sion_key_entry *) p;
352  DPRINTFP((2, DFUNCTION, -1, "found entry with %d blocks\n",entry->blocks_avail));
353 
354  if(entry->current_pos != POS_BEHIND_END) {
355  *current_pos = entry->current_pos;
356  *bytes_left = entry->bytes_left;
357  DPRINTFP((2, DFUNCTION, -1, "found block #%d --> (%ld,%ld)\n",(int) entry->blocklist_current->blocknum,(long) *current_pos,(long) *bytes_left));
358 
359  } else {
360  rc=SION_NOT_SUCCESS;
361  }
362  }
363 
364  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
365 
366  return(rc);
367 }
368 #undef DFUNCTION
369 
370 #define DFUNCTION "_sion_keyvalue_keymngr_lookup_and_set_pos"
371 int _sion_keyvalue_keymngr_lookup_and_set_pos(_sion_keyvalue_keymngr* keymngr, sion_table_key_t key, int blocknum, sion_int64 posinblock,
372  size_t *current_pos, size_t *bytes_left) {
373  size_t rc=SION_SUCCESS;
374  _sion_key_entry *entry;
375  _sion_key_block_entry *block;
376  sion_int64 absolutepos;
377  void *p;
378 
379  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
380 
381  *current_pos = 0;
382  *bytes_left = 0;
383 
384  /* check if key is known */
385  p=_sion_keyvalue_table_lookup(keymngr->key_table,key);
386  if( p==NULL ) {
387  rc=SION_NOT_SUCCESS;
388  DPRINTFP((2, DFUNCTION, -1, "leave key not found rc=%d\n",rc));
389  return(rc);
390  }
391 
392  entry=(_sion_key_entry *) p;
393  DPRINTFP((2, DFUNCTION, -1, "found entry with %d blocks\n",entry->blocks_avail));
394 
395  if((posinblock == SION_CURRENT_POS) && (blocknum==SION_CURRENT_BLOCK) ) {
396  /* dummy call, leave position unchanged */
397  *current_pos = entry->current_pos;
398  *bytes_left = entry->bytes_left;
399  DPRINTFP((2, DFUNCTION, -1, "dummy call (SION_CURRENT_BLOCK,SION_CURRENT_POS), leave position unchanged rc=%d\n",rc));
400 
401  if(entry->current_pos != POS_BEHIND_END) {
402  rc=SION_SUCCESS;
403  DPRINTFP((2, DFUNCTION, -1, "leave, seek position is already current pos in block %d\n",blocknum));
404  } else {
405  rc=SION_NOT_SUCCESS;
406  DPRINTFP((2, DFUNCTION, -1, "leave, currently behind last block rc=%d\n",rc));
407  }
408  return(rc);
409  }
410 
411  if(blocknum == SION_CURRENT_BLOCK) {
412 
413  DPRINTFP((2, DFUNCTION, -1, "seek in current_block request\n"));
414 
415  if(entry->current_pos != POS_BEHIND_END) {
416 
417  /* search in current block */
418  blocknum = entry->blocklist_current->blocknum;
419  DPRINTFP((2, DFUNCTION, -1, "seek position is in block %d\n",blocknum));
420 
421  } else {
422 
423  rc=SION_NOT_SUCCESS;
424  DPRINTFP((2, DFUNCTION, -1, "leave, currently behind last block rc=%d\n",rc));
425  return(rc);
426  }
427 
428  }
429 
430  if(blocknum != SION_ABSOLUTE_POS) {
431 
432  /* search for blocknum */
433 
434  if ( (blocknum<0) || (blocknum >= entry->blocks_avail)) {
435  rc=SION_NOT_SUCCESS; /* wrong blocknum */
436  DPRINTFP((2, DFUNCTION, -1, "leave, blocknum outside known range rc=%d\n",rc));
437  return(rc);
438  }
439 
440  /* determine start block for search: current block or head of list */
441  if (blocknum >= entry->blocklist_current->blocknum) {
442  block=entry->blocklist_current;
443  } else {
444  block=entry->blocklist_head;
445  }
446 
447  /* loop until block found */
448  while ((block!=NULL) && (blocknum != block->blocknum)) {
449  block=block->next;
450  }
451  if(block==NULL) {
452  DPRINTFP((2, DFUNCTION, -1, "internal error: somethink went wrong, seek, aborting, blocknum position block is NULL\n"));
453  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"internal error: somethink went wrong, seek, aborting ...\n"));
454  }
455 
456  /* position is in block */
457  DPRINTFP((2, DFUNCTION, -1, "position is in this block %d\n",block->blocknum));
458 
459  if((posinblock>=0) && (posinblock < block->len)) {
460  entry->blocklist_current = block;
461  entry->current_pos = entry->blocklist_current->offset + posinblock;
462  entry->bytes_left = block->len - posinblock;
463  DPRINTFP((2, DFUNCTION, -1, "set current pointer for entry: current_pos=%d bytes_left=%d blocknum=%d\n",(int) entry->current_pos,(int) entry->bytes_left, (int) block->blocknum));
464  } else {
465  DPRINTFP((2, DFUNCTION, -1, "wrong pos in block %d\n",(int) posinblock));
466  rc=SION_NOT_SUCCESS; /* wrong pos in block */
467  }
468 
469  } else { /* (blocknum == SION_ABSOLUTE_POS) */
470 
471  absolutepos=posinblock;
472  if ( (absolutepos < 0)
473  || ( absolutepos >= (entry->blocklist_tail->offset_in_entry + entry->blocklist_tail->len) )
474  ){
475  rc=SION_NOT_SUCCESS; /* wrong absolute position */
476  DPRINTFP((2, DFUNCTION, -1, "leave, absolute position outside known range rc=%d\n",rc));
477  return(rc);
478  }
479 
480  /* determine start block for search: current block or head of list */
481  if (absolutepos >= entry->blocklist_current->offset_in_entry) {
482  block=entry->blocklist_current;
483  } else {
484  block=entry->blocklist_head;
485  }
486 
487  /* loop until block found */
488  while ((block!=NULL) && (absolutepos >= (block->offset_in_entry + block->len))) {
489  block=block->next;
490  }
491  if(block==NULL) {
492  DPRINTFP((2, DFUNCTION, -1, "internal error: somethink went wrong, seek, aborting, absolute position block is NULL\n"));
493  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"internal error: somethink went wrong, seek, aborting ...\n"));
494  }
495  /* recompute posinblock */
496  posinblock = absolutepos - block->offset_in_entry;
497 
498  /* set current information */
499  entry->blocklist_current = block;
500  entry->current_pos = entry->blocklist_current->offset + posinblock;
501  entry->bytes_left = block->len - posinblock;
502  DPRINTFP((2, DFUNCTION, -1, "set current pointer for entry: current_pos=%d bytes_left=%d blocknum=%d\n",(int) entry->current_pos,(int) entry->bytes_left, (int) block->blocknum));
503 
504  }
505 
506  *current_pos = entry->current_pos;
507  *bytes_left = entry->bytes_left;
508  DPRINTFP((2, DFUNCTION, -1, "found block --> (%ld,%ld)\n",(long) *current_pos,(long) *bytes_left));
509 
510  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
511 
512  return(rc);
513 }
514 #undef DFUNCTION
515 
516 #define DFUNCTION "_sion_keyvalue_keymngr_iterator_reset"
517  int _sion_keyvalue_keymngr_iterator_reset(_sion_keyvalue_keymngr* keymngr) {
518 
519  int rc=SION_SUCCESS;
520 
521  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
522 
523  keymngr->iterator_last_block = NULL;
524 
525  DPRINTFP((2, DFUNCTION, -1, "leave\n"));
526  return(rc);
527  }
528 #undef DFUNCTION
529 
530  /* moves to next block in write order, sets also internal current_pos to beginning of block */
531  /* this function could also be called after end of block list is reached, this is nedded for supporting iterator on in
532  inline mode where meta data is read on the fly during iteration */
533 #define DFUNCTION "_sion_keyvalue_keymngr_iterator_next"
534  int _sion_keyvalue_keymngr_iterator_next(_sion_keyvalue_keymngr* keymngr, sion_table_key_t *key, size_t *current_pos, size_t *offset, size_t *len) {
535  int rc=SION_SUCCESS;
536  int found = 0;
537 
538  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
539 
540  /* first time called after reset */
541  if(keymngr->iterator_last_block==NULL) {
542  /* next block is the head block */
544  if(keymngr->iterator_last_block!=NULL) {
545  found=1;
546  }
547  } else {
548  /* not already at end of list */
549  if(keymngr->iterator_last_block!=keymngr->block_inwriteorder_tail) {
550  /* move forward in list */
552  if(keymngr->iterator_last_block==NULL) {
553  return(_sion_errorprint(SION_NOT_SUCCESS,_SION_ERROR_RETURN,"internal error: somethink went wrong, checking end of list , aborting ...\n"));
554  }
555  found=1;
556  }
557  }
558 
559  if(found) {
560 
561  /* set current_block and current_pos and bytes_left of entry to this block */
565 
566  /* set output parameters */
567  *key = keymngr->iterator_last_block->entry->key;
568  *offset = keymngr->iterator_last_block->offset;
569  *len = keymngr->iterator_last_block->len;
570  *current_pos= keymngr->iterator_last_block->offset;
571 
572  rc=SION_SUCCESS;
573 
574  } else {
575 
576  rc=SION_NOT_SUCCESS;
577 
578  }
579 
580  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
581  return(rc);
582  }
583 #undef DFUNCTION
584 
585 #define DFUNCTION "_sion_keyvalue_keymngr_key_list_iterator_reset"
586  int _sion_keyvalue_keymngr_key_list_iterator_reset(_sion_keyvalue_keymngr* keymngr) {
587 
588  int rc=SION_SUCCESS;
589 
590  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
591 
592  _sion_keyvalue_table_iterator_reset(keymngr->key_table);
593 
594  DPRINTFP((2, DFUNCTION, -1, "leave\n"));
595  return(rc);
596  }
597 #undef DFUNCTION
598 
599 #define DFUNCTION "_sion_keyvalue_keymngr_key_list_iterator_next"
600  int _sion_keyvalue_keymngr_key_list_iterator_next(_sion_keyvalue_keymngr* keymngr, sion_table_key_t *key) {
601  int rc=SION_SUCCESS;
602  void *data;
603 
604  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
605 
606  rc=_sion_keyvalue_table_iterator_next_in_store_order(keymngr->key_table, key, &data);
607 
608  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d key=%ld\n",rc, (long) *key));
609  return(rc);
610  }
611 #undef DFUNCTION
612 
613  /* this function will create a duplicate of the keymngr structure containing the the description of all blocks
614  of all keys, execpt that the current read posiotion for each key is a beginning of first block of each key */
615 #define DFUNCTION "_sion_keyvalue_keymngr_dup"
616 _sion_keyvalue_keymngr* _sion_keyvalue_keymngr_dup(_sion_keyvalue_keymngr* keymngr_orig, int dup_mode, sion_table_key_t sel_key) {
617  _sion_keyvalue_keymngr *keymngr=NULL;
618  _sion_key_block_entry *block;
619  _sion_key_entry *entry;
620  void *p;
621 
622  DPRINTFP((2, DFUNCTION, -1, "enter dup\n"));
623 
624  keymngr = malloc(sizeof(_sion_keyvalue_keymngr));
625  if (keymngr == NULL) {
626  _sion_errorprint(0,_SION_ERROR_RETURN,"cannot allocate internal keyvalue keymngr of size %lu , aborting ...\n", (unsigned long) sizeof(_sion_keyvalue_keymngr));
627  return(NULL);
628  }
629 
630  /* init main data structure */
631  keymngr->lastentry_used= NULL;
632  keymngr->next_scan_pos = keymngr_orig->next_scan_pos;
633  keymngr->iterator_last_block = NULL;
634  keymngr->block_inwriteorder_head = NULL;
635  keymngr->block_inwriteorder_tail = NULL;
636 
637  /* create empty table of keys */
638  keymngr->size = keymngr_orig->size;
639  keymngr->key_table = _sion_keyvalue_table_init(keymngr_orig->size);
640  if (keymngr->key_table == NULL) {
641  _sion_errorprint(0,_SION_ERROR_RETURN,"cannot allocate internal keyvalue table of for %lu entries , aborting ...\n", (unsigned long) keymngr->size);
642  free(keymngr);
643  return(NULL);
644  }
645 
646 
647  if(dup_mode==SION_DESCSTATE_DUP_SEL_RANK_KEY) {
648  DPRINTFP((2, DFUNCTION, -1, "start duplicating entry for key %llu\n",sel_key));
649 
650  p=_sion_keyvalue_table_lookup(keymngr_orig->key_table,sel_key);
651  if( p!=NULL ) {
652  entry = (_sion_key_entry *) p;
653  block=entry->blocklist_head;
654  while(block!=NULL) {
655  /* using own API routine to add block to new keymngr structure */
656  DPRINTFP((2, DFUNCTION, -1, "add block to keymngr key=%ld offset=%ld len=%ld\n",
657  (long)block->entry->key,(long)block->offset,(long)block->len));
658  _sion_keyvalue_keymngr_add_block(keymngr,block->entry->key,block->offset,block->len);
659  block=block->next;
660  }
661  }
662  DPRINTFP((2, DFUNCTION, -1, "end duplicating one entry\n"));
663 
664  } else {
665  /* loop over all blocks in orig keymngr */
666  DPRINTFP((2, DFUNCTION, -1, "start duplicating all entries\n"));
667  block=keymngr_orig->block_inwriteorder_head;
668  while(block!=NULL) {
669  /* using own API routine to add block to new keymngr structure */
670  DPRINTFP((2, DFUNCTION, -1, "add block to keymngr key=%ld offset=%ld len=%ld\n",
671  (long)block->entry->key,(long)block->offset,(long)block->len));
672  _sion_keyvalue_keymngr_add_block(keymngr,block->entry->key,block->offset,block->len);
673  block=block->next_inwriteorder;
674  }
675  DPRINTFP((2, DFUNCTION, -1, "end duplicating entries\n"));
676  }
677 
678  DPRINTFP((2, DFUNCTION, -1, "leave\n"));
679  return (keymngr);
680  }
681 #undef DFUNCTION
682 
683 #define DFUNCTION "_sion_keyvalue_keymngr_key_get_stat"
684 int _sion_keyvalue_keymngr_key_get_stat(_sion_keyvalue_keymngr* keymngr, sion_table_key_t key, sion_key_stat_t *keystat) {
685  size_t rc=SION_SUCCESS;
686  _sion_key_entry *entry;
687  void *p;
688 
689  DPRINTFP((2, DFUNCTION, -1, "enter key=%ld\n",(long) key));
690 
691  /* check if key is known */
692  p=_sion_keyvalue_table_lookup(keymngr->key_table,key);
693  if( p==NULL ) {
694  rc=SION_NOT_SUCCESS;
695  } else {
696  rc=SION_SUCCESS;
697  entry=(_sion_key_entry *) p;
698  DPRINTFP((2, DFUNCTION, -1, "found entry with %d blocks\n",entry->blocks_avail));
699 
700  if(entry->blocklist_tail != NULL) {
701  keystat->key = (uint64_t) key;
702  keystat->num_blocks = entry->blocklist_tail->blocknum+1;
703  keystat->total_size = entry->blocklist_tail->offset_in_entry+entry->blocklist_tail->len;
704  rc=SION_SUCCESS;
705  DPRINTFP((2, DFUNCTION, -1, "found entry for key %ld (%ld,%ld)\n",(long) keystat->key, (long) keystat->num_blocks, (long) keystat->total_size ));
706  } else {
707  rc=SION_NOT_SUCCESS;
708  }
709  }
710 
711  DPRINTFP((2, DFUNCTION, -1, "leave rc=%d\n",rc));
712 
713  return(rc);
714 }
715 #undef DFUNCTION
716 
717 #define DFUNCTION "_sion_keyvalue_keymngr_key_get_sizeof"
718 int _sion_keyvalue_keymngr_key_get_sizeof(_sion_keyvalue_keymngr* keymngr) {
719  size_t bytes=0, help_bytes;
720  _sion_key_block_entry *block;
721 
722 
723  DPRINTFP((2, DFUNCTION, -1, "enter\n"));
724  help_bytes=sizeof(_sion_keyvalue_keymngr);
725  DPRINTFP((2, DFUNCTION, -1, " sizeof key_keymgr= %5d\n", help_bytes));
726  bytes+=help_bytes;
727 
728  block = keymngr->block_inwriteorder_head;
729  while (block != NULL) {
730  help_bytes=sizeof(_sion_key_block_entry);
731  block = block->next_inwriteorder;
732  }
733  DPRINTFP((2, DFUNCTION, -1, " sizeof key_blocks= %5d\n", help_bytes));
734  bytes+=help_bytes;
735 
736  if(keymngr->key_table) {
737  help_bytes=_sion_keyvalue_table_get_size(keymngr->key_table);
738  DPRINTFP((2, DFUNCTION, -1, " sizeof key_blocks= %5d\n", help_bytes));
739  bytes+=help_bytes;
740 
741  }
742  DPRINTFP((2, DFUNCTION, -1, "leave bytes=%d\n",bytes));
743 
744  return(bytes);
745 }
746 #undef DFUNCTION
_sion_key_block_entry * next
_sion_key_entry * entry
_sion_key_block_entry * next_inwriteorder
size_t offset_in_entry
int blocknum
size_t bytes_left
#define SION_DESCSTATE_DUP_SEL_RANK_KEY
Definition: sion_filedesc.h:44
_sion_key_block_entry * blocklist_current
size_t offset
_sion_keyvalue_table * key_table
#define SION_CURRENT_POS
Definition: sion_const.h:70
int _sion_errorprint(int rc, int level, const char *format,...)
Internal SION error.
#define SION_ABSOLUTE_POS
Definition: sion_const.h:71
_sion_key_block_entry * block_inwriteorder_head
_sion_key_block_entry * blocklist_head
_sion_key_block_entry * block_inwriteorder_tail
#define SION_CURRENT_BLOCK
Definition: sion_const.h:69
size_t len
Sion Time Stamp Header.
_sion_key_block_entry * blocklist_tail
int blocks_avail
ssize_t current_pos
sion_int64 key
_sion_key_block_entry * iterator_last_block