From 9fe5bca6d8c4c959d9abc87f0acc190fff4661a6 Mon Sep 17 00:00:00 2001 From: flowerinthenight Date: Tue, 16 Jul 2024 18:50:08 +0900 Subject: [PATCH] v2: use uint64 for token instead of timestamp --- go.mod | 2 +- spindle.go | 61 +++++++++++++++++++++++++++++------------------------- 2 files changed, 34 insertions(+), 29 deletions(-) diff --git a/go.mod b/go.mod index bff2af0..99eb5c0 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.21 require ( cloud.google.com/go/spanner v1.64.0 + github.com/cespare/xxhash/v2 v2.3.0 github.com/google/uuid v1.6.0 github.com/googleapis/gax-go/v2 v2.12.5 google.golang.org/api v0.187.0 @@ -19,7 +20,6 @@ require ( cloud.google.com/go/longrunning v0.5.9 // indirect github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp v1.5.0 // indirect github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect - github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b // indirect github.com/envoyproxy/go-control-plane v0.12.0 // indirect github.com/envoyproxy/protoc-gen-validate v1.0.4 // indirect diff --git a/spindle.go b/spindle.go index 736340b..d3c5b37 100644 --- a/spindle.go +++ b/spindle.go @@ -11,6 +11,7 @@ import ( "time" "cloud.google.com/go/spanner" + "github.com/cespare/xxhash/v2" "github.com/google/uuid" gaxv2 "github.com/googleapis/gax-go/v2" "google.golang.org/api/iterator" @@ -52,7 +53,7 @@ type Lock struct { id string // unique id for this instance duration int64 // lock duration in ms iter atomic.Int64 - token *time.Time + ttoken *time.Time mtx *sync.Mutex logger *log.Logger active atomic.Int32 @@ -64,7 +65,10 @@ func (l *Lock) Run(ctx context.Context, done ...chan error) error { var leader atomic.Int32 // for heartbeat go func() { min := (time.Millisecond * time.Duration(l.duration)) / 2 - bo := gaxv2.Backoff{Max: time.Millisecond * time.Duration(l.duration)} + bo := gaxv2.Backoff{ + Max: time.Millisecond * time.Duration(l.duration), + } + for { if ctx.Err() == context.Canceled { return // not foolproof due to delay @@ -89,14 +93,14 @@ func (l *Lock) Run(ctx context.Context, done ...chan error) error { }() locked := func() bool { - // See if there is an active leased lock (could be us, could be somebody else). - tokenLocked, diff, err := l.checkLock() + // See if there is an active leased lock (could be us, could be someone else). + token, diff, err := l.checkLock() if err != nil { l.logger.Println(err) return true // err on safer side } - if l.tokenString() != "" && l.tokenString() == tokenLocked { + if l.token() == token { leader.Add(1) if leader.Load() == 1 { l.heartbeat() // only on 1 @@ -166,7 +170,7 @@ func (l *Lock) Run(ctx context.Context, done ...chan error) error { if err == nil { l.setToken(&cts) - l.logger.Printf("%v got the lock with token [%v]", prefix, l.tokenString()) + l.logger.Printf("%v got the lock with token [%v]", prefix, l.token()) return } @@ -176,13 +180,13 @@ func (l *Lock) Run(ctx context.Context, done ...chan error) error { // For the succeeding lock attempts. if initial.Load() == 0 { prefix := "[next]" - token, _, err := l.getCurrentTokenAndId() + token, _, err := l.getCurrentToken() if err != nil { - l.logger.Printf("%v getCurrentTokenAndId failed: %v", prefix, err) + l.logger.Printf("%v getCurrentToken failed: %v", prefix, err) return } - if token == "" { + if token == 0 { l.logger.Printf("%v read token failed: empty", prefix) return } @@ -230,7 +234,7 @@ func (l *Lock) Run(ctx context.Context, done ...chan error) error { } l.setToken(&nts) // doesn't mean we're leader - l.logger.Printf("%v got the lock: token=%v", prefix, l.tokenString()) + l.logger.Printf("%v got the lock: token=%v", prefix, l.token()) } } } @@ -267,17 +271,17 @@ func (l *Lock) Run(ctx context.Context, done ...chan error) error { } // HasLock returns true if this instance got the lock, together with the lock token. -func (l *Lock) HasLock() (bool, string) { +func (l *Lock) HasLock() (bool, uint64) { if l.active.Load() == 0 { - return false, "" + return false, 0 } - token, _, err := l.getCurrentTokenAndId() + token, _, err := l.getCurrentToken() if err != nil { return false, token } - if token != "" && token == l.tokenString() { + if token == l.token() { return true, token } @@ -290,7 +294,7 @@ func (l *Lock) Leader() (string, error) { return "", ErrNotRunning } - _, w, err := l.getCurrentTokenAndId() + _, w, err := l.getCurrentToken() return w, err } @@ -303,20 +307,21 @@ func (l *Lock) Iterations() int64 { return l.iter.Load() } // Client returns the Spanner client. func (l *Lock) Client() *spanner.Client { return l.db } -func (l *Lock) tokenString() string { +func (l *Lock) token() uint64 { l.mtx.Lock() defer l.mtx.Unlock() - if l.token == nil { - return "" + if l.ttoken == nil { + return 0 } - return (*l.token).UTC().Format(time.RFC3339Nano) + v := (*l.ttoken).UTC().Format(time.RFC3339Nano) + return xxhash.Sum64String(v) } func (l *Lock) setToken(v *time.Time) { l.mtx.Lock() defer l.mtx.Unlock() - l.token = v + l.ttoken = v } type diffT struct { @@ -324,8 +329,8 @@ type diffT struct { Token spanner.NullTime } -func (l *Lock) checkLock() (string, int64, error) { - var tokenLocked string +func (l *Lock) checkLock() (uint64, int64, error) { + var token string var diff int64 err := func() error { @@ -361,13 +366,13 @@ func (l *Lock) checkLock() (string, int64, error) { } diff = v.Diff.Int64 - tokenLocked = v.Token.Time.UTC().Format(time.RFC3339Nano) + token = v.Token.Time.UTC().Format(time.RFC3339Nano) } return retErr }() - return tokenLocked, diff, err + return xxhash.Sum64String(token), diff, err } type tokenT struct { @@ -375,7 +380,7 @@ type tokenT struct { Writer spanner.NullString } -func (l *Lock) getCurrentTokenAndId() (string, string, error) { +func (l *Lock) getCurrentToken() (uint64, string, error) { var q strings.Builder fmt.Fprintf(&q, "select token, writer from %s ", l.table) fmt.Fprintf(&q, "where name = @name") @@ -394,13 +399,13 @@ func (l *Lock) getCurrentTokenAndId() (string, string, error) { } if err != nil { - return token, writer, err + return 0, writer, err } var v tokenT err = row.ToStruct(&v) if err != nil { - return token, writer, err + return 0, writer, err } token = v.Token.Time.UTC().Format(time.RFC3339Nano) @@ -409,7 +414,7 @@ func (l *Lock) getCurrentTokenAndId() (string, string, error) { } } - return token, writer, nil + return xxhash.Sum64String(token), writer, nil } func (l *Lock) heartbeat() {