From ae6ace81be2d9941bdc4aadd205892a50ecb6dba Mon Sep 17 00:00:00 2001 From: flowerinthenight Date: Thu, 11 Jul 2024 02:07:44 +0900 Subject: [PATCH] cleanup dbg data --- distmem.go | 63 +++++++++++++++++++++++++----------------------------- 1 file changed, 29 insertions(+), 34 deletions(-) diff --git a/distmem.go b/distmem.go index 8537074..227257f 100644 --- a/distmem.go +++ b/distmem.go @@ -53,12 +53,12 @@ type DistMem struct { Name string op *Op - nodes []uint64 // 0=local, ...=spillover - meta map[uint64]*metaT // per-node metadata + nodes []uint64 // 0=local, 1..n=network + meta map[uint64]*metaT // per-node metadata, key=node mlimit uint64 // mem limit dlimit uint64 // disk limit hasher hashT // for node id - data map[uint64]*memT // mem data + data map[uint64]*memT // mem data , key=node dlocs []int // disk offsets mlock *sync.Mutex // local mem lock dlock *sync.Mutex // local file lock @@ -194,7 +194,6 @@ func (w *Writer) start() { atomic.AddUint64(&w.dm.meta[node].msize, uint64(len(data))) default: if msize < mlimit { - // Use local memory. memCount++ if !mlock { w.dm.mlock.Lock() @@ -212,7 +211,6 @@ func (w *Writer) start() { w.dm.data[node].mlocs = append(w.dm.data[node].mlocs, len(data)) atomic.AddUint64(&w.dm.meta[node].msize, uint64(len(data))) } else { - // Use local disk. diskCount++ if !dlock { w.dm.dlock.Lock() @@ -344,7 +342,6 @@ func (r *Reader) Read(out chan []byte) { continue } - var n int for { in, err := r.dm.meta[node].reader.Recv() if err == io.EOF { @@ -359,50 +356,48 @@ func (r *Reader) Read(out chan []byte) { } out <- in.Data - n++ } default: func() { r.dm.mlock.Lock() defer r.dm.mlock.Unlock() - var n, count int + var n int for _, off := range r.dm.data[node].mlocs { out <- r.dm.data[node].data[n : n+off] n += off - count++ } }() - if len(r.dm.dlocs) > 0 { - func() { - r.dm.dlock.Lock() - defer r.dm.dlock.Unlock() - ra, err := mmap.Open(r.dm.localFile()) + func() { + r.dm.dlock.Lock() + defer r.dm.dlock.Unlock() + if len(r.dm.dlocs) == 0 { + return + } + + ra, err := mmap.Open(r.dm.localFile()) + if err != nil { + r.Lock() + r.err = fmt.Errorf("Open failed: %v", err) + r.Unlock() + return + } + + defer ra.Close() + var off int64 + for _, loc := range r.dm.dlocs { + buf := make([]byte, loc) + n, err := ra.ReadAt(buf, off) if err != nil { r.Lock() - r.err = fmt.Errorf("Open failed: %v", err) + r.err = fmt.Errorf("ReadAt failed: %v", err) r.Unlock() - return } - defer ra.Close() - var off int64 - var count int - for _, loc := range r.dm.dlocs { - b := make([]byte, loc) - n, err := ra.ReadAt(b, off) - if err != nil { - r.Lock() - r.err = fmt.Errorf("ReadAt failed: %v", err) - r.Unlock() - } - - out <- b - off = off + int64(n) - count++ - } - }() - } + out <- buf + off = off + int64(n) + } + }() } }