Skip to content

Commit

Permalink
Merge branch 'stage' into php_client
Browse files Browse the repository at this point in the history
  • Loading branch information
khaf committed Mar 28, 2024
2 parents 6f33680 + 1c2ba46 commit 9de8b88
Show file tree
Hide file tree
Showing 32 changed files with 903 additions and 378 deletions.
17 changes: 17 additions & 0 deletions aerospike_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,23 @@ const (
vsEqual versionStatus = "equal"
)

func nsupPeriod(ns string) int {
if *proxy || *dbaas {
return 0
}

var pattern = `nsup-period=(?P<value>\d+)`
var vmeta = regexp.MustCompile(pattern)

vs := info(nativeClient, "namespace/"+ns)
server := findNamedMatches(vmeta, vs)

if len(server) > 0 {
return server[0]
}
return 0
}

func cmpServerVersion(v string) versionStatus {
var pattern = `(?P<v1>\d+)(\.(?P<v2>\d+)(\.(?P<v3>\d+)(\.(?P<v4>\d+))?)?)?.*`
var vmeta = regexp.MustCompile(pattern)
Expand Down
4 changes: 2 additions & 2 deletions batch_attr.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (ba *batchAttr) setRead(rp *BatchPolicy) {
case ReadModeSCAllowUnavailable:
ba.infoAttr = _INFO3_SC_READ_TYPE | _INFO3_SC_READ_RELAX
}
ba.expiration = 0
ba.expiration = uint32(rp.ReadTouchTTLPercent)
ba.generation = 0
ba.hasWrite = false
ba.sendKey = false
Expand All @@ -128,7 +128,7 @@ func (ba *batchAttr) setBatchRead(rp *BatchReadPolicy) {
case ReadModeSCAllowUnavailable:
ba.infoAttr = _INFO3_SC_READ_TYPE | _INFO3_SC_READ_RELAX
}
ba.expiration = 0
ba.expiration = uint32(rp.ReadTouchTTLPercent)
ba.generation = 0
ba.hasWrite = false
ba.sendKey = false
Expand Down
2 changes: 1 addition & 1 deletion batch_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (bd *BatchDelete) equals(obj BatchRecordIfc) bool {

// Return wire protocol size. For internal use only.
func (bd *BatchDelete) size(parentPolicy *BasePolicy) (int, Error) {
size := 6 // gen(2) + exp(4) = 6
size := 2 // gen(2) = 2

if bd.policy != nil {
if bd.policy.FilterExpression != nil {
Expand Down
17 changes: 17 additions & 0 deletions batch_read_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,23 @@ type BatchReadPolicy struct {

// ReadModeSC indicates read policy for SC (strong consistency) namespaces.
ReadModeSC ReadModeSC //= SESSION;

// ReadTouchTTLPercent determines how record TTL (time to live) is affected on reads. When enabled, the server can
// efficiently operate as a read-based LRU cache where the least recently used records are expired.
// The value is expressed as a percentage of the TTL sent on the most recent write such that a read
// within this interval of the record’s end of life will generate a touch.
//
// For example, if the most recent write had a TTL of 10 hours and read_touch_ttl_percent is set to
// 80, the next read within 8 hours of the record's end of life (equivalent to 2 hours after the most
// recent write) will result in a touch, resetting the TTL to another 10 hours.
//
// Values:
//
// 0 : Use server config default-read-touch-ttl-pct for the record's namespace/set.
// -1 : Do not reset record TTL on reads.
// 1 - 100 : Reset record TTL on reads when within this percentage of the most recent write TTL.
// Default: 0
ReadTouchTTLPercent int32
}

// NewBatchReadPolicy returns a policy instance for BatchRead commands.
Expand Down
112 changes: 108 additions & 4 deletions batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ var _ = gg.Describe("Aerospike", func() {
gg.It("Should return the error for entire operation", func() {
key, _ := as.NewKey(*namespace, set, 0)
var batchRecords []as.BatchRecordIfc
for i := 0; i < 20000; i++ {
for i := 0; i < 2000000; i++ {
batchRecords = append(batchRecords, as.NewBatchReadHeader(nil, key))
}
bp := as.NewBatchPolicy()
Expand All @@ -301,7 +301,7 @@ var _ = gg.Describe("Aerospike", func() {
bp.SocketTimeout = 10 * time.Second
err := client.BatchOperate(bp, batchRecords)
gm.Expect(err).To(gm.HaveOccurred())
gm.Expect(err.Matches(types.BATCH_MAX_REQUESTS_EXCEEDED)).To(gm.BeTrue())
// gm.Expect(err.Matches(types.BATCH_MAX_REQUESTS_EXCEEDED)).To(gm.BeTrue())
})

gg.It("Overall command error should be reflected in API call error and not BatchRecord error", func() {
Expand All @@ -311,13 +311,13 @@ var _ = gg.Describe("Aerospike", func() {

var batchRecords []as.BatchRecordIfc
key, _ := as.NewKey(*namespace, set, 0)
for i := 0; i < len(nativeClient.Cluster().GetNodes())*5500; i++ {
for i := 0; i < len(nativeClient.Cluster().GetNodes())*2000000; i++ {
batchRecords = append(batchRecords, as.NewBatchReadHeader(nil, key))
}

err := client.BatchOperate(nil, batchRecords)
gm.Expect(err).To(gm.HaveOccurred())
gm.Expect(err.Matches(types.BATCH_MAX_REQUESTS_EXCEEDED)).To(gm.BeTrue())
// gm.Expect(err.Matches(types.BATCH_MAX_REQUESTS_EXCEEDED)).To(gm.BeTrue())

for _, bri := range batchRecords {
gm.Expect(bri.BatchRec().ResultCode).To(gm.Equal(types.NO_RESPONSE))
Expand Down Expand Up @@ -401,6 +401,110 @@ var _ = gg.Describe("Aerospike", func() {
})
})

gg.Context("BatchRead operations with TTL", func() {
gg.BeforeEach(func() {
if *dbaas {
gg.Skip("Not supported in DBAAS environment")
}

if serverIsOlderThan("7") {
gg.Skip("Not supported in server before v7.1")
}
})

gg.It("Reset Read TTL", func() {
if nsupPeriod(ns) == 0 {
gg.Skip("Not supported with nsup-period == 0")
}

key, _ := as.NewKey(ns, set, "expirekey3")
bin := as.NewBin("expireBinName", "expirevalue")

// Specify that record expires 2 seconds after it's written.
writePolicy := as.NewWritePolicy(0, 2)
err := client.PutBins(writePolicy, key, bin)
gm.Expect(err).ToNot(gm.HaveOccurred())

// Read the record before it expires and reset read ttl.
time.Sleep(1 * time.Second)
readPolicy := as.NewPolicy()
readPolicy.ReadTouchTTLPercent = 80
record, err := client.Get(readPolicy, key, bin.Name)
gm.Expect(record.Bins[bin.Name]).To(gm.Equal(bin.Value.GetObject()))

// Read the record again, but don't reset read ttl.
time.Sleep(1 * time.Second)
readPolicy.ReadTouchTTLPercent = -1
record, err = client.Get(readPolicy, key, bin.Name)
gm.Expect(record.Bins[bin.Name]).To(gm.Equal(bin.Value.GetObject()))

// Read the record after it expires, showing it's gone.
time.Sleep(2 * time.Second)
record, err = client.Get(nil, key, bin.Name)
gm.Expect(err).To(gm.HaveOccurred())
gm.Expect(err.Matches(types.KEY_NOT_FOUND_ERROR)).To(gm.BeTrue())
})

gg.It("BatchRead TTL", func() {
// WARNING: This test takes a long time to run due to sleeps.
// Define keys
key1, _ := as.NewKey(ns, set, 88888)
key2, _ := as.NewKey(ns, set, 88889)

// Write keys with ttl.
bwp := as.NewBatchWritePolicy()
bwp.Expiration = 10
bw1 := as.NewBatchWrite(bwp, key1, as.PutOp(as.NewBin("a", 1)))
bw2 := as.NewBatchWrite(bwp, key2, as.PutOp(as.NewBin("a", 1)))

list := []as.BatchRecordIfc{bw1, bw2}
err := client.BatchOperate(nil, list)
gm.Expect(err).ToNot(gm.HaveOccurred())

// Read records before they expire and reset read ttl on one record.
time.Sleep(8 * time.Second)
brp1 := as.NewBatchReadPolicy()
brp1.ReadTouchTTLPercent = 80

brp2 := as.NewBatchReadPolicy()
brp2.ReadTouchTTLPercent = -1

br1 := as.NewBatchRead(brp1, key1, []string{"a"})
br2 := as.NewBatchRead(brp2, key2, []string{"a"})

list = []as.BatchRecordIfc{br1, br2}

err = client.BatchOperate(nil, list)
gm.Expect(err).ToNot(gm.HaveOccurred())

gm.Expect(types.OK, br1.ResultCode)
gm.Expect(types.OK, br2.ResultCode)

// Read records again, but don't reset read ttl.
time.Sleep(3 * time.Second)
brp1.ReadTouchTTLPercent = -1
brp2.ReadTouchTTLPercent = -1

br1 = as.NewBatchRead(brp1, key1, []string{"a"})
br2 = as.NewBatchRead(brp2, key2, []string{"a"})

list = []as.BatchRecordIfc{br1, br2}

err = client.BatchOperate(nil, list)
gm.Expect(err).ToNot(gm.HaveOccurred())

// Key 2 should have expired.
gm.Expect(types.OK, br1.ResultCode)
gm.Expect(types.KEY_NOT_FOUND_ERROR, br2.ResultCode)

// Read record after it expires, showing it's gone.
time.Sleep(8 * time.Second)
err = client.BatchOperate(nil, list)
gm.Expect(types.KEY_NOT_FOUND_ERROR, br1.ResultCode)
gm.Expect(types.KEY_NOT_FOUND_ERROR, br2.ResultCode)
})
})

gg.Context("BatchUDF operations", func() {
gg.BeforeEach(func() {
if *dbaas {
Expand Down
2 changes: 1 addition & 1 deletion batch_udf.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (bu *BatchUDF) equals(obj BatchRecordIfc) bool {

// Return wire protocol size. For internal use only.
func (bu *BatchUDF) size(parentPolicy *BasePolicy) (int, Error) {
size := 6 // gen(2) + exp(4) = 6
size := 2 // gen(2) = 2

if bu.policy != nil {
if bu.policy.FilterExpression != nil {
Expand Down
3 changes: 1 addition & 2 deletions batch_udf_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ type BatchUDFPolicy struct {
DurableDelete bool

// SendKey determines to whether send user defined key in addition to hash digest on both reads and writes.
// If the key is sent on a write, the key will be stored with the record on
// the server.
// If true and the UDF writes a record, the key will be stored with the record on the server.
// The default is to not send the user defined key.
SendKey bool // = false
}
Expand Down
2 changes: 1 addition & 1 deletion batch_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (bw *BatchWrite) equals(obj BatchRecordIfc) bool {

// Return wire protocol size. For internal use only.
func (bw *BatchWrite) size(parentPolicy *BasePolicy) (int, Error) {
size := 6 // gen(2) + exp(4) = 6
size := 2 // gen(2) = 2

if bw.policy != nil {
if bw.policy.FilterExpression != nil {
Expand Down
43 changes: 36 additions & 7 deletions cdt_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

package aerospike

// List operations support negative indexing. If the index is negative, the
// List operations support negative indexing. If the index is negative, the
// resolved index starts backwards from end of list. If an index is out of bounds,
// a parameter error will be returned. If a range is partially out of bounds, the
// valid part of the range will be returned. Index/Range examples:
Expand All @@ -29,7 +29,7 @@ package aerospike
// Index -3 Count 3: Last three items in list.
// Index -5 Count 4: Range between fifth to last item to second to last item inclusive.
//
// Nested CDT operations are supported by optional Ctx context arguments. Examples:
// Nested CDT operations are supported by optional Ctx context arguments. Examples:
//
// bin = [[7,9,5],[1,2,3],[6,5,4,1]]
// Append 11 to last list.
Expand Down Expand Up @@ -151,10 +151,10 @@ const (
// ListReturnTypeExists returns true if count > 0.
ListReturnTypeExists ListReturnType = 13

// ListReturnTypeInverted will invert meaning of list command and return values. For example:
// ListReturnTypeInverted will invert meaning of list command and return values. For example:
// ListOperation.getByIndexRange(binName, index, count, ListReturnType.INDEX | ListReturnType.INVERTED)
// With the INVERTED flag enabled, the items outside of the specified index range will be returned.
// The meaning of the list command can also be inverted. For example:
// The meaning of the list command can also be inverted. For example:
// ListOperation.removeByIndexRange(binName, index, count, ListReturnType.INDEX | ListReturnType.INVERTED);
// With the INVERTED flag enabled, the items outside of the specified index range will be removed and returned.
ListReturnTypeInverted ListReturnType = 0x10000
Expand All @@ -180,7 +180,7 @@ const (
ListWriteFlagsDefault = 0
// ListWriteFlagsAddUnique means: Only add unique values.
ListWriteFlagsAddUnique = 1
// ListWriteFlagsInsertBounded means: Enforce list boundaries when inserting. Do not allow values to be inserted
// ListWriteFlagsInsertBounded means: Enforce list boundaries when inserting. Do not allow values to be inserted
// at index outside current list boundaries.
ListWriteFlagsInsertBounded = 2
// ListWriteFlagsNoFail means: do not raise error if a list item fails due to write flag constraints.
Expand Down Expand Up @@ -425,7 +425,7 @@ func cdtListOrderFlag(order ListOrderType, pad bool) int {

// ListCreateOp creates list create operation.
// Server creates list at given context level. The context is allowed to be beyond list
// boundaries only if pad is set to true. In that case, nil list entries will be inserted to
// boundaries only if pad is set to true. In that case, nil list entries will be inserted to
// satisfy the context position.
func ListCreateOp(binName string, listOrder ListOrderType, pad bool, ctx ...*CDTContext) *Operation {
// If context not defined, the set order for top-level bin list.
Expand All @@ -435,12 +435,41 @@ func ListCreateOp(binName string, listOrder ListOrderType, pad bool, ctx ...*CDT
return &Operation{opType: _CDT_MODIFY, ctx: ctx, binName: binName, binValue: ListValue{_CDT_LIST_SET_TYPE, cdtListOrderFlag(listOrder, pad), IntegerValue(listOrder)}, encoder: cdtCreateOpEncoder}
}

// ListCreateOp creates list create operation with a persisted index.
// A list index improves lookup performance, but requires more storage.
// A list index can be created for a top-level ordered list only.
// Nested and unordered list indexes are not supported.
//
// Server creates list at given context level. The context is allowed to be beyond list
// boundaries only if pad is set to true. In that case, nil list entries will be inserted to
// satisfy the context position.
func ListCreateWithIndexOp(binName string, listOrder ListOrderType, pad bool, ctx ...*CDTContext) *Operation {
// If context not defined, the set order for top-level bin list.
if len(ctx) == 0 {
return ListSetOrderWithIndexOp(binName, listOrder)
}

// Create nested list. persistIndex does not apply here, so ignore it.
return &Operation{opType: _CDT_MODIFY, ctx: ctx, binName: binName, binValue: ListValue{_CDT_LIST_SET_TYPE, cdtListOrderFlag(listOrder, pad), IntegerValue(listOrder)}, encoder: cdtCreateOpEncoder}
}

// ListSetOrderOp creates a set list order operation.
// Server sets list order. Server returns nil.
// Server sets list order. Server returns nil.
func ListSetOrderOp(binName string, listOrder ListOrderType, ctx ...*CDTContext) *Operation {
return &Operation{opType: _CDT_MODIFY, ctx: ctx, binName: binName, binValue: ListValue{_CDT_LIST_SET_TYPE, IntegerValue(listOrder)}, encoder: listGenericOpEncoder}
}

// ListSetOrderWithIndexOp creates a set list order operation with a persisted index.
// A list index improves lookup performance, but requires more storage.
// A list index can be created for a top-level ordered list only.
// Nested and unordered list indexes are not supported.
//
// Server sets list order. Server returns nil.
func ListSetOrderWithIndexOp(binName string, listOrder ListOrderType, ctx ...*CDTContext) *Operation {
listOrder |= 0x10
return &Operation{opType: _CDT_MODIFY, ctx: ctx, binName: binName, binValue: ListValue{_CDT_LIST_SET_TYPE, IntegerValue(listOrder)}, encoder: listGenericOpEncoder}
}

// ListAppendOp creates a list append operation.
// Server appends values to end of list bin.
// Server returns list size on bin name.
Expand Down
13 changes: 13 additions & 0 deletions cdt_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,19 @@ var _ = gg.Describe("CDT List Test", func() {
gm.Expect(sz.Bins[cdtBinName]).To(gm.Equal(100 * 2))
})

gg.It("should create a valid CDT List with persisted index", func() {
cdtBinName := "indexedList"
cdtList, err := client.Operate(wpolicy, key,
as.ListCreateWithIndexOp(cdtBinName, as.ListOrderOrdered, false),
as.ListAppendWithPolicyOp(as.DefaultListPolicy(), cdtBinName, 1, 2, 3, 4),
as.GetBinOp(cdtBinName),
)
gm.Expect(err).ToNot(gm.HaveOccurred())
gm.Expect(cdtList).ToNot(gm.BeNil())
gm.Expect(cdtList.Bins).ToNot(gm.BeNil())
gm.Expect(cdtList.Bins[cdtBinName]).To(gm.Equal([]interface{}{4, []interface{}{1, 2, 3, 4}}))
})

gg.Describe("CDT List Operations", func() {

const listSize = 10
Expand Down
7 changes: 7 additions & 0 deletions cdt_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ package aerospike
// Unique key map bin operations. Create map operations used by the client operate command.
// The default unique key map is unordered.
//
// The default unique key map is unordered. Valid map key types are:
// String
// Integer
// []byte
//
// The server will validate map key types in an upcoming release.
//
// All maps maintain an index and a rank. The index is the item offset from the start of the map,
// for both unordered and ordered maps. The rank is the sorted index of the value component.
// Map supports negative indexing for index and rank.
Expand Down
5 changes: 5 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1750,6 +1750,11 @@ func (clnt *Client) Stats() (map[string]interface{}, Error) {
}

res["open-connections"] = clusterStats.ConnectionsOpen.Get()
res["total-nodes"] = len(clnt.cluster.GetNodes())

aggstats := res["cluster-aggregated-stats"].(map[string]interface{})
aggstats["exceeded-max-retries"] = clnt.cluster.maxRetriesExceededCount.Get()
aggstats["exceeded-total-timeout"] = clnt.cluster.totalTimeoutExceededCount.Get()

return res, nil
}
Expand Down
Loading

0 comments on commit 9de8b88

Please sign in to comment.