Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upgrade to support sparse read feature #1

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions admin_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package zetta

import (
"context"
"log"
"time"

"github.com/zhihu/zetta-client-go/utils/retry"
Expand All @@ -41,12 +40,13 @@ type AdminClient struct {
pbCli tspb.TablestoreAdminClient
}

func NewAdminClient(addr string) (*AdminClient, error) {
conn, err := grpc.Dial(addr, grpc.WithInsecure())
func NewAdminClient(addr string, opts ...grpc.DialOption) (*AdminClient, error) {
opts = append(opts, grpc.WithInsecure())

conn, err := grpc.Dial(addr, opts...)
if err != nil {
return nil, err
}
log.Println(addr)
return &AdminClient{
conn: conn,
pbCli: tspb.NewTablestoreAdminClient(conn),
Expand Down
1 change: 0 additions & 1 deletion admin_ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (

// Create a table
func (ac *AdminClient) CreateTable(ctx context.Context, db string, tableMeta *tspb.TableMeta, indexMetas []*tspb.IndexMeta) error {
tableMeta.Database = db
in := &tspb.CreateTableRequest{
Database: db,
TableMeta: tableMeta,
Expand Down
67 changes: 58 additions & 9 deletions data_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ import (
)

const (
SPARSE_READ = "sparseread"

// numChannels is the default value for NumChannels of client
numChannels = 4
)
Expand All @@ -50,6 +48,8 @@ type DataClient struct {

type DataClientConfig struct {
NumChannels int // 并发度,对应 session pool
RecvMsgSize int
DialOptions []grpc.DialOption
SessionPoolConfig
}

Expand All @@ -65,10 +65,17 @@ func NewDataClient(ctx context.Context, serverAddr, dbName string, conf DataClie
if conf.MaxBurst == 0 {
conf.MaxBurst = DefaultSessionPoolConfig.MaxBurst
}
if conf.RecvMsgSize == 0 {
conf.RecvMsgSize = 64 * 1000 * 1000
}

dc := &DataClient{database: dbName}

dialOpts := []grpc.DialOption{grpc.WithInsecure()}
dialOpts := []grpc.DialOption{
grpc.WithInsecure(),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(conf.RecvMsgSize)),
}
dialOpts = append(dialOpts, conf.DialOptions...)

rsTarget := ParseTarget(serverAddr)
if strings.ToLower(rsTarget.Scheme) == "dns" {
Expand Down Expand Up @@ -246,7 +253,7 @@ func (dc *DataClient) Read(ctx context.Context, table string, keys KeySet, index
if err != nil {
return nil, err
}
keysProto, err := keys.proto()
keysProto, err := keys.keySetProto()
if err != nil {
return nil, err
}
Expand All @@ -271,6 +278,8 @@ func (dc *DataClient) Read(ctx context.Context, table string, keys KeySet, index
Columns: columns,
Limit: limit,
}

res, err := sh.getClient().Read(ctx, req)
defer func() {
if sh != nil {
if shouldDropSession(err) {
Expand All @@ -279,7 +288,6 @@ func (dc *DataClient) Read(ctx context.Context, table string, keys KeySet, index
sh.recycle()
}
}()
res, err := sh.getClient().Read(ctx, req)
if err != nil {
return nil, err
}
Expand All @@ -288,8 +296,6 @@ func (dc *DataClient) Read(ctx context.Context, table string, keys KeySet, index
}

func (dc *DataClient) SparseRead(ctx context.Context, table, family string, rows []*SparseRow, limit int64) (*SparseResultSet, error) {
metricCount(SPARSE_READ)
t := metricStartTiming()
sh, err := dc.sp.take(ctx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -325,12 +331,55 @@ func (dc *DataClient) SparseRead(ctx context.Context, table, family string, rows
Limit: limit,
}
res, err := sh.getClient().SparseRead(ctx, req)
defer func() {
if sh != nil {
if shouldDropSession(err) {
sh.destroy()
}
sh.recycle()
}
}()
if err != nil {
return nil, err
}

return BuildSparseResultSet(res), nil
}

func (dc *DataClient) SparseScan(ctx context.Context, table, family string, keys KeySet, limit int64) (*SparseResultSet, error) {
sh, err := dc.sp.take(ctx)
if err != nil {
return nil, err
}
keysProto, err := keys.keySetProto()
if err != nil {
return nil, err
}
session := sh.getID()

req := &tspb.SparseScanRequest{
Session: session,
Transaction: &tspb.TransactionSelector{
Selector: &tspb.TransactionSelector_SingleUse{
SingleUse: &tspb.TransactionOptions{
Mode: &tspb.TransactionOptions_ReadOnly_{
ReadOnly: &tspb.TransactionOptions_ReadOnly{
TimestampBound: &tspb.TransactionOptions_ReadOnly_Strong{Strong: true},
},
},
},
},
},
Table: table,
Family: family,
KeySet: keysProto,
Limit: limit,
}
res, err := sh.getClient().SparseScan(ctx, req)
if err != nil {
metricCountError(SPARSE_READ)
return nil, err
}
defer func() {
metricRecordTiming(t, SPARSE_READ)
if sh != nil {
if shouldDropSession(err) {
sh.destroy()
Expand Down
11 changes: 6 additions & 5 deletions examples/simple/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ func main() {
func writeTable(client *zetta.DataClient) {
cols := []string{"name", "age"}
vals := []interface{}{"user-01", "18"}
rawMS := zetta.InsertOrUpdate(TABLE_NAME, cols, vals)
pkey := zetta.Key{"user-01"}
rawMS := zetta.InsertOrUpdate(TABLE_NAME, pkey, "default", cols, vals)
err := client.Mutate(context.Background(), rawMS)
if err != nil {
panic(err)
Expand Down Expand Up @@ -103,9 +104,8 @@ func readTable(client *zetta.DataClient) {
PartitionToken: nil,
}

keys := zetta.KeySet{
Keys: []zetta.Key{[]interface{}{"user-01"}},
}
keys := zetta.Key{"user-01"}

resp, err := client.Read(context.Background(), in.Table, keys, "", in.Columns, 10)
if err != nil {
panic(err)
Expand Down Expand Up @@ -194,8 +194,9 @@ func createTable() {
func writeMutations() {
cols := []string{"id", "age"}
vs := []interface{}{1, 20}
pkey := zetta.Key{"id"}
mutations := []*zetta.Mutation{
zetta.Insert(DB_NAME, cols, vs),
zetta.InsertOrUpdate(DB_NAME, pkey, "default", cols, vs),
}
ctx := context.Background()
t, err := dataClient.Apply(ctx, mutations)
Expand Down
49 changes: 25 additions & 24 deletions extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@ func decodeSparseValue(v *tspb.Value, t *tspb.Type, ptr interface{}) error {
}
acode = t.ArrayElementType.Code
}
typeErr := errTypeMismatch(code, false, ptr)
if code == tspb.TypeCode_ARRAY {
typeErr = errTypeMismatch(acode, true, ptr)
typeErr := func() error {
if code == tspb.TypeCode_ARRAY {
return errTypeMismatch(acode, true, ptr)
}
return errTypeMismatch(code, false, ptr)
}
nullErr := errDstNotForNull(ptr)
_, isNull := v.Kind.(*tspb.Value_NullValue)

// Do the decoding based on the type of ptr.
Expand All @@ -49,7 +50,7 @@ func decodeSparseValue(v *tspb.Value, t *tspb.Type, ptr interface{}) error {
return errNilDst(p)
}
if isNull {
return nullErr
return errDstNotForNull(ptr)
}
x, err := getStringValue(v)
if err != nil {
Expand All @@ -75,7 +76,7 @@ func decodeSparseValue(v *tspb.Value, t *tspb.Type, ptr interface{}) error {
return errNilDst(p)
}
if acode != tspb.TypeCode_STRING {
return typeErr
return typeErr()
}
if isNull {
*p = nil
Expand Down Expand Up @@ -130,7 +131,7 @@ func decodeSparseValue(v *tspb.Value, t *tspb.Type, ptr interface{}) error {
}

if isNull {
return nullErr
return errDstNotForNull(ptr)
}
x, err := getInteger64Value(v)
if err != nil {
Expand Down Expand Up @@ -178,10 +179,10 @@ func decodeSparseValue(v *tspb.Value, t *tspb.Type, ptr interface{}) error {
return errNilDst(p)
}
if code != tspb.TypeCode_BOOL {
return typeErr
return typeErr()
}
if isNull {
return nullErr
return errDstNotForNull(ptr)
}
x, err := getBoolValue(v)
if err != nil {
Expand All @@ -193,7 +194,7 @@ func decodeSparseValue(v *tspb.Value, t *tspb.Type, ptr interface{}) error {
return errNilDst(p)
}
if code != tspb.TypeCode_BOOL {
return typeErr
return typeErr()
}
if isNull {
*p = NullBool{}
Expand All @@ -210,7 +211,7 @@ func decodeSparseValue(v *tspb.Value, t *tspb.Type, ptr interface{}) error {
return errNilDst(p)
}
if acode != tspb.TypeCode_BOOL {
return typeErr
return typeErr()
}
if isNull {
*p = nil
Expand All @@ -230,10 +231,10 @@ func decodeSparseValue(v *tspb.Value, t *tspb.Type, ptr interface{}) error {
return errNilDst(p)
}
if code != tspb.TypeCode_FLOAT64 {
return typeErr
return typeErr()
}
if isNull {
return nullErr
return errDstNotForNull(ptr)
}
x, err := getFloat64Value(v)
if err != nil {
Expand All @@ -245,7 +246,7 @@ func decodeSparseValue(v *tspb.Value, t *tspb.Type, ptr interface{}) error {
return errNilDst(p)
}
if code != tspb.TypeCode_FLOAT64 {
return typeErr
return typeErr()
}
if isNull {
*p = NullFloat64{}
Expand All @@ -262,7 +263,7 @@ func decodeSparseValue(v *tspb.Value, t *tspb.Type, ptr interface{}) error {
return errNilDst(p)
}
if acode != tspb.TypeCode_FLOAT64 {
return typeErr
return typeErr()
}
if isNull {
*p = nil
Expand All @@ -280,7 +281,7 @@ func decodeSparseValue(v *tspb.Value, t *tspb.Type, ptr interface{}) error {
case *time.Time:
var nt NullTime
if isNull {
return nullErr
return errDstNotForNull(ptr)
}
err := parseNullTime(v, &nt, code, isNull)
if err != nil {
Expand All @@ -297,7 +298,7 @@ func decodeSparseValue(v *tspb.Value, t *tspb.Type, ptr interface{}) error {
return errNilDst(p)
}
if acode != tspb.TypeCode_TIMESTAMP {
return typeErr
return typeErr()
}
if isNull {
*p = nil
Expand All @@ -317,10 +318,10 @@ func decodeSparseValue(v *tspb.Value, t *tspb.Type, ptr interface{}) error {
return errNilDst(p)
}
if code != tspb.TypeCode_DATE {
return typeErr
return typeErr()
}
if isNull {
return nullErr
return errDstNotForNull(ptr)
}
x, err := getStringValue(v)
if err != nil {
Expand All @@ -336,7 +337,7 @@ func decodeSparseValue(v *tspb.Value, t *tspb.Type, ptr interface{}) error {
return errNilDst(p)
}
if code != tspb.TypeCode_DATE {
return typeErr
return typeErr()
}
if isNull {
*p = NullDate{}
Expand All @@ -357,7 +358,7 @@ func decodeSparseValue(v *tspb.Value, t *tspb.Type, ptr interface{}) error {
return errNilDst(p)
}
if acode != tspb.TypeCode_DATE {
return typeErr
return typeErr()
}
if isNull {
*p = nil
Expand All @@ -377,7 +378,7 @@ func decodeSparseValue(v *tspb.Value, t *tspb.Type, ptr interface{}) error {
return errNilDst(p)
}
if acode != tspb.TypeCode_STRUCT {
return typeErr
return typeErr()
}
if isNull {
*p = nil
Expand All @@ -402,15 +403,15 @@ func decodeSparseValue(v *tspb.Value, t *tspb.Type, ptr interface{}) error {
default:
// Check if the proto encoding is for an array of structs.
if !(code == tspb.TypeCode_ARRAY && acode == tspb.TypeCode_STRUCT) {
return typeErr
return typeErr()
}
vp := reflect.ValueOf(p)
if !vp.IsValid() {
return errNilDst(p)
}
if !isPtrStructPtrSlice(vp.Type()) {
// The container is not a pointer to a struct pointer slice.
return typeErr
return typeErr()
}
// Only use reflection for nil detection on slow path.
// Also, IsNil panics on many types, so check it after the type check.
Expand Down
Loading