From b044f4e2e9bd906db0bb9dd7614cd3fde8847491 Mon Sep 17 00:00:00 2001 From: Yersa Nordman Date: Thu, 11 Jul 2024 22:45:55 +0200 Subject: [PATCH] Thought up basic methods supporting designed sync protocol --- DESIGN | 4 +- README.md | 100 +--- src/kvsm.c | 1322 ++++++++++++++++++++++++------------------------ src/kvsm.h | 165 +++--- test.c | 20 +- util/kvsmctl.c | 460 ++++++++--------- 6 files changed, 1004 insertions(+), 1067 deletions(-) diff --git a/DESIGN b/DESIGN index 41c5e5c..d976ba8 100644 --- a/DESIGN +++ b/DESIGN @@ -17,7 +17,7 @@ New blob layout header 1 byte transaction version (0) 15 bytes transaction identifier (randomly generated during commit) - 8 bytes height/increment (must be 1 higher than that of highest parent) + 8 bytes height/increment (must be 1 higher than that of highest parent, bubbling down during compaction?) 8 bytes parent offset [] (0 = end-of-list) entry[] 1-2 bytes key length (0 = end of list) @@ -25,7 +25,7 @@ New blob layout 8 bytes data length 0-(2^64-1) bytes data -Transaction sync idea: +Transaction sync idea (part of keveat, not kvsm): Nodes have predefined connections (--join : on cli?) diff --git a/README.md b/README.md index 1b176ef..b30dc74 100644 --- a/README.md +++ b/README.md @@ -24,11 +24,12 @@ This library makes use of [palloc](https://github.com/finwo/palloc.c) to handle blob allocations on a file or block device. Each blob represents a transaction. -From there, a transaction contains a parent reference, an increment number -and a list of key-value pairs. +From there, a transaction contains one or more parent references, an +increment number and a list of key-value pairs. This in turn allows for out-of-order insertion of transactions, compaction -of older transactions, and repairing of references if something went wrong. +of older transactions, and even having multiple nodes sync up their +transactions in a deterministic manner ## API @@ -75,24 +76,24 @@ of older transactions, and repairing of references if something went wrong. ```C struct kvsm { - PALLOC_FD fd; - PALLOC_OFFSET current_offset; - uint64_t current_increment; + PALLOC_FD fd; + PALLOC_OFFSET *head; + int head_count; }; ```
- struct kvsm_cursor + struct kvsm_transaction - Represents a cursor to a kvsm increment/transaction + TBD ```C -struct kvsm_cursor { +struct kvsm_transaction { const struct kvsm *ctx; - PALLOC_OFFSET parent; - PALLOC_OFFSET offset; - uint64_t increment; + const struct buf *id; + PALLOC_OFFSET *parent; + int parent_count; }; ``` @@ -162,81 +163,6 @@ KVSM_RESPONSE kvsm_set(struct kvsm *ctx, const struct buf *key, const struct buf #define kvsm_del(ctx,key) (kvsm_set(ctx,key,&((struct buf){ .len = 0, .cap = 0 }))) ``` -
-
- kvsm_cursor_free(cursor) - - Frees the used memory by the given cursor - -```C -KVSM_RESPONSE kvsm_cursor_free(struct kvsm_cursor *cursor); -``` - -
-
- kvsm_cursor_previous(cursor) - - Returns a NEW cursor, pointing to the given cursor's parent transaction, - or NULL if the given cursor has no parent - -```C -struct kvsm_cursor * kvsm_cursor_previous(const struct kvsm_cursor *cursor); -``` - -
-
- kvsm_cursor_next(cursor) - - Returns a NEW cursor, pointing to the given cursor's child transaction, - or NULL if the given cursor has no child - -```C -struct kvsm_cursor * kvsm_cursor_next(const struct kvsm_cursor *cursor); -``` - -
-
- kvsm_cursor_load(ctx, offset) - - Returns a new cursor, loaded from the transaction at the given offset. - -```C -struct kvsm_cursor * kvsm_cursor_load(const struct kvsm *ctx, PALLOC_OFFSET offset); -``` - -
-
- kvsm_cursor_fetch(ctx, increment) - - Returns a new cursor pointing to the transaction with the given increment, - or to the oldest transaction available higher than the given increment. - -```C -struct kvsm_cursor * kvsm_cursor_fetch(const struct kvsm *ctx, const uint64_t increment); -``` - -
-
- kvsm_cursor_serialize(cursor) - - Returns a buffer representing the serialized transaction, including - increment and values - -```C -struct buf * kvsm_cursor_serialize(const struct kvsm_cursor *cursor); -``` - -
-
- kvsm_cursor_ingest(ctx, serialized) - - Ingests the given serialized transaction, inserting it with the existing - increment instead of writing a new one - -```C -KVSM_RESPONSE kvsm_cursor_ingest(struct kvsm *ctx, const struct buf *serialized); -``` -
## Example diff --git a/src/kvsm.c b/src/kvsm.c index b501a60..a7b5bf9 100644 --- a/src/kvsm.c +++ b/src/kvsm.c @@ -1,661 +1,661 @@ -#include -#include - -#include "finwo/endian.h" -#include "finwo/io.h" -#include "rxi/log.h" -#include "tidwall/buf.h" - -#include "kvsm.h" - -#define FLAG_HYDRATED 1 -#define FLAG_NONRECURSIVE 2 - -// Loads JUST the info, not the data -struct kvsm_cursor * kvsm_cursor_load(const struct kvsm *ctx, PALLOC_OFFSET offset) { - struct kvsm_cursor *cursor = NULL; - - // Version check - uint8_t version; - seek_os(ctx->fd, offset , SEEK_SET); - read_os(ctx->fd, &version, sizeof(version)); - if (version != 0) { - log_error("Incompatible version at %lld", offset); - return NULL; - } - - // Actually reserve memory - cursor = calloc(1, sizeof(struct kvsm_cursor)); - cursor->offset = offset; - cursor->ctx = ctx; - - // Parent pointer overlaps with version, 56 bits of offset should be plenty - // Why offset and not increment: offset is single-volume, increment may be shared between systems - PALLOC_OFFSET parent; - seek_os(ctx->fd, offset , SEEK_SET); - read_os(ctx->fd, &parent, sizeof(parent)); - cursor->parent = be64toh(parent); - // No need to remove version, we're version 0 - - uint64_t increment; - read_os(ctx->fd, &increment, sizeof(increment)); - cursor->increment = be64toh(increment); - log_trace("Read increment %lld", cursor->increment); - - return cursor; -} - -// Assumes the parent still exists and hasn't been deleted by compaction -struct kvsm_cursor * kvsm_cursor_previous(const struct kvsm_cursor *cursor) { - if (!cursor) return NULL; - if (!cursor->ctx) return NULL; - if (!cursor->parent) return NULL; - // Turns the cursor into it's own parent - return kvsm_cursor_load(cursor->ctx, cursor->parent); -} - -// Assumes the cursor still exists and hasn't been deleted by compaction -struct kvsm_cursor * kvsm_cursor_next(const struct kvsm_cursor *cursor) { - if (!cursor) return NULL; - if (!cursor->ctx) return NULL; - const struct kvsm *ctx = cursor->ctx; - - PALLOC_OFFSET parent; - PALLOC_OFFSET current = ctx->current_offset; - PALLOC_OFFSET child = 0; - uint64_t increment; - uint8_t len8; - - while(current) { - // Sanity-check version - seek_os(ctx->fd, current, SEEK_SET); - read_os(ctx->fd, &len8, sizeof(len8)); - if (len8 != 0) return NULL; - seek_os(ctx->fd, current, SEEK_SET); - read_os(ctx->fd, &parent, sizeof(parent)); - parent = be64toh(parent); - // No need to un-mix version, we're 0 - - // Return child (a.k.a. next in time) when we find ourselves - read_os(ctx->fd, &increment, sizeof(increment)); - increment = be64toh(increment); - if (increment == cursor->increment) { - return kvsm_cursor_load(ctx, child); - } - - // Go to the parent - child = current; - current = parent; - } - - // Not found - return NULL; -} - -// Fetches a specific increment, not a specific offset -struct kvsm_cursor * kvsm_cursor_fetch(const struct kvsm *ctx, const uint64_t increment) { - if (!ctx) return NULL; - - PALLOC_OFFSET parent; - PALLOC_OFFSET current = ctx->current_offset; - PALLOC_OFFSET child = ctx->current_offset; - uint64_t _increment; - uint8_t len8; - - while(current) { - // Sanity-check version - seek_os(ctx->fd, current, SEEK_SET); - read_os(ctx->fd, &len8, sizeof(len8)); - if (len8 != 0) return NULL; - seek_os(ctx->fd, current, SEEK_SET); - read_os(ctx->fd, &parent, sizeof(parent)); - parent = be64toh(parent); - // No need to un-mix version, we're 0 - read_os(ctx->fd, &_increment, sizeof(_increment)); - _increment = be64toh(_increment); - - // Found target increment - if (_increment == increment) { - return kvsm_cursor_load(ctx, current); - } - - // Found target increment - if (_increment <= increment) { - return kvsm_cursor_load(ctx, child); - } - - // TODO: make optional - // Return if we're the "oldest" - if (!parent) { - return kvsm_cursor_load(ctx, current); - } - - // Go to the parent - child = current; - current = parent; - } - - // Not found - return NULL; -} - -struct kvsm * kvsm_open(const char *filename, const int isBlockDev) { - log_trace("call: kvsm_open(%s,%d)", filename, isBlockDev); - struct kvsm_cursor *cursor = NULL; - PALLOC_FLAGS flags = PALLOC_DEFAULT; - if (!isBlockDev) flags |= PALLOC_DYNAMIC; - struct kvsm *ctx = calloc(1, sizeof(*ctx)); - - if (!ctx) { - log_error("Could not reserve memory for kvsm context"); - return NULL; - } - - ctx->fd = palloc_open(filename, flags); - if (!ctx->fd) { - log_error("Could not open storage medium: %d", filename); - return NULL; - } - - log_debug("Initializing blob storage"); - PALLOC_RESPONSE r = palloc_init(ctx->fd, flags); - if (r != PALLOC_OK) { - log_error("Error during medium initialization", filename); - palloc_close(ctx->fd); - return NULL; - } - - log_debug("Searching for kv root"); - PALLOC_OFFSET off = palloc_next(ctx->fd, 0); - while(off) { - log_trace("Scanning %lld for root", off); - cursor = kvsm_cursor_load(ctx, off); - if (!cursor) { - log_trace("Not supported: %lld", off); - off = palloc_next(ctx->fd, off); - continue; - } - - log_trace("Loaded tx: %llx, %lld", cursor->offset, cursor->increment); - if (cursor->increment > ctx->current_increment) { - log_trace("Promoted to new root"); - ctx->current_increment = cursor->increment; - ctx->current_offset = off; - } - - kvsm_cursor_free(cursor); - off = palloc_next(ctx->fd, off); - } - - log_debug("Detected root: %llx, %d", ctx->current_offset, ctx->current_increment); - return ctx; -} - -KVSM_RESPONSE kvsm_close(struct kvsm *ctx) { - if (!ctx) return KVSM_ERROR; - palloc_close(ctx->fd); - free(ctx); - return KVSM_OK; -} - -KVSM_RESPONSE kvsm_cursor_free(struct kvsm_cursor *cursor) { - free(cursor); - return KVSM_OK; -} - -struct _kvsm_get_response { - const struct buf *key; - struct buf *value; - uint64_t increment; -}; -// DOES support multi-value transactions -struct _kvsm_get_response * _kvsm_get(const struct kvsm *ctx, const struct buf *key, bool load_value) { - log_trace("call: kvsm_get(...)"); - - if (key->len >= 32768) { - log_error("key too large"); - return NULL; - } - - uint8_t len8; - uint16_t len16; - uint64_t len64; - PALLOC_OFFSET off = ctx->current_offset; - PALLOC_OFFSET parent; - uint64_t increment; - struct buf k = {}; - struct buf *v = NULL; - struct _kvsm_get_response *resp = NULL; - - while(off) { - log_trace("Checking %lld", off); - seek_os(ctx->fd, off, SEEK_SET); - read_os(ctx->fd, &len8, sizeof(len8)); - if (len8 != 0) return NULL; - - seek_os(ctx->fd, off, SEEK_SET); - read_os(ctx->fd, &parent, sizeof(parent)); - parent = be64toh(parent); - - read_os(ctx->fd, &increment, sizeof(increment)); - increment = be64toh(increment); - - while(true) { - - // Read key length - read_os(ctx->fd, &len8, sizeof(len8)); - if (!len8) break; - len16 = len8 & 127; - if (len8 & 128) { - len16 = len16 << 8; - read_os(ctx->fd, &len8, sizeof(len8)); - len16 |= len8; - } - - // Read key data - k.data = malloc(len16); - k.len = len16; - k.cap = len16; - read_os(ctx->fd, k.data, k.len); - - // Read value length - read_os(ctx->fd, &len64, sizeof(len64)); - len64 = be64toh(len64); - - // Different length = no match - if (k.len != key->len) { - seek_os(ctx->fd, len64, SEEK_CUR); - buf_clear(&k); - continue; - } - - // Different data = no match - if (memcmp(k.data, key->data, k.len)) { - seek_os(ctx->fd, len64, SEEK_CUR); - buf_clear(&k); - continue; - } - - // Here = found - buf_clear(&k); - - // Handle delete marker response - if (!len64) { - return NULL; - } - - resp = malloc(sizeof(struct _kvsm_get_response)); - if (!resp) { - log_error("Error during memory allocation for get return wrapper"); - return NULL; - } - resp->increment = increment; - resp->key = key; - - if (load_value) { - v = calloc(1, sizeof(struct buf)); - if (!v) { - log_error("Error during memory allocation for get return struct"); - return NULL; - } - v->len = len64; - v->cap = len64; - v->data = malloc(len64); - if (!v->data) { - free(v); - log_error("Error during memory allocation for get return blob"); - return NULL; - } - - read_os(ctx->fd, v->data, len64); - resp->value = v; - } - - return resp; - } - - off = parent; - } - - // Not found - return NULL; -} -struct buf * kvsm_get(const struct kvsm *ctx, const struct buf *key) { - struct _kvsm_get_response *response = _kvsm_get(ctx, key, true); - if (!response) return NULL; - struct buf *value = response->value; - free(response); - return value; -} - -// Only gets the increment -uint64_t kvsm_get_increment(const struct kvsm *ctx, const struct buf *key) { - struct _kvsm_get_response *response = _kvsm_get(ctx, key, false); - if (!response) return 0; - uint64_t increment = response->increment; - free(response); - return increment; -} - -// DOES NOT support multi-value transactions -// Does close the list as if it supports them though -KVSM_RESPONSE kvsm_set(struct kvsm *ctx, const struct buf *key, const struct buf *value) { - log_trace("call: kvsm_set(...)"); - - if (key->len >= 32768) { - log_error("key too large"); - return KVSM_ERROR; - } - - // Calculate transaction size - size_t tx_size = 0; - tx_size += 8; // version|parent - tx_size += 8; // increment - if (key->len >= 128) { - tx_size += 2; - } else { - tx_size += 1; - } - tx_size += key->len; - tx_size += 8; // value size - tx_size += value->len; - tx_size += 1; // End-of-list - - log_trace("Reserving %lld bytes", tx_size); - PALLOC_OFFSET offset = palloc(ctx->fd, tx_size); - PALLOC_OFFSET parent = htobe64(ctx->current_offset); - // No need to merge version, we're 0 - seek_os(ctx->fd, offset, SEEK_SET); - write_os(ctx->fd, &parent, sizeof(parent)); - parent = be64toh(parent); - - uint64_t increment = htobe64(ctx->current_increment + 1); - write_os(ctx->fd, &increment, sizeof(parent)); - increment = be64toh(increment); - - uint8_t len8; - if (key->len >= 128) { - len8 = 128 | (key->len >> 8); - write_os(ctx->fd, &len8, sizeof(len8)); - len8 = key->len & 255; - write_os(ctx->fd, &len8, sizeof(len8)); - } else { - len8 = key->len; - write_os(ctx->fd, &len8, sizeof(len8)); - } - - write_os(ctx->fd, key->data, key->len); - - uint64_t valsize = htobe64(value->len); - write_os(ctx->fd, &valsize, sizeof(valsize)); - write_os(ctx->fd, value->data, value->len); - - len8 = 0; - write_os(ctx->fd, &len8, sizeof(len8)); - - if (increment > ctx->current_increment) { - ctx->current_increment = increment; - ctx->current_offset = offset; - } - - return KVSM_OK; -} - -// Caution: lazy algorithm -// Goes through every "transaction", and discards them if it only contains non-current versions -struct kvsm_compact_track { - uint16_t len; - char *dat; -}; -KVSM_RESPONSE kvsm_compact(const struct kvsm *ctx) { - PALLOC_OFFSET child = 0; - PALLOC_OFFSET current = ctx->current_offset; - PALLOC_OFFSET parent = 0; - PALLOC_OFFSET tmp = 0; - - bool discardable; - uint64_t current_increment; - - uint8_t len8; - uint16_t len16; - uint64_t len64; - - struct buf key; - - while(current) { - discardable = true; - log_trace("Checking 0x%llx for being discardable", current); - - seek_os(ctx->fd, current, SEEK_SET); - read_os(ctx->fd, &len8, sizeof(len8)); - if (len8 != 0) return KVSM_ERROR; // Only version 0 supported - - seek_os(ctx->fd, current, SEEK_SET); - read_os(ctx->fd, &parent, sizeof(parent)); - parent = be64toh(parent); - // No need to un-mix version, we're 0 - - read_os(ctx->fd, ¤t_increment, sizeof(current_increment)); - current_increment = be64toh(current_increment); - - while(true) { - log_trace("Cursor: 0x%llx", seek_os(ctx->fd, 0, SEEK_CUR)); - - // Read key length - read_os(ctx->fd, &len8, sizeof(len8)); - log_trace("Len: %d", len8); - if (!len8) break; // End of list - len16 = len8 & 127; - if (len8 >= 128) { - len16 = len16 << 8; - read_os(ctx->fd, &len8, sizeof(len8)); - len16 |= len8; - } - log_trace("Keylen %d in 0x%llx", len16, current); - - // Read key data - key.data = malloc(len16); - key.len = len16; - key.cap = len16; - read_os(ctx->fd, key.data, key.len); - - // Read value length - read_os(ctx->fd, &len64, sizeof(len64)); - len64 = be64toh(len64); - - // Save current location, fetching increment moves fd cursor - tmp = seek_os(ctx->fd, 0, SEEK_CUR); - - // Mark as non-discardable if it's being used for the found key - if (kvsm_get_increment(ctx, &key) == current_increment) { - discardable = false; - seek_os(ctx->fd, tmp, SEEK_SET); - buf_clear(&key); - break; - } - - // Return to after data to continue checking the list - buf_clear(&key); - seek_os(ctx->fd, tmp+len64, SEEK_SET); - - } - - log_trace("status for %llx: %sdiscardable", current, discardable ? "" : "not "); - - // Go to the next transaction - if (!discardable) { - child = current; - current = parent; - continue; - } - - log_debug("Discarding increment %d at %llx", current_increment, current); - - if (child) { - // Update child to point to our parent - // Sanity check - seek_os(ctx->fd, child, SEEK_SET); - read_os(ctx->fd, &len8, sizeof(len8)); - if (len8 != 0) return KVSM_ERROR; // Only version 0 supported, bail - - seek_os(ctx->fd, child, SEEK_SET); - // No need to mix version, we're 0 - parent = htobe64(parent); - write_os(ctx->fd, &parent, sizeof(parent)); - parent = be64toh(parent); - } else { - // No child, but we won't remove root, it's always up-to-date - } - - // Free used space - pfree(ctx->fd, current); - current = parent; - // Don't update child, hasn't changed - } - - return KVSM_OK; -} - -struct buf * kvsm_cursor_serialize(const struct kvsm_cursor *cursor) { - log_trace("call: kvsm_cursor_serialize(%lld)", cursor->increment); - const struct kvsm *ctx = cursor->ctx; - - uint8_t len8; - uint16_t len16; - uint64_t len64; - - // Sanity-check the version - seek_os(ctx->fd, cursor->offset, SEEK_SET); - read_os(ctx->fd, &len8, sizeof(len8)); - if (len8 != 0) return NULL; - - struct buf *output = calloc(1, sizeof(struct buf)); - - buf_append_byte(output, 0); // Serialized format 0 - - seek_os(ctx->fd, cursor->offset + sizeof(PALLOC_OFFSET), SEEK_SET); - read_os(ctx->fd, &len64, sizeof(len64)); - buf_append(output, &len64, sizeof(len64)); - - while(1) { - read_os(ctx->fd, &len8, sizeof(len8)); - buf_append(output, &len8, sizeof(len8)); - if (!len8) break; - - len16 = len8 & 127; - if (len8 & 128) { - len16 = len16 << 8; - read_os(ctx->fd, &len8, sizeof(len8)); - buf_append(output, &len8, sizeof(len8)); - len16 |= len8; - } - - if ((output->cap - output->len) < len16) { - output->data = realloc(output->data, output->len + len16); - output->cap = output->len + len16; - } - read_os(ctx->fd, output->data + output->len, len16); - output->len += len16; - - read_os(ctx->fd, &len64, sizeof(len64)); - buf_append(output, &len64, sizeof(len64)); - len64 = be64toh(len64); - if ((output->cap - output->len) < len64) { - output->data = realloc(output->data, output->len + len64); - output->cap = output->len + len64; - } - read_os(ctx->fd, output->data + output->len, len64); - output->len += len64; - } - - return output; -} - -KVSM_RESPONSE kvsm_cursor_ingest(struct kvsm *ctx, const struct buf *serialized) { - log_trace("call: kvsm_cursor_ingest(...)"); - - if (serialized->len < 1) { - log_error("Invalid length to ingest"); - return KVSM_ERROR; - } - - if (serialized->data[0] != 0) { - log_error("Ingestable has unsupported version"); - return KVSM_ERROR; - } - - uint8_t len8; - uint8_t len64; - uint64_t increment; - memcpy(&increment, &(serialized->data[1]), sizeof(increment)); - increment = be64toh(increment); - - PALLOC_OFFSET child = 0; - PALLOC_OFFSET current = 0; - PALLOC_OFFSET parent = 0; - if (increment > ctx->current_increment) { - // Newer than we currently have, dangle at the front - parent = ctx->current_offset; - } else { - - current = ctx->current_offset; - while(1) { - seek_os(ctx->fd, current, SEEK_SET); - read_os(ctx->fd, &len8, sizeof(len8)); - if (len8 != 0) { - log_error("Encountered unsupported transaction version %d at %lld", len8, seek_os(ctx->fd, 0, SEEK_CUR) - 1); - return KVSM_ERROR; - } - - seek_os(ctx->fd, current, SEEK_SET); - read_os(ctx->fd, &parent, sizeof(parent)); - parent = be64toh(parent); - log_trace("Parent of %llx is %llx", current, parent); - read_os(ctx->fd, &len64, sizeof(len64)); - len64 = be64toh(len64); - - // No more parents - // Return to dangle at the end - if (!parent) { - child = current; - break; - } - - // Found spot to insert between - if (len64 < increment) { - parent = current; - break; - } - - // Loop to the next parent - child = current; - current = parent; - } - } - - // Start actually writing - current = palloc(ctx->fd, serialized->len + sizeof(parent) - 1); - seek_os(ctx->fd, current, SEEK_SET); - // No need to mix version, we're 0 - log_debug("Ingesting with parent %llx", parent); - parent = htobe64(parent); - write_os(ctx->fd, &parent, sizeof(parent)); - // Write serialized data as-is, it matches our on-disk format - write_os(ctx->fd, serialized->data + 1, serialized->len - 1); - - // Update child's parent pointer if we got one - log_debug("Ingesting with child %llx", child); - if (child) { - current = htobe64(current); - seek_os(ctx->fd, child, SEEK_SET); - write_os(ctx->fd, ¤t, sizeof(current)); - } - - // Update root marker if needed - if (increment > ctx->current_increment) { - ctx->current_increment = increment; - ctx->current_offset = current; - } - - return KVSM_OK; -} +/*#include */ +/*#include */ +/**/ +/*#include "finwo/endian.h"*/ +/*#include "finwo/io.h"*/ +/*#include "rxi/log.h"*/ +/*#include "tidwall/buf.h"*/ +/**/ +/*#include "kvsm.h"*/ +/**/ +/*#define FLAG_HYDRATED 1*/ +/*#define FLAG_NONRECURSIVE 2*/ +/**/ +/*// Loads JUST the info, not the data*/ +/*struct kvsm_cursor * kvsm_cursor_load(const struct kvsm *ctx, PALLOC_OFFSET offset) {*/ +/* struct kvsm_cursor *cursor = NULL;*/ +/**/ +/* // Version check*/ +/* uint8_t version;*/ +/* seek_os(ctx->fd, offset , SEEK_SET);*/ +/* read_os(ctx->fd, &version, sizeof(version));*/ +/* if (version != 0) {*/ +/* log_error("Incompatible version at %lld", offset);*/ +/* return NULL;*/ +/* }*/ +/**/ +/* // Actually reserve memory*/ +/* cursor = calloc(1, sizeof(struct kvsm_cursor));*/ +/* cursor->offset = offset;*/ +/* cursor->ctx = ctx;*/ +/**/ +/* // Parent pointer overlaps with version, 56 bits of offset should be plenty*/ +/* // Why offset and not increment: offset is single-volume, increment may be shared between systems*/ +/* PALLOC_OFFSET parent;*/ +/* seek_os(ctx->fd, offset , SEEK_SET);*/ +/* read_os(ctx->fd, &parent, sizeof(parent));*/ +/* cursor->parent = be64toh(parent);*/ +/* // No need to remove version, we're version 0*/ +/**/ +/* uint64_t increment;*/ +/* read_os(ctx->fd, &increment, sizeof(increment));*/ +/* cursor->increment = be64toh(increment);*/ +/* log_trace("Read increment %lld", cursor->increment);*/ +/**/ +/* return cursor;*/ +/*}*/ +/**/ +/*// Assumes the parent still exists and hasn't been deleted by compaction*/ +/*struct kvsm_cursor * kvsm_cursor_previous(const struct kvsm_cursor *cursor) {*/ +/* if (!cursor) return NULL;*/ +/* if (!cursor->ctx) return NULL;*/ +/* if (!cursor->parent) return NULL;*/ +/* // Turns the cursor into it's own parent*/ +/* return kvsm_cursor_load(cursor->ctx, cursor->parent);*/ +/*}*/ +/**/ +/*// Assumes the cursor still exists and hasn't been deleted by compaction*/ +/*struct kvsm_cursor * kvsm_cursor_next(const struct kvsm_cursor *cursor) {*/ +/* if (!cursor) return NULL;*/ +/* if (!cursor->ctx) return NULL;*/ +/* const struct kvsm *ctx = cursor->ctx;*/ +/**/ +/* PALLOC_OFFSET parent;*/ +/* PALLOC_OFFSET current = ctx->current_offset;*/ +/* PALLOC_OFFSET child = 0;*/ +/* uint64_t increment;*/ +/* uint8_t len8;*/ +/**/ +/* while(current) {*/ +/* // Sanity-check version*/ +/* seek_os(ctx->fd, current, SEEK_SET);*/ +/* read_os(ctx->fd, &len8, sizeof(len8));*/ +/* if (len8 != 0) return NULL;*/ +/* seek_os(ctx->fd, current, SEEK_SET);*/ +/* read_os(ctx->fd, &parent, sizeof(parent));*/ +/* parent = be64toh(parent);*/ +/* // No need to un-mix version, we're 0*/ +/**/ +/* // Return child (a.k.a. next in time) when we find ourselves*/ +/* read_os(ctx->fd, &increment, sizeof(increment));*/ +/* increment = be64toh(increment);*/ +/* if (increment == cursor->increment) {*/ +/* return kvsm_cursor_load(ctx, child);*/ +/* }*/ +/**/ +/* // Go to the parent*/ +/* child = current;*/ +/* current = parent;*/ +/* }*/ +/**/ +/* // Not found*/ +/* return NULL;*/ +/*}*/ +/**/ +/*// Fetches a specific increment, not a specific offset*/ +/*struct kvsm_cursor * kvsm_cursor_fetch(const struct kvsm *ctx, const uint64_t increment) {*/ +/* if (!ctx) return NULL;*/ +/**/ +/* PALLOC_OFFSET parent;*/ +/* PALLOC_OFFSET current = ctx->current_offset;*/ +/* PALLOC_OFFSET child = ctx->current_offset;*/ +/* uint64_t _increment;*/ +/* uint8_t len8;*/ +/**/ +/* while(current) {*/ +/* // Sanity-check version*/ +/* seek_os(ctx->fd, current, SEEK_SET);*/ +/* read_os(ctx->fd, &len8, sizeof(len8));*/ +/* if (len8 != 0) return NULL;*/ +/* seek_os(ctx->fd, current, SEEK_SET);*/ +/* read_os(ctx->fd, &parent, sizeof(parent));*/ +/* parent = be64toh(parent);*/ +/* // No need to un-mix version, we're 0*/ +/* read_os(ctx->fd, &_increment, sizeof(_increment));*/ +/* _increment = be64toh(_increment);*/ +/**/ +/* // Found target increment*/ +/* if (_increment == increment) {*/ +/* return kvsm_cursor_load(ctx, current);*/ +/* }*/ +/**/ +/* // Found target increment*/ +/* if (_increment <= increment) {*/ +/* return kvsm_cursor_load(ctx, child);*/ +/* }*/ +/**/ +/* // TODO: make optional*/ +/* // Return if we're the "oldest"*/ +/* if (!parent) {*/ +/* return kvsm_cursor_load(ctx, current);*/ +/* }*/ +/**/ +/* // Go to the parent*/ +/* child = current;*/ +/* current = parent;*/ +/* }*/ +/**/ +/* // Not found*/ +/* return NULL;*/ +/*}*/ +/**/ +/*struct kvsm * kvsm_open(const char *filename, const int isBlockDev) {*/ +/* log_trace("call: kvsm_open(%s,%d)", filename, isBlockDev);*/ +/* struct kvsm_cursor *cursor = NULL;*/ +/* PALLOC_FLAGS flags = PALLOC_DEFAULT;*/ +/* if (!isBlockDev) flags |= PALLOC_DYNAMIC;*/ +/* struct kvsm *ctx = calloc(1, sizeof(*ctx));*/ +/**/ +/* if (!ctx) {*/ +/* log_error("Could not reserve memory for kvsm context");*/ +/* return NULL;*/ +/* }*/ +/**/ +/* ctx->fd = palloc_open(filename, flags);*/ +/* if (!ctx->fd) {*/ +/* log_error("Could not open storage medium: %d", filename);*/ +/* return NULL;*/ +/* }*/ +/**/ +/* log_debug("Initializing blob storage");*/ +/* PALLOC_RESPONSE r = palloc_init(ctx->fd, flags);*/ +/* if (r != PALLOC_OK) {*/ +/* log_error("Error during medium initialization", filename);*/ +/* palloc_close(ctx->fd);*/ +/* return NULL;*/ +/* }*/ +/**/ +/* log_debug("Searching for kv root");*/ +/* PALLOC_OFFSET off = palloc_next(ctx->fd, 0);*/ +/* while(off) {*/ +/* log_trace("Scanning %lld for root", off);*/ +/* cursor = kvsm_cursor_load(ctx, off);*/ +/* if (!cursor) {*/ +/* log_trace("Not supported: %lld", off);*/ +/* off = palloc_next(ctx->fd, off);*/ +/* continue;*/ +/* }*/ +/**/ +/* log_trace("Loaded tx: %llx, %lld", cursor->offset, cursor->increment);*/ +/* if (cursor->increment > ctx->current_increment) {*/ +/* log_trace("Promoted to new root");*/ +/* ctx->current_increment = cursor->increment;*/ +/* ctx->current_offset = off;*/ +/* }*/ +/**/ +/* kvsm_cursor_free(cursor);*/ +/* off = palloc_next(ctx->fd, off);*/ +/* }*/ +/**/ +/* log_debug("Detected root: %llx, %d", ctx->current_offset, ctx->current_increment);*/ +/* return ctx;*/ +/*}*/ +/**/ +/*KVSM_RESPONSE kvsm_close(struct kvsm *ctx) {*/ +/* if (!ctx) return KVSM_ERROR;*/ +/* palloc_close(ctx->fd);*/ +/* free(ctx);*/ +/* return KVSM_OK;*/ +/*}*/ +/**/ +/*KVSM_RESPONSE kvsm_cursor_free(struct kvsm_cursor *cursor) {*/ +/* free(cursor);*/ +/* return KVSM_OK;*/ +/*}*/ +/**/ +/*struct _kvsm_get_response {*/ +/* const struct buf *key;*/ +/* struct buf *value;*/ +/* uint64_t increment;*/ +/*};*/ +/*// DOES support multi-value transactions*/ +/*struct _kvsm_get_response * _kvsm_get(const struct kvsm *ctx, const struct buf *key, bool load_value) {*/ +/* log_trace("call: kvsm_get(...)");*/ +/**/ +/* if (key->len >= 32768) {*/ +/* log_error("key too large");*/ +/* return NULL;*/ +/* }*/ +/**/ +/* uint8_t len8;*/ +/* uint16_t len16;*/ +/* uint64_t len64;*/ +/* PALLOC_OFFSET off = ctx->current_offset;*/ +/* PALLOC_OFFSET parent;*/ +/* uint64_t increment;*/ +/* struct buf k = {};*/ +/* struct buf *v = NULL;*/ +/* struct _kvsm_get_response *resp = NULL;*/ +/**/ +/* while(off) {*/ +/* log_trace("Checking %lld", off);*/ +/* seek_os(ctx->fd, off, SEEK_SET);*/ +/* read_os(ctx->fd, &len8, sizeof(len8));*/ +/* if (len8 != 0) return NULL;*/ +/**/ +/* seek_os(ctx->fd, off, SEEK_SET);*/ +/* read_os(ctx->fd, &parent, sizeof(parent));*/ +/* parent = be64toh(parent);*/ +/**/ +/* read_os(ctx->fd, &increment, sizeof(increment));*/ +/* increment = be64toh(increment);*/ +/**/ +/* while(true) {*/ +/**/ +/* // Read key length*/ +/* read_os(ctx->fd, &len8, sizeof(len8));*/ +/* if (!len8) break;*/ +/* len16 = len8 & 127;*/ +/* if (len8 & 128) {*/ +/* len16 = len16 << 8;*/ +/* read_os(ctx->fd, &len8, sizeof(len8));*/ +/* len16 |= len8;*/ +/* }*/ +/**/ +/* // Read key data*/ +/* k.data = malloc(len16);*/ +/* k.len = len16;*/ +/* k.cap = len16;*/ +/* read_os(ctx->fd, k.data, k.len);*/ +/**/ +/* // Read value length*/ +/* read_os(ctx->fd, &len64, sizeof(len64));*/ +/* len64 = be64toh(len64);*/ +/**/ +/* // Different length = no match*/ +/* if (k.len != key->len) {*/ +/* seek_os(ctx->fd, len64, SEEK_CUR);*/ +/* buf_clear(&k);*/ +/* continue;*/ +/* }*/ +/**/ +/* // Different data = no match*/ +/* if (memcmp(k.data, key->data, k.len)) {*/ +/* seek_os(ctx->fd, len64, SEEK_CUR);*/ +/* buf_clear(&k);*/ +/* continue;*/ +/* }*/ +/**/ +/* // Here = found*/ +/* buf_clear(&k);*/ +/**/ +/* // Handle delete marker response*/ +/* if (!len64) {*/ +/* return NULL;*/ +/* }*/ +/**/ +/* resp = malloc(sizeof(struct _kvsm_get_response));*/ +/* if (!resp) {*/ +/* log_error("Error during memory allocation for get return wrapper");*/ +/* return NULL;*/ +/* }*/ +/* resp->increment = increment;*/ +/* resp->key = key;*/ +/**/ +/* if (load_value) {*/ +/* v = calloc(1, sizeof(struct buf));*/ +/* if (!v) {*/ +/* log_error("Error during memory allocation for get return struct");*/ +/* return NULL;*/ +/* }*/ +/* v->len = len64;*/ +/* v->cap = len64;*/ +/* v->data = malloc(len64);*/ +/* if (!v->data) {*/ +/* free(v);*/ +/* log_error("Error during memory allocation for get return blob");*/ +/* return NULL;*/ +/* }*/ +/**/ +/* read_os(ctx->fd, v->data, len64);*/ +/* resp->value = v;*/ +/* }*/ +/**/ +/* return resp;*/ +/* }*/ +/**/ +/* off = parent;*/ +/* }*/ +/**/ +/* // Not found*/ +/* return NULL;*/ +/*}*/ +/*struct buf * kvsm_get(const struct kvsm *ctx, const struct buf *key) {*/ +/* struct _kvsm_get_response *response = _kvsm_get(ctx, key, true);*/ +/* if (!response) return NULL;*/ +/* struct buf *value = response->value;*/ +/* free(response);*/ +/* return value;*/ +/*}*/ +/**/ +/*// Only gets the increment*/ +/*uint64_t kvsm_get_increment(const struct kvsm *ctx, const struct buf *key) {*/ +/* struct _kvsm_get_response *response = _kvsm_get(ctx, key, false);*/ +/* if (!response) return 0;*/ +/* uint64_t increment = response->increment;*/ +/* free(response);*/ +/* return increment;*/ +/*}*/ +/**/ +/*// DOES NOT support multi-value transactions*/ +/*// Does close the list as if it supports them though*/ +/*KVSM_RESPONSE kvsm_set(struct kvsm *ctx, const struct buf *key, const struct buf *value) {*/ +/* log_trace("call: kvsm_set(...)");*/ +/**/ +/* if (key->len >= 32768) {*/ +/* log_error("key too large");*/ +/* return KVSM_ERROR;*/ +/* }*/ +/**/ +/* // Calculate transaction size*/ +/* size_t tx_size = 0;*/ +/* tx_size += 8; // version|parent*/ +/* tx_size += 8; // increment*/ +/* if (key->len >= 128) {*/ +/* tx_size += 2;*/ +/* } else {*/ +/* tx_size += 1;*/ +/* }*/ +/* tx_size += key->len;*/ +/* tx_size += 8; // value size*/ +/* tx_size += value->len;*/ +/* tx_size += 1; // End-of-list*/ +/**/ +/* log_trace("Reserving %lld bytes", tx_size);*/ +/* PALLOC_OFFSET offset = palloc(ctx->fd, tx_size);*/ +/* PALLOC_OFFSET parent = htobe64(ctx->current_offset);*/ +/* // No need to merge version, we're 0*/ +/* seek_os(ctx->fd, offset, SEEK_SET);*/ +/* write_os(ctx->fd, &parent, sizeof(parent));*/ +/* parent = be64toh(parent);*/ +/**/ +/* uint64_t increment = htobe64(ctx->current_increment + 1);*/ +/* write_os(ctx->fd, &increment, sizeof(parent));*/ +/* increment = be64toh(increment);*/ +/**/ +/* uint8_t len8;*/ +/* if (key->len >= 128) {*/ +/* len8 = 128 | (key->len >> 8);*/ +/* write_os(ctx->fd, &len8, sizeof(len8));*/ +/* len8 = key->len & 255;*/ +/* write_os(ctx->fd, &len8, sizeof(len8));*/ +/* } else {*/ +/* len8 = key->len;*/ +/* write_os(ctx->fd, &len8, sizeof(len8));*/ +/* }*/ +/**/ +/* write_os(ctx->fd, key->data, key->len);*/ +/**/ +/* uint64_t valsize = htobe64(value->len);*/ +/* write_os(ctx->fd, &valsize, sizeof(valsize));*/ +/* write_os(ctx->fd, value->data, value->len);*/ +/**/ +/* len8 = 0;*/ +/* write_os(ctx->fd, &len8, sizeof(len8));*/ +/**/ +/* if (increment > ctx->current_increment) {*/ +/* ctx->current_increment = increment;*/ +/* ctx->current_offset = offset;*/ +/* }*/ +/**/ +/* return KVSM_OK;*/ +/*}*/ +/**/ +/*// Caution: lazy algorithm*/ +/*// Goes through every "transaction", and discards them if it only contains non-current versions*/ +/*struct kvsm_compact_track {*/ +/* uint16_t len;*/ +/* char *dat;*/ +/*};*/ +/*KVSM_RESPONSE kvsm_compact(const struct kvsm *ctx) {*/ +/* PALLOC_OFFSET child = 0;*/ +/* PALLOC_OFFSET current = ctx->current_offset;*/ +/* PALLOC_OFFSET parent = 0;*/ +/* PALLOC_OFFSET tmp = 0;*/ +/**/ +/* bool discardable;*/ +/* uint64_t current_increment;*/ +/**/ +/* uint8_t len8;*/ +/* uint16_t len16;*/ +/* uint64_t len64;*/ +/**/ +/* struct buf key;*/ +/**/ +/* while(current) {*/ +/* discardable = true;*/ +/* log_trace("Checking 0x%llx for being discardable", current);*/ +/**/ +/* seek_os(ctx->fd, current, SEEK_SET);*/ +/* read_os(ctx->fd, &len8, sizeof(len8));*/ +/* if (len8 != 0) return KVSM_ERROR; // Only version 0 supported*/ +/**/ +/* seek_os(ctx->fd, current, SEEK_SET);*/ +/* read_os(ctx->fd, &parent, sizeof(parent));*/ +/* parent = be64toh(parent);*/ +/* // No need to un-mix version, we're 0*/ +/**/ +/* read_os(ctx->fd, ¤t_increment, sizeof(current_increment));*/ +/* current_increment = be64toh(current_increment);*/ +/**/ +/* while(true) {*/ +/* log_trace("Cursor: 0x%llx", seek_os(ctx->fd, 0, SEEK_CUR));*/ +/**/ +/* // Read key length*/ +/* read_os(ctx->fd, &len8, sizeof(len8));*/ +/* log_trace("Len: %d", len8);*/ +/* if (!len8) break; // End of list*/ +/* len16 = len8 & 127;*/ +/* if (len8 >= 128) {*/ +/* len16 = len16 << 8;*/ +/* read_os(ctx->fd, &len8, sizeof(len8));*/ +/* len16 |= len8;*/ +/* }*/ +/* log_trace("Keylen %d in 0x%llx", len16, current);*/ +/**/ +/* // Read key data*/ +/* key.data = malloc(len16);*/ +/* key.len = len16;*/ +/* key.cap = len16;*/ +/* read_os(ctx->fd, key.data, key.len);*/ +/**/ +/* // Read value length*/ +/* read_os(ctx->fd, &len64, sizeof(len64));*/ +/* len64 = be64toh(len64);*/ +/**/ +/* // Save current location, fetching increment moves fd cursor*/ +/* tmp = seek_os(ctx->fd, 0, SEEK_CUR);*/ +/**/ +/* // Mark as non-discardable if it's being used for the found key*/ +/* if (kvsm_get_increment(ctx, &key) == current_increment) {*/ +/* discardable = false;*/ +/* seek_os(ctx->fd, tmp, SEEK_SET);*/ +/* buf_clear(&key);*/ +/* break;*/ +/* }*/ +/**/ +/* // Return to after data to continue checking the list*/ +/* buf_clear(&key);*/ +/* seek_os(ctx->fd, tmp+len64, SEEK_SET);*/ +/**/ +/* }*/ +/**/ +/* log_trace("status for %llx: %sdiscardable", current, discardable ? "" : "not ");*/ +/**/ +/* // Go to the next transaction*/ +/* if (!discardable) {*/ +/* child = current;*/ +/* current = parent;*/ +/* continue;*/ +/* }*/ +/**/ +/* log_debug("Discarding increment %d at %llx", current_increment, current);*/ +/**/ +/* if (child) {*/ +/* // Update child to point to our parent*/ +/* // Sanity check*/ +/* seek_os(ctx->fd, child, SEEK_SET);*/ +/* read_os(ctx->fd, &len8, sizeof(len8));*/ +/* if (len8 != 0) return KVSM_ERROR; // Only version 0 supported, bail*/ +/**/ +/* seek_os(ctx->fd, child, SEEK_SET);*/ +/* // No need to mix version, we're 0*/ +/* parent = htobe64(parent);*/ +/* write_os(ctx->fd, &parent, sizeof(parent));*/ +/* parent = be64toh(parent);*/ +/* } else {*/ +/* // No child, but we won't remove root, it's always up-to-date*/ +/* }*/ +/**/ +/* // Free used space*/ +/* pfree(ctx->fd, current);*/ +/* current = parent;*/ +/* // Don't update child, hasn't changed*/ +/* }*/ +/**/ +/* return KVSM_OK;*/ +/*}*/ +/**/ +/*struct buf * kvsm_cursor_serialize(const struct kvsm_cursor *cursor) {*/ +/* log_trace("call: kvsm_cursor_serialize(%lld)", cursor->increment);*/ +/* const struct kvsm *ctx = cursor->ctx;*/ +/**/ +/* uint8_t len8;*/ +/* uint16_t len16;*/ +/* uint64_t len64;*/ +/**/ +/* // Sanity-check the version*/ +/* seek_os(ctx->fd, cursor->offset, SEEK_SET);*/ +/* read_os(ctx->fd, &len8, sizeof(len8));*/ +/* if (len8 != 0) return NULL;*/ +/**/ +/* struct buf *output = calloc(1, sizeof(struct buf));*/ +/**/ +/* buf_append_byte(output, 0); // Serialized format 0*/ +/**/ +/* seek_os(ctx->fd, cursor->offset + sizeof(PALLOC_OFFSET), SEEK_SET);*/ +/* read_os(ctx->fd, &len64, sizeof(len64));*/ +/* buf_append(output, &len64, sizeof(len64));*/ +/**/ +/* while(1) {*/ +/* read_os(ctx->fd, &len8, sizeof(len8));*/ +/* buf_append(output, &len8, sizeof(len8));*/ +/* if (!len8) break;*/ +/**/ +/* len16 = len8 & 127;*/ +/* if (len8 & 128) {*/ +/* len16 = len16 << 8;*/ +/* read_os(ctx->fd, &len8, sizeof(len8));*/ +/* buf_append(output, &len8, sizeof(len8));*/ +/* len16 |= len8;*/ +/* }*/ +/**/ +/* if ((output->cap - output->len) < len16) {*/ +/* output->data = realloc(output->data, output->len + len16);*/ +/* output->cap = output->len + len16;*/ +/* }*/ +/* read_os(ctx->fd, output->data + output->len, len16);*/ +/* output->len += len16;*/ +/**/ +/* read_os(ctx->fd, &len64, sizeof(len64));*/ +/* buf_append(output, &len64, sizeof(len64));*/ +/* len64 = be64toh(len64);*/ +/* if ((output->cap - output->len) < len64) {*/ +/* output->data = realloc(output->data, output->len + len64);*/ +/* output->cap = output->len + len64;*/ +/* }*/ +/* read_os(ctx->fd, output->data + output->len, len64);*/ +/* output->len += len64;*/ +/* }*/ +/**/ +/* return output;*/ +/*}*/ +/**/ +/*KVSM_RESPONSE kvsm_cursor_ingest(struct kvsm *ctx, const struct buf *serialized) {*/ +/* log_trace("call: kvsm_cursor_ingest(...)");*/ +/**/ +/* if (serialized->len < 1) {*/ +/* log_error("Invalid length to ingest");*/ +/* return KVSM_ERROR;*/ +/* }*/ +/**/ +/* if (serialized->data[0] != 0) {*/ +/* log_error("Ingestable has unsupported version");*/ +/* return KVSM_ERROR;*/ +/* }*/ +/**/ +/* uint8_t len8;*/ +/* uint8_t len64;*/ +/* uint64_t increment;*/ +/* memcpy(&increment, &(serialized->data[1]), sizeof(increment));*/ +/* increment = be64toh(increment);*/ +/**/ +/* PALLOC_OFFSET child = 0;*/ +/* PALLOC_OFFSET current = 0;*/ +/* PALLOC_OFFSET parent = 0;*/ +/* if (increment > ctx->current_increment) {*/ +/* // Newer than we currently have, dangle at the front*/ +/* parent = ctx->current_offset;*/ +/* } else {*/ +/**/ +/* current = ctx->current_offset;*/ +/* while(1) {*/ +/* seek_os(ctx->fd, current, SEEK_SET);*/ +/* read_os(ctx->fd, &len8, sizeof(len8));*/ +/* if (len8 != 0) {*/ +/* log_error("Encountered unsupported transaction version %d at %lld", len8, seek_os(ctx->fd, 0, SEEK_CUR) - 1);*/ +/* return KVSM_ERROR;*/ +/* }*/ +/**/ +/* seek_os(ctx->fd, current, SEEK_SET);*/ +/* read_os(ctx->fd, &parent, sizeof(parent));*/ +/* parent = be64toh(parent);*/ +/* log_trace("Parent of %llx is %llx", current, parent);*/ +/* read_os(ctx->fd, &len64, sizeof(len64));*/ +/* len64 = be64toh(len64);*/ +/**/ +/* // No more parents*/ +/* // Return to dangle at the end*/ +/* if (!parent) {*/ +/* child = current;*/ +/* break;*/ +/* }*/ +/**/ +/* // Found spot to insert between*/ +/* if (len64 < increment) {*/ +/* parent = current;*/ +/* break;*/ +/* }*/ +/**/ +/* // Loop to the next parent*/ +/* child = current;*/ +/* current = parent;*/ +/* }*/ +/* }*/ +/**/ +/* // Start actually writing*/ +/* current = palloc(ctx->fd, serialized->len + sizeof(parent) - 1);*/ +/* seek_os(ctx->fd, current, SEEK_SET);*/ +/* // No need to mix version, we're 0*/ +/* log_debug("Ingesting with parent %llx", parent);*/ +/* parent = htobe64(parent);*/ +/* write_os(ctx->fd, &parent, sizeof(parent));*/ +/* // Write serialized data as-is, it matches our on-disk format*/ +/* write_os(ctx->fd, serialized->data + 1, serialized->len - 1);*/ +/**/ +/* // Update child's parent pointer if we got one*/ +/* log_debug("Ingesting with child %llx", child);*/ +/* if (child) {*/ +/* current = htobe64(current);*/ +/* seek_os(ctx->fd, child, SEEK_SET);*/ +/* write_os(ctx->fd, ¤t, sizeof(current));*/ +/* }*/ +/**/ +/* // Update root marker if needed*/ +/* if (increment > ctx->current_increment) {*/ +/* ctx->current_increment = increment;*/ +/* ctx->current_offset = current;*/ +/* }*/ +/**/ +/* return KVSM_OK;*/ +/*}*/ diff --git a/src/kvsm.h b/src/kvsm.h index 6482c5e..abda8a6 100644 --- a/src/kvsm.h +++ b/src/kvsm.h @@ -27,16 +27,17 @@ /// handle blob allocations on a file or block device. Each blob represents a /// transaction. /// -/// From there, a transaction contains a parent reference, an increment number -/// and a list of key-value pairs. +/// From there, a transaction contains one or more parent references, an +/// increment number and a list of key-value pairs. /// /// This in turn allows for out-of-order insertion of transactions, compaction -/// of older transactions, and repairing of references if something went wrong. +/// of older transactions, and even having multiple nodes sync up their +/// transactions in a deterministic manner #include #include "finwo/palloc.h" -#include "tidwall/buf.h" +/*#include "tidwall/buf.h"*/ /// /// ## API @@ -83,23 +84,24 @@ /// Represents a state descriptor for kvsm, holds internal state /// /// + ///
-/// struct kvsm_cursor +/// struct kvsm_transaction /// -/// Represents a cursor to a kvsm increment/transaction +/// TBD /// ///
@@ -165,73 +167,82 @@ KVSM_RESPONSE kvsm_set(struct kvsm *ctx, const struct buf *key, const struct buf ///> /// -///
-/// kvsm_cursor_free(cursor) -/// -/// Frees the used memory by the given cursor -/// -///
-///
-/// kvsm_cursor_previous(cursor) -/// -/// Returns a NEW cursor, pointing to the given cursor's parent transaction, -/// or NULL if the given cursor has no parent -/// -///
+// TODO +// buf kvsm_tx_get_id(ctx,offset) +// tx kvsm_tx_load_id(ctx,identifier) //-- gets metadata/struct, not entries +// resp kvsm_tx_free(tx) +// buf kvsm_tx_serialize(tx) //-- includes data +// resp kvsm_tx_ingest(buf) -///
-/// kvsm_cursor_next(cursor) -/// -/// Returns a NEW cursor, pointing to the given cursor's child transaction, -/// or NULL if the given cursor has no child -/// -///
-///
-/// kvsm_cursor_load(ctx, offset) -/// -/// Returns a new cursor, loaded from the transaction at the given offset. -/// -///
- -///
-/// kvsm_cursor_fetch(ctx, increment) -/// -/// Returns a new cursor pointing to the transaction with the given increment, -/// or to the oldest transaction available higher than the given increment. -/// -///
- -///
-/// kvsm_cursor_serialize(cursor) -/// -/// Returns a buffer representing the serialized transaction, including -/// increment and values -/// -///
- -///
-/// kvsm_cursor_ingest(ctx, serialized) -/// -/// Ingests the given serialized transaction, inserting it with the existing -/// increment instead of writing a new one -/// -///
+// //
+// // kvsm_cursor_free(cursor) +// // +// // Frees the used memory by the given cursor +// // +// //
+// +// //
+// // kvsm_cursor_previous(cursor) +// // +// // Returns a NEW cursor, pointing to the given cursor's parent transaction, +// // or NULL if the given cursor has no parent +// // +// //
+// +// //
+// // kvsm_cursor_next(cursor) +// // +// // Returns a NEW cursor, pointing to the given cursor's child transaction, +// // or NULL if the given cursor has no child +// // +// //
+// +// //
+// // kvsm_cursor_load(ctx, offset) +// // +// // Returns a new cursor, loaded from the transaction at the given offset. +// // +// //
+// +// //
+// // kvsm_cursor_fetch(ctx, increment) +// // +// // Returns a new cursor pointing to the transaction with the given increment, +// // or to the oldest transaction available higher than the given increment. +// // +// //
+// +// //
+// // kvsm_cursor_serialize(cursor) +// // +// // Returns a buffer representing the serialized transaction, including +// // increment and values +// // +// //
+// +// //
+// // kvsm_cursor_ingest(ctx, serialized) +// // +// // Ingests the given serialized transaction, inserting it with the existing +// // increment instead of writing a new one +// // +// //
/// /// ## Example diff --git a/test.c b/test.c index 9e993aa..1a6526f 100644 --- a/test.c +++ b/test.c @@ -9,16 +9,16 @@ #include "src/kvsm.h" void test_kvsm_regular() { - struct kvsm *ctx; - - ctx = kvsm_open("test.db", 0); - ASSERT("Opening a file returns a context", ctx != NULL); - ASSERT("Closing a file context returns OK", kvsm_close(ctx) == KVSM_OK); - - ctx = kvsm_open(NULL, 0); - ASSERT("Opening a NULL returns no context", ctx == NULL); - ASSERT("Closing a NULL context returns ERROR", kvsm_close(ctx) != KVSM_OK); - + /*struct kvsm *ctx;*/ + /**/ + /*ctx = kvsm_open("test.db", 0);*/ + /*ASSERT("Opening a file returns a context", ctx != NULL);*/ + /*ASSERT("Closing a file context returns OK", kvsm_close(ctx) == KVSM_OK);*/ + /**/ + /*ctx = kvsm_open(NULL, 0);*/ + /*ASSERT("Opening a NULL returns no context", ctx == NULL);*/ + /*ASSERT("Closing a NULL context returns ERROR", kvsm_close(ctx) != KVSM_OK);*/ + /**/ } int main() { diff --git a/util/kvsmctl.c b/util/kvsmctl.c index dda8095..a5001d4 100644 --- a/util/kvsmctl.c +++ b/util/kvsmctl.c @@ -1,234 +1,234 @@ -#include -#include -#include -#include - -#include "finwo/io.h" -#include "rxi/log.h" -#include "tidwall/buf.h" - -#include "kvsm.h" - -void usage_global(char **argv) { - printf("\n"); - printf("Usage: %s [global opts] command [command opts]\n", argv[0]); - printf("\n"); - printf("Global options\n"); - printf(" -h Show this usage\n"); - printf(" -f filename Set database file to operate on\n"); - printf(" -v level Set verbosity level (fatal,error,warn,info,debug,trace)\n"); - printf("\n"); - printf("Commands\n"); - printf(" current-increment Outputs the current transaction increment\n"); - printf(" compact Merge transactions, potentially freeing up disk space\n"); - printf(" serialize Serialize a transaction into hex\n"); - printf(" ingest Ingest a hex transaction and store it\n"); - printf(" get [key] Outputs the value of the given/stdin key to stdout\n"); - printf(" del [key] Writes a tombstone on the given/stdin key in a new transaction\n"); - printf(" set Sets the value of the given key to stdin data in a new transaction\n"); - printf("\n"); -} - +/*#include */ +/*#include */ +/*#include */ +/*#include */ +/**/ +/*#include "finwo/io.h"*/ +/*#include "rxi/log.h"*/ +/*#include "tidwall/buf.h"*/ +/**/ +/*#include "kvsm.h"*/ +/**/ +/*void usage_global(char **argv) {*/ +/* printf("\n");*/ +/* printf("Usage: %s [global opts] command [command opts]\n", argv[0]);*/ +/* printf("\n");*/ +/* printf("Global options\n");*/ +/* printf(" -h Show this usage\n");*/ +/* printf(" -f filename Set database file to operate on\n");*/ +/* printf(" -v level Set verbosity level (fatal,error,warn,info,debug,trace)\n");*/ +/* printf("\n");*/ +/* printf("Commands\n");*/ +/* printf(" current-increment Outputs the current transaction increment\n");*/ +/* printf(" compact Merge transactions, potentially freeing up disk space\n");*/ +/* printf(" serialize Serialize a transaction into hex\n");*/ +/* printf(" ingest Ingest a hex transaction and store it\n");*/ +/* printf(" get [key] Outputs the value of the given/stdin key to stdout\n");*/ +/* printf(" del [key] Writes a tombstone on the given/stdin key in a new transaction\n");*/ +/* printf(" set Sets the value of the given key to stdin data in a new transaction\n");*/ +/* printf("\n");*/ +/*}*/ +/**/ int main(int argc, char **argv) { - log_set_level(LOG_INFO); - char *filename = NULL; - char *command = NULL; - int64_t i; - - // Parse global options - int c; - while((c = getopt(argc, argv, "hf:v:")) != -1) { - switch(c) { - case 'h': - usage_global(argv); - return 0; - case 'f': - filename = optarg; - break; - case 'v': - if (0) { - // Intentionally empty - } else if (!strcasecmp(optarg, "trace")) { - log_set_level(LOG_TRACE); - } else if (!strcasecmp(optarg, "debug")) { - log_set_level(LOG_DEBUG); - } else if (!strcasecmp(optarg, "info")) { - log_set_level(LOG_INFO); - } else if (!strcasecmp(optarg, "warn")) { - log_set_level(LOG_WARN); - } else if (!strcasecmp(optarg, "error")) { - log_set_level(LOG_ERROR); - } else if (!strcasecmp(optarg, "fatal")) { - log_set_level(LOG_FATAL); - } else { - log_fatal("Unknown log level: %s", optarg); - return 1; - } - break; - default: - log_fatal("illegal option", c); - return 1; - } - } - if (optind < argc) { - command = argv[optind++]; - } - if (!command) { - log_fatal("No command given"); - return 1; - } - if (!filename) { - log_fatal("No storage file given"); - return 1; - } - - struct kvsm *ctx = kvsm_open(filename, 0); - - if (0) { - // Intentionally empty - } else if (!strcasecmp(command, "mini-stat")) { - - struct kvsm_cursor *current = kvsm_cursor_load(ctx, ctx->current_offset); - struct kvsm_cursor *fetched = kvsm_cursor_fetch(ctx, ctx->current_increment); - struct kvsm_cursor *parent = kvsm_cursor_previous(current); - struct kvsm_cursor *recurrent = kvsm_cursor_next(parent); - - printf("Current : %lld @ %llx\n", current->increment , current->offset ); - printf("Fetched : %lld @ %llx\n", fetched->increment , fetched->offset ); - printf("Parent : %lld @ %llx\n", parent->increment , parent->offset ); - printf("Recurrent: %lld @ %llx\n", recurrent->increment, recurrent->offset); - - } else if (!strcasecmp(command, "current-increment")) { - printf("%lld\n", ctx->current_increment); - } else if (!strcasecmp(command, "compact")) { - kvsm_compact(ctx); - } else if (!strcasecmp(command, "get")) { - struct buf *key = calloc(1, sizeof(struct buf)); - - if (optind < argc) { - buf_append(key, argv[optind], strlen(argv[optind])); - optind++; - } else { - log_fatal("Reading key from stdin not implemented"); - return 1; - } - - struct buf *response = kvsm_get(ctx, key); - if (!response) { - printf("(NULL)\n"); - } else { - write(STDOUT_FILENO, response->data, response->len); - buf_clear(response); - free(response); - } - - } else if (!strcasecmp(command, "del")) { - struct buf *key = calloc(1, sizeof(struct buf)); - - if (optind < argc) { - buf_append(key, argv[optind], strlen(argv[optind])); - optind++; - } else { - log_fatal("Reading key from stdin not implemented"); - return 1; - } - - KVSM_RESPONSE response = kvsm_del(ctx, key); - if (response != KVSM_OK) { - fprintf(stderr, "Error during deletion\n"); - } - - } else if (!strcasecmp(command, "set")) { - struct buf *key = calloc(1, sizeof(struct buf)); - struct buf *value = calloc(1, sizeof(struct buf)); - - if (optind < argc) { - buf_append(key, argv[optind], strlen(argv[optind])); - optind++; - } else { - log_fatal("Reading key from stdin not implemented"); - return 1; - } - - if (optind < argc) { - buf_append(value, argv[optind], strlen(argv[optind])); - optind++; - } else { - log_fatal("Reading value from stdin not implemented"); - return 1; - } - - KVSM_RESPONSE response = kvsm_set(ctx, key, value); - if (response != KVSM_OK) { - fprintf(stderr, "Error during setting of value\n"); - } - - } else if (!strcasecmp(command, "serialize")) { - uint64_t increment = ctx->current_increment; - - if (optind < argc) { - increment = atoll(argv[optind]); - optind++; - } else { - // Intentionally empty, serialized the current increment; - } - - struct kvsm_cursor *cursor = kvsm_cursor_fetch(ctx, increment); - if (!cursor) { - printf("(NULL)\n"); - return 0; - } - - struct buf *serialized = kvsm_cursor_serialize(cursor); - if (!serialized) { - printf("(NULL)\n"); - return 0; - } - - // TODO: optimize? - i = 0; - while(i < serialized->len) { - printf("%02x", *(serialized->data + i)); - i++; - } - printf("\n"); - - buf_clear(serialized); - free(serialized); - - } else { - const char * serialized_raw = NULL; - - if (optind < argc) { - serialized_raw = argv[optind]; - optind++; - } else { - log_fatal("Must provide a serialized transaction"); - return 1; - } - - struct buf *serialized = calloc(1, sizeof(struct buf)); - serialized->cap = strlen(serialized_raw); - serialized->data = malloc(serialized->cap); - if (!serialized->data) { - log_fatal("Unable to reserve memory for decoded increment"); - return 1; - } - - for( i = 0 ; i < serialized->cap ; i += 2 ) { - sscanf(serialized_raw + i, "%2hhx", &(serialized->data[i/2])); - serialized->len++; - } - - if (kvsm_cursor_ingest(ctx, serialized) != KVSM_OK) { - log_fatal("Unable to ingest transaction"); - return 1; - } - - return 0; - } - - kvsm_close(ctx); +/* log_set_level(LOG_INFO);*/ +/* char *filename = NULL;*/ +/* char *command = NULL;*/ +/* int64_t i;*/ +/**/ +/* // Parse global options*/ +/* int c;*/ +/* while((c = getopt(argc, argv, "hf:v:")) != -1) {*/ +/* switch(c) {*/ +/* case 'h':*/ +/* usage_global(argv);*/ +/* return 0;*/ +/* case 'f':*/ +/* filename = optarg;*/ +/* break;*/ +/* case 'v':*/ +/* if (0) {*/ +/* // Intentionally empty*/ +/* } else if (!strcasecmp(optarg, "trace")) {*/ +/* log_set_level(LOG_TRACE);*/ +/* } else if (!strcasecmp(optarg, "debug")) {*/ +/* log_set_level(LOG_DEBUG);*/ +/* } else if (!strcasecmp(optarg, "info")) {*/ +/* log_set_level(LOG_INFO);*/ +/* } else if (!strcasecmp(optarg, "warn")) {*/ +/* log_set_level(LOG_WARN);*/ +/* } else if (!strcasecmp(optarg, "error")) {*/ +/* log_set_level(LOG_ERROR);*/ +/* } else if (!strcasecmp(optarg, "fatal")) {*/ +/* log_set_level(LOG_FATAL);*/ +/* } else {*/ +/* log_fatal("Unknown log level: %s", optarg);*/ +/* return 1;*/ +/* }*/ +/* break;*/ +/* default:*/ +/* log_fatal("illegal option", c);*/ +/* return 1;*/ +/* }*/ +/* }*/ +/* if (optind < argc) {*/ +/* command = argv[optind++];*/ +/* }*/ +/* if (!command) {*/ +/* log_fatal("No command given");*/ +/* return 1;*/ +/* }*/ +/* if (!filename) {*/ +/* log_fatal("No storage file given");*/ +/* return 1;*/ +/* }*/ +/**/ +/* struct kvsm *ctx = kvsm_open(filename, 0);*/ +/**/ +/* if (0) {*/ +/* // Intentionally empty*/ +/* } else if (!strcasecmp(command, "mini-stat")) {*/ +/**/ +/* struct kvsm_cursor *current = kvsm_cursor_load(ctx, ctx->current_offset);*/ +/* struct kvsm_cursor *fetched = kvsm_cursor_fetch(ctx, ctx->current_increment);*/ +/* struct kvsm_cursor *parent = kvsm_cursor_previous(current);*/ +/* struct kvsm_cursor *recurrent = kvsm_cursor_next(parent);*/ +/**/ +/* printf("Current : %lld @ %llx\n", current->increment , current->offset );*/ +/* printf("Fetched : %lld @ %llx\n", fetched->increment , fetched->offset );*/ +/* printf("Parent : %lld @ %llx\n", parent->increment , parent->offset );*/ +/* printf("Recurrent: %lld @ %llx\n", recurrent->increment, recurrent->offset);*/ +/**/ +/* } else if (!strcasecmp(command, "current-increment")) {*/ +/* printf("%lld\n", ctx->current_increment);*/ +/* } else if (!strcasecmp(command, "compact")) {*/ +/* kvsm_compact(ctx);*/ +/* } else if (!strcasecmp(command, "get")) {*/ +/* struct buf *key = calloc(1, sizeof(struct buf));*/ +/**/ +/* if (optind < argc) {*/ +/* buf_append(key, argv[optind], strlen(argv[optind]));*/ +/* optind++;*/ +/* } else {*/ +/* log_fatal("Reading key from stdin not implemented");*/ +/* return 1;*/ +/* }*/ +/**/ +/* struct buf *response = kvsm_get(ctx, key);*/ +/* if (!response) {*/ +/* printf("(NULL)\n");*/ +/* } else {*/ +/* write(STDOUT_FILENO, response->data, response->len);*/ +/* buf_clear(response);*/ +/* free(response);*/ +/* }*/ +/**/ +/* } else if (!strcasecmp(command, "del")) {*/ +/* struct buf *key = calloc(1, sizeof(struct buf));*/ +/**/ +/* if (optind < argc) {*/ +/* buf_append(key, argv[optind], strlen(argv[optind]));*/ +/* optind++;*/ +/* } else {*/ +/* log_fatal("Reading key from stdin not implemented");*/ +/* return 1;*/ +/* }*/ +/**/ +/* KVSM_RESPONSE response = kvsm_del(ctx, key);*/ +/* if (response != KVSM_OK) {*/ +/* fprintf(stderr, "Error during deletion\n");*/ +/* }*/ +/**/ +/* } else if (!strcasecmp(command, "set")) {*/ +/* struct buf *key = calloc(1, sizeof(struct buf));*/ +/* struct buf *value = calloc(1, sizeof(struct buf));*/ +/**/ +/* if (optind < argc) {*/ +/* buf_append(key, argv[optind], strlen(argv[optind]));*/ +/* optind++;*/ +/* } else {*/ +/* log_fatal("Reading key from stdin not implemented");*/ +/* return 1;*/ +/* }*/ +/**/ +/* if (optind < argc) {*/ +/* buf_append(value, argv[optind], strlen(argv[optind]));*/ +/* optind++;*/ +/* } else {*/ +/* log_fatal("Reading value from stdin not implemented");*/ +/* return 1;*/ +/* }*/ +/**/ +/* KVSM_RESPONSE response = kvsm_set(ctx, key, value);*/ +/* if (response != KVSM_OK) {*/ +/* fprintf(stderr, "Error during setting of value\n");*/ +/* }*/ +/**/ +/* } else if (!strcasecmp(command, "serialize")) {*/ +/* uint64_t increment = ctx->current_increment;*/ +/**/ +/* if (optind < argc) {*/ +/* increment = atoll(argv[optind]);*/ +/* optind++;*/ +/* } else {*/ +/* // Intentionally empty, serialized the current increment;*/ +/* }*/ +/**/ +/* struct kvsm_cursor *cursor = kvsm_cursor_fetch(ctx, increment);*/ +/* if (!cursor) {*/ +/* printf("(NULL)\n");*/ +/* return 0;*/ +/* }*/ +/**/ +/* struct buf *serialized = kvsm_cursor_serialize(cursor);*/ +/* if (!serialized) {*/ +/* printf("(NULL)\n");*/ +/* return 0;*/ +/* }*/ +/**/ +/* // TODO: optimize?*/ +/* i = 0;*/ +/* while(i < serialized->len) {*/ +/* printf("%02x", *(serialized->data + i));*/ +/* i++;*/ +/* }*/ +/* printf("\n");*/ +/**/ +/* buf_clear(serialized);*/ +/* free(serialized);*/ +/**/ +/* } else {*/ +/* const char * serialized_raw = NULL;*/ +/**/ +/* if (optind < argc) {*/ +/* serialized_raw = argv[optind];*/ +/* optind++;*/ +/* } else {*/ +/* log_fatal("Must provide a serialized transaction");*/ +/* return 1;*/ +/* }*/ +/**/ +/* struct buf *serialized = calloc(1, sizeof(struct buf));*/ +/* serialized->cap = strlen(serialized_raw);*/ +/* serialized->data = malloc(serialized->cap);*/ +/* if (!serialized->data) {*/ +/* log_fatal("Unable to reserve memory for decoded increment");*/ +/* return 1;*/ +/* }*/ +/**/ +/* for( i = 0 ; i < serialized->cap ; i += 2 ) {*/ +/* sscanf(serialized_raw + i, "%2hhx", &(serialized->data[i/2]));*/ +/* serialized->len++;*/ +/* }*/ +/**/ +/* if (kvsm_cursor_ingest(ctx, serialized) != KVSM_OK) {*/ +/* log_fatal("Unable to ingest transaction");*/ +/* return 1;*/ +/* }*/ +/**/ +/* return 0;*/ +/* }*/ +/**/ +/* kvsm_close(ctx);*/ return 0; }