Skip to content

Commit

Permalink
[CLIENT-2766] Support RawBlobValue in the Go client
Browse files Browse the repository at this point in the history
  • Loading branch information
khaf committed Feb 25, 2024
1 parent 7022c84 commit bdaabc2
Show file tree
Hide file tree
Showing 9 changed files with 122 additions and 14 deletions.
4 changes: 3 additions & 1 deletion multi_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
type baseMultiCommand struct {
baseCommand

rawCDT bool

namespace string
recordset *Recordset

Expand Down Expand Up @@ -331,7 +333,7 @@ func (cmd *baseMultiCommand) parseRecordResults(ifc command, receiveSize int) (b
if err = cmd.readBytes(particleBytesSize); err != nil {
return false, newNodeError(cmd.node, err)
}
value, err := bytesToParticle(particleType, cmd.dataBuffer, 0, particleBytesSize)
value, err := bytesToParticleRaw(particleType, cmd.dataBuffer, 0, particleBytesSize, cmd.rawCDT)
if err != nil {
return false, newNodeError(cmd.node, err)
}
Expand Down
4 changes: 4 additions & 0 deletions multi_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ type MultiPolicy struct {

// Indicates if bin data is retrieved. If false, only record digests are retrieved.
IncludeBinData bool //= true;

// RawCDT specifies that the value of the CDT fields (Maps and Lists) should not be unpacked/decoded.
// This is only used internally by Aerospike for Backup purposes and should not be used by 3rd parties.
RawCDT bool
}

// NewMultiPolicy initializes a MultiPolicy instance with default values.
Expand Down
1 change: 1 addition & 0 deletions proxy_query_partition_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func newGrpcQueryPartitionCommand(
partitionFilter: partitionFilter,
operations: operations,
}
cmd.rawCDT = policy.RawCDT
cmd.tracker = partitionTracker
cmd.terminationErrorType = statement.terminationError()
cmd.nodePartitions = newNodePartitions(nil, _PARTITIONS)
Expand Down
1 change: 1 addition & 0 deletions proxy_scan_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func newGrpcScanPartitionCommand(
binNames: binNames,
partitionFilter: partitionFilter,
}
cmd.rawCDT = policy.RawCDT
cmd.tracker = partitionTracker
cmd.terminationErrorType = types.SCAN_TERMINATED
cmd.nodePartitions = newNodePartitions(nil, _PARTITIONS)
Expand Down
5 changes: 4 additions & 1 deletion query_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@ type queryCommand struct {
}

func newQueryCommand(node *Node, policy *QueryPolicy, writePolicy *WritePolicy, statement *Statement, operations []*Operation, recordset *Recordset) *queryCommand {
return &queryCommand{
res := &queryCommand{
baseMultiCommand: *newStreamingMultiCommand(node, recordset, statement.Namespace, false),
policy: policy,
writePolicy: writePolicy,
statement: statement,
operations: operations,
}
res.rawCDT = policy.RawCDT

return res
}

func (cmd *queryCommand) getPolicy(ifc command) Policy {
Expand Down
1 change: 1 addition & 0 deletions query_partition_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func newQueryPartitionCommand(
statement: statement,
operations: nil,
}
cmd.rawCDT = policy.RawCDT
cmd.terminationErrorType = statement.terminationError()
cmd.tracker = tracker
cmd.nodePartitions = nodePartitions
Expand Down
1 change: 1 addition & 0 deletions scan_partition_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func newScanPartitionCommand(
setName: setName,
binNames: binNames,
}
cmd.rawCDT = policy.RawCDT
cmd.terminationErrorType = types.SCAN_TERMINATED
cmd.tracker = tracker
cmd.nodePartitions = nodePartitions
Expand Down
56 changes: 44 additions & 12 deletions scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

as "github.com/aerospike/aerospike-client-go/v7"
ast "github.com/aerospike/aerospike-client-go/v7/types"
particleType "github.com/aerospike/aerospike-client-go/v7/types/particle_type"

gg "github.com/onsi/ginkgo/v2"
gm "github.com/onsi/gomega"
Expand All @@ -39,11 +40,13 @@ var _ = gg.Describe("Scan operations", func() {
const ldtElemCount = 10
bin1 := as.NewBin("Aerospike1", rand.Intn(math.MaxInt16))
bin2 := as.NewBin("Aerospike2", randString(100))
bin3 := as.NewBin("map", map[string]int{"1": 1, "2": 2})
bin4 := as.NewBin("list", []int{1, 2, 3})
var keys map[string]*as.Key

// read all records from the channel and make sure all of them are returned
// if cancelCnt is set, it will cancel the scan after specified record count
var checkResults = func(recordset *as.Recordset, cancelCnt int, checkLDT bool) int {
var checkResults = func(recordset *as.Recordset, cancelCnt int, rawCDT bool) int {
counter := 0
for res := range recordset.Results() {
gm.Expect(res.Err).ToNot(gm.HaveOccurred())
Expand All @@ -52,18 +55,25 @@ var _ = gg.Describe("Scan operations", func() {

gm.Expect(exists).To(gm.Equal(true))
gm.Expect(key.Value().GetObject()).To(gm.Equal(rec.Key.Value().GetObject()))
gm.Expect(rec.Bins[bin1.Name]).To(gm.Equal(bin1.Value.GetObject()))
gm.Expect(rec.Bins[bin2.Name]).To(gm.Equal(bin2.Value.GetObject()))

ldt := res.Record.Bins["LDT"]
if checkLDT {
gm.Expect(ldt).NotTo(gm.BeNil())
gm.Expect(len(ldt.([]interface{}))).To(gm.Equal(ldtElemCount))
gm.Expect(res.Record.Bins[bin3.Name]).NotTo(gm.BeNil())
gm.Expect(res.Record.Bins[bin4.Name]).NotTo(gm.BeNil())
if rawCDT {
gm.Expect(res.Record.Bins[bin3.Name].(*as.RawBlobValue).ParticleType).To(gm.Equal(particleType.MAP))
gm.Expect(res.Record.Bins[bin4.Name].(*as.RawBlobValue).ParticleType).To(gm.Equal(particleType.LIST))

// rewrite the record to the database to see if the values are correctly written
err := client.Put(nil, res.Record.Key, res.Record.Bins)
gm.Expect(err).ToNot(gm.HaveOccurred())
} else {
gm.Expect(ldt).To(gm.BeNil())
}
gm.Expect(rec.Bins[bin1.Name]).To(gm.Equal(bin1.Value.GetObject()))
gm.Expect(rec.Bins[bin2.Name]).To(gm.Equal(bin2.Value.GetObject()))

delete(keys, string(rec.Key.Digest()))
gm.Expect(res.Record.Bins[bin3.Name]).To(gm.Equal(map[interface{}]interface{}{"1": 1, "2": 2}))
gm.Expect(res.Record.Bins[bin4.Name]).To(gm.Equal([]interface{}{1, 2, 3}))

delete(keys, string(rec.Key.Digest()))
}

counter++
// cancel scan abruptly
Expand All @@ -84,7 +94,7 @@ var _ = gg.Describe("Scan operations", func() {
gm.Expect(err).ToNot(gm.HaveOccurred())

keys[string(key.Digest())] = key
err = client.PutBins(wpolicy, key, bin1, bin2)
err = client.PutBins(wpolicy, key, bin1, bin2, bin3, bin4)
gm.Expect(err).ToNot(gm.HaveOccurred())
}
})
Expand Down Expand Up @@ -336,7 +346,7 @@ var _ = gg.Describe("Scan operations", func() {
gm.Expect(len(keys)).To(gm.Equal(0))
})

gg.It("must Scan and get all records back from all nodes sequnetially", func() {
gg.It("must Scan and get all records back from all nodes sequentially", func() {
gm.Expect(len(keys)).To(gm.Equal(keyCount))

scanPolicy := as.NewScanPolicy()
Expand All @@ -350,6 +360,28 @@ var _ = gg.Describe("Scan operations", func() {
gm.Expect(len(keys)).To(gm.Equal(0))
})

gg.It("must Scan and get all records back in RawBlobValue when policy.RawCDT is set to true", func() {
gm.Expect(len(keys)).To(gm.Equal(keyCount))

scanPolicy := as.NewScanPolicy()
scanPolicy.MaxConcurrentNodes = 1
scanPolicy.RawCDT = true

recordset, err := client.ScanAll(scanPolicy, ns, set)
gm.Expect(err).ToNot(gm.HaveOccurred())

checkResults(recordset, 0, scanPolicy.RawCDT)

scanPolicy.RawCDT = false

recordset, err = client.ScanAll(scanPolicy, ns, set)
gm.Expect(err).ToNot(gm.HaveOccurred())

checkResults(recordset, 0, scanPolicy.RawCDT)

gm.Expect(len(keys)).To(gm.Equal(0))
})

gg.It("must Cancel Scan", func() {
gm.Expect(len(keys)).To(gm.Equal(keyCount))

Expand Down
63 changes: 63 additions & 0 deletions value.go
Original file line number Diff line number Diff line change
Expand Up @@ -1139,8 +1139,71 @@ func (vl HLLValue) String() string {
return Buffer.BytesToHexString([]byte(vl))
}

///////////////////////////////////////////////////////////////////////////////

// RawBlobValue encapsulates a CDT BLOB value.
// Notice: Do not use this value, it is for internal aerospike use only.
type RawBlobValue struct {
// ParticleType signifies the data
ParticleType int
// Data carries the data
Data []byte
}

// NewRawBlobValue generates a RawBlobValue instance for a CDT List or map using a particle type.
func NewRawBlobValue(pt int, b []byte) *RawBlobValue {
data := make([]byte, len(b))
copy(data, b)
return &RawBlobValue{ParticleType: pt, Data: data}
}

// EstimateSize returns the size of the RawBlobValue in wire protocol.
func (vl *RawBlobValue) EstimateSize() (int, Error) {
return len(vl.Data), nil
}

func (vl *RawBlobValue) write(cmd BufferEx) (int, Error) {
return cmd.Write(vl.Data)
}

func (vl *RawBlobValue) pack(cmd BufferEx) (int, Error) {
panic("UNREACHABLE")
}

// GetType returns wire protocol value type.
func (vl *RawBlobValue) GetType() int {
return vl.ParticleType
}

// GetObject returns original value as an interface{}.
func (vl *RawBlobValue) GetObject() interface{} {
return []byte(vl.Data)
}

// String implements Stringer interface.
func (vl *RawBlobValue) String() string {
return Buffer.BytesToHexString(vl.Data)
}

//////////////////////////////////////////////////////////////////////////////

func bytesToParticleRaw(ptype int, buf []byte, offset int, length int, raw bool) (interface{}, Error) {
switch ptype {
case ParticleType.MAP:
if raw {
return NewRawBlobValue(ptype, buf[offset:offset+length]), nil
}
return newUnpacker(buf, offset, length).UnpackMap()

case ParticleType.LIST:
if raw {
return NewRawBlobValue(ptype, buf[offset:offset+length]), nil
}
return newUnpacker(buf, offset, length).UnpackList()
}
return bytesToParticle(ptype, buf, offset, length)
}

func bytesToParticle(ptype int, buf []byte, offset int, length int) (interface{}, Error) {

switch ptype {
Expand Down

0 comments on commit bdaabc2

Please sign in to comment.