Skip to content

Commit

Permalink
expose writer and reader objects
Browse files Browse the repository at this point in the history
  • Loading branch information
flowerinthenight committed Jul 9, 2024
1 parent b6ecd48 commit e772fab
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 17 deletions.
34 changes: 18 additions & 16 deletions distmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type DistMem struct {
mlock *sync.Mutex // local mem lock
dlock *sync.Mutex // local file lock
wmtx *sync.Mutex // one active writer only
writer *writer // writer object
writer *Writer // writer object
wrefs int64 // writer reference count
rrefs int64 // reader reference count
on int32
Expand All @@ -73,7 +73,7 @@ type DistMem struct {
start time.Time
}

type writer struct {
type Writer struct {
sync.Mutex
lo bool // local write only
dm *DistMem
Expand All @@ -84,17 +84,17 @@ type writer struct {
}

// Err returns the last recorded error during the write operation.
func (w *writer) Err() error {
func (w *Writer) Err() error {
w.Lock()
defer w.Unlock()
return w.err
}

// Write writes data to the underlying storage.
func (w *writer) Write(data []byte) { w.ch <- data }
func (w *Writer) Write(data []byte) { w.ch <- data }

// Close closes the writer object.
func (w *writer) Close() {
func (w *Writer) Close() {
if atomic.LoadInt32(&w.on) == 0 {
return
}
Expand All @@ -106,7 +106,7 @@ func (w *writer) Close() {
w.dm.wmtx.Unlock()
}

func (w *writer) start() {
func (w *Writer) start() {
defer func() { w.done <- struct{}{} }()
atomic.StoreInt32(&w.on, 1)
ctx := context.Background()
Expand Down Expand Up @@ -279,15 +279,16 @@ type writerOptions struct {
}

// Writer returns a writer object for writing data to DistMem. The
// caller needs to call writer.Close() after use.
func (dm *DistMem) Writer(opts ...*writerOptions) (*writer, error) {
// caller needs to call writer.Close() after use. Options is only
// used internally, not exposed to callers.
func (dm *DistMem) Writer(opts ...*writerOptions) (*Writer, error) {
dm.wmtx.Lock()
var localOnly bool
if len(opts) > 0 {
localOnly = opts[0].LocalOnly
}

dm.writer = &writer{
dm.writer = &Writer{
lo: localOnly,
dm: dm,
ch: make(chan []byte),
Expand All @@ -299,7 +300,7 @@ func (dm *DistMem) Writer(opts ...*writerOptions) (*writer, error) {
return dm.writer, nil
}

type reader struct {
type Reader struct {
sync.Mutex
lo bool // local read only
dm *DistMem
Expand All @@ -309,7 +310,7 @@ type reader struct {
}

// Read reads the underlying data and streams them to the `out` channel.
func (r *reader) Read(out chan []byte) {
func (r *Reader) Read(out chan []byte) {
eg := new(errgroup.Group)
eg.Go(func() error {
atomic.StoreInt32(&r.on, 1)
Expand Down Expand Up @@ -416,14 +417,14 @@ func (r *reader) Read(out chan []byte) {
}

// Err returns the last recorded error, if any, during the read operation.
func (r *reader) Err() error {
func (r *Reader) Err() error {
r.Lock()
defer r.Unlock()
return r.err
}

// Close closes the reader object.
func (r *reader) Close() {
func (r *Reader) Close() {
if atomic.LoadInt32(&r.on) == 0 {
return
}
Expand All @@ -438,14 +439,15 @@ type readerOptions struct {
}

// Reader returns a reader object for reading data from DistMem.
// The caller needs to call reader.Close() after use.
func (dm *DistMem) Reader(opts ...*readerOptions) (*reader, error) {
// The caller needs to call reader.Close() after use. Options is
// only used internally, not exposed to callers.
func (dm *DistMem) Reader(opts ...*readerOptions) (*Reader, error) {
var localOnly bool
if len(opts) > 0 {
localOnly = opts[0].LocalOnly
}

reader := &reader{
reader := &Reader{
lo: localOnly,
dm: dm,
done: make(chan struct{}, 1),
Expand Down
2 changes: 1 addition & 1 deletion service.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (s *service) Broadcast(hs pb.Hedge_BroadcastServer) error {
func (s *service) DMemWrite(hs pb.Hedge_DMemWriteServer) error {
var err error
ctx := hs.Context()
var writer *writer
var writer *Writer

var count int

Expand Down

0 comments on commit e772fab

Please sign in to comment.