Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add context in state component #1708

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions state/aerospike/aerospike.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ limitations under the License.
package aerospike

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -110,7 +111,7 @@ func (aspike *Aerospike) Features() []state.Feature {
}

// Set stores value for a key to Aerospike. It honors ETag (for concurrency) and consistency settings.
func (aspike *Aerospike) Set(req *state.SetRequest) error {
func (aspike *Aerospike) Set(ctx context.Context, req *state.SetRequest) error {
err := state.CheckRequestOptions(req.Options)
if err != nil {
return err
Expand Down Expand Up @@ -162,7 +163,7 @@ func (aspike *Aerospike) Set(req *state.SetRequest) error {
}

// Get retrieves state from Aerospike with a key.
func (aspike *Aerospike) Get(req *state.GetRequest) (*state.GetResponse, error) {
func (aspike *Aerospike) Get(ctx context.Context, req *state.GetRequest) (*state.GetResponse, error) {
asKey, err := as.NewKey(aspike.namespace, aspike.set, req.Key)
if err != nil {
return nil, err
Expand Down Expand Up @@ -196,7 +197,7 @@ func (aspike *Aerospike) Get(req *state.GetRequest) (*state.GetResponse, error)
}

// Delete performs a delete operation.
func (aspike *Aerospike) Delete(req *state.DeleteRequest) error {
func (aspike *Aerospike) Delete(ctx context.Context, req *state.DeleteRequest) error {
err := state.CheckRequestOptions(req.Options)
if err != nil {
return err
Expand Down Expand Up @@ -238,7 +239,7 @@ func (aspike *Aerospike) Delete(req *state.DeleteRequest) error {
return nil
}

func (aspike *Aerospike) Ping() error {
func (aspike *Aerospike) Ping(ctx context.Context) error {
return nil
}

Expand Down
15 changes: 8 additions & 7 deletions state/alicloud/tablestore/tablestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ limitations under the License.
package tablestore

import (
"context"
"encoding/json"

"github.com/agrea/ptr"
Expand Down Expand Up @@ -68,7 +69,7 @@ func (s *AliCloudTableStore) Features() []state.Feature {
return s.features
}

func (s *AliCloudTableStore) Get(req *state.GetRequest) (*state.GetResponse, error) {
func (s *AliCloudTableStore) Get(ctx context.Context, req *state.GetRequest) (*state.GetResponse, error) {
criteria := &tablestore.SingleRowQueryCriteria{
PrimaryKey: s.primaryKey(req.Key),
TableName: s.metadata.TableName,
Expand Down Expand Up @@ -103,7 +104,7 @@ func (s *AliCloudTableStore) getResp(columns []*tablestore.AttributeColumn) *sta
return getResp
}

func (s *AliCloudTableStore) BulkGet(reqs []state.GetRequest) (bool, []state.BulkGetResponse, error) {
func (s *AliCloudTableStore) BulkGet(ctx context.Context, reqs []state.GetRequest) (bool, []state.BulkGetResponse, error) {
// "len == 0": empty request, directly return empty response
if len(reqs) == 0 {
return true, []state.BulkGetResponse{}, nil
Expand Down Expand Up @@ -139,7 +140,7 @@ func (s *AliCloudTableStore) BulkGet(reqs []state.GetRequest) (bool, []state.Bul
return true, responseList, nil
}

func (s *AliCloudTableStore) Set(req *state.SetRequest) error {
func (s *AliCloudTableStore) Set(ctx context.Context, req *state.SetRequest) error {
change := s.updateRowChange(req)

request := &tablestore.UpdateRowRequest{
Expand Down Expand Up @@ -183,7 +184,7 @@ func unmarshal(val interface{}) []byte {
return []byte(output)
}

func (s *AliCloudTableStore) Delete(req *state.DeleteRequest) error {
func (s *AliCloudTableStore) Delete(ctx context.Context, req *state.DeleteRequest) error {
change := s.deleteRowChange(req)

deleteRowReq := &tablestore.DeleteRowRequest{
Expand All @@ -205,11 +206,11 @@ func (s *AliCloudTableStore) deleteRowChange(req *state.DeleteRequest) *tablesto
return change
}

func (s *AliCloudTableStore) BulkSet(reqs []state.SetRequest) error {
func (s *AliCloudTableStore) BulkSet(ctx context.Context, reqs []state.SetRequest) error {
return s.batchWrite(reqs, nil)
}

func (s *AliCloudTableStore) BulkDelete(reqs []state.DeleteRequest) error {
func (s *AliCloudTableStore) BulkDelete(ctx context.Context, reqs []state.DeleteRequest) error {
return s.batchWrite(nil, reqs)
}

Expand All @@ -234,7 +235,7 @@ func (s *AliCloudTableStore) batchWrite(setReqs []state.SetRequest, deleteReqs [
return nil
}

func (s *AliCloudTableStore) Ping() error {
func (s *AliCloudTableStore) Ping(ctx context.Context) error {
return nil
}

Expand Down
17 changes: 9 additions & 8 deletions state/alicloud/tablestore/tablestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ limitations under the License.
package tablestore

import (
"context"
"testing"

"github.com/agrea/ptr"
Expand Down Expand Up @@ -63,15 +64,15 @@ func TestReadAndWrite(t *testing.T) {
Value: "value of key",
ETag: ptr.String("the etag"),
}
err := store.Set(setReq)
err := store.Set(context.Background(), setReq)
assert.Nil(t, err)
})

t.Run("test get 1", func(t *testing.T) {
getReq := &state.GetRequest{
Key: "theFirstKey",
}
resp, err := store.Get(getReq)
resp, err := store.Get(context.Background(), getReq)
assert.Nil(t, err)
assert.NotNil(t, resp)
assert.Equal(t, "value of key", string(resp.Data))
Expand All @@ -83,22 +84,22 @@ func TestReadAndWrite(t *testing.T) {
Value: "1234",
ETag: ptr.String("the etag"),
}
err := store.Set(setReq)
err := store.Set(context.Background(), setReq)
assert.Nil(t, err)
})

t.Run("test get 2", func(t *testing.T) {
getReq := &state.GetRequest{
Key: "theSecondKey",
}
resp, err := store.Get(getReq)
resp, err := store.Get(context.Background(), getReq)
assert.Nil(t, err)
assert.NotNil(t, resp)
assert.Equal(t, "1234", string(resp.Data))
})

t.Run("test BulkSet", func(t *testing.T) {
err := store.BulkSet([]state.SetRequest{{
err := store.BulkSet(context.Background(), []state.SetRequest{{
Key: "theFirstKey",
Value: "666",
}, {
Expand All @@ -110,7 +111,7 @@ func TestReadAndWrite(t *testing.T) {
})

t.Run("test BulkGet", func(t *testing.T) {
_, resp, err := store.BulkGet([]state.GetRequest{{
_, resp, err := store.BulkGet(context.Background(), []state.GetRequest{{
Key: "theFirstKey",
}, {
Key: "theSecondKey",
Expand All @@ -126,12 +127,12 @@ func TestReadAndWrite(t *testing.T) {
req := &state.DeleteRequest{
Key: "theFirstKey",
}
err := store.Delete(req)
err := store.Delete(context.Background(), req)
assert.Nil(t, err)
})

t.Run("test BulkGet2", func(t *testing.T) {
_, resp, err := store.BulkGet([]state.GetRequest{{
_, resp, err := store.BulkGet(context.Background(), []state.GetRequest{{
Key: "theFirstKey",
}, {
Key: "theSecondKey",
Expand Down
15 changes: 8 additions & 7 deletions state/aws/dynamodb/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ limitations under the License.
package dynamodb

import (
"context"
"encoding/json"
"fmt"
"strconv"
Expand Down Expand Up @@ -70,7 +71,7 @@ func (d *StateStore) Init(metadata state.Metadata) error {
return nil
}

func (d *StateStore) Ping() error {
func (d *StateStore) Ping(ctx context.Context) error {
return nil
}

Expand All @@ -80,7 +81,7 @@ func (d *StateStore) Features() []state.Feature {
}

// Get retrieves a dynamoDB item.
func (d *StateStore) Get(req *state.GetRequest) (*state.GetResponse, error) {
func (d *StateStore) Get(ctx context.Context, req *state.GetRequest) (*state.GetResponse, error) {
input := &dynamodb.GetItemInput{
ConsistentRead: aws.Bool(req.Options.Consistency == state.Strong),
TableName: aws.String(d.table),
Expand Down Expand Up @@ -124,13 +125,13 @@ func (d *StateStore) Get(req *state.GetRequest) (*state.GetResponse, error) {
}

// BulkGet performs a bulk get operations.
func (d *StateStore) BulkGet(req []state.GetRequest) (bool, []state.BulkGetResponse, error) {
func (d *StateStore) BulkGet(ctx context.Context, req []state.GetRequest) (bool, []state.BulkGetResponse, error) {
// TODO: replace with dynamodb.BatchGetItem for performance
return false, nil, nil
}

// Set saves a dynamoDB item.
func (d *StateStore) Set(req *state.SetRequest) error {
func (d *StateStore) Set(ctx context.Context, req *state.SetRequest) error {
value, err := d.marshalToString(req.Value)
if err != nil {
return fmt.Errorf("dynamodb error: failed to set key %s: %s", req.Key, err)
Expand Down Expand Up @@ -176,7 +177,7 @@ func (d *StateStore) Set(req *state.SetRequest) error {
}

// BulkSet performs a bulk set operation.
func (d *StateStore) BulkSet(req []state.SetRequest) error {
func (d *StateStore) BulkSet(ctx context.Context, req []state.SetRequest) error {
writeRequests := []*dynamodb.WriteRequest{}

for _, r := range req {
Expand Down Expand Up @@ -234,7 +235,7 @@ func (d *StateStore) BulkSet(req []state.SetRequest) error {
}

// Delete performs a delete operation.
func (d *StateStore) Delete(req *state.DeleteRequest) error {
func (d *StateStore) Delete(ctx context.Context, req *state.DeleteRequest) error {
input := &dynamodb.DeleteItemInput{
Key: map[string]*dynamodb.AttributeValue{
"key": {
Expand All @@ -249,7 +250,7 @@ func (d *StateStore) Delete(req *state.DeleteRequest) error {
}

// BulkDelete performs a bulk delete operation.
func (d *StateStore) BulkDelete(req []state.DeleteRequest) error {
func (d *StateStore) BulkDelete(ctx context.Context, req []state.DeleteRequest) error {
writeRequests := []*dynamodb.WriteRequest{}

for _, r := range req {
Expand Down
Loading