Skip to content

Commit

Permalink
Add ctx in state
Browse files Browse the repository at this point in the history
Signed-off-by: pigletfly <[email protected]>
  • Loading branch information
pigletfly committed Jun 1, 2022
1 parent e2aced1 commit 15cb8eb
Show file tree
Hide file tree
Showing 55 changed files with 685 additions and 628 deletions.
9 changes: 5 additions & 4 deletions state/aerospike/aerospike.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ limitations under the License.
package aerospike

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -110,7 +111,7 @@ func (aspike *Aerospike) Features() []state.Feature {
}

// Set stores value for a key to Aerospike. It honors ETag (for concurrency) and consistency settings.
func (aspike *Aerospike) Set(req *state.SetRequest) error {
func (aspike *Aerospike) Set(ctx context.Context, req *state.SetRequest) error {
err := state.CheckRequestOptions(req.Options)
if err != nil {
return err
Expand Down Expand Up @@ -162,7 +163,7 @@ func (aspike *Aerospike) Set(req *state.SetRequest) error {
}

// Get retrieves state from Aerospike with a key.
func (aspike *Aerospike) Get(req *state.GetRequest) (*state.GetResponse, error) {
func (aspike *Aerospike) Get(ctx context.Context, req *state.GetRequest) (*state.GetResponse, error) {
asKey, err := as.NewKey(aspike.namespace, aspike.set, req.Key)
if err != nil {
return nil, err
Expand Down Expand Up @@ -196,7 +197,7 @@ func (aspike *Aerospike) Get(req *state.GetRequest) (*state.GetResponse, error)
}

// Delete performs a delete operation.
func (aspike *Aerospike) Delete(req *state.DeleteRequest) error {
func (aspike *Aerospike) Delete(ctx context.Context, req *state.DeleteRequest) error {
err := state.CheckRequestOptions(req.Options)
if err != nil {
return err
Expand Down Expand Up @@ -238,7 +239,7 @@ func (aspike *Aerospike) Delete(req *state.DeleteRequest) error {
return nil
}

func (aspike *Aerospike) Ping() error {
func (aspike *Aerospike) Ping(ctx context.Context) error {
return nil
}

Expand Down
15 changes: 8 additions & 7 deletions state/alicloud/tablestore/tablestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ limitations under the License.
package tablestore

import (
"context"
"encoding/json"

"github.com/agrea/ptr"
Expand Down Expand Up @@ -68,7 +69,7 @@ func (s *AliCloudTableStore) Features() []state.Feature {
return s.features
}

func (s *AliCloudTableStore) Get(req *state.GetRequest) (*state.GetResponse, error) {
func (s *AliCloudTableStore) Get(ctx context.Context, req *state.GetRequest) (*state.GetResponse, error) {
criteria := &tablestore.SingleRowQueryCriteria{
PrimaryKey: s.primaryKey(req.Key),
TableName: s.metadata.TableName,
Expand Down Expand Up @@ -103,7 +104,7 @@ func (s *AliCloudTableStore) getResp(columns []*tablestore.AttributeColumn) *sta
return getResp
}

func (s *AliCloudTableStore) BulkGet(reqs []state.GetRequest) (bool, []state.BulkGetResponse, error) {
func (s *AliCloudTableStore) BulkGet(ctx context.Context, reqs []state.GetRequest) (bool, []state.BulkGetResponse, error) {
// "len == 0": empty request, directly return empty response
if len(reqs) == 0 {
return true, []state.BulkGetResponse{}, nil
Expand Down Expand Up @@ -139,7 +140,7 @@ func (s *AliCloudTableStore) BulkGet(reqs []state.GetRequest) (bool, []state.Bul
return true, responseList, nil
}

func (s *AliCloudTableStore) Set(req *state.SetRequest) error {
func (s *AliCloudTableStore) Set(ctx context.Context, req *state.SetRequest) error {
change := s.updateRowChange(req)

request := &tablestore.UpdateRowRequest{
Expand Down Expand Up @@ -183,7 +184,7 @@ func unmarshal(val interface{}) []byte {
return []byte(output)
}

func (s *AliCloudTableStore) Delete(req *state.DeleteRequest) error {
func (s *AliCloudTableStore) Delete(ctx context.Context, req *state.DeleteRequest) error {
change := s.deleteRowChange(req)

deleteRowReq := &tablestore.DeleteRowRequest{
Expand All @@ -205,11 +206,11 @@ func (s *AliCloudTableStore) deleteRowChange(req *state.DeleteRequest) *tablesto
return change
}

func (s *AliCloudTableStore) BulkSet(reqs []state.SetRequest) error {
func (s *AliCloudTableStore) BulkSet(ctx context.Context, reqs []state.SetRequest) error {
return s.batchWrite(reqs, nil)
}

func (s *AliCloudTableStore) BulkDelete(reqs []state.DeleteRequest) error {
func (s *AliCloudTableStore) BulkDelete(ctx context.Context, reqs []state.DeleteRequest) error {
return s.batchWrite(nil, reqs)
}

Expand All @@ -234,7 +235,7 @@ func (s *AliCloudTableStore) batchWrite(setReqs []state.SetRequest, deleteReqs [
return nil
}

func (s *AliCloudTableStore) Ping() error {
func (s *AliCloudTableStore) Ping(ctx context.Context) error {
return nil
}

Expand Down
17 changes: 9 additions & 8 deletions state/alicloud/tablestore/tablestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ limitations under the License.
package tablestore

import (
"context"
"testing"

"github.com/agrea/ptr"
Expand Down Expand Up @@ -63,15 +64,15 @@ func TestReadAndWrite(t *testing.T) {
Value: "value of key",
ETag: ptr.String("the etag"),
}
err := store.Set(setReq)
err := store.Set(context.Background(), setReq)
assert.Nil(t, err)
})

t.Run("test get 1", func(t *testing.T) {
getReq := &state.GetRequest{
Key: "theFirstKey",
}
resp, err := store.Get(getReq)
resp, err := store.Get(context.Background(), getReq)
assert.Nil(t, err)
assert.NotNil(t, resp)
assert.Equal(t, "value of key", string(resp.Data))
Expand All @@ -83,22 +84,22 @@ func TestReadAndWrite(t *testing.T) {
Value: "1234",
ETag: ptr.String("the etag"),
}
err := store.Set(setReq)
err := store.Set(context.Background(), setReq)
assert.Nil(t, err)
})

t.Run("test get 2", func(t *testing.T) {
getReq := &state.GetRequest{
Key: "theSecondKey",
}
resp, err := store.Get(getReq)
resp, err := store.Get(context.Background(), getReq)
assert.Nil(t, err)
assert.NotNil(t, resp)
assert.Equal(t, "1234", string(resp.Data))
})

t.Run("test BulkSet", func(t *testing.T) {
err := store.BulkSet([]state.SetRequest{{
err := store.BulkSet(context.Background(), []state.SetRequest{{
Key: "theFirstKey",
Value: "666",
}, {
Expand All @@ -110,7 +111,7 @@ func TestReadAndWrite(t *testing.T) {
})

t.Run("test BulkGet", func(t *testing.T) {
_, resp, err := store.BulkGet([]state.GetRequest{{
_, resp, err := store.BulkGet(context.Background(), []state.GetRequest{{
Key: "theFirstKey",
}, {
Key: "theSecondKey",
Expand All @@ -126,12 +127,12 @@ func TestReadAndWrite(t *testing.T) {
req := &state.DeleteRequest{
Key: "theFirstKey",
}
err := store.Delete(req)
err := store.Delete(context.Background(), req)
assert.Nil(t, err)
})

t.Run("test BulkGet2", func(t *testing.T) {
_, resp, err := store.BulkGet([]state.GetRequest{{
_, resp, err := store.BulkGet(context.Background(), []state.GetRequest{{
Key: "theFirstKey",
}, {
Key: "theSecondKey",
Expand Down
15 changes: 8 additions & 7 deletions state/aws/dynamodb/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ limitations under the License.
package dynamodb

import (
"context"
"encoding/json"
"fmt"
"strconv"
Expand Down Expand Up @@ -70,7 +71,7 @@ func (d *StateStore) Init(metadata state.Metadata) error {
return nil
}

func (d *StateStore) Ping() error {
func (d *StateStore) Ping(ctx context.Context) error {
return nil
}

Expand All @@ -80,7 +81,7 @@ func (d *StateStore) Features() []state.Feature {
}

// Get retrieves a dynamoDB item.
func (d *StateStore) Get(req *state.GetRequest) (*state.GetResponse, error) {
func (d *StateStore) Get(ctx context.Context, req *state.GetRequest) (*state.GetResponse, error) {
input := &dynamodb.GetItemInput{
ConsistentRead: aws.Bool(req.Options.Consistency == state.Strong),
TableName: aws.String(d.table),
Expand Down Expand Up @@ -124,13 +125,13 @@ func (d *StateStore) Get(req *state.GetRequest) (*state.GetResponse, error) {
}

// BulkGet performs a bulk get operations.
func (d *StateStore) BulkGet(req []state.GetRequest) (bool, []state.BulkGetResponse, error) {
func (d *StateStore) BulkGet(ctx context.Context, req []state.GetRequest) (bool, []state.BulkGetResponse, error) {
// TODO: replace with dynamodb.BatchGetItem for performance
return false, nil, nil
}

// Set saves a dynamoDB item.
func (d *StateStore) Set(req *state.SetRequest) error {
func (d *StateStore) Set(ctx context.Context, req *state.SetRequest) error {
value, err := d.marshalToString(req.Value)
if err != nil {
return fmt.Errorf("dynamodb error: failed to set key %s: %s", req.Key, err)
Expand Down Expand Up @@ -176,7 +177,7 @@ func (d *StateStore) Set(req *state.SetRequest) error {
}

// BulkSet performs a bulk set operation.
func (d *StateStore) BulkSet(req []state.SetRequest) error {
func (d *StateStore) BulkSet(ctx context.Context, req []state.SetRequest) error {
writeRequests := []*dynamodb.WriteRequest{}

for _, r := range req {
Expand Down Expand Up @@ -234,7 +235,7 @@ func (d *StateStore) BulkSet(req []state.SetRequest) error {
}

// Delete performs a delete operation.
func (d *StateStore) Delete(req *state.DeleteRequest) error {
func (d *StateStore) Delete(ctx context.Context, req *state.DeleteRequest) error {
input := &dynamodb.DeleteItemInput{
Key: map[string]*dynamodb.AttributeValue{
"key": {
Expand All @@ -249,7 +250,7 @@ func (d *StateStore) Delete(req *state.DeleteRequest) error {
}

// BulkDelete performs a bulk delete operation.
func (d *StateStore) BulkDelete(req []state.DeleteRequest) error {
func (d *StateStore) BulkDelete(ctx context.Context, req []state.DeleteRequest) error {
writeRequests := []*dynamodb.WriteRequest{}

for _, r := range req {
Expand Down
Loading

0 comments on commit 15cb8eb

Please sign in to comment.