Skip to content

Commit

Permalink
Simplify region life cycle
Browse files Browse the repository at this point in the history
* Removed RegionInfo.SetClient. The client is now set to nil by
MarkUnavailable, and then set again with MarkAvailable. This removes
bugs and races where a region info may have its Client set when it
shouldn't be. A region now has 3 distinct states it may be in:

1. Unavailable: AvailabilityChan() is non-nil and Client() is nil
2. Available: AvailabilityChan() is nil and Client() is non-nil
3. Stale: AvailabilityChan() is nil and Client() is nil

* Documented RegionInfo type with information about those 3 states and
the lifecycle.

* Documented establishRegion with what it does and what it's
requirements are.

* Removed calls to SetClient(nil)/MarkUnavailable in clientRegionCache
methods. That work is the responsibility of the caller.

* getRegionAndClientForRPC no longer marks a region as unavailable if
Client is nil after waiting on the AvailabilityChan. If Client is nil,
that indicates the region is stale and a lookup has to be performed.
Marking it as unavailable and trying to reestablish it is wasted
effort.

* The above change broke the use of the meta and admin regions, which
are created when creating the gohbase Client/AdminClient, but don't
have a Client set. These regions were relying on a missing Client to
cause the region to be established. Now the meta and admin regions are
established as part of NewClient and NewAdminClient, respectively.
  • Loading branch information
aaronbee committed Jul 29, 2023
1 parent 4bda353 commit 1489640
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 80 deletions.
5 changes: 5 additions & 0 deletions admin_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ func newAdminClient(zkquorum string, options ...Option) AdminClient {
option(c)
}
c.zkClient = zk.NewClient(zkquorum, c.zkTimeout)

// Get client connection for admin region
c.adminRegionInfo.MarkUnavailable()
go c.reestablishRegion(c.adminRegionInfo)

return c
}

Expand Down
7 changes: 1 addition & 6 deletions caches.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ func (rcc *clientRegionCache) del(r hrpc.RegionInfo) {
rcc.m.Lock()
c := r.Client()
if c != nil {
r.SetClient(nil)
regions := rcc.regions[c]
delete(regions, r)
}
Expand All @@ -71,11 +70,7 @@ func (rcc *clientRegionCache) del(r hrpc.RegionInfo) {

func (rcc *clientRegionCache) closeAll() {
rcc.m.Lock()
for client, regions := range rcc.regions {
for region := range regions {
region.MarkUnavailable()
region.SetClient(nil)
}
for client := range rcc.regions {
client.Close()
}
rcc.m.Unlock()
Expand Down
4 changes: 4 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ func newClient(zkquorum string, options ...Option) *client {
//since the zkTimeout could be changed as an option
c.zkClient = zk.NewClient(zkquorum, c.zkTimeout)

// Get client connection for meta region
c.metaRegionInfo.MarkUnavailable()
go c.reestablishRegion(c.metaRegionInfo)

return c
}

Expand Down
6 changes: 3 additions & 3 deletions debug_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestDebugStateSanity(t *testing.T) {
} else if len(os) != 0 {
t.Errorf("Didn't expect any overlaps, got: %v", os)
}
region1.SetClient(regClient)
setRegionClient(region1, regClient)
client.clients.put("regionserver:1", region1, newClientFn)

region2 := region.NewInfo(
Expand All @@ -60,7 +60,7 @@ func TestDebugStateSanity(t *testing.T) {
} else if len(os) != 0 {
t.Errorf("Didn't expect any overlaps, got: %v", os)
}
region2.SetClient(regClient)
setRegionClient(region2, regClient)
client.clients.put("regionserver:1", region2, newClientFn)

region3 := region.NewInfo(
Expand All @@ -76,7 +76,7 @@ func TestDebugStateSanity(t *testing.T) {
} else if len(os) != 0 {
t.Errorf("Didn't expect any overlaps, got: %v", os)
}
region3.SetClient(regClient)
setRegionClient(region3, regClient)
client.clients.put("regionserver:1", region3, newClientFn)

jsonVal, err := DebugState(client)
Expand Down
21 changes: 19 additions & 2 deletions hrpc/call.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,29 @@ import (
)

// RegionInfo represents HBase region.
//
// A RegionInfo may be in one of three states:
//
// 1. Unavailable: AvailabilityChan() is non-nil and Client() is nil
// 2. Available: AvailabilityChan() is nil and Client() is non-nil
// 3. Stale: AvailabilityChan() is nil and Client() is nil
//
// RegionInfo lifecycle:
//
// On creation and when seeing certain errors a region should have
// MarkUnavailable called to initialize the AvailabilityChan. If
// MarkUnavailable returns true then (re)establishRegion will need to
// be called to find a client for the region. The user of the
// RegionInfo can watch the AvailabilityChan to know when the region
// has been established. After the AvailabilityChan is closed Client()
// may return an initialized RegionClient or it may return nil, in
// which case the region is stale and the region lookup should be
// performed again.
type RegionInfo interface {
IsUnavailable() bool
AvailabilityChan() <-chan struct{}
MarkUnavailable() bool
MarkAvailable()
MarkAvailable(RegionClient)
MarkDead()
Context() context.Context
String() string
Expand All @@ -31,7 +49,6 @@ type RegionInfo interface {
StopKey() []byte
Namespace() []byte
Table() []byte
SetClient(RegionClient)
Client() RegionClient
}

Expand Down
3 changes: 1 addition & 2 deletions hrpc/hrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ func (ri mockRegionInfo) Name() []byte {
func (ri mockRegionInfo) IsUnavailable() bool { return true }
func (ri mockRegionInfo) AvailabilityChan() <-chan struct{} { return nil }
func (ri mockRegionInfo) MarkUnavailable() bool { return true }
func (ri mockRegionInfo) MarkAvailable() {}
func (ri mockRegionInfo) MarkAvailable(RegionClient) {}
func (ri mockRegionInfo) MarkDead() {}
func (ri mockRegionInfo) Context() context.Context { return nil }
func (ri mockRegionInfo) String() string { return "" }
Expand All @@ -491,7 +491,6 @@ func (ri mockRegionInfo) StartKey() []byte { return nil }
func (ri mockRegionInfo) StopKey() []byte { return nil }
func (ri mockRegionInfo) Namespace() []byte { return nil }
func (ri mockRegionInfo) Table() []byte { return nil }
func (ri mockRegionInfo) SetClient(RegionClient) {}
func (ri mockRegionInfo) Client() RegionClient { return nil }

type byFamily []*pb.MutationProto_ColumnValue
Expand Down
6 changes: 4 additions & 2 deletions region/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ func (i *info) MarkUnavailable() bool {
created := false
i.m.Lock()
if i.available == nil {
i.client = nil
i.available = make(chan struct{})
created = true
}
Expand All @@ -186,10 +187,11 @@ func (i *info) MarkUnavailable() bool {

// MarkAvailable will mark this region as available again, by closing the struct
// returned by AvailabilityChan
func (i *info) MarkAvailable() {
func (i *info) MarkAvailable(c hrpc.RegionClient) {
i.m.Lock()
ch := i.available
i.available = nil
i.client = c
close(ch)
i.m.Unlock()
}
Expand Down Expand Up @@ -254,7 +256,7 @@ func (i *info) Client() hrpc.RegionClient {
return c
}

// SetClient sets region client
// SetClient sets region client. Used in tests only
func (i *info) SetClient(c hrpc.RegionClient) {
i.m.Lock()
i.client = c
Expand Down
77 changes: 34 additions & 43 deletions rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,31 +128,9 @@ func (c *client) getRegionAndClientForRPC(ctx context.Context, rpc hrpc.Call) (

client := reg.Client()
if client == nil {
// There was an error getting the region client. Mark the
// region as unavailable.
if reg.MarkUnavailable() {
// If this was the first goroutine to mark the region as
// unavailable, start a goroutine to reestablish a connection
go c.reestablishRegion(reg)
}
if ch := reg.AvailabilityChan(); ch != nil {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-c.done:
return nil, ErrClientClosed
case <-ch:
}
}
if reg.Context().Err() != nil {
// region is dead because it was split or merged,
// retry lookup
continue
}
client = reg.Client()
if client == nil {
continue
}
// region client is nil after marked available, likely
// means the region is dead. Let's retry
continue
}
rpc.SetRegion(reg)
return client, nil
Expand Down Expand Up @@ -399,7 +377,6 @@ func (c *client) clientDown(client hrpc.RegionClient) {
downregions := c.clients.clientDown(client)
for downreg := range downregions {
if downreg.MarkUnavailable() {
downreg.SetClient(nil)
go c.reestablishRegion(downreg)
}
}
Expand Down Expand Up @@ -656,14 +633,28 @@ func isRegionEstablished(rc hrpc.RegionClient, reg hrpc.RegionInfo) error {
}
}

// establishRegion will attempt to find a region client for reg and
// create a connection to that region client.
//
// reg must have had MarkUnavailable called on it before being passed
// to establishRegion to have its AvailabilityChan created.
// establishRegion will call MarkAvailable on the region before
// returning, closing the AvailabilityChan.
//
// When successfully connected to a region server, the region will
// have its Client set. On error it will retry. If the region is found
// to be stale, eg. it has been replaced with a newer region due to a
// split, or otherwise is not found in the meta region then the region
// will have a nil Client associated with it. Callers will need to
// retry lookup.
func (c *client) establishRegion(reg hrpc.RegionInfo, addr string) {
var backoff time.Duration
var err error
for {
backoff, err = sleepAndIncreaseBackoff(reg.Context(), backoff)
if err != nil {
// region is dead
reg.MarkAvailable()
reg.MarkAvailable(nil) // unblock waiters
return
}
if addr == "" {
Expand All @@ -677,7 +668,7 @@ func (c *client) establishRegion(reg hrpc.RegionInfo, addr string) {
// region doesn't exist, delete it from caches
c.regions.del(originalReg)
c.clients.del(originalReg)
originalReg.MarkAvailable()
originalReg.MarkAvailable(nil) // unblock waiters

log.WithFields(log.Fields{
"region": originalReg.String(),
Expand All @@ -686,9 +677,10 @@ func (c *client) establishRegion(reg hrpc.RegionInfo, addr string) {
}).Info("region does not exist anymore")

return
} else if originalReg.Context().Err() != nil {
}
if originalReg.Context().Err() != nil {
// region is dead
originalReg.MarkAvailable()
originalReg.MarkAvailable(nil) // unblock waiters

log.WithFields(log.Fields{
"region": originalReg.String(),
Expand All @@ -697,10 +689,12 @@ func (c *client) establishRegion(reg hrpc.RegionInfo, addr string) {
}).Info("region became dead while establishing client for it")

return
} else if err == ErrClientClosed {
// client has been closed
}
if err == ErrClientClosed {
// gohbase client has been closed
return
} else if err != nil {
}
if err != nil {
log.WithFields(log.Fields{
"region": originalReg.String(),
"err": err,
Expand All @@ -714,8 +708,9 @@ func (c *client) establishRegion(reg hrpc.RegionInfo, addr string) {
overlaps, replaced := c.regions.put(reg)
if !replaced {
// a region that is the same or younger is already in cache
reg.MarkAvailable()
originalReg.MarkAvailable()
// TODO: Could the younger region be
reg.MarkAvailable(nil)
originalReg.MarkAvailable(nil)
return
}
// otherwise delete the overlapped regions in cache
Expand All @@ -724,7 +719,7 @@ func (c *client) establishRegion(reg hrpc.RegionInfo, addr string) {
}
// let rpcs know that they can retry and either get the newly
// added region from cache or lookup the one they need
originalReg.MarkAvailable()
originalReg.MarkAvailable(nil)
} else {
// same region, discard the looked up one
reg = originalReg
Expand Down Expand Up @@ -754,24 +749,20 @@ func (c *client) establishRegion(reg hrpc.RegionInfo, addr string) {

if err == nil {
if reg == c.adminRegionInfo {
reg.SetClient(client)
reg.MarkAvailable()
reg.MarkAvailable(client)
return
}

if err = isRegionEstablished(client, reg); err == nil {
// set region client so that as soon as we mark it available,
// concurrent readers are able to find the client
reg.SetClient(client)
reg.MarkAvailable()
reg.MarkAvailable(client)
return
} else if _, ok := err.(region.ServerError); ok {
// the client we got died
c.clientDown(client)
}
} else if err == context.Canceled {
// region is dead
reg.MarkAvailable()
reg.MarkAvailable(nil)
return
} else {
// otherwise Dial failed, purge the client and retry.
Expand Down
Loading

0 comments on commit 1489640

Please sign in to comment.