diff --git a/.github/scripts/test-info.mjs b/.github/scripts/test-info.mjs index d0d315f316..959f5d3543 100644 --- a/.github/scripts/test-info.mjs +++ b/.github/scripts/test-info.mjs @@ -542,7 +542,11 @@ const components = { 'internal/component/sql', ], }, - 'state.etcd': { + 'state.etcd.v1': { + conformance: true, + conformanceSetup: 'docker-compose.sh etcd', + }, + 'state.etcd.v2': { conformance: true, conformanceSetup: 'docker-compose.sh etcd', }, diff --git a/go.mod b/go.mod index c6cedb8d5a..ae0d907fa8 100644 --- a/go.mod +++ b/go.mod @@ -119,6 +119,7 @@ require ( golang.org/x/oauth2 v0.8.0 google.golang.org/api v0.115.0 google.golang.org/grpc v1.54.0 + google.golang.org/protobuf v1.30.0 gopkg.in/couchbase/gocb.v1 v1.6.7 gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df gopkg.in/yaml.v3 v3.0.1 @@ -371,7 +372,6 @@ require ( golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20230403163135-c38d8f061ccd // indirect - google.golang.org/protobuf v1.30.0 // indirect gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect gopkg.in/couchbase/gocbcore.v7 v7.1.18 // indirect gopkg.in/couchbaselabs/gocbconnstr.v1 v1.0.4 // indirect diff --git a/internal/proto/state/etcd/v2/value.pb.go b/internal/proto/state/etcd/v2/value.pb.go new file mode 100644 index 0000000000..e411d91ab2 --- /dev/null +++ b/internal/proto/state/etcd/v2/value.pb.go @@ -0,0 +1,202 @@ +// +//Copyright 2021 The Dapr Authors +//Licensed under the Apache License, Version 2.0 (the "License"); +//you may not use this file except in compliance with the License. +//You may obtain a copy of the License at +//http://www.apache.org/licenses/LICENSE-2.0 +//Unless required by applicable law or agreed to in writing, software +//distributed under the License is distributed on an "AS IS" BASIS, +//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +//See the License for the specific language governing permissions and +//limitations under the License. + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.1 +// protoc v3.21.12 +// source: internal/proto/state/etcd/v2/value.proto + +package v2 + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + durationpb "google.golang.org/protobuf/types/known/durationpb" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// Value is the value of the state key item. It contains the underlying data as +// well as necessary metadata. +type Value struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Required. The value of the state key item. + Data []byte `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"` + // Required. The creation time of the state key item. This is an + // approximation by the components-contrib instance since ETCD does not + // provide this information natively. + Ts *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=ts,proto3" json:"ts,omitempty"` + // Optional. The Time To Live of the state key item. The duration of the TTL + // is from the creation time of the key (`ts`). If not specified, the key has + // no TTL. + Ttl *durationpb.Duration `protobuf:"bytes,3,opt,name=ttl,proto3,oneof" json:"ttl,omitempty"` +} + +func (x *Value) Reset() { + *x = Value{} + if protoimpl.UnsafeEnabled { + mi := &file_internal_proto_state_etcd_v2_value_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Value) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Value) ProtoMessage() {} + +func (x *Value) ProtoReflect() protoreflect.Message { + mi := &file_internal_proto_state_etcd_v2_value_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Value.ProtoReflect.Descriptor instead. +func (*Value) Descriptor() ([]byte, []int) { + return file_internal_proto_state_etcd_v2_value_proto_rawDescGZIP(), []int{0} +} + +func (x *Value) GetData() []byte { + if x != nil { + return x.Data + } + return nil +} + +func (x *Value) GetTs() *timestamppb.Timestamp { + if x != nil { + return x.Ts + } + return nil +} + +func (x *Value) GetTtl() *durationpb.Duration { + if x != nil { + return x.Ttl + } + return nil +} + +var File_internal_proto_state_etcd_v2_value_proto protoreflect.FileDescriptor + +var file_internal_proto_state_etcd_v2_value_proto_rawDesc = []byte{ + 0x0a, 0x28, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x2f, 0x73, 0x74, 0x61, 0x74, 0x65, 0x2f, 0x65, 0x74, 0x63, 0x64, 0x2f, 0x76, 0x32, 0x2f, 0x76, + 0x61, 0x6c, 0x75, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x1e, 0x64, 0x61, 0x70, 0x72, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x63, 0x6f, 0x6d, 0x70, 0x6f, 0x6e, 0x65, 0x6e, 0x74, + 0x73, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x2e, 0x76, 0x32, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, + 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, + 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1e, 0x67, 0x6f, 0x6f, + 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x64, 0x75, 0x72, + 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x81, 0x01, 0x0a, 0x05, + 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x12, 0x2a, 0x0a, 0x02, 0x74, 0x73, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, + 0x70, 0x52, 0x02, 0x74, 0x73, 0x12, 0x30, 0x0a, 0x03, 0x74, 0x74, 0x6c, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x44, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x48, 0x00, 0x52, + 0x03, 0x74, 0x74, 0x6c, 0x88, 0x01, 0x01, 0x42, 0x06, 0x0a, 0x04, 0x5f, 0x74, 0x74, 0x6c, 0x42, + 0x41, 0x5a, 0x3f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x64, 0x61, + 0x70, 0x72, 0x2f, 0x63, 0x6f, 0x6d, 0x70, 0x6f, 0x6e, 0x65, 0x6e, 0x74, 0x73, 0x2d, 0x63, 0x6f, + 0x6e, 0x74, 0x72, 0x69, 0x62, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x74, 0x61, 0x74, 0x65, 0x2f, 0x65, 0x74, 0x63, 0x64, 0x2f, + 0x76, 0x32, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_internal_proto_state_etcd_v2_value_proto_rawDescOnce sync.Once + file_internal_proto_state_etcd_v2_value_proto_rawDescData = file_internal_proto_state_etcd_v2_value_proto_rawDesc +) + +func file_internal_proto_state_etcd_v2_value_proto_rawDescGZIP() []byte { + file_internal_proto_state_etcd_v2_value_proto_rawDescOnce.Do(func() { + file_internal_proto_state_etcd_v2_value_proto_rawDescData = protoimpl.X.CompressGZIP(file_internal_proto_state_etcd_v2_value_proto_rawDescData) + }) + return file_internal_proto_state_etcd_v2_value_proto_rawDescData +} + +var file_internal_proto_state_etcd_v2_value_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_internal_proto_state_etcd_v2_value_proto_goTypes = []interface{}{ + (*Value)(nil), // 0: dapr.proto.components.state.v2.Value + (*timestamppb.Timestamp)(nil), // 1: google.protobuf.Timestamp + (*durationpb.Duration)(nil), // 2: google.protobuf.Duration +} +var file_internal_proto_state_etcd_v2_value_proto_depIdxs = []int32{ + 1, // 0: dapr.proto.components.state.v2.Value.ts:type_name -> google.protobuf.Timestamp + 2, // 1: dapr.proto.components.state.v2.Value.ttl:type_name -> google.protobuf.Duration + 2, // [2:2] is the sub-list for method output_type + 2, // [2:2] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name +} + +func init() { file_internal_proto_state_etcd_v2_value_proto_init() } +func file_internal_proto_state_etcd_v2_value_proto_init() { + if File_internal_proto_state_etcd_v2_value_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_internal_proto_state_etcd_v2_value_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Value); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + file_internal_proto_state_etcd_v2_value_proto_msgTypes[0].OneofWrappers = []interface{}{} + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_internal_proto_state_etcd_v2_value_proto_rawDesc, + NumEnums: 0, + NumMessages: 1, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_internal_proto_state_etcd_v2_value_proto_goTypes, + DependencyIndexes: file_internal_proto_state_etcd_v2_value_proto_depIdxs, + MessageInfos: file_internal_proto_state_etcd_v2_value_proto_msgTypes, + }.Build() + File_internal_proto_state_etcd_v2_value_proto = out.File + file_internal_proto_state_etcd_v2_value_proto_rawDesc = nil + file_internal_proto_state_etcd_v2_value_proto_goTypes = nil + file_internal_proto_state_etcd_v2_value_proto_depIdxs = nil +} diff --git a/internal/proto/state/etcd/v2/value.proto b/internal/proto/state/etcd/v2/value.proto new file mode 100644 index 0000000000..7c1c6d6c3b --- /dev/null +++ b/internal/proto/state/etcd/v2/value.proto @@ -0,0 +1,39 @@ +/* +Copyright 2021 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +syntax = "proto3"; + +package dapr.proto.components.state.v2; + +import "google/protobuf/timestamp.proto"; +import "google/protobuf/duration.proto"; + +option go_package = "github.com/dapr/components-contrib/internal/proto/state/etcd/v2"; + + +// Value is the value of the state key item. It contains the underlying data as +// well as necessary metadata. +message Value { + // Required. The value of the state key item. + bytes data = 1; + + // Required. The creation time of the state key item. This is an + // approximation by the components-contrib instance since ETCD does not + // provide this information natively. + google.protobuf.Timestamp ts = 2; + + // Optional. The Time To Live of the state key item. The duration of the TTL + // is from the creation time of the key (`ts`). If not specified, the key has + // no TTL. + optional google.protobuf.Duration ttl = 3; +} diff --git a/state/etcd/etcd.go b/state/etcd/etcd.go index 5dc68f60f8..30b5c93df1 100644 --- a/state/etcd/etcd.go +++ b/state/etcd/etcd.go @@ -17,7 +17,6 @@ import ( "context" "crypto/tls" "crypto/x509" - "encoding/json" "errors" "fmt" "reflect" @@ -43,6 +42,7 @@ type Etcd struct { keyPrefixPath string features []state.Feature logger logger.Logger + schema schemaMarshaller } type etcdConfig struct { @@ -55,9 +55,19 @@ type etcdConfig struct { Key string `json:"key"` } -// NewEtcdStateStore returns a new etcd state store. -func NewEtcdStateStore(logger logger.Logger) state.Store { +// NewEtcdStateStoreV1 returns a new etcd state store for schema V1. +func NewEtcdStateStoreV1(logger logger.Logger) state.Store { + return newETCD(logger, schemaV1{}) +} + +// NewEtcdStateStoreV2 returns a new etcd state store for schema V2. +func NewEtcdStateStoreV2(logger logger.Logger) state.Store { + return newETCD(logger, schemaV2{}) +} + +func newETCD(logger logger.Logger, schema schemaMarshaller) state.Store { s := &Etcd{ + schema: schema, logger: logger, features: []state.Feature{state.FeatureETag, state.FeatureTransactional}, } @@ -141,9 +151,15 @@ func (e *Etcd) Get(ctx context.Context, req *state.GetRequest) (*state.GetRespon return &state.GetResponse{}, nil } + data, metadata, err := e.schema.decode(resp.Kvs[0].Value) + if err != nil { + return nil, err + } + return &state.GetResponse{ - Data: resp.Kvs[0].Value, - ETag: ptr.Of(strconv.Itoa(int(resp.Kvs[0].ModRevision))), + Data: data, + ETag: ptr.Of(strconv.Itoa(int(resp.Kvs[0].ModRevision))), + Metadata: metadata, }, nil } @@ -160,20 +176,20 @@ func (e *Etcd) Set(ctx context.Context, req *state.SetRequest) error { return err } - reqVal, err := stateutils.Marshal(req.Value, json.Marshal) - if err != nil { - return err - } - - return e.doSet(ctx, keyWithPath, string(reqVal), req.ETag, ttlInSeconds) + return e.doSet(ctx, keyWithPath, req.Value, req.ETag, ttlInSeconds) } -func (e *Etcd) doSet(ctx context.Context, key, reqVal string, etag *string, ttlInSeconds int64) error { +func (e *Etcd) doSet(ctx context.Context, key string, val any, etag *string, ttlInSeconds *int64) error { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() - if ttlInSeconds > 0 { - resp, err := e.client.Grant(ctx, ttlInSeconds) + reqVal, err := e.schema.encode(val, ttlInSeconds) + if err != nil { + return err + } + + if ttlInSeconds != nil { + resp, err := e.client.Grant(ctx, *ttlInSeconds) if err != nil { return fmt.Errorf("couldn't grant lease %s: %w", key, err) } @@ -207,22 +223,18 @@ func (e *Etcd) doSet(ctx context.Context, key, reqVal string, etag *string, ttlI return nil } -func (e *Etcd) doSetValidateParameters(req *state.SetRequest) (int64, error) { +func (e *Etcd) doSetValidateParameters(req *state.SetRequest) (*int64, error) { err := state.CheckRequestOptions(req.Options) if err != nil { - return 0, err + return nil, err } - var ttlVal int64 - ttlInSeconds, err := stateutils.ParseTTL(req.Metadata) + ttlInSeconds, err := stateutils.ParseTTL64(req.Metadata) if err != nil { - return 0, err - } - if ttlInSeconds != nil { - ttlVal = int64(*ttlInSeconds) + return nil, err } - return ttlVal, nil + return ttlInSeconds, nil } // Delete performes a Etcd KV delete operation. @@ -339,7 +351,7 @@ func (e *Etcd) Multi(ctx context.Context, request *state.TransactionalStateReque return err } - reqVal, err := stateutils.Marshal(req.Value, json.Marshal) + reqVal, err := e.schema.encode(req.Value, ttlInSeconds) if err != nil { return err } @@ -348,19 +360,19 @@ func (e *Etcd) Multi(ctx context.Context, request *state.TransactionalStateReque etag, _ := strconv.ParseInt(*req.ETag, 10, 64) cmp = clientv3.Compare(clientv3.ModRevision(keyWithPath), "=", etag) } - if ttlInSeconds > 0 { - resp, err := e.client.Grant(ctx, ttlInSeconds) + if ttlInSeconds != nil { + resp, err := e.client.Grant(ctx, *ttlInSeconds) if err != nil { return fmt.Errorf("couldn't grant lease %s: %w", keyWithPath, err) } - put := clientv3.OpPut(keyWithPath, string(reqVal), clientv3.WithLease(resp.ID)) + put := clientv3.OpPut(keyWithPath, reqVal, clientv3.WithLease(resp.ID)) if req.HasETag() { ops = append(ops, clientv3.OpTxn([]clientv3.Cmp{cmp}, []clientv3.Op{put}, nil)) } else { ops = append(ops, clientv3.OpTxn(nil, []clientv3.Op{put}, nil)) } } else { - put := clientv3.OpPut(keyWithPath, string(reqVal)) + put := clientv3.OpPut(keyWithPath, reqVal) if req.HasETag() { ops = append(ops, clientv3.OpTxn([]clientv3.Cmp{cmp}, []clientv3.Op{put}, nil)) } else { diff --git a/state/etcd/schema.go b/state/etcd/schema.go new file mode 100644 index 0000000000..6587bd6896 --- /dev/null +++ b/state/etcd/schema.go @@ -0,0 +1,91 @@ +/* +Copyright 2021 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package etcd + +import ( + "encoding/json" + "time" + + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/durationpb" + "google.golang.org/protobuf/types/known/timestamppb" + + pbv2 "github.com/dapr/components-contrib/internal/proto/state/etcd/v2" + "github.com/dapr/components-contrib/state" + "github.com/dapr/components-contrib/state/utils" +) + +// schemaMarshaller is an interface for encoding and decoding values which are +// written and read from ETCD. Different storage schema versions store values +// in different formats or envelopes. +type schemaMarshaller interface { + // encode the value in the correct storage schema. + encode(data any, ttlInSeconds *int64) (string, error) + + // decode the value from the correct storage schema, optionally returning + // metadata extracted from the envelope. + decode(data []byte) ([]byte, map[string]string, error) +} + +type schemaV1 struct{} + +func (schemaV1) encode(data any, _ *int64) (string, error) { + reqVal, err := utils.Marshal(data, json.Marshal) + if err != nil { + return "", err + } + return string(reqVal), nil +} + +func (schemaV1) decode(data []byte) ([]byte, map[string]string, error) { + return data, nil, nil +} + +type schemaV2 struct{} + +func (schemaV2) encode(data any, ttlInSeconds *int64) (string, error) { + dataB, err := utils.JSONStringify(data) + if err != nil { + return "", err + } + + var duration durationpb.Duration + if ttlInSeconds != nil { + duration = durationpb.Duration{Seconds: *ttlInSeconds} + } + + value, err := proto.Marshal(&pbv2.Value{ + Data: dataB, + Ts: timestamppb.New(time.Now().UTC()), + Ttl: &duration, + }) + + return string(value), err +} + +func (schemaV2) decode(data []byte) ([]byte, map[string]string, error) { + var value pbv2.Value + if err := proto.Unmarshal(data, &value); err != nil { + return nil, nil, err + } + + var metadata map[string]string + if value.Ttl != nil { + metadata = map[string]string{ + state.GetRespMetaKeyTTLExpireTime: value.Ts.AsTime().Add(value.Ttl.AsDuration()).Format(time.RFC3339), + } + } + + return value.Data, metadata, nil +} diff --git a/state/utils/ttl.go b/state/utils/ttl.go index 077be9fc6b..44e43d8b57 100644 --- a/state/utils/ttl.go +++ b/state/utils/ttl.go @@ -24,8 +24,7 @@ const MetadataTTLKey = "ttlInSeconds" // ParseTTL parses the "ttlInSeconds" metadata property. func ParseTTL(requestMetadata map[string]string) (*int, error) { - val, found := requestMetadata[MetadataTTLKey] - if found && val != "" { + if val := requestMetadata[MetadataTTLKey]; val != "" { parsedVal, err := strconv.ParseInt(val, 10, 0) if err != nil { return nil, fmt.Errorf("incorrect value for metadata '%s': %w", MetadataTTLKey, err) @@ -38,3 +37,18 @@ func ParseTTL(requestMetadata map[string]string) (*int, error) { } return nil, nil } + +// ParseTTL64 parses the "ttlInSeconds" metadata property. +func ParseTTL64(requestMetadata map[string]string) (*int64, error) { + if val := requestMetadata[MetadataTTLKey]; val != "" { + parsedVal, err := strconv.ParseInt(val, 10, 0) + if err != nil { + return nil, fmt.Errorf("incorrect value for metadata '%s': %w", MetadataTTLKey, err) + } + if parsedVal < -1 || parsedVal > math.MaxInt32 { + return nil, fmt.Errorf("incorrect value for metadata '%s': must be -1 or greater", MetadataTTLKey) + } + return &parsedVal, nil + } + return nil, nil +} diff --git a/state/utils/utils.go b/state/utils/utils.go index a80ba034da..01c0504bd0 100644 --- a/state/utils/utils.go +++ b/state/utils/utils.go @@ -13,6 +13,13 @@ limitations under the License. package utils +import ( + "bytes" + "encoding/json" + "strconv" + "strings" +) + func Marshal(val interface{}, marshaler func(interface{}) ([]byte, error)) ([]byte, error) { var err error = nil bt, ok := val.([]byte) @@ -22,3 +29,46 @@ func Marshal(val interface{}, marshaler func(interface{}) ([]byte, error)) ([]by return bt, err } + +func JSONStringify(value any) ([]byte, error) { + switch value := value.(type) { + case []byte: + return value, nil + case int: + return []byte(strconv.FormatInt(int64(value), 10)), nil + case int8: + return []byte(strconv.FormatInt(int64(value), 10)), nil + case int16: + return []byte(strconv.FormatInt(int64(value), 10)), nil + case int32: + return []byte(strconv.FormatInt(int64(value), 10)), nil + case int64: + return []byte(strconv.FormatInt(value, 10)), nil + case uint: + return []byte(strconv.FormatUint(uint64(value), 10)), nil + case uint16: + return []byte(strconv.FormatUint(uint64(value), 10)), nil + case uint32: + return []byte(strconv.FormatUint(uint64(value), 10)), nil + case uint64: + return []byte(strconv.FormatUint(value, 10)), nil + case float32: + return []byte(strconv.FormatFloat(float64(value), 'f', -1, 64)), nil + case float64: + return []byte(strconv.FormatFloat(value, 'f', -1, 64)), nil + case bool: + if value { + return []byte("true"), nil + } + return []byte("false"), nil + case string: + return []byte(`"` + strings.ReplaceAll(value, `"`, `\"`) + `"`), nil + default: + var buf bytes.Buffer + enc := json.NewEncoder(&buf) + enc.SetEscapeHTML(false) + err := enc.Encode(value) + // Trim newline. + return bytes.TrimSuffix(buf.Bytes(), []byte{0xa}), err + } +} diff --git a/tests/config/state/etcd/statestore.yaml b/tests/config/state/etcd/v1/statestore.yaml similarity index 92% rename from tests/config/state/etcd/statestore.yaml rename to tests/config/state/etcd/v1/statestore.yaml index ef8fe37da5..40690fe7b8 100644 --- a/tests/config/state/etcd/statestore.yaml +++ b/tests/config/state/etcd/v1/statestore.yaml @@ -11,4 +11,4 @@ spec: - name: keyPrefixPath value: "dapr" - name: tlsEnable - value: "false" \ No newline at end of file + value: "false" diff --git a/tests/config/state/etcd/v2/statestore.yaml b/tests/config/state/etcd/v2/statestore.yaml new file mode 100644 index 0000000000..1893b56cbd --- /dev/null +++ b/tests/config/state/etcd/v2/statestore.yaml @@ -0,0 +1,14 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: statestore +spec: + type: state.etcd + version: v2 + metadata: + - name: endpoints + value: "localhost:12379" + - name: keyPrefixPath + value: "dapr" + - name: tlsEnable + value: "false" diff --git a/tests/config/state/tests.yml b/tests/config/state/tests.yml index 50b6cae994..8c7c6bb615 100644 --- a/tests/config/state/tests.yml +++ b/tests/config/state/tests.yml @@ -73,9 +73,11 @@ components: operations: [ "transaction", "etag", "first-write" ] - component: aws.dynamodb.terraform operations: [ "transaction", "etag", "first-write" ] - - component: etcd + - component: etcd.v1 + operations: [ "transaction", "etag", "first-write", "ttl" ] + - component: etcd.v2 operations: [ "transaction", "etag", "first-write", "ttl" ] - component: gcp.firestore.docker operations: [] - component: gcp.firestore.cloud - operations: [] \ No newline at end of file + operations: [] diff --git a/tests/conformance/bindings/bindings.go b/tests/conformance/bindings/bindings.go index 9b22b415d9..a587740dbf 100644 --- a/tests/conformance/bindings/bindings.go +++ b/tests/conformance/bindings/bindings.go @@ -126,17 +126,17 @@ func ConformanceTests(t *testing.T, props map[string]string, inputBinding bindin // Check for an output binding specific operation before init if config.HasOperation("operations") { testLogger.Info("Init output binding ...") - err := outputBinding.Init(context.Background(), bindings.Metadata{ - Base: metadata.Base{Properties: props}, - }) + err := outputBinding.Init(context.Background(), bindings.Metadata{Base: metadata.Base{ + Properties: props, + }}) assert.NoError(t, err, "expected no error setting up output binding") } // Check for an input binding specific operation before init if config.HasOperation("read") { testLogger.Info("Init input binding ...") - err := inputBinding.Init(context.Background(), bindings.Metadata{ - Base: metadata.Base{Properties: props}, - }) + err := inputBinding.Init(context.Background(), bindings.Metadata{Base: metadata.Base{ + Properties: props, + }}) assert.NoError(t, err, "expected no error setting up input binding") } testLogger.Info("Init test done.") diff --git a/tests/conformance/common.go b/tests/conformance/common.go index ff0862c9b6..7c55ce600a 100644 --- a/tests/conformance/common.go +++ b/tests/conformance/common.go @@ -329,7 +329,6 @@ func (tc *TestConfiguration) loadComponentsAndProperties(t *testing.T, filepath require.Equal(t, 1, len(comps)) // We only expect a single component per file c := comps[0] props, err := ConvertMetadataToProperties(c.Spec.Metadata) - return props, err } @@ -577,8 +576,10 @@ func loadStateStore(tc TestComponent) state.Store { store = s_awsdynamodb.NewDynamoDBStateStore(testLogger) case "aws.dynamodb.terraform": store = s_awsdynamodb.NewDynamoDBStateStore(testLogger) - case "etcd": - store = s_etcd.NewEtcdStateStore(testLogger) + case "etcd.v1": + store = s_etcd.NewEtcdStateStoreV1(testLogger) + case "etcd.v2": + store = s_etcd.NewEtcdStateStoreV2(testLogger) case "gcp.firestore.docker": store = s_gcpfirestore.NewFirestoreStateStore(testLogger) case "gcp.firestore.cloud": diff --git a/tests/conformance/configuration/configuration.go b/tests/conformance/configuration/configuration.go index bee02c308c..4dd76bac70 100644 --- a/tests/conformance/configuration/configuration.go +++ b/tests/conformance/configuration/configuration.go @@ -159,7 +159,9 @@ func ConformanceTests(t *testing.T, props map[string]string, store configuration // Initializing store err = store.Init(context.Background(), configuration.Metadata{ - Base: metadata.Base{Properties: props}, + Base: metadata.Base{ + Properties: props, + }, }) require.NoError(t, err) }) diff --git a/tests/conformance/crypto/crypto.go b/tests/conformance/crypto/crypto.go index b57d047aeb..743bcbfe56 100644 --- a/tests/conformance/crypto/crypto.go +++ b/tests/conformance/crypto/crypto.go @@ -114,9 +114,9 @@ func ConformanceTests(t *testing.T, props map[string]string, component contribCr // Init t.Run("Init", func(t *testing.T) { - err := component.Init(context.Background(), contribCrypto.Metadata{ - Base: metadata.Base{Properties: props}, - }) + err := component.Init(context.Background(), contribCrypto.Metadata{Base: metadata.Base{ + Properties: props, + }}) require.NoError(t, err, "expected no error on initializing store") }) diff --git a/tests/conformance/pubsub/pubsub.go b/tests/conformance/pubsub/pubsub.go index c5e877e175..b04629e13c 100644 --- a/tests/conformance/pubsub/pubsub.go +++ b/tests/conformance/pubsub/pubsub.go @@ -105,9 +105,9 @@ func ConformanceTests(t *testing.T, props map[string]string, ps pubsub.PubSub, c // Init t.Run("init", func(t *testing.T) { - err := ps.Init(context.Background(), pubsub.Metadata{ - Base: metadata.Base{Properties: props}, - }) + err := ps.Init(context.Background(), pubsub.Metadata{Base: metadata.Base{ + Properties: props, + }}) assert.NoError(t, err, "expected no error on setting up pubsub") }) diff --git a/tests/conformance/secretstores/secretstores.go b/tests/conformance/secretstores/secretstores.go index f06ee89b50..38c585793d 100644 --- a/tests/conformance/secretstores/secretstores.go +++ b/tests/conformance/secretstores/secretstores.go @@ -49,9 +49,9 @@ func ConformanceTests(t *testing.T, props map[string]string, store secretstores. // Init t.Run("init", func(t *testing.T) { - err := store.Init(context.Background(), secretstores.Metadata{ - Base: metadata.Base{Properties: props}, - }) + err := store.Init(context.Background(), secretstores.Metadata{Base: metadata.Base{ + Properties: props, + }}) assert.NoError(t, err, "expected no error on initializing store") }) diff --git a/tests/conformance/state/state.go b/tests/conformance/state/state.go index 9d8bfc9bbe..bbc2f78d0c 100644 --- a/tests/conformance/state/state.go +++ b/tests/conformance/state/state.go @@ -238,9 +238,9 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St } t.Run("init", func(t *testing.T) { - err := statestore.Init(context.Background(), state.Metadata{ - Base: metadata.Base{Properties: props}, - }) + err := statestore.Init(context.Background(), state.Metadata{Base: metadata.Base{ + Properties: props, + }}) assert.NoError(t, err) }) diff --git a/tests/conformance/workflows/workflows.go b/tests/conformance/workflows/workflows.go index be292acbfb..9e96b9ae2f 100644 --- a/tests/conformance/workflows/workflows.go +++ b/tests/conformance/workflows/workflows.go @@ -51,9 +51,9 @@ func NewTestConfig(component string, operations []string, conf map[string]interf func ConformanceTests(t *testing.T, props map[string]string, workflowItem workflows.Workflow, config TestConfig) { // Test vars t.Run("init", func(t *testing.T) { - err := workflowItem.Init(workflows.Metadata{ - Base: metadata.Base{Properties: props}, - }) + err := workflowItem.Init(workflows.Metadata{Base: metadata.Base{ + Properties: props, + }}) assert.NoError(t, err) })