Skip to content

Commit

Permalink
Merge pull request #34 from streamingfast/misfeature/somekeyrework
Browse files Browse the repository at this point in the history
Misfeature/somekeyrework
  • Loading branch information
billettc authored Feb 8, 2024
2 parents 89e2ab3 + 2a2506f commit ddfffcd
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 115 deletions.
4 changes: 2 additions & 2 deletions cmd/substreams-sink-kv/inject.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func injectRunE(cmd *cobra.Command, args []string) error {
endpoint, dsn, manifestPath, blockRange := extractInjectArgs(cmd, args)
queryRowLimit := sflags.MustGetInt(cmd, "query-rows-limit")

flushInterval := sflags.MustGetDuration(cmd, "flush-interval")
flushInterval := sflags.MustGetUint64(cmd, "flush-interval")
module := sflags.MustGetString(cmd, "module")

listenAddr, provided := sflags.MustGetStringProvided(cmd, "server-listen-addr")
Expand All @@ -88,7 +88,7 @@ func injectRunE(cmd *cobra.Command, args []string) error {
zap.String("endpoint", endpoint),
zap.String("manifest_path", manifestPath),
zap.String("block_range", blockRange),
zap.Duration("flush_interval", flushInterval),
zap.Uint64("flush_interval", flushInterval),
zap.String("module", module),
}

Expand Down
174 changes: 88 additions & 86 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ type OperationDB struct {
store store.KVStore

QueryRowsLimit int
pendingOperations []*pbkv.KVOperation
pendingOperations map[string]*pbkv.KVOperation
logger *zap.Logger
tracer logging.Tracer
undosOperations map[uint64][]byte
}

func New(dsn string, queryRowsLimit int, logger *zap.Logger, tracer logging.Tracer) (*OperationDB, error) {
Expand All @@ -39,10 +40,12 @@ func New(dsn string, queryRowsLimit int, logger *zap.Logger, tracer logging.Trac
return nil, err
}
return &OperationDB{
QueryRowsLimit: queryRowsLimit,
store: s,
logger: logger,
tracer: tracer,
QueryRowsLimit: queryRowsLimit,
store: s,
logger: logger,
tracer: tracer,
pendingOperations: make(map[string]*pbkv.KVOperation),
undosOperations: make(map[uint64][]byte),
}, nil
}

Expand All @@ -52,7 +55,8 @@ func (db *OperationDB) MarshalLogObject(encoder zapcore.ObjectEncoder) error {
return nil
}

var undoPrefix = []byte{'x', 'u'}
var undoPrefix = [2]byte{'x', 'u'}
var userKeyPrefix byte = 'k'

var ErrInvalidArguments = errors.New("invalid arguments")
var ErrNotFound = errors.New("not found")
Expand All @@ -70,46 +74,64 @@ func (db *OperationDB) AddOperations(ops *pbkv.KVOperations) {
}

func (db *OperationDB) AddOperation(op *pbkv.KVOperation) {
db.pendingOperations = append(db.pendingOperations, op)
//this will only keep the last operation for a given key
db.pendingOperations[op.Key] = op
}
func (db *OperationDB) HandleOperations(ctx context.Context, blockNumber uint64, finalBlockHeight uint64, step bstream.StepType, kvOps *pbkv.KVOperations) error {
if step == bstream.StepNew {
err := db.PurgeUndoOperations(ctx, finalBlockHeight)
if err != nil {
return fmt.Errorf("deleting LIB undo operations: %w", err)
}

func (db *OperationDB) Flush(ctx context.Context, cursor *sink.Cursor) (count int, err error) {
puts, deletes := lastOperationPerKey(db.pendingOperations)
for _, put := range puts {
if err := db.store.Put(ctx, userKey(put.Key), put.Value); err != nil {
return 0, err
undoOperations, err := db.GenerateUndoOperations(ctx, kvOps.Operations)
if err != nil {
return fmt.Errorf("generating reverse operations: %w", err)
}
}

if err := db.store.BatchDelete(ctx, deletes); err != nil {
return 0, err
err = db.AddUndosOperations(ctx, blockNumber, undoOperations)
if err != nil {
return fmt.Errorf("storing reverse operations: %w", err)
}
}

if err := db.WriteCursor(ctx, cursor); err != nil {
return 0, err
}
db.reset()
return len(puts) + len(deletes), nil
db.AddOperations(kvOps)
return nil
}

func (db *OperationDB) HandleOperations(ctx context.Context, blockNumber, finalBlockHeight uint64, step bstream.StepType, kvOps *pbkv.KVOperations) error {
if step == bstream.StepNew {
err := db.DeleteLIBUndoOperations(ctx, finalBlockHeight)
if err != nil {
return fmt.Errorf("deleting LIB undo operations: %w", err)
func (db *OperationDB) Flush(ctx context.Context, cursor *sink.Cursor) (count int, err error) {
for _, op := range db.pendingOperations {
switch op.Type {
case pbkv.KVOperation_SET:
if err := db.store.Put(ctx, userKey(op.Key), op.Value); err != nil {
return 0, err
}
case pbkv.KVOperation_DELETE:
if err := db.store.BatchDelete(ctx, [][]byte{userKey(op.Key)}); err != nil {
return 0, err
}
default:
panic(fmt.Sprintf("invalid operation type %d", op.Type))
}
err = db.storeUndoOperations(ctx, blockNumber, kvOps.Operations)
if err != nil {
return fmt.Errorf("storing reverse operations: %w", err)
}

for blockNumber, undoOperations := range db.undosOperations {
if err := db.store.Put(ctx, undoKey(blockNumber), undoOperations); err != nil {
return 0, err
}
}

db.AddOperations(kvOps)
if err := db.WriteCursor(ctx, cursor); err != nil {
return 0, err
}

return nil
opCount := len(db.pendingOperations)
db.reset()

return opCount, nil
}

func (db *OperationDB) DeleteLIBUndoOperations(ctx context.Context, finalBlockHeight uint64) error {
func (db *OperationDB) PurgeUndoOperations(ctx context.Context, finalBlockHeight uint64) error {
keys := make([][]byte, 0)

scanOutput := db.store.Scan(ctx, undoKey(finalBlockHeight), undoKey(0), 0)
Expand All @@ -125,61 +147,59 @@ func (db *OperationDB) DeleteLIBUndoOperations(ctx context.Context, finalBlockHe
return db.store.BatchDelete(ctx, keys)
}

func (db *OperationDB) storeUndoOperations(ctx context.Context, blockNumber uint64, ops []*pbkv.KVOperation) error {
undoOperations := generateUndoOperations(ctx, ops, db.store)
func (db *OperationDB) AddUndosOperations(ctx context.Context, blockNumber uint64, undoOperations *pbkv.KVOperations) error {
data, err := proto.Marshal(undoOperations)
if err != nil {
return fmt.Errorf("unable to marshal reversed operations: %w", err)
}

err = db.store.Put(ctx, undoKey(blockNumber), data)
if err != nil {
return fmt.Errorf("storing reversed operations: %w", err)
}

err = db.store.FlushPuts(ctx)
if err != nil {
return fmt.Errorf("flushing undo put: %w", err)
}
db.undosOperations[blockNumber] = data

return nil
}

func generateUndoOperations(ctx context.Context, ops []*pbkv.KVOperation, kvStore store.KVStore) *pbkv.KVOperations {
func (db *OperationDB) GenerateUndoOperations(ctx context.Context, ops []*pbkv.KVOperation) (*pbkv.KVOperations, error) {
var undoOperations []*pbkv.KVOperation
for _, op := range ops {
previousValue, errNotFound := kvStore.Get(ctx, userKey(op.Key))
undoOp := undoOperation(op, previousValue, errNotFound != nil)
previousValue, err := db.store.Get(ctx, userKey(op.Key))
previousKeyExists := true
if err != nil {
if !errors.Is(err, store.ErrNotFound) {
return nil, fmt.Errorf("getting previous value for key %s %T: %w", op.Key, err, err)
}
previousKeyExists = false
}
undoOp := undoOperation(op, previousValue, previousKeyExists)
undoOperations = append([]*pbkv.KVOperation{undoOp}, undoOperations...)
}
reversedKVOperations := &pbkv.KVOperations{Operations: undoOperations}
return reversedKVOperations
return reversedKVOperations, nil
}

func undoOperation(op *pbkv.KVOperation, previousValue []byte, notFound bool) *pbkv.KVOperation {
func undoOperation(op *pbkv.KVOperation, previousValue []byte, previousKeyExists bool) *pbkv.KVOperation {
switch op.Type {
case pbkv.KVOperation_SET:
if notFound { // if previous value was not notFound, we need to delete the key when applying the undo
if previousKeyExists {
return &pbkv.KVOperation{
Type: pbkv.KVOperation_DELETE,
Type: pbkv.KVOperation_SET,
Key: op.Key,
Value: op.Value,
Value: previousValue,
}
}
return &pbkv.KVOperation{
Type: pbkv.KVOperation_SET,
Type: pbkv.KVOperation_DELETE,
Key: op.Key,
Value: previousValue,
Value: op.Value,
}
case pbkv.KVOperation_DELETE:
if notFound {
return nil
}
return &pbkv.KVOperation{
Type: pbkv.KVOperation_SET,
Key: op.Key,
Value: op.Value,
if previousKeyExists {
return &pbkv.KVOperation{
Type: pbkv.KVOperation_SET,
Key: op.Key,
Value: op.Value,
}
}
return nil
default:
panic(fmt.Sprintf("invalid operation type %d", op.Type))
}
Expand Down Expand Up @@ -208,29 +228,9 @@ func (db *OperationDB) DeleteUndoKeys(ctx context.Context, keys [][]byte) error
return db.store.BatchDelete(ctx, keys)
}

func lastOperationPerKey(ops []*pbkv.KVOperation) (puts []*pbkv.KVOperation, deletes [][]byte) {
opsPerKey := make(map[string][]*pbkv.KVOperation)

for _, op := range ops {
opsPerKey[op.Key] = append(opsPerKey[op.Key], op)
}

for _, ops := range opsPerKey {
// sortByOrdinal(ops)
lastOp := ops[len(ops)-1]
switch lastOp.Type {
case pbkv.KVOperation_SET:
puts = append(puts, lastOp)
case pbkv.KVOperation_DELETE:
deletes = append(deletes, userKey(lastOp.Key))
}
}

return
}

func (db *OperationDB) reset() {
db.pendingOperations = nil
db.pendingOperations = make(map[string]*pbkv.KVOperation)
db.undosOperations = make(map[uint64][]byte)
}

func (db *OperationDB) Get(ctx context.Context, key string) (val []byte, err error) {
Expand Down Expand Up @@ -334,24 +334,26 @@ func (db *OperationDB) Scan(ctx context.Context, begin, exclusiveEnd string, lim

func userKey(k string) []byte {
out := make([]byte, len(k)+1)
out[0] = 'k'
out[0] = userKeyPrefix
copy(out[1:], k)
return out
}

func undoKey(num uint64) []byte {
numBytes := make([]byte, 8)
binary.BigEndian.PutUint64(numBytes, math.MaxUint64-num)
return append(undoPrefix, numBytes...)
numBytes := make([]byte, 8+len(undoPrefix))
copy(numBytes, undoPrefix[:])
binary.BigEndian.PutUint64(numBytes[2:], math.MaxUint64-num)
return numBytes
}

func isUserKey(k []byte) bool {
if len(k) > 1 && k[0] == 'k' {
if len(k) > 1 && k[0] == userKeyPrefix {
return true
}
return false
}

func fromUserKey(k []byte) string {
// skip the `userKeyPrefix`
return string(k[1:])
}
7 changes: 3 additions & 4 deletions db/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"testing"

"github.com/streamingfast/bstream"

_ "github.com/streamingfast/kvdb/store/badger3"
"github.com/streamingfast/logging"
pbkv "github.com/streamingfast/substreams-sink-kv/pb/substreams/sink/kv/v1"
Expand Down Expand Up @@ -275,8 +274,8 @@ func TestDB_HandleUndo(t *testing.T) {

func TestDB_UndoOperation(t *testing.T) {
type foundValue struct {
found bool
previousValue []byte
previousKeyExists bool
previousValue []byte
}
cases := []struct {
name string
Expand Down Expand Up @@ -341,7 +340,7 @@ func TestDB_UndoOperation(t *testing.T) {

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
undoOperationResult := undoOperation(c.operation, c.foundValue.previousValue, !c.foundValue.found)
undoOperationResult := undoOperation(c.operation, c.foundValue.previousValue, c.foundValue.previousKeyExists)
require.Equal(t, c.expectedUndoOperation, undoOperationResult)
})
}
Expand Down
Loading

0 comments on commit ddfffcd

Please sign in to comment.