Skip to content

Commit

Permalink
Use prefixes to distingish between conf and logs
Browse files Browse the repository at this point in the history
Closes #3
  • Loading branch information
aalda committed Nov 23, 2019
1 parent 355100c commit ee05bc4
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 20 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
## UNRELEASED
## v0.2.0 (November 23, 2019)

IMPROVEMENTS

* Use prefixes to distingish between conf and logs.
* Upgrade Badger version to 1.5.5.
* Upgrade Raft version to 1.1.1.

## v0.1.1 (November 19, 2018)
Expand Down
54 changes: 36 additions & 18 deletions badger_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ import (
)

var (
// Prefix names to distingish between logs and conf
prefixLogs = []byte{0x0}
prefixConf = []byte{0x1}

// ErrKeyNotFound is an error indicating a given key does not exist
ErrKeyNotFound = errors.New("not found")
)
Expand Down Expand Up @@ -175,26 +179,39 @@ func (b *BadgerStore) Close() error {

// FirstIndex returns the first known index from the Raft log.
func (b *BadgerStore) FirstIndex() (uint64, error) {
return b.firstIndex(false)
var value uint64
err := b.conn.View(func(txn *badger.Txn) error {
it := txn.NewIterator(badger.IteratorOptions{
PrefetchValues: false,
Reverse: false,
})
defer it.Close()

it.Seek(prefixLogs)
if it.ValidForPrefix(prefixLogs) {
value = bytesToUint64(it.Item().Key()[1:])
}
return nil
})
if err != nil {
return 0, err
}
return value, nil
}

// LastIndex returns the last known index from the Raft log.
func (b *BadgerStore) LastIndex() (uint64, error) {
return b.firstIndex(true)
}

func (b *BadgerStore) firstIndex(reverse bool) (uint64, error) {
var value uint64
err := b.conn.View(func(txn *badger.Txn) error {
it := txn.NewIterator(badger.IteratorOptions{
PrefetchValues: false,
Reverse: reverse,
Reverse: true,
})
defer it.Close()

it.Rewind()
if it.Valid() {
value = bytesToUint64(it.Item().Key())
it.Seek(append(prefixLogs, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff))
if it.ValidForPrefix(prefixLogs) {
value = bytesToUint64(it.Item().Key()[1:])
}
return nil
})
Expand All @@ -207,7 +224,7 @@ func (b *BadgerStore) firstIndex(reverse bool) (uint64, error) {
// GetLog gets a log entry from Badger at a given index.
func (b *BadgerStore) GetLog(index uint64, log *raft.Log) error {
return b.conn.View(func(txn *badger.Txn) error {
item, err := txn.Get(uint64ToBytes(index))
item, err := txn.Get(append(prefixLogs, uint64ToBytes(index)...))
if err != nil {
switch err {
case badger.ErrKeyNotFound:
Expand All @@ -231,15 +248,15 @@ func (b *BadgerStore) StoreLog(log *raft.Log) error {
return err
}
return b.conn.Update(func(txn *badger.Txn) error {
return txn.Set(uint64ToBytes(log.Index), val.Bytes())
return txn.Set(append(prefixLogs, uint64ToBytes(log.Index)...), val.Bytes())
})
}

// StoreLogs stores a set of raft logs.
func (b *BadgerStore) StoreLogs(logs []*raft.Log) error {
return b.conn.Update(func(txn *badger.Txn) error {
for _, log := range logs {
key := uint64ToBytes(log.Index)
key := append(prefixLogs, uint64ToBytes(log.Index)...)
val, err := encodeMsgPack(log)
if err != nil {
return err
Expand All @@ -261,11 +278,12 @@ func (b *BadgerStore) DeleteRange(min, max uint64) error {
Reverse: false,
})

for it.Seek(uint64ToBytes(min)); it.Valid(); it.Next() {
key := make([]byte, 8)
start := append(prefixLogs, uint64ToBytes(min)...)
for it.Seek(start); it.Valid(); it.Next() {
key := make([]byte, 9)
it.Item().KeyCopy(key)
// Handle out-of-range log index
if bytesToUint64(key) > max {
if bytesToUint64(key[1:]) > max {
break
}
// Delete in-range log index
Expand All @@ -276,7 +294,7 @@ func (b *BadgerStore) DeleteRange(min, max uint64) error {
if err != nil {
return err
}
return b.DeleteRange(bytesToUint64(key), max)
return b.DeleteRange(bytesToUint64(key[1:]), max)
}
return err
}
Expand All @@ -292,15 +310,15 @@ func (b *BadgerStore) DeleteRange(min, max uint64) error {
// Set is used to set a key/value set outside of the raft log.
func (b *BadgerStore) Set(key []byte, val []byte) error {
return b.conn.Update(func(txn *badger.Txn) error {
return txn.Set(key, val)
return txn.Set(append(prefixConf, key...), val)
})
}

// Get is used to retrieve a value from the k/v store by key
func (b *BadgerStore) Get(key []byte) ([]byte, error) {
var value []byte
err := b.conn.View(func(txn *badger.Txn) error {
item, err := txn.Get(key)
item, err := txn.Get(append(prefixConf, key...))
if err != nil {
switch err {
case badger.ErrKeyNotFound:
Expand Down
2 changes: 1 addition & 1 deletion badger_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func testBadgerStore(t testing.TB) (*BadgerStore, string) {
os.RemoveAll(path)

// Successfully creates and returns a store
store, err := NewBadgerStore(path)
store, err := New(Options{Path: path, NoSync: true})
if err != nil {
t.Fatalf("err: %s", err)
}
Expand Down

0 comments on commit ee05bc4

Please sign in to comment.