Skip to content
This repository has been archived by the owner on Nov 23, 2018. It is now read-only.

Adding support for context.Context #9

Merged
merged 13 commits into from
Oct 26, 2015
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
*.sw?
.DS_Store

255 changes: 168 additions & 87 deletions chainstore.go
Original file line number Diff line number Diff line change
@@ -1,147 +1,228 @@
package chainstore

import (
"errors"
"io/ioutil"
"fmt"
"regexp"
"sync"
"time"

"golang.org/x/net/context"
)

type storeFn func(s Store) error

var (
ErrInvalidKey = errors.New("Invalid key")
keyInvalidator = regexp.MustCompile(`(i?)[^a-z0-9\/_\-:\.]`)
)

KeyInvalidator = regexp.MustCompile(`(i?)[^a-z0-9\/_\-:\.]`)
var (
DefaultTimeout = time.Millisecond * 3500
)

const (
MaxKeyLen = 256
maxKeyLen = 256
)

type Store interface {
Open() error
Close() error
Put(key string, val []byte) error
Get(key string) ([]byte, error)
Del(key string) error
Put(ctx context.Context, key string, val []byte) error
Get(ctx context.Context, key string) ([]byte, error)
Del(ctx context.Context, key string) error
}

// TODO: how can we check if a store has been opened...?
type storeWrapper struct {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could add a "opened" property here, if required.

Store
errE error
errMu sync.RWMutex
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this mutex? Is it even possible that anything would be writing error to one store at the same time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it happened a couple of times while testing, I found it with -race.

}

func (s *storeWrapper) err() error {
s.errMu.RLock()
defer s.errMu.RUnlock()
return s.errE
}

func (s *storeWrapper) setErr(err error) {
s.errMu.Lock()
defer s.errMu.Unlock()
s.errE = err
}

// Chain represents a store chain.
type Chain struct {
stores []Store
async bool
stores []*storeWrapper
}

// New creates a new store chain backed by the passed stores.
func New(stores ...Store) Store {
return &Chain{stores, false}
// TODO: make the chain..
// call Open(), but in case of error..?
c := &Chain{
stores: make([]*storeWrapper, 0, len(stores)),
}
for _, s := range stores {
c.stores = append(c.stores, &storeWrapper{Store: s})
}
return c
}

func Async(stores ...Store) Store {
return &Chain{stores, true}
}
// Open opens all the stores.
func (c *Chain) Open() error {

func (c *Chain) Open() (err error) {
for _, s := range c.stores {
err = s.Open()
if err != nil {
return // return first error that comes up
}
if err := c.firstErr(); err != nil {
return fmt.Errorf("Open failed due to a previous error: %q", err)
}
return

var wg sync.WaitGroup

for i := range c.stores {
wg.Add(1)
go func(s *storeWrapper) {
defer wg.Done()
s.setErr(s.Open())
}(c.stores[i])
}

wg.Wait()

return c.firstErr()
}

func (c *Chain) Close() (err error) {
for _, s := range c.stores {
err = s.Close()
// TODO: we shouldn't stop on first error.. should keep trying to close
// and record errors separately
if err != nil {
return
}
// Close closes all the stores.
func (c *Chain) Close() error {
var wg sync.WaitGroup

for i := range c.stores {
wg.Add(1)
go func(s *storeWrapper) {
defer wg.Done()
s.setErr(s.Close())
}(c.stores[i])
}
return

wg.Wait()

return c.firstErr()
}

func (c *Chain) Put(key string, val []byte) (err error) {
if !IsValidKey(key) {
func (c *Chain) Put(ctx context.Context, key string, val []byte) (err error) {
if !isValidKey(key) {
return ErrInvalidKey
}

fn := func() (err error) {
for _, s := range c.stores {
err = s.Put(key, val)
if err != nil {
return
}
}
return
if err := c.firstErr(); err != nil {
return fmt.Errorf("Open failed due to a previous error: %q", err)
}
if c.async {
go fn()
} else {
err = fn()

fn := func(s Store) error {
return s.Put(ctx, key, val)
}
return

return c.doWithContext(ctx, fn)
}

func (c *Chain) Get(key string) (val []byte, err error) {
if !IsValidKey(key) {
func (c *Chain) Get(ctx context.Context, key string) (val []byte, err error) {
if !isValidKey(key) {
return nil, ErrInvalidKey
}

for i, s := range c.stores {
val, err = s.Get(key)
if err != nil {
return
}
if err := c.firstErr(); err != nil {
return nil, fmt.Errorf("Open failed due to a previous error: %q", err)
}

if len(val) > 0 {
if i > 0 {
// put the value in all other stores up the chain
fn := func() {
for n := i - 1; n >= 0; n-- {
c.stores[n].Put(key, val) // errors..?
}
nextStore := make(chan Store, len(c.stores))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @VojtechVitek,

I had to modify your snippet a bit, what do you think of the new location of the range over putBack? Otherwise it just stays blocked waiting for a value to arrive.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did it block because the channel was unbuffered?
putBack := make(chan Store, len(c.stores)) shouldn't block, I can't imagine how.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taking that back ... obviously not, let me see the code again :) I was throwing it of top of my head. But this looks good already :)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, I have a working snippet for you... without firstVal channel and the extra goroutine:

package main

import (
    "errors"
    "fmt"
    "time"

    "golang.org/x/net/context"
)

func Get(ctx context.Context) (string, error) {
    nextStore := make(chan string, 3)
    for _, store := range []string{"first", "second", "third"} {
        nextStore <- store
    }
    close(nextStore)

    putBack := make(chan string, 3)

    for {
        select {
        case <-ctx.Done():
            return "", ctx.Err()
        case store, ok := <-nextStore:
            if !ok {
                return "", errors.New("not found")
            }

            if store != "fourth" { // change this --- simulating what store returns FOUND
                putBack <- store
                break
            }
            close(putBack)

            for store := range putBack {
                go fmt.Printf("putting back to %v store\n", store)
            }

            return fmt.Sprintf("found in %v", store), nil
        }
    }

    panic("unreachable")
}

func main() {
    ctx, _ := context.WithTimeout(context.Background(), time.Second)
    val, err := Get(ctx)
    fmt.Println(val, err)

    // wait for fmt.Printf() from goroutines
    time.Sleep(time.Second)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool! makes sense, the close(putBack) is what makes the range not to lock. I like it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried closing the channel and everything worked fine :).

for _, store := range c.stores {
nextStore <- store
}
close(nextStore)

putBack := make(chan Store, len(c.stores))
firstVal := make(chan []byte)

go func() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this goroutine and errCh helps with anything. It will be fetching from all the stores sequentially anyways - so why in background? And how do you close this goroutine on ctx.Done()?

How about

nextStore := make(chan Store, 0, len(c.stores))
for _, store := range c.stores {
      nextStore <- store
}
close(nextStore) // this will trigger !ok condition after all stores are received

putBack := make(chan Store, 0, len(c.stores))

for {
    select {
        case <-ctx.Done():
            return nil, ctx.Err()
        case store, ok := <-nextStore:
            if !ok {
                return nil, ErrNotFound
            }
            val, err := store.Get(ctx, key)
            if err != nil { // not found in this store
                putBack <- store
                continue // next store
            }
            for _, store := range putBack {
                go store.Put(ctx, key, val)
            }
            return val, nil
    }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you create a buffered channel and write to it even before something is reading from it and at the end we get rid of the WaitGroup, that's cool!. I need to test it tho, it seems like the channel will be closed by the time we try to read it so maybe we'll need to add another goroutine somewhere.

Also, what happens with putBack if the value is never found and what happens with it if the stores reply with something like [not found, found, not found, not found]?

Good point on the sequential loop, I also thought about it not being sequential but I realized that that would ping all the stores indistinctly, for this particular case I think we actually want them to be sequential and we'll always stop when the first value is found, in that case we will send the value to all stores that don't have it (which are all the stores that we already passed without finding the value). This sequential is not perfect either because we can end up with something like [found, not found, not found, not found].

Maybe we should think a bit on this Get again, what do think @pkieltyka?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems like the channel will be closed by the time we try to read it so maybe we'll need to add another goroutine somewhere

that's expected, you can still read values from closed channel :) I didn't know about it either until I read http://www.golang.to/spotlight/curious-channels-dave-cheney-8818 :)

Also, what happens with putBack if the value is never found and what happens with it if the stores reply with something like [not found, found, not found, not found]?

interesting, this is exactly what we should discuss at #10 -- a broader topic

Good point on the sequential loop, I also thought about it not being sequential but I realized that that would ping all the stores indistinctly, for this particular case I think we actually want them to be sequential and we'll always stop when the first value is found, in that case we will send the value to all stores that don't have it (which are all the stores that we already passed without finding the value).

Sequential is correct, you want to GET from faster stores first (and don't even touch slower ones, like s3) -- we should document it better that chain order matters (imho, it's obvious).

This sequential is not perfect either because we can end up with something like [found, not found, not found, not found].

Agreed -- #10

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, let's talk about that on the other issue.

val := <-firstVal
for store := range putBack {
go store.Put(ctx, key, val)
}
}()

for {
select {
case <-ctx.Done():
return nil, ctx.Err()
case store, ok := <-nextStore:
if !ok {
return nil, ErrNoSuchKey
}
val, err := store.Get(ctx, key)
if err != nil {
if err == ErrTimeout {
return nil, err
}
// if c.async { } else { } ....?
go fn()
putBack <- store
continue
}

// return the first value found on the chain
return
firstVal <- val
close(putBack)

return val, nil
}
}
return

panic("reached")
}

func (c *Chain) Del(key string) (err error) {
if !IsValidKey(key) {
func (c *Chain) Del(ctx context.Context, key string) (err error) {
if !isValidKey(key) {
return ErrInvalidKey
}

fn := func() (err error) {
for _, s := range c.stores {
err = s.Del(key)
if err != nil {
return
}
}
return
if err := c.firstErr(); err != nil {
return fmt.Errorf("Delete failed due to a previous error: %q", err)
}
if c.async {
go fn()
} else {
err = fn()

fn := func(s Store) error {
return s.Del(ctx, key)
}
return

return c.doWithContext(ctx, fn)
}

func IsValidKey(key string) bool {
return len(key) <= MaxKeyLen && !KeyInvalidator.MatchString(key)
func (c *Chain) doWithContext(ctx context.Context, fn storeFn) error {
var wg sync.WaitGroup

for i := range c.stores {
wg.Add(1)

go func(s *storeWrapper) {
defer wg.Done()
s.setErr(fn(s))
}(c.stores[i])
}

wg.Wait()

return c.firstErr()
}

func (c *Chain) firstErr() error {
var rerr error
for i := range c.stores {
if err := c.stores[i].err(); err != nil {
rerr = err
if err == ErrTimeout {
// We can recover from this kind of error, so we return it and try
// again.
c.stores[i].setErr(nil)
return err
} else {
break
}
}
}
return rerr
}

func TempDir() string {
path, _ := ioutil.TempDir("", "chainstore-")
return path
func isValidKey(key string) bool {
return len(key) <= maxKeyLen && !keyInvalidator.MatchString(key)
}
12 changes: 12 additions & 0 deletions error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package chainstore

import (
"errors"
)

var (
ErrInvalidKey = errors.New("Invalid key")
ErrMissingStores = errors.New("No stores provided")
ErrNoSuchKey = errors.New("No such key")
ErrTimeout = errors.New("Timed out")
)
Loading