Skip to content

Commit

Permalink
fix bug skipping operations in some store states, some tooling, prep …
Browse files Browse the repository at this point in the history
…v1.9.0 release
  • Loading branch information
sduchesneau committed Jul 11, 2024
1 parent 0b4d63b commit 77a3d47
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 7 deletions.
22 changes: 18 additions & 4 deletions docs/release-notes/change-log.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,31 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased
## v1.9.0

- Expose a new intrinsic to modules: `skip_empty_output`, which causes the module output to be skipped if it has zero bytes. Be careful, a protobuf object with all its default values will have zero bytes.
### Important BUG FIX

* Fix a bug introduced in v1.6.0 that could result in corrupted store "state" file if all
the "outputs" were already cached for a module in a given segment (rare occurence)
* We recommend clearing your substreams cache after this upgrade and re-processing or
validating your data if you use stores.

### Fixed

* substreams 'tools decode state' now correctly prints the `kvops` when pointing to store output files

### Added

* Expose a new intrinsic to modules: `skip_empty_output`, which causes the module output to be skipped if it has zero bytes. (Watch out, a protobuf object with all its default values will have zero bytes)
* Improve schedule order (faster time to first block) for substreams with multiple stages when starting mid-chain

## v1.8.2

- `substreams init` (code generation): fix displaying of saved path in filenames
* `substreams init` (code generation): fix displaying of saved path in filenames

## v1.8.1

- Add a `NoopMode` to the `Tier1` enabling to avoid sending data back to requester while processing live.
* Add a `NoopMode` to the `Tier1` enabling to avoid sending data back to requester while processing live.

## v1.8.0

Expand Down
17 changes: 15 additions & 2 deletions service/tier2.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,21 @@ func (s *Tier2Service) processRange(ctx context.Context, request *pbssinternal.P
excludable:
for _, stage := range pipe.ModuleExecutors {
for _, executor := range stage {
if executionPlan.ExistingExecOuts[executor.Name()] != nil {
continue
switch executor := executor.(type) {
case *exec.MapperModuleExecutor:
if executionPlan.ExistingExecOuts[executor.Name()] != nil {
continue
}
case *exec.IndexModuleExecutor:
if executionPlan.ExistingIndices[executor.Name()] != nil {
continue
}
case *exec.StoreModuleExecutor:
if executionPlan.ExistingExecOuts[executor.Name()] != nil {
if _, ok := executionPlan.StoresToWrite[executor.Name()]; !ok {
continue
}
}
}
if !executor.BlockIndex().ExcludesAllBlocks() {
allExecutorsExcludedByBlockIndex = false
Expand Down
4 changes: 4 additions & 0 deletions storage/execout/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"fmt"
"io"
"path"
"sort"
"strconv"
"sync"
Expand Down Expand Up @@ -35,6 +36,9 @@ type File struct {
loadedSize uint64
}

func (c *File) FullFilename() string {
return path.Join(c.store.BaseURL().String(), c.Filename())
}
func (c *File) Filename() string {
return computeDBinFilename(c.Range.StartBlock, c.Range.ExclusiveEndBlock)
}
Expand Down
71 changes: 70 additions & 1 deletion tools/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tools
import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
"strconv"
"strings"
Expand All @@ -14,10 +15,12 @@ import (
"github.com/streamingfast/cli/sflags"
"github.com/streamingfast/dstore"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/descriptorpb"

"github.com/streamingfast/substreams/block"
"github.com/streamingfast/substreams/manifest"
pbssinternal "github.com/streamingfast/substreams/pb/sf/substreams/intern/v2"
pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1"
"github.com/streamingfast/substreams/storage/execout"
"github.com/streamingfast/substreams/storage/index"
Expand Down Expand Up @@ -312,7 +315,7 @@ func runDecodeOutputsModuleRunE(cmd *cobra.Command, args []string) error {
case *pbsubstreams.Module_KindMap_:
return searchOutputsModule(ctx, requestedBlocks, startBlock, saveInterval, moduleHash, matchingModule, s, protoFiles)
case *pbsubstreams.Module_KindStore_:
return searchOutputsModule(ctx, requestedBlocks, startBlock, saveInterval, moduleHash, matchingModule, s, protoFiles)
return searchOutputsModuleKvOps(ctx, requestedBlocks, startBlock, saveInterval, moduleHash, matchingModule, s)
}
return fmt.Errorf("module has an unknown")
}
Expand Down Expand Up @@ -340,6 +343,7 @@ func searchOutputsModule(
rng := block.NewRange(startBlock, startBlock-startBlock%saveInterval+saveInterval)

outputCache := modStore.NewFile(rng)
fmt.Println("filename:", outputCache.FullFilename())
zlog.Info("loading block from store", zap.Uint64("start_block", startBlock), zap.Stringer("requested_block_range", requestedBlocks))
if err := outputCache.Load(ctx); err != nil {
if err == dstore.ErrNotFound {
Expand All @@ -366,6 +370,55 @@ func searchOutputsModule(
return nil
}

func searchOutputsModuleKvOps(
ctx context.Context,
requestedBlocks *block.Range,
startBlock,
saveInterval uint64,
moduleHash string,
module *pbsubstreams.Module,
stateStore dstore.Store,
) error {
modStore, err := execout.NewConfig(module.Name, module.InitialBlock, pbsubstreams.ModuleKindMap, moduleHash, stateStore, zlog)
if err != nil {
return fmt.Errorf("execout new config: %w", err)
}

moduleStore, err := stateStore.SubStore(moduleHash + "/outputs")
if err != nil {
return fmt.Errorf("can't find substore for hash %q: %w", moduleHash, err)
}

rng := block.NewRange(startBlock, startBlock-startBlock%saveInterval+saveInterval)

outputCache := modStore.NewFile(rng)
fmt.Println("filename:", outputCache.FullFilename())
zlog.Info("loading block from store", zap.Uint64("start_block", startBlock), zap.Stringer("requested_block_range", requestedBlocks))
if err := outputCache.Load(ctx); err != nil {
if err == dstore.ErrNotFound {
return fmt.Errorf("can't find cache at block %d storeURL %q", startBlock, moduleStore.BaseURL().String())
}

return fmt.Errorf("loading cache %s file %s : %w", moduleStore.BaseURL(), outputCache.String(), err)
}

for i := requestedBlocks.StartBlock; i < requestedBlocks.ExclusiveEndBlock; i++ {
payloadBytes, found := outputCache.GetAtBlock(i)
if !found {
continue
}

fmt.Println("Block", i)
if len(payloadBytes) == 0 {
continue
}
if err := printKVOps(payloadBytes); err != nil {
return fmt.Errorf("printing object: %w", err)
}
}
return nil
}

func searchStateModule(
ctx context.Context,
startBlock uint64,
Expand All @@ -386,13 +439,29 @@ func searchStateModule(
return fmt.Errorf("unable to load file: %w", err)
}

fmt.Println("filename:", stateStore.BaseURL().JoinPath(moduleHash, "states", file.Filename+".zst").String())

bytes, found := moduleStore.GetLast(key)
if !found {
return fmt.Errorf("no data found for %q", key)
}
return printObject(module, protoFiles, bytes)
}

func printKVOps(data []byte) error {
kvOps := &pbssinternal.Operations{}
if err := proto.Unmarshal(data, kvOps); err != nil {
return fmt.Errorf("unmarshalling kvOps: %w", err)
}

asJSON, err := json.Marshal(kvOps)
if err != nil {
return fmt.Errorf("marshalling back as json: %w", err)
}
fmt.Println(string(asJSON))
return nil
}

func printObject(module *pbsubstreams.Module, protoFiles []*descriptorpb.FileDescriptorProto, data []byte) error {
protoDefinition := ""
valuePrinted := false
Expand Down

0 comments on commit 77a3d47

Please sign in to comment.