Skip to content

Commit

Permalink
hrpc: enable ScanMetrics
Browse files Browse the repository at this point in the history
Enable tracking scan metrics in the ScanResponse. Clients can access
the metrics via calls to scanner.Next()
Support for  HBase versions < 2.6.0 where ScanMetrics provides
ROWS_SCANNED and ROWS_FILTERED metrics.
  • Loading branch information
ciacono committed Jun 4, 2024
1 parent ce0b353 commit 74ea11a
Show file tree
Hide file tree
Showing 22 changed files with 715 additions and 230 deletions.
8 changes: 5 additions & 3 deletions hrpc/call.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,13 +259,15 @@ type Result struct {
Cells []*Cell
Stale bool
Partial bool
// Exists is only set if existance_only was set in the request query.
// Exists is only set if existence_only was set in the request query.
Exists *bool
// ScanMetrics is only non-nil if track_scan_metrics was set in the request query.
ScanMetrics *ScanMetrics
}

func (c *Result) String() string {
return fmt.Sprintf("cells:%v stale:%v partial:%v exists:%v ",
c.Cells, c.Stale, c.Partial, c.Exists)
return fmt.Sprintf("cells:%v stale:%v partial:%v exists:%v scanmetrics:%v",
c.Cells, c.Stale, c.Partial, c.Exists, c.ScanMetrics)
}

func extractBool(v *bool) bool {
Expand Down
30 changes: 30 additions & 0 deletions hrpc/hrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ func TestScanToProto(t *testing.T) {
Column: []*pb.Column{},
TimeRange: &pb.TimeRange{},
},
TrackScanMetrics: proto.Bool(false),
},
},
{ // explicitly set configurable attributes to default values
Expand Down Expand Up @@ -317,6 +318,7 @@ func TestScanToProto(t *testing.T) {
MaxVersions: nil,
CacheBlocks: nil,
},
TrackScanMetrics: proto.Bool(false),
},
},
{ // set configurable attributes to non-default values
Expand Down Expand Up @@ -350,6 +352,7 @@ func TestScanToProto(t *testing.T) {
MaxVersions: proto.Uint32(89),
CacheBlocks: proto.Bool(!DefaultCacheBlocks),
},
TrackScanMetrics: proto.Bool(false),
},
},
{ // test that pb.ScanRequest.Scan is nil when scanner id is specificed
Expand All @@ -374,6 +377,7 @@ func TestScanToProto(t *testing.T) {
ClientHandlesPartials: proto.Bool(true),
ClientHandlesHeartbeats: proto.Bool(true),
Scan: nil,
TrackScanMetrics: proto.Bool(false),
},
},
{ // set reversed attribute
Expand All @@ -393,6 +397,7 @@ func TestScanToProto(t *testing.T) {
TimeRange: &pb.TimeRange{},
Reversed: proto.Bool(true),
},
TrackScanMetrics: proto.Bool(false),
},
},
{ // set scan attribute
Expand All @@ -418,6 +423,7 @@ func TestScanToProto(t *testing.T) {
{Name: proto.String("key2"), Value: []byte("value2")},
},
},
TrackScanMetrics: proto.Bool(false),
},
},
{ // scan key range
Expand All @@ -438,6 +444,7 @@ func TestScanToProto(t *testing.T) {
StartRow: startRow,
StopRow: stopRow,
},
TrackScanMetrics: proto.Bool(false),
},
},
{ // set filters and families
Expand All @@ -459,6 +466,7 @@ func TestScanToProto(t *testing.T) {
TimeRange: &pb.TimeRange{},
Filter: pbFilter,
},
TrackScanMetrics: proto.Bool(false),
}
}(),
},
Expand All @@ -478,8 +486,30 @@ func TestScanToProto(t *testing.T) {
Column: []*pb.Column{},
TimeRange: &pb.TimeRange{},
},
TrackScanMetrics: proto.Bool(false),
},
},
// set TrackScanMetrics
{
s: func() *Scan {
s, _ := NewScanStr(ctx, "", TrackScanMetrics())
return s
}(),
expProto: func() *pb.ScanRequest {
return &pb.ScanRequest{
Region: rs,
NumberOfRows: proto.Uint32(DefaultNumberOfRows),
CloseScanner: proto.Bool(false),
ClientHandlesPartials: proto.Bool(true),
ClientHandlesHeartbeats: proto.Bool(true),
Scan: &pb.Scan{
MaxResultSize: proto.Uint64(DefaultMaxResultSize),
TimeRange: &pb.TimeRange{},
},
TrackScanMetrics: proto.Bool(true),
}
}(),
},
}

for i, tcase := range tests {
Expand Down
38 changes: 34 additions & 4 deletions hrpc/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,26 @@ type Scan struct {

scannerID uint64

maxResultSize uint64
numberOfRows uint32
reversed bool
attribute []*pb.NameBytesPair
maxResultSize uint64
numberOfRows uint32
reversed bool
attribute []*pb.NameBytesPair
trackScanMetrics bool

closeScanner bool
allowPartialResults bool
}

type ScanMetrics struct {
RowsScanned int64
RowsFiltered int64
}

func (sm *ScanMetrics) String() string {
return fmt.Sprintf("Rows scanned: %d Rows filtered %d",
sm.RowsScanned, sm.RowsFiltered)
}

// baseScan returns a Scan struct with default values set.
func baseScan(ctx context.Context, table []byte,
options ...func(Call) error) (*Scan, error) {
Expand Down Expand Up @@ -178,6 +189,11 @@ func (s *Scan) NumberOfRows() uint32 {
return s.numberOfRows
}

// TrackScanMetrics returns true if the client is requesting to track scan metrics.
func (s *Scan) TrackScanMetrics() bool {
return s.trackScanMetrics
}

// ToProto converts this Scan into a protobuf message
func (s *Scan) ToProto() proto.Message {
scan := &pb.ScanRequest{
Expand All @@ -189,6 +205,7 @@ func (s *Scan) ToProto() proto.Message {
// tell server that we "handle" heartbeats by ignoring them
// since we don't really time out our scans (unless context was cancelled)
ClientHandlesHeartbeats: proto.Bool(true),
TrackScanMetrics: &s.trackScanMetrics,
}
if s.scannerID != math.MaxUint64 {
scan.ScannerId = &s.scannerID
Expand Down Expand Up @@ -334,6 +351,19 @@ func AllowPartialResults() func(Call) error {
}
}

// TrackScanMetrics is an option for scan requests.
// Enables tracking scan metrics from HBase, which will be returned in the scan response.
func TrackScanMetrics() func(Call) error {
return func(g Call) error {
scan, ok := g.(*Scan)
if !ok {
return errors.New("'TrackScanMetrics' option can only be used with Scan queries")
}
scan.trackScanMetrics = true
return nil
}
}

// Reversed is a Scan-only option which allows you to scan in reverse key order
// To use it the startKey would be greater than the end key
func Reversed() func(Call) error {
Expand Down
112 changes: 110 additions & 2 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"flag"
"fmt"
"io"
"math"
"os"
"os/exec"
"reflect"
Expand All @@ -25,8 +26,6 @@ import (
"testing"
"time"

"math"

log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/tsuna/gohbase"
Expand Down Expand Up @@ -1020,6 +1019,115 @@ func TestScanTimeRangeVersions(t *testing.T) {
if len(rsp[1].Cells) != 1 {
t.Fatalf("Expected versions: %d, Got versions: %d", 2, len(rsp[0].Cells))
}

if rsp[0].ScanMetrics != nil && rsp[1].ScanMetrics != nil {
t.Fatal("Result had ScanMetrics when they were not enabled")
}
}

func TestScanWithScanMetrics(t *testing.T) {
var (
key = "TestScanWithScanMetrics"
from = time.Now().UnixMilli()
to = from + int64(1)
r1 = fmt.Sprintf("%s_%d", key, 1)
r2 = fmt.Sprintf("%s_%d", key, 2)
f1 = filter.NewPrefixFilter([]byte(r1))
f2 = filter.NewPrefixFilter([]byte("other"))
val = []byte("1")
family = "cf"
ctx = context.Background()
)

c := gohbase.NewClient(*host)
defer c.Close()

err := insertKeyValue(c, r1, family, val,
hrpc.Timestamp(time.Unix(0, from)))
if err != nil {
t.Fatalf("Put failed: %s", err)
}
err = insertKeyValue(c, r2, family, val,
hrpc.Timestamp(time.Unix(0, from)))
if err != nil {
t.Fatalf("Put failed: %s", err)
}

tcases := []struct {
description string
filters func(call hrpc.Call) error
expectedRowsScanned int64
expectedRowsFiltered int64
noScanMetrics bool
}{
{
description: "scan metrics not enabled",
expectedRowsScanned: 0,
expectedRowsFiltered: 0,
noScanMetrics: true,
},
{
description: "2 rows scanned",
expectedRowsScanned: 2,
},
{
description: "1 row scanned 1 row filtered",
filters: hrpc.Filters(f1),
expectedRowsScanned: 1,
expectedRowsFiltered: 1,
},
{
description: "0 rows scanned 2 rows filtered",
filters: hrpc.Filters(f2),
expectedRowsScanned: 0,
expectedRowsFiltered: 2,
},
}

for _, tc := range tcases {
t.Run(tc.description, func(t *testing.T) {
var scan *hrpc.Scan
if tc.noScanMetrics {
scan, err = hrpc.NewScanStr(ctx, table,
hrpc.TimeRange(time.UnixMilli(from), time.UnixMilli(to)))
} else if tc.filters == nil {
scan, err = hrpc.NewScanStr(ctx, table, hrpc.TrackScanMetrics(),
hrpc.TimeRange(time.UnixMilli(from), time.UnixMilli(to)))
} else {
scan, err = hrpc.NewScanStr(ctx, table, hrpc.TrackScanMetrics(),
hrpc.TimeRange(time.UnixMilli(from), time.UnixMilli(to)), tc.filters)
}
if err != nil {
t.Fatalf("Scan req failed: %s", err)
}

var results []*hrpc.Result
scanner := c.Scan(scan)
for {
var r *hrpc.Result
r, err = scanner.Next()
if err == io.EOF {
break
}
if err != nil {
t.Fatal(err)
}
results = append(results, r)
}

for _, res := range results {
if res.ScanMetrics.RowsScanned != tc.expectedRowsScanned {
t.Fatalf("Expected %d rows scanned, got %d",
tc.expectedRowsScanned, res.ScanMetrics.RowsScanned)
}
if res.ScanMetrics.RowsFiltered != tc.expectedRowsFiltered {
t.Fatalf("Expected %d rows filtered, got %d",
tc.expectedRowsFiltered, res.ScanMetrics.RowsFiltered)
}
}
})
}

}

func TestPutTTL(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions pb/Cell.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 74ea11a

Please sign in to comment.