From e772fabda6f486a39a760fe4209e05d7b75de40f Mon Sep 17 00:00:00 2001 From: flowerinthenight Date: Tue, 9 Jul 2024 18:35:09 +0900 Subject: [PATCH] expose writer and reader objects --- distmem.go | 34 ++++++++++++++++++---------------- service.go | 2 +- 2 files changed, 19 insertions(+), 17 deletions(-) diff --git a/distmem.go b/distmem.go index 33b6f96..3256eb9 100644 --- a/distmem.go +++ b/distmem.go @@ -64,7 +64,7 @@ type DistMem struct { mlock *sync.Mutex // local mem lock dlock *sync.Mutex // local file lock wmtx *sync.Mutex // one active writer only - writer *writer // writer object + writer *Writer // writer object wrefs int64 // writer reference count rrefs int64 // reader reference count on int32 @@ -73,7 +73,7 @@ type DistMem struct { start time.Time } -type writer struct { +type Writer struct { sync.Mutex lo bool // local write only dm *DistMem @@ -84,17 +84,17 @@ type writer struct { } // Err returns the last recorded error during the write operation. -func (w *writer) Err() error { +func (w *Writer) Err() error { w.Lock() defer w.Unlock() return w.err } // Write writes data to the underlying storage. -func (w *writer) Write(data []byte) { w.ch <- data } +func (w *Writer) Write(data []byte) { w.ch <- data } // Close closes the writer object. -func (w *writer) Close() { +func (w *Writer) Close() { if atomic.LoadInt32(&w.on) == 0 { return } @@ -106,7 +106,7 @@ func (w *writer) Close() { w.dm.wmtx.Unlock() } -func (w *writer) start() { +func (w *Writer) start() { defer func() { w.done <- struct{}{} }() atomic.StoreInt32(&w.on, 1) ctx := context.Background() @@ -279,15 +279,16 @@ type writerOptions struct { } // Writer returns a writer object for writing data to DistMem. The -// caller needs to call writer.Close() after use. -func (dm *DistMem) Writer(opts ...*writerOptions) (*writer, error) { +// caller needs to call writer.Close() after use. Options is only +// used internally, not exposed to callers. +func (dm *DistMem) Writer(opts ...*writerOptions) (*Writer, error) { dm.wmtx.Lock() var localOnly bool if len(opts) > 0 { localOnly = opts[0].LocalOnly } - dm.writer = &writer{ + dm.writer = &Writer{ lo: localOnly, dm: dm, ch: make(chan []byte), @@ -299,7 +300,7 @@ func (dm *DistMem) Writer(opts ...*writerOptions) (*writer, error) { return dm.writer, nil } -type reader struct { +type Reader struct { sync.Mutex lo bool // local read only dm *DistMem @@ -309,7 +310,7 @@ type reader struct { } // Read reads the underlying data and streams them to the `out` channel. -func (r *reader) Read(out chan []byte) { +func (r *Reader) Read(out chan []byte) { eg := new(errgroup.Group) eg.Go(func() error { atomic.StoreInt32(&r.on, 1) @@ -416,14 +417,14 @@ func (r *reader) Read(out chan []byte) { } // Err returns the last recorded error, if any, during the read operation. -func (r *reader) Err() error { +func (r *Reader) Err() error { r.Lock() defer r.Unlock() return r.err } // Close closes the reader object. -func (r *reader) Close() { +func (r *Reader) Close() { if atomic.LoadInt32(&r.on) == 0 { return } @@ -438,14 +439,15 @@ type readerOptions struct { } // Reader returns a reader object for reading data from DistMem. -// The caller needs to call reader.Close() after use. -func (dm *DistMem) Reader(opts ...*readerOptions) (*reader, error) { +// The caller needs to call reader.Close() after use. Options is +// only used internally, not exposed to callers. +func (dm *DistMem) Reader(opts ...*readerOptions) (*Reader, error) { var localOnly bool if len(opts) > 0 { localOnly = opts[0].LocalOnly } - reader := &reader{ + reader := &Reader{ lo: localOnly, dm: dm, done: make(chan struct{}, 1), diff --git a/service.go b/service.go index 83bc9fb..19e9ca3 100644 --- a/service.go +++ b/service.go @@ -112,7 +112,7 @@ func (s *service) Broadcast(hs pb.Hedge_BroadcastServer) error { func (s *service) DMemWrite(hs pb.Hedge_DMemWriteServer) error { var err error ctx := hs.Context() - var writer *writer + var writer *Writer var count int