Skip to content

Commit

Permalink
hrpc, region: Allow setting priority on Scan and Get requests
Browse files Browse the repository at this point in the history
Provide an option for setting the priority on a Scan or Get request.
The priority is set on the RequestHeader, so must be set by the region
code when serializing the request.
  • Loading branch information
aaronbee authored and dethi committed Aug 14, 2024
1 parent 731f0bd commit e08f807
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 1 deletion.
1 change: 1 addition & 0 deletions hrpc/call.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ type hasQueryOptions interface {
setResultOffset(offset uint32)
setCacheBlocks(cacheBlocks bool)
setConsistency(consistency ConsistencyType)
setPriority(priority uint32)
}

// RPCResult is struct that will contain both the resulting message from an RPC
Expand Down
20 changes: 20 additions & 0 deletions hrpc/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type baseQuery struct {
maxVersions uint32
storeLimit uint32
storeOffset uint32
priority uint32
cacheBlocks bool
consistency ConsistencyType
}
Expand Down Expand Up @@ -97,6 +98,15 @@ func (bq *baseQuery) setCacheBlocks(cacheBlocks bool) {
func (bq *baseQuery) setConsistency(consistency ConsistencyType) {
bq.consistency = consistency
}
func (bq *baseQuery) setPriority(priority uint32) {
bq.priority = priority
}
func (bq *baseQuery) Priority() *uint32 {
if bq.priority == 0 {
return nil
}
return &bq.priority
}

// Families option adds families constraint to a Scan or Get request.
func Families(f map[string][]string) func(Call) error {
Expand Down Expand Up @@ -218,3 +228,13 @@ func Consistency(consistency ConsistencyType) func(Call) error {
return errors.New("'Consistency' option can only be used with Get or Scan requests")
}
}

func Priority(priority uint32) func(Call) error {
return func(hc Call) error {
if c, ok := hc.(hasQueryOptions); ok {
c.setPriority(priority)
return nil
}
return errors.New("'Priority' option can only be used with Get or Scan requests")
}
}
46 changes: 46 additions & 0 deletions hrpc/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,3 +208,49 @@ func TestCacheBlocks(t *testing.T) {
t.Error(err)
}
}

func TestPriority(t *testing.T) {
get, err := NewGet(nil, nil, nil)
if err != nil {
t.Fatal(err)
}
if got := get.Priority(); got != nil {
t.Errorf("expected nil, got %v", got)
}
get, err = NewGet(nil, nil, nil, Priority(0))
if err != nil {
t.Fatal(err)
}
if got := get.Priority(); got != nil {
t.Errorf("expected nil, got %v", got)
}
get, err = NewGet(nil, nil, nil, Priority(5))
if err != nil {
t.Fatal(err)
}
if got := get.Priority(); *got != 5 {
t.Errorf("expected priority 5, got %v", got)
}

scan, err := NewScan(nil, nil)
if err != nil {
t.Fatal(err)
}
if got := scan.Priority(); got != nil {
t.Errorf("expected nil, got %v", got)
}
scan, err = NewScan(nil, nil, Priority(0))
if err != nil {
t.Fatal(err)
}
if got := scan.Priority(); got != nil {
t.Errorf("expected nil, got %v", got)
}
scan, err = NewScan(nil, nil, Priority(5))
if err != nil {
t.Fatal(err)
}
if got := scan.Priority(); *got != 5 {
t.Errorf("expected priority 5, got %v", got)
}
}
5 changes: 4 additions & 1 deletion region/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,10 +683,13 @@ var pbTrue = proto.Bool(true)
func marshalProto(rpc hrpc.Call, callID uint32, request proto.Message,
cellblocksLen uint32) ([]byte, error) {
header := getHeader()
defer returnHeader(header)
header.MethodName = proto.String(rpc.Name())
header.RequestParam = pbTrue
header.CallId = &callID
defer returnHeader(header)
if p, ok := rpc.(interface{ Priority() *uint32 }); ok {
header.Priority = p.Priority()
}

if cellblocksLen > 0 {
header.CellBlockMeta = &pb.CellBlockMeta{
Expand Down

0 comments on commit e08f807

Please sign in to comment.