Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

bmt, param: Introduce SectionHasher interface, implement in bmt #2021

Merged
merged 16 commits into from
Feb 8, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 172 additions & 57 deletions bmt/bmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,17 @@
package bmt

import (
"context"
"encoding/binary"
"errors"
"fmt"
"hash"
"strings"
"sync"
"sync/atomic"

"github.com/ethersphere/swarm/log"
"github.com/ethersphere/swarm/param"
)

/*
Expand Down Expand Up @@ -60,6 +66,10 @@ const (
PoolSize = 8
)

var (
zeroSpan = make([]byte, 8)
)

// BaseHasherFunc is a hash.Hash constructor function used for the base hash of the BMT.
// implemented by Keccak256 SHA3 sha3.NewLegacyKeccak256
type BaseHasherFunc func() hash.Hash
Expand All @@ -75,8 +85,12 @@ type BaseHasherFunc func() hash.Hash
// the tree and itself in a state reusable for hashing a new chunk
// - generates and verifies segment inclusion proofs (TODO:)
type Hasher struct {
pool *TreePool // BMT resource pool
bmt *tree // prebuilt BMT resource for flowcontrol and proofs
pool *TreePool // BMT resource pool
bmt *tree // prebuilt BMT resource for flowcontrol and proofs
size int // bytes written to Hasher since last Reset()
cursor int // cursor to write to on next Write() call
errFunc func(error)
ctx context.Context
nolash marked this conversation as resolved.
Show resolved Hide resolved
}

// New creates a reusable BMT Hasher that
Expand Down Expand Up @@ -276,48 +290,91 @@ func newTree(segmentSize, depth int, hashfunc func() hash.Hash) *tree {
}
}

// methods needed to implement hash.Hash
// Implements param.SectionWriter
func (h *Hasher) SetWriter(_ param.SectionWriterFunc) param.SectionWriter {
log.Warn("Synchasher does not currently support SectionWriter chaining")
nolash marked this conversation as resolved.
Show resolved Hide resolved
return h
}

// SectionSize implements param.SectionWriter
func (h *Hasher) SectionSize() int {
return h.pool.SegmentSize
}

// SetLength implements param.SectionWriter
func (h *Hasher) SetLength(length int) {
}

// SetSpan implements param.SectionWriter
func (h *Hasher) SetSpan(length int) {
span := LengthToSpan(length)
h.getTree().span = span
}

// SetSpanBytes implements storage.SwarmHash
func (h *Hasher) SetSpanBytes(b []byte) {
t := h.getTree()
t.span = make([]byte, 8)
copy(t.span, b)
}

// Size returns the size
// Branches implements param.SectionWriter
func (h *Hasher) Branches() int {
return h.pool.SegmentCount
}

// Init implements param.SectionWriter
func (h *Hasher) Init(ctx context.Context, errFunc func(error)) {
nolash marked this conversation as resolved.
Show resolved Hide resolved
h.errFunc = errFunc
h.ctx = ctx
}

// Size implements hash.Hash and param.SectionWriter
func (h *Hasher) Size() int {
return h.pool.SegmentSize
}

// BlockSize returns the block size
// SeekSection implements param.SectionWriter
func (h *Hasher) SeekSection(offset int) {
h.cursor = offset
}

// BlockSize implements hash.Hash and param.SectionWriter
func (h *Hasher) BlockSize() int {
return 2 * h.pool.SegmentSize
}

// Sum returns the BMT root hash of the buffer
// using Sum presupposes sequential synchronous writes (io.Writer interface)
// hash.Hash interface Sum method appends the byte slice to the underlying
// data before it calculates and returns the hash of the chunk
// caller must make sure Sum is not called concurrently with Write, writeSection
// Implements hash.Hash in param.SectionWriter
func (h *Hasher) Sum(b []byte) (s []byte) {
t := h.getTree()
if h.size == 0 && t.offset == 0 {
h.releaseTree()
return h.pool.zerohashes[h.pool.Depth]
}
// write the last section with final flag set to true
go h.writeSection(t.cursor, t.section, true, true)
// wait for the result
s = <-t.result
if t.span == nil {
t.span = LengthToSpan(h.size)
}
span := t.span
// release the tree resource back to the pool
h.releaseTree()
// b + sha3(span + BMT(pure_chunk))
if len(span) == 0 {
return append(b, s...)
}
return doSum(h.pool.hasher(), b, span, s)
}

// methods needed to implement the SwarmHash and the io.Writer interfaces

// Write calls sequentially add to the buffer to be hashed,
// with every full segment calls writeSection in a go routine
// Implements hash.Hash and param.SectionWriter
func (h *Hasher) Write(b []byte) (int, error) {
l := len(b)
if l == 0 || l > h.pool.Size {
return 0, nil
}
h.size += len(b)
t := h.getTree()
secsize := 2 * h.pool.SegmentSize
// calculate length of missing bit to complete current open section
Expand Down Expand Up @@ -358,21 +415,13 @@ func (h *Hasher) Write(b []byte) (int, error) {
return l, nil
}

// Reset needs to be called before writing to the hasher
// Reset implements hash.Hash and param.SectionWriter
func (h *Hasher) Reset() {
h.cursor = 0
h.size = 0
h.releaseTree()
}

// methods needed to implement the SwarmHash interface

// ResetWithLength needs to be called before writing to the hasher
// the argument is supposed to be the byte slice binary representation of
// the length of the data subsumed under the hash, i.e., span
func (h *Hasher) ResetWithLength(span []byte) {
h.Reset()
h.getTree().span = span
}

// releaseTree gives back the Tree to the pool whereby it unlocks
// it resets tree, segment and index
func (h *Hasher) releaseTree() {
Expand All @@ -395,33 +444,33 @@ func (h *Hasher) releaseTree() {
}

// NewAsyncWriter extends Hasher with an interface for concurrent segment/section writes
// TODO: Instead of explicitly setting double size of segment should be dynamic and chunked internally. If not, we have to keep different bmt hashers generation functions for different purposes in the same instance, or cope with added complexity of bmt hasher generation functions having to receive parameters
func (h *Hasher) NewAsyncWriter(double bool) *AsyncHasher {
secsize := h.pool.SegmentSize
if double {
secsize *= 2
}
seccount := h.pool.SegmentCount
if double {
seccount /= 2
}
write := func(i int, section []byte, final bool) {
h.writeSection(i, section, double, final)
}
return &AsyncHasher{
Hasher: h,
double: double,
secsize: secsize,
write: write,
Hasher: h,
double: double,
secsize: secsize,
seccount: seccount,
write: write,
jobSize: 0,
}
}

// SectionWriter is an asynchronous segment/section writer interface
type SectionWriter interface {
Reset() // standard init to be called before reuse
Write(index int, data []byte) // write into section of index
Sum(b []byte, length int, span []byte) []byte // returns the hash of the buffer
SectionSize() int // size of the async section unit to use
}

// AsyncHasher extends BMT Hasher with an asynchronous segment/section writer interface
// AsyncHasher is unsafe and does not check indexes and section data lengths
// it must be used with the right indexes and length and the right number of sections
// AsyncHasher cannot be used as with a hash.Hash interface: It must be used with the
// right indexes and length and the right number of sections
// It is unsafe and does not check indexes and section data lengths
//
// behaviour is undefined if
// * non-final sections are shorter or longer than secsize
Expand All @@ -434,33 +483,84 @@ type SectionWriter interface {
// * it will not leak processes if not all sections are written but it blocks
// and keeps the resource which can be released calling Reset()
type AsyncHasher struct {
*Hasher // extends the Hasher
mtx sync.Mutex // to lock the cursor access
double bool // whether to use double segments (call Hasher.writeSection)
secsize int // size of base section (size of hash or double)
write func(i int, section []byte, final bool)
*Hasher // extends the Hasher
mtx sync.Mutex // to lock the cursor access
double bool // whether to use double segments (call Hasher.writeSection)
secsize int // size of base section (size of hash or double)
seccount int // base section count
write func(i int, section []byte, final bool)
errFunc func(error)
all bool // if all written in one go, temporary workaround
jobSize int
}

// Reset implements param.SectionWriter
func (sw *AsyncHasher) Reset() {
sw.jobSize = 0
sw.all = false
sw.Hasher.Reset()
}

// methods needed to implement AsyncWriter
// SetLength implements param.SectionWriter
func (sw *AsyncHasher) SetLength(length int) {
sw.jobSize = length
}

// SetWriter implements param.SectionWriter
func (sw *AsyncHasher) SetWriter(_ param.SectionWriterFunc) param.SectionWriter {
sw.errFunc(errors.New("Asynchasher does not currently support SectionWriter chaining"))
nolash marked this conversation as resolved.
Show resolved Hide resolved
nolash marked this conversation as resolved.
Show resolved Hide resolved
return sw
}

// SectionSize returns the size of async section unit to use
// SectionSize implements param.SectionWriter
func (sw *AsyncHasher) SectionSize() int {
return sw.secsize
}

// Write writes the i-th section of the BMT base
// this function can and is meant to be called concurrently
// it sets max segment threadsafely
func (sw *AsyncHasher) Write(i int, section []byte) {
// Branches implements param.SectionWriter
func (sw *AsyncHasher) Branches() int {
return sw.seccount
}

// SeekSection locks the cursor until Write() is called; if no Write() is called, it will hang.
// Implements param.SectionWriter
func (sw *AsyncHasher) SeekSection(offset int) {
sw.mtx.Lock()
sw.Hasher.SeekSection(offset)
}

// Write writes to the current position cursor of the Hasher
// The cursor must first be manually set with SeekSection()
// The method will NOT advance the cursor.
// Implements param.SectionWriter
func (sw *AsyncHasher) Write(section []byte) (int, error) {
defer sw.mtx.Unlock()
sw.Hasher.size += len(section)
return sw.WriteSection(sw.Hasher.cursor, section)
}

// WriteSection writes the i-th section of the BMT base
// this function can and is meant to be called concurrently
// it sets max segment threadsafely
func (sw *AsyncHasher) WriteSection(i int, section []byte) (int, error) {
// TODO: Temporary workaround for chunkwise write
if i < 0 {
sw.Hasher.cursor = 0
sw.Hasher.Reset()
sw.Hasher.SetLength(len(section))
sw.Hasher.Write(section)
sw.all = true
return len(section), nil
}
//sw.mtx.Lock() // this lock is now set in SeekSection
// defer sw.mtk.Unlock() // this unlock is still left in Write()
t := sw.getTree()
// cursor keeps track of the rightmost section written so far
// if index is lower than cursor then just write non-final section as is
if i < t.cursor {
// if index is not the rightmost, safe to write section
go sw.write(i, section, false)
return
return len(section), nil
}
// if there is a previous rightmost section safe to write section
if t.offset > 0 {
Expand All @@ -470,7 +570,7 @@ func (sw *AsyncHasher) Write(i int, section []byte) {
t.section = make([]byte, sw.secsize)
copy(t.section, section)
go sw.write(i, t.section, true)
return
return len(section), nil
}
// the rightmost section just changed, so we write the previous one as non-final
go sw.write(t.cursor, t.section, false)
Expand All @@ -481,6 +581,7 @@ func (sw *AsyncHasher) Write(i int, section []byte) {
t.offset = i*sw.secsize + 1
t.section = make([]byte, sw.secsize)
copy(t.section, section)
return len(section), nil
}

// Sum can be called any time once the length and the span is known
Expand All @@ -492,12 +593,20 @@ func (sw *AsyncHasher) Write(i int, section []byte) {
// length: known length of the input (unsafe; undefined if out of range)
// meta: metadata to hash together with BMT root for the final digest
// e.g., span for protection against existential forgery
func (sw *AsyncHasher) Sum(b []byte, length int, meta []byte) (s []byte) {
//
// Implements param.SectionWriter
func (sw *AsyncHasher) Sum(b []byte) (s []byte) {
if sw.all {
return sw.Hasher.Sum(nil)
}
sw.mtx.Lock()
t := sw.getTree()
length := sw.jobSize
if length == 0 {
sw.releaseTree()
sw.mtx.Unlock()
s = sw.pool.zerohashes[sw.pool.Depth]
return
} else {
// for non-zero input the rightmost section is written to the tree asynchronously
// if the actual last section has been written (t.cursor == length/t.secsize)
Expand All @@ -515,15 +624,13 @@ func (sw *AsyncHasher) Sum(b []byte, length int, meta []byte) (s []byte) {
}
// relesase the tree back to the pool
sw.releaseTree()
// if no meta is given just append digest to b
if len(meta) == 0 {
return append(b, s...)
}
meta := t.span
// hash together meta and BMT root hash using the pools
return doSum(sw.pool.hasher(), b, meta, s)
}

// writeSection writes the hash of i-th section into level 1 node of the BMT tree
// TODO: h.size increases even on multiple writes to the same section of a section
func (h *Hasher) writeSection(i int, section []byte, double bool, final bool) {
// select the leaf node for the section
var n *node
Expand Down Expand Up @@ -688,3 +795,11 @@ func calculateDepthFor(n int) (d int) {
}
return d + 1
}

// LengthToSpan creates a binary data span size representation
// It is required for calculating the BMT hash
func LengthToSpan(length int) []byte {
nolash marked this conversation as resolved.
Show resolved Hide resolved
spanBytes := make([]byte, 8)
binary.LittleEndian.PutUint64(spanBytes, uint64(length))
return spanBytes
}
Loading