From 5c2545c6d78516170fb2ebd6b812b2a4cac49a92 Mon Sep 17 00:00:00 2001 From: Yutong Sean Date: Tue, 11 May 2021 18:35:01 +0800 Subject: [PATCH] Add semaphore to control the number of goroutine looking up meta table --- client.go | 17 +++++++++++++++++ rpc.go | 2 ++ 2 files changed, 19 insertions(+) diff --git a/client.go b/client.go index 982ddb27..0d340561 100644 --- a/client.go +++ b/client.go @@ -11,6 +11,8 @@ import ( "sync" "time" + "golang.org/x/sync/semaphore" + log "github.com/sirupsen/logrus" "github.com/tsuna/gohbase/compression" "github.com/tsuna/gohbase/hrpc" @@ -27,6 +29,7 @@ const ( defaultZkRoot = "/hbase" defaultZkTimeout = 30 * time.Second defaultEffectiveUser = "root" + defaultMetaLimit = 50 ) // Client a regular HBase client @@ -90,6 +93,12 @@ type client struct { // regionReadTimeout is the maximum amount of time to wait for regionserver reply regionReadTimeout time.Duration + // The max number of the goroutines that look up meta table simultaneously. + metaLimit int64 + + // The corresponding semaphore of metaLimit + sema *semaphore.Weighted + done chan struct{} closeOnce sync.Once @@ -126,6 +135,7 @@ func newClient(zkquorum string, options ...Option) *client { zkRoot: defaultZkRoot, zkTimeout: defaultZkTimeout, effectiveUser: defaultEffectiveUser, + metaLimit: defaultMetaLimit, regionLookupTimeout: region.DefaultLookupTimeout, regionReadTimeout: region.DefaultReadTimeout, done: make(chan struct{}), @@ -139,6 +149,13 @@ func newClient(zkquorum string, options ...Option) *client { //since the zkTimeout could be changed as an option c.zkClient = zk.NewClient(zkquorum, c.zkTimeout) + if c.metaLimit > 0 { + c.sema = semaphore.NewWeighted(c.metaLimit) + } else { + c.sema = semaphore.NewWeighted(defaultMetaLimit) + c.metaLimit = defaultMetaLimit + } + return c } diff --git a/rpc.go b/rpc.go index 370e6187..ec33aedc 100644 --- a/rpc.go +++ b/rpc.go @@ -342,8 +342,10 @@ func (c *client) metaLookup(ctx context.Context, return nil, "", err } + c.sema.Acquire(context.Background(), 1) scanner := c.Scan(rpc) resp, err := scanner.Next() + c.sema.Release(1) if err == io.EOF { return nil, "", TableNotFound }