-
Notifications
You must be signed in to change notification settings - Fork 10
Adding support for context.Context #9
Changes from 6 commits
3342ccb
8e54b1d
a42cbf8
01abb1e
78ffa54
5699eaf
37c534c
6a44f6f
a514b70
8b7207c
cf9bf16
c188d1b
5f9aede
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,2 @@ | ||
*.sw? | ||
.DS_Store | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,147 +1,228 @@ | ||
package chainstore | ||
|
||
import ( | ||
"errors" | ||
"io/ioutil" | ||
"fmt" | ||
"regexp" | ||
"sync" | ||
"time" | ||
|
||
"golang.org/x/net/context" | ||
) | ||
|
||
type storeFn func(s Store) error | ||
|
||
var ( | ||
ErrInvalidKey = errors.New("Invalid key") | ||
keyInvalidator = regexp.MustCompile(`(i?)[^a-z0-9\/_\-:\.]`) | ||
) | ||
|
||
KeyInvalidator = regexp.MustCompile(`(i?)[^a-z0-9\/_\-:\.]`) | ||
var ( | ||
DefaultTimeout = time.Millisecond * 3500 | ||
) | ||
|
||
const ( | ||
MaxKeyLen = 256 | ||
maxKeyLen = 256 | ||
) | ||
|
||
type Store interface { | ||
Open() error | ||
Close() error | ||
Put(key string, val []byte) error | ||
Get(key string) ([]byte, error) | ||
Del(key string) error | ||
Put(ctx context.Context, key string, val []byte) error | ||
Get(ctx context.Context, key string) ([]byte, error) | ||
Del(ctx context.Context, key string) error | ||
} | ||
|
||
// TODO: how can we check if a store has been opened...? | ||
type storeWrapper struct { | ||
Store | ||
errE error | ||
errMu sync.RWMutex | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need this mutex? Is it even possible that anything would be writing error to one store at the same time? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it happened a couple of times while testing, I found it with |
||
} | ||
|
||
func (s *storeWrapper) err() error { | ||
s.errMu.RLock() | ||
defer s.errMu.RUnlock() | ||
return s.errE | ||
} | ||
|
||
func (s *storeWrapper) setErr(err error) { | ||
s.errMu.Lock() | ||
defer s.errMu.Unlock() | ||
s.errE = err | ||
} | ||
|
||
// Chain represents a store chain. | ||
type Chain struct { | ||
stores []Store | ||
async bool | ||
stores []*storeWrapper | ||
} | ||
|
||
// New creates a new store chain backed by the passed stores. | ||
func New(stores ...Store) Store { | ||
return &Chain{stores, false} | ||
// TODO: make the chain.. | ||
// call Open(), but in case of error..? | ||
c := &Chain{ | ||
stores: make([]*storeWrapper, 0, len(stores)), | ||
} | ||
for _, s := range stores { | ||
c.stores = append(c.stores, &storeWrapper{Store: s}) | ||
} | ||
return c | ||
} | ||
|
||
func Async(stores ...Store) Store { | ||
return &Chain{stores, true} | ||
} | ||
// Open opens all the stores. | ||
func (c *Chain) Open() error { | ||
|
||
func (c *Chain) Open() (err error) { | ||
for _, s := range c.stores { | ||
err = s.Open() | ||
if err != nil { | ||
return // return first error that comes up | ||
} | ||
if err := c.firstErr(); err != nil { | ||
return fmt.Errorf("Open failed due to a previous error: %q", err) | ||
} | ||
return | ||
|
||
var wg sync.WaitGroup | ||
|
||
for i := range c.stores { | ||
wg.Add(1) | ||
go func(s *storeWrapper) { | ||
defer wg.Done() | ||
s.setErr(s.Open()) | ||
}(c.stores[i]) | ||
} | ||
|
||
wg.Wait() | ||
|
||
return c.firstErr() | ||
} | ||
|
||
func (c *Chain) Close() (err error) { | ||
for _, s := range c.stores { | ||
err = s.Close() | ||
// TODO: we shouldn't stop on first error.. should keep trying to close | ||
// and record errors separately | ||
if err != nil { | ||
return | ||
} | ||
// Close closes all the stores. | ||
func (c *Chain) Close() error { | ||
var wg sync.WaitGroup | ||
|
||
for i := range c.stores { | ||
wg.Add(1) | ||
go func(s *storeWrapper) { | ||
defer wg.Done() | ||
s.setErr(s.Close()) | ||
}(c.stores[i]) | ||
} | ||
return | ||
|
||
wg.Wait() | ||
|
||
return c.firstErr() | ||
} | ||
|
||
func (c *Chain) Put(key string, val []byte) (err error) { | ||
if !IsValidKey(key) { | ||
func (c *Chain) Put(ctx context.Context, key string, val []byte) (err error) { | ||
if !isValidKey(key) { | ||
return ErrInvalidKey | ||
} | ||
|
||
fn := func() (err error) { | ||
for _, s := range c.stores { | ||
err = s.Put(key, val) | ||
if err != nil { | ||
return | ||
} | ||
} | ||
return | ||
if err := c.firstErr(); err != nil { | ||
return fmt.Errorf("Open failed due to a previous error: %q", err) | ||
} | ||
if c.async { | ||
go fn() | ||
} else { | ||
err = fn() | ||
|
||
fn := func(s Store) error { | ||
return s.Put(ctx, key, val) | ||
} | ||
return | ||
|
||
return c.doWithContext(ctx, fn) | ||
} | ||
|
||
func (c *Chain) Get(key string) (val []byte, err error) { | ||
if !IsValidKey(key) { | ||
func (c *Chain) Get(ctx context.Context, key string) (val []byte, err error) { | ||
if !isValidKey(key) { | ||
return nil, ErrInvalidKey | ||
} | ||
|
||
for i, s := range c.stores { | ||
val, err = s.Get(key) | ||
if err != nil { | ||
return | ||
} | ||
if err := c.firstErr(); err != nil { | ||
return nil, fmt.Errorf("Open failed due to a previous error: %q", err) | ||
} | ||
|
||
if len(val) > 0 { | ||
if i > 0 { | ||
// put the value in all other stores up the chain | ||
fn := func() { | ||
for n := i - 1; n >= 0; n-- { | ||
c.stores[n].Put(key, val) // errors..? | ||
} | ||
nextStore := make(chan Store, len(c.stores)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hey @VojtechVitek, I had to modify your snippet a bit, what do you think of the new location of the range over There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Did it block because the channel was unbuffered? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Taking that back ... obviously not, let me see the code again :) I was throwing it of top of my head. But this looks good already :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hey, I have a working snippet for you... without firstVal channel and the extra goroutine: package main
import (
"errors"
"fmt"
"time"
"golang.org/x/net/context"
)
func Get(ctx context.Context) (string, error) {
nextStore := make(chan string, 3)
for _, store := range []string{"first", "second", "third"} {
nextStore <- store
}
close(nextStore)
putBack := make(chan string, 3)
for {
select {
case <-ctx.Done():
return "", ctx.Err()
case store, ok := <-nextStore:
if !ok {
return "", errors.New("not found")
}
if store != "fourth" { // change this --- simulating what store returns FOUND
putBack <- store
break
}
close(putBack)
for store := range putBack {
go fmt.Printf("putting back to %v store\n", store)
}
return fmt.Sprintf("found in %v", store), nil
}
}
panic("unreachable")
}
func main() {
ctx, _ := context.WithTimeout(context.Background(), time.Second)
val, err := Get(ctx)
fmt.Println(val, err)
// wait for fmt.Printf() from goroutines
time.Sleep(time.Second)
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cool! makes sense, the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tried closing the channel and everything worked fine :). |
||
for _, store := range c.stores { | ||
nextStore <- store | ||
} | ||
close(nextStore) | ||
|
||
putBack := make(chan Store, len(c.stores)) | ||
firstVal := make(chan []byte) | ||
|
||
go func() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this goroutine and errCh helps with anything. It will be fetching from all the stores sequentially anyways - so why in background? And how do you close this goroutine on How about nextStore := make(chan Store, 0, len(c.stores))
for _, store := range c.stores {
nextStore <- store
}
close(nextStore) // this will trigger !ok condition after all stores are received
putBack := make(chan Store, 0, len(c.stores))
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
case store, ok := <-nextStore:
if !ok {
return nil, ErrNotFound
}
val, err := store.Get(ctx, key)
if err != nil { // not found in this store
putBack <- store
continue // next store
}
for _, store := range putBack {
go store.Put(ctx, key, val)
}
return val, nil
}
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So you create a buffered channel and write to it even before something is reading from it and at the end we get rid of the WaitGroup, that's cool!. I need to test it tho, it seems like the channel will be closed by the time we try to read it so maybe we'll need to add another goroutine somewhere. Also, what happens with Good point on the sequential loop, I also thought about it not being sequential but I realized that that would ping all the stores indistinctly, for this particular case I think we actually want them to be sequential and we'll always stop when the first value is found, in that case we will send the value to all stores that don't have it (which are all the stores that we already passed without finding the value). This sequential is not perfect either because we can end up with something like [found, not found, not found, not found]. Maybe we should think a bit on this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
that's expected, you can still read values from closed channel :) I didn't know about it either until I read http://www.golang.to/spotlight/curious-channels-dave-cheney-8818 :)
interesting, this is exactly what we should discuss at #10 -- a broader topic
Sequential is correct, you want to GET from faster stores first (and don't even touch slower ones, like s3) -- we should document it better that chain order matters (imho, it's obvious).
Agreed -- #10 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, let's talk about that on the other issue. |
||
val := <-firstVal | ||
for store := range putBack { | ||
go store.Put(ctx, key, val) | ||
} | ||
}() | ||
|
||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return nil, ctx.Err() | ||
case store, ok := <-nextStore: | ||
if !ok { | ||
return nil, ErrNoSuchKey | ||
} | ||
val, err := store.Get(ctx, key) | ||
if err != nil { | ||
if err == ErrTimeout { | ||
return nil, err | ||
} | ||
// if c.async { } else { } ....? | ||
go fn() | ||
putBack <- store | ||
continue | ||
} | ||
|
||
// return the first value found on the chain | ||
return | ||
firstVal <- val | ||
close(putBack) | ||
|
||
return val, nil | ||
} | ||
} | ||
return | ||
|
||
panic("reached") | ||
} | ||
|
||
func (c *Chain) Del(key string) (err error) { | ||
if !IsValidKey(key) { | ||
func (c *Chain) Del(ctx context.Context, key string) (err error) { | ||
if !isValidKey(key) { | ||
return ErrInvalidKey | ||
} | ||
|
||
fn := func() (err error) { | ||
for _, s := range c.stores { | ||
err = s.Del(key) | ||
if err != nil { | ||
return | ||
} | ||
} | ||
return | ||
if err := c.firstErr(); err != nil { | ||
return fmt.Errorf("Delete failed due to a previous error: %q", err) | ||
} | ||
if c.async { | ||
go fn() | ||
} else { | ||
err = fn() | ||
|
||
fn := func(s Store) error { | ||
return s.Del(ctx, key) | ||
} | ||
return | ||
|
||
return c.doWithContext(ctx, fn) | ||
} | ||
|
||
func IsValidKey(key string) bool { | ||
return len(key) <= MaxKeyLen && !KeyInvalidator.MatchString(key) | ||
func (c *Chain) doWithContext(ctx context.Context, fn storeFn) error { | ||
var wg sync.WaitGroup | ||
|
||
for i := range c.stores { | ||
wg.Add(1) | ||
|
||
go func(s *storeWrapper) { | ||
defer wg.Done() | ||
s.setErr(fn(s)) | ||
}(c.stores[i]) | ||
} | ||
|
||
wg.Wait() | ||
|
||
return c.firstErr() | ||
} | ||
|
||
func (c *Chain) firstErr() error { | ||
var rerr error | ||
for i := range c.stores { | ||
if err := c.stores[i].err(); err != nil { | ||
rerr = err | ||
if err == ErrTimeout { | ||
// We can recover from this kind of error, so we return it and try | ||
// again. | ||
c.stores[i].setErr(nil) | ||
return err | ||
} else { | ||
break | ||
} | ||
} | ||
} | ||
return rerr | ||
} | ||
|
||
func TempDir() string { | ||
path, _ := ioutil.TempDir("", "chainstore-") | ||
return path | ||
func isValidKey(key string) bool { | ||
return len(key) <= maxKeyLen && !keyInvalidator.MatchString(key) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
package chainstore | ||
|
||
import ( | ||
"errors" | ||
) | ||
|
||
var ( | ||
ErrInvalidKey = errors.New("Invalid key") | ||
ErrMissingStores = errors.New("No stores provided") | ||
ErrNoSuchKey = errors.New("No such key") | ||
ErrTimeout = errors.New("Timed out") | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could add a "opened" property here, if required.