Skip to content

Commit

Permalink
cleanup dbg data
Browse files Browse the repository at this point in the history
  • Loading branch information
flowerinthenight committed Jul 10, 2024
1 parent 5e8da21 commit ae6ace8
Showing 1 changed file with 29 additions and 34 deletions.
63 changes: 29 additions & 34 deletions distmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ type DistMem struct {
Name string

op *Op
nodes []uint64 // 0=local, ...=spillover
meta map[uint64]*metaT // per-node metadata
nodes []uint64 // 0=local, 1..n=network
meta map[uint64]*metaT // per-node metadata, key=node
mlimit uint64 // mem limit
dlimit uint64 // disk limit
hasher hashT // for node id
data map[uint64]*memT // mem data
data map[uint64]*memT // mem data , key=node
dlocs []int // disk offsets
mlock *sync.Mutex // local mem lock
dlock *sync.Mutex // local file lock
Expand Down Expand Up @@ -194,7 +194,6 @@ func (w *Writer) start() {
atomic.AddUint64(&w.dm.meta[node].msize, uint64(len(data)))
default:
if msize < mlimit {
// Use local memory.
memCount++
if !mlock {
w.dm.mlock.Lock()
Expand All @@ -212,7 +211,6 @@ func (w *Writer) start() {
w.dm.data[node].mlocs = append(w.dm.data[node].mlocs, len(data))
atomic.AddUint64(&w.dm.meta[node].msize, uint64(len(data)))
} else {
// Use local disk.
diskCount++
if !dlock {
w.dm.dlock.Lock()
Expand Down Expand Up @@ -344,7 +342,6 @@ func (r *Reader) Read(out chan []byte) {
continue
}

var n int
for {
in, err := r.dm.meta[node].reader.Recv()
if err == io.EOF {
Expand All @@ -359,50 +356,48 @@ func (r *Reader) Read(out chan []byte) {
}

out <- in.Data
n++
}
default:
func() {
r.dm.mlock.Lock()
defer r.dm.mlock.Unlock()
var n, count int
var n int
for _, off := range r.dm.data[node].mlocs {
out <- r.dm.data[node].data[n : n+off]
n += off
count++
}
}()

if len(r.dm.dlocs) > 0 {
func() {
r.dm.dlock.Lock()
defer r.dm.dlock.Unlock()
ra, err := mmap.Open(r.dm.localFile())
func() {
r.dm.dlock.Lock()
defer r.dm.dlock.Unlock()
if len(r.dm.dlocs) == 0 {
return
}

ra, err := mmap.Open(r.dm.localFile())
if err != nil {
r.Lock()
r.err = fmt.Errorf("Open failed: %v", err)
r.Unlock()
return
}

defer ra.Close()
var off int64
for _, loc := range r.dm.dlocs {
buf := make([]byte, loc)
n, err := ra.ReadAt(buf, off)
if err != nil {
r.Lock()
r.err = fmt.Errorf("Open failed: %v", err)
r.err = fmt.Errorf("ReadAt failed: %v", err)
r.Unlock()
return
}

defer ra.Close()
var off int64
var count int
for _, loc := range r.dm.dlocs {
b := make([]byte, loc)
n, err := ra.ReadAt(b, off)
if err != nil {
r.Lock()
r.err = fmt.Errorf("ReadAt failed: %v", err)
r.Unlock()
}

out <- b
off = off + int64(n)
count++
}
}()
}
out <- buf
off = off + int64(n)
}
}()
}
}

Expand Down

0 comments on commit ae6ace8

Please sign in to comment.