Skip to content

Commit

Permalink
hrpc: enable ScanMetrics
Browse files Browse the repository at this point in the history
Enable tracking scan metrics in the ScanResponse. Clients can access
the metrics via calls to scanner.GetScanMetrics()
Support for  HBase versions < 2.6.0 where ScanMetrics provides
ROWS_SCANNED and ROWS_FILTERED metrics.
  • Loading branch information
ciacono committed Jun 13, 2024
1 parent faec3bd commit 875e7ef
Show file tree
Hide file tree
Showing 22 changed files with 713 additions and 221 deletions.
4 changes: 2 additions & 2 deletions hrpc/call.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,12 +259,12 @@ type Result struct {
Cells []*Cell
Stale bool
Partial bool
// Exists is only set if existance_only was set in the request query.
// Exists is only set if existence_only was set in the request query.
Exists *bool
}

func (c *Result) String() string {
return fmt.Sprintf("cells:%v stale:%v partial:%v exists:%v ",
return fmt.Sprintf("cells:%v stale:%v partial:%v exists:%v",
c.Cells, c.Stale, c.Partial, c.Exists)
}

Expand Down
30 changes: 30 additions & 0 deletions hrpc/hrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ func TestScanToProto(t *testing.T) {
Column: []*pb.Column{},
TimeRange: &pb.TimeRange{},
},
TrackScanMetrics: proto.Bool(false),
},
},
{ // explicitly set configurable attributes to default values
Expand Down Expand Up @@ -317,6 +318,7 @@ func TestScanToProto(t *testing.T) {
MaxVersions: nil,
CacheBlocks: nil,
},
TrackScanMetrics: proto.Bool(false),
},
},
{ // set configurable attributes to non-default values
Expand Down Expand Up @@ -350,6 +352,7 @@ func TestScanToProto(t *testing.T) {
MaxVersions: proto.Uint32(89),
CacheBlocks: proto.Bool(!DefaultCacheBlocks),
},
TrackScanMetrics: proto.Bool(false),
},
},
{ // test that pb.ScanRequest.Scan is nil when scanner id is specificed
Expand All @@ -374,6 +377,7 @@ func TestScanToProto(t *testing.T) {
ClientHandlesPartials: proto.Bool(true),
ClientHandlesHeartbeats: proto.Bool(true),
Scan: nil,
TrackScanMetrics: proto.Bool(false),
},
},
{ // set reversed attribute
Expand All @@ -393,6 +397,7 @@ func TestScanToProto(t *testing.T) {
TimeRange: &pb.TimeRange{},
Reversed: proto.Bool(true),
},
TrackScanMetrics: proto.Bool(false),
},
},
{ // set scan attribute
Expand All @@ -418,6 +423,7 @@ func TestScanToProto(t *testing.T) {
{Name: proto.String("key2"), Value: []byte("value2")},
},
},
TrackScanMetrics: proto.Bool(false),
},
},
{ // scan key range
Expand All @@ -438,6 +444,7 @@ func TestScanToProto(t *testing.T) {
StartRow: startRow,
StopRow: stopRow,
},
TrackScanMetrics: proto.Bool(false),
},
},
{ // set filters and families
Expand All @@ -459,6 +466,7 @@ func TestScanToProto(t *testing.T) {
TimeRange: &pb.TimeRange{},
Filter: pbFilter,
},
TrackScanMetrics: proto.Bool(false),
}
}(),
},
Expand All @@ -478,8 +486,30 @@ func TestScanToProto(t *testing.T) {
Column: []*pb.Column{},
TimeRange: &pb.TimeRange{},
},
TrackScanMetrics: proto.Bool(false),
},
},
// set TrackScanMetrics
{
s: func() *Scan {
s, _ := NewScanStr(ctx, "", TrackScanMetrics())
return s
}(),
expProto: func() *pb.ScanRequest {
return &pb.ScanRequest{
Region: rs,
NumberOfRows: proto.Uint32(DefaultNumberOfRows),
CloseScanner: proto.Bool(false),
ClientHandlesPartials: proto.Bool(true),
ClientHandlesHeartbeats: proto.Bool(true),
Scan: &pb.Scan{
MaxResultSize: proto.Uint64(DefaultMaxResultSize),
TimeRange: &pb.TimeRange{},
},
TrackScanMetrics: proto.Bool(true),
}
}(),
},
}

for i, tcase := range tests {
Expand Down
35 changes: 30 additions & 5 deletions hrpc/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,14 @@ type Scanner interface {

// Close should be called if it is desired to stop scanning before getting all of results.
// If you call Next() after calling Close() you might still get buffered results.
// Othwerwise, in case all results have been delivered or in case of an error, the Scanner
// Otherwise, in case all results have been delivered or in case of an error, the Scanner
// will be closed automatically. It's okay to close an already closed scanner.
Close() error
// GetScanMetrics returns the scan metrics for the scanner at the time of call.
// The scan metrics are non-nil only if the Scan has TrackScanMetrics() enabled
// The scan metrics for a Scan are only accurate and complete if the scanner has
// been closed with an io.EOF.
GetScanMetrics() map[string]int64
}

// Scan represents a scanner on an HBase table.
Expand All @@ -64,10 +69,11 @@ type Scan struct {

scannerID uint64

maxResultSize uint64
numberOfRows uint32
reversed bool
attribute []*pb.NameBytesPair
maxResultSize uint64
numberOfRows uint32
reversed bool
attribute []*pb.NameBytesPair
trackScanMetrics bool

closeScanner bool
allowPartialResults bool
Expand Down Expand Up @@ -178,6 +184,11 @@ func (s *Scan) NumberOfRows() uint32 {
return s.numberOfRows
}

// TrackScanMetrics returns true if the client is requesting to track scan metrics.
func (s *Scan) TrackScanMetrics() bool {
return s.trackScanMetrics
}

// ToProto converts this Scan into a protobuf message
func (s *Scan) ToProto() proto.Message {
scan := &pb.ScanRequest{
Expand All @@ -189,6 +200,7 @@ func (s *Scan) ToProto() proto.Message {
// tell server that we "handle" heartbeats by ignoring them
// since we don't really time out our scans (unless context was cancelled)
ClientHandlesHeartbeats: proto.Bool(true),
TrackScanMetrics: &s.trackScanMetrics,
}
if s.scannerID != math.MaxUint64 {
scan.ScannerId = &s.scannerID
Expand Down Expand Up @@ -334,6 +346,19 @@ func AllowPartialResults() func(Call) error {
}
}

// TrackScanMetrics is an option for scan requests.
// Enables tracking scan metrics from HBase, which will be returned in the scan response.
func TrackScanMetrics() func(Call) error {
return func(g Call) error {
scan, ok := g.(*Scan)
if !ok {
return errors.New("'TrackScanMetrics' option can only be used with Scan queries")
}
scan.trackScanMetrics = true
return nil
}
}

// Reversed is a Scan-only option which allows you to scan in reverse key order
// To use it the startKey would be greater than the end key
func Reversed() func(Call) error {
Expand Down
111 changes: 107 additions & 4 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"flag"
"fmt"
"io"
"math"
"os"
"os/exec"
"reflect"
Expand All @@ -25,8 +26,6 @@ import (
"testing"
"time"

"math"

log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/tsuna/gohbase"
Expand Down Expand Up @@ -1022,6 +1021,110 @@ func TestScanTimeRangeVersions(t *testing.T) {
}
}

func TestScanWithScanMetrics(t *testing.T) {
var (
key = "TestScanWithScanMetrics"
now = time.Now()
r1 = fmt.Sprintf("%s_%d", key, 1)
r2 = fmt.Sprintf("%s_%d", key, 2)
r3 = fmt.Sprintf("%s_%d", key, 3)
val = []byte("1")
family = "cf"
ctx = context.Background()
rowsScanned = "ROWS_SCANNED"
rowsFiltered = "ROWS_FILTERED"
)

c := gohbase.NewClient(*host)
defer c.Close()

for _, r := range []string{r1, r2, r3} {
err := insertKeyValue(c, r, family, val, hrpc.Timestamp(now))
if err != nil {
t.Fatalf("Put failed: %s", err)
}
}

tcases := []struct {
description string
filters func(call hrpc.Call) error
expectedRowsScanned int64
expectedRowsFiltered int64
noScanMetrics bool
}{
{
description: "scan metrics not enabled",
expectedRowsScanned: 0,
expectedRowsFiltered: 0,
noScanMetrics: true,
},
{
description: "2 rows scanned",
expectedRowsScanned: 2,
expectedRowsFiltered: 0,
},
{
description: "1 row scanned 1 row filtered",
filters: hrpc.Filters(filter.NewPrefixFilter([]byte(r1))),
expectedRowsScanned: 1,
expectedRowsFiltered: 1,
},
}

for _, tc := range tcases {
t.Run(tc.description, func(t *testing.T) {
var (
scan *hrpc.Scan
err error
)
if tc.noScanMetrics {
scan, err = hrpc.NewScanRangeStr(ctx, table, r1, r3)
} else if tc.filters == nil {
scan, err = hrpc.NewScanRangeStr(ctx, table, r1, r3, hrpc.TrackScanMetrics())
} else {
scan, err = hrpc.NewScanRangeStr(ctx, table, r1, r3, hrpc.TrackScanMetrics(),
tc.filters)
}
if err != nil {
t.Fatalf("Scan req failed: %s", err)
}

var results []*hrpc.Result
scanner := c.Scan(scan)
for {
var r *hrpc.Result
r, err = scanner.Next()
if err == io.EOF {
break
}
if err != nil {
t.Fatal(err)
}
results = append(results, r)
}

actualMetrics := scanner.GetScanMetrics()

if tc.noScanMetrics && actualMetrics != nil {
t.Fatalf("Expected nil scan metrics, got %v", actualMetrics)
}

scanned := actualMetrics[rowsScanned]
if tc.expectedRowsScanned != scanned {
t.Errorf("Did not get expected rows scanned - expected: %d, actual %d",
tc.expectedRowsScanned, scanned)
}

filtered := actualMetrics[rowsFiltered]
if tc.expectedRowsFiltered != filtered {
t.Errorf("Did not get expected rows filtered - expected: %d, actual %d",
tc.expectedRowsFiltered, filtered)
}
})
}

}

func TestPutTTL(t *testing.T) {
key := "TestPutTTL"
c := gohbase.NewClient(*host)
Expand All @@ -1034,13 +1137,13 @@ func TestPutTTL(t *testing.T) {
t.Fatalf("Put failed: %s", err)
}

//Wait ttl duration and try to get the value
// Wait ttl duration and try to get the value
time.Sleep(ttl)

get, err := hrpc.NewGetStr(context.Background(), table, key,
hrpc.Families(map[string][]string{"cf": nil}))

//Make sure we dont get a result back
// Make sure we don't get a result back
res, err := c.Get(get)
if err != nil {
t.Fatalf("Get failed: %s", err)
Expand Down
4 changes: 2 additions & 2 deletions pb/Cell.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 875e7ef

Please sign in to comment.