Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hrpc: enable ScanMetrics #254

Merged
merged 1 commit into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions hrpc/call.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,12 +259,12 @@ type Result struct {
Cells []*Cell
Stale bool
Partial bool
// Exists is only set if existance_only was set in the request query.
// Exists is only set if existence_only was set in the request query.
Exists *bool
}

func (c *Result) String() string {
return fmt.Sprintf("cells:%v stale:%v partial:%v exists:%v ",
return fmt.Sprintf("cells:%v stale:%v partial:%v exists:%v",
c.Cells, c.Stale, c.Partial, c.Exists)
}

Expand Down
30 changes: 30 additions & 0 deletions hrpc/hrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ func TestScanToProto(t *testing.T) {
Column: []*pb.Column{},
TimeRange: &pb.TimeRange{},
},
TrackScanMetrics: proto.Bool(false),
},
},
{ // explicitly set configurable attributes to default values
Expand Down Expand Up @@ -317,6 +318,7 @@ func TestScanToProto(t *testing.T) {
MaxVersions: nil,
CacheBlocks: nil,
},
TrackScanMetrics: proto.Bool(false),
},
},
{ // set configurable attributes to non-default values
Expand Down Expand Up @@ -350,6 +352,7 @@ func TestScanToProto(t *testing.T) {
MaxVersions: proto.Uint32(89),
CacheBlocks: proto.Bool(!DefaultCacheBlocks),
},
TrackScanMetrics: proto.Bool(false),
},
},
{ // test that pb.ScanRequest.Scan is nil when scanner id is specificed
Expand All @@ -374,6 +377,7 @@ func TestScanToProto(t *testing.T) {
ClientHandlesPartials: proto.Bool(true),
ClientHandlesHeartbeats: proto.Bool(true),
Scan: nil,
TrackScanMetrics: proto.Bool(false),
},
},
{ // set reversed attribute
Expand All @@ -393,6 +397,7 @@ func TestScanToProto(t *testing.T) {
TimeRange: &pb.TimeRange{},
Reversed: proto.Bool(true),
},
TrackScanMetrics: proto.Bool(false),
},
},
{ // set scan attribute
Expand All @@ -418,6 +423,7 @@ func TestScanToProto(t *testing.T) {
{Name: proto.String("key2"), Value: []byte("value2")},
},
},
TrackScanMetrics: proto.Bool(false),
},
},
{ // scan key range
Expand All @@ -438,6 +444,7 @@ func TestScanToProto(t *testing.T) {
StartRow: startRow,
StopRow: stopRow,
},
TrackScanMetrics: proto.Bool(false),
},
},
{ // set filters and families
Expand All @@ -459,6 +466,7 @@ func TestScanToProto(t *testing.T) {
TimeRange: &pb.TimeRange{},
Filter: pbFilter,
},
TrackScanMetrics: proto.Bool(false),
}
}(),
},
Expand All @@ -478,8 +486,30 @@ func TestScanToProto(t *testing.T) {
Column: []*pb.Column{},
TimeRange: &pb.TimeRange{},
},
TrackScanMetrics: proto.Bool(false),
},
},
// set TrackScanMetrics
{
s: func() *Scan {
s, _ := NewScanStr(ctx, "", TrackScanMetrics())
return s
}(),
expProto: func() *pb.ScanRequest {
return &pb.ScanRequest{
Region: rs,
NumberOfRows: proto.Uint32(DefaultNumberOfRows),
CloseScanner: proto.Bool(false),
ClientHandlesPartials: proto.Bool(true),
ClientHandlesHeartbeats: proto.Bool(true),
Scan: &pb.Scan{
MaxResultSize: proto.Uint64(DefaultMaxResultSize),
TimeRange: &pb.TimeRange{},
},
TrackScanMetrics: proto.Bool(true),
}
}(),
},
}

for i, tcase := range tests {
Expand Down
35 changes: 30 additions & 5 deletions hrpc/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,14 @@ type Scanner interface {

// Close should be called if it is desired to stop scanning before getting all of results.
// If you call Next() after calling Close() you might still get buffered results.
// Othwerwise, in case all results have been delivered or in case of an error, the Scanner
// Otherwise, in case all results have been delivered or in case of an error, the Scanner
// will be closed automatically. It's okay to close an already closed scanner.
Close() error
// GetScanMetrics returns the scan metrics for the scanner.
// The scan metrics are non-nil only if the Scan has TrackScanMetrics() enabled.
// GetScanMetrics should only be called after the scanner has been closed with an io.EOF
// (ie there are no more rows left to be returned by calls to Next()).
GetScanMetrics() map[string]int64
}

// Scan represents a scanner on an HBase table.
Expand All @@ -64,10 +69,11 @@ type Scan struct {

scannerID uint64

maxResultSize uint64
numberOfRows uint32
reversed bool
attribute []*pb.NameBytesPair
maxResultSize uint64
numberOfRows uint32
reversed bool
attribute []*pb.NameBytesPair
trackScanMetrics bool

closeScanner bool
allowPartialResults bool
Expand Down Expand Up @@ -178,6 +184,11 @@ func (s *Scan) NumberOfRows() uint32 {
return s.numberOfRows
}

// TrackScanMetrics returns true if the client is requesting to track scan metrics.
func (s *Scan) TrackScanMetrics() bool {
return s.trackScanMetrics
}

// ToProto converts this Scan into a protobuf message
func (s *Scan) ToProto() proto.Message {
scan := &pb.ScanRequest{
Expand All @@ -189,6 +200,7 @@ func (s *Scan) ToProto() proto.Message {
// tell server that we "handle" heartbeats by ignoring them
// since we don't really time out our scans (unless context was cancelled)
ClientHandlesHeartbeats: proto.Bool(true),
TrackScanMetrics: &s.trackScanMetrics,
}
if s.scannerID != math.MaxUint64 {
scan.ScannerId = &s.scannerID
Expand Down Expand Up @@ -334,6 +346,19 @@ func AllowPartialResults() func(Call) error {
}
}

// TrackScanMetrics is an option for scan requests.
// Enables tracking scan metrics from HBase, which will be returned in the scan response.
func TrackScanMetrics() func(Call) error {
return func(g Call) error {
scan, ok := g.(*Scan)
if !ok {
return errors.New("'TrackScanMetrics' option can only be used with Scan queries")
}
scan.trackScanMetrics = true
return nil
}
}

// Reversed is a Scan-only option which allows you to scan in reverse key order
// To use it the startKey would be greater than the end key
func Reversed() func(Call) error {
Expand Down
111 changes: 107 additions & 4 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"flag"
"fmt"
"io"
"math"
"os"
"os/exec"
"reflect"
Expand All @@ -25,8 +26,6 @@ import (
"testing"
"time"

"math"

log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/tsuna/gohbase"
Expand Down Expand Up @@ -1022,6 +1021,110 @@ func TestScanTimeRangeVersions(t *testing.T) {
}
}

func TestScanWithScanMetrics(t *testing.T) {
var (
key = "TestScanWithScanMetrics"
now = time.Now()
r1 = fmt.Sprintf("%s_%d", key, 1)
r2 = fmt.Sprintf("%s_%d", key, 2)
r3 = fmt.Sprintf("%s_%d", key, 3)
val = []byte("1")
family = "cf"
ctx = context.Background()
rowsScanned = "ROWS_SCANNED"
rowsFiltered = "ROWS_FILTERED"
)

c := gohbase.NewClient(*host)
defer c.Close()

for _, r := range []string{r1, r2, r3} {
err := insertKeyValue(c, r, family, val, hrpc.Timestamp(now))
if err != nil {
t.Fatalf("Put failed: %s", err)
}
}

tcases := []struct {
description string
filters func(call hrpc.Call) error
expectedRowsScanned int64
expectedRowsFiltered int64
noScanMetrics bool
}{
{
description: "scan metrics not enabled",
expectedRowsScanned: 0,
expectedRowsFiltered: 0,
noScanMetrics: true,
},
{
description: "2 rows scanned",
expectedRowsScanned: 2,
expectedRowsFiltered: 0,
},
{
description: "1 row scanned 1 row filtered",
filters: hrpc.Filters(filter.NewPrefixFilter([]byte(r1))),
expectedRowsScanned: 1,
expectedRowsFiltered: 1,
},
}

for _, tc := range tcases {
t.Run(tc.description, func(t *testing.T) {
var (
scan *hrpc.Scan
err error
)
if tc.noScanMetrics {
scan, err = hrpc.NewScanRangeStr(ctx, table, r1, r3)
} else if tc.filters == nil {
scan, err = hrpc.NewScanRangeStr(ctx, table, r1, r3, hrpc.TrackScanMetrics())
} else {
scan, err = hrpc.NewScanRangeStr(ctx, table, r1, r3, hrpc.TrackScanMetrics(),
tc.filters)
}
if err != nil {
t.Fatalf("Scan req failed: %s", err)
}

var results []*hrpc.Result
scanner := c.Scan(scan)
for {
var r *hrpc.Result
r, err = scanner.Next()
if err == io.EOF {
break
}
if err != nil {
t.Fatal(err)
}
results = append(results, r)
}

actualMetrics := scanner.GetScanMetrics()

if tc.noScanMetrics && actualMetrics != nil {
t.Fatalf("Expected nil scan metrics, got %v", actualMetrics)
}

scanned := actualMetrics[rowsScanned]
if tc.expectedRowsScanned != scanned {
t.Errorf("Did not get expected rows scanned - expected: %d, actual %d",
tc.expectedRowsScanned, scanned)
}

filtered := actualMetrics[rowsFiltered]
if tc.expectedRowsFiltered != filtered {
t.Errorf("Did not get expected rows filtered - expected: %d, actual %d",
tc.expectedRowsFiltered, filtered)
}
})
}

}

func TestPutTTL(t *testing.T) {
key := "TestPutTTL"
c := gohbase.NewClient(*host)
Expand All @@ -1034,13 +1137,13 @@ func TestPutTTL(t *testing.T) {
t.Fatalf("Put failed: %s", err)
}

//Wait ttl duration and try to get the value
// Wait ttl duration and try to get the value
time.Sleep(ttl)

get, err := hrpc.NewGetStr(context.Background(), table, key,
hrpc.Families(map[string][]string{"cf": nil}))

//Make sure we dont get a result back
// Make sure we don't get a result back
res, err := c.Get(get)
if err != nil {
t.Fatalf("Get failed: %s", err)
Expand Down
4 changes: 2 additions & 2 deletions pb/Cell.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading