From 8136725720c108485ee2df51b7df783a02dd0f47 Mon Sep 17 00:00:00 2001 From: MyonKeminta <9948422+MyonKeminta@users.noreply.github.com> Date: Thu, 18 Jan 2024 20:48:43 +0800 Subject: [PATCH 1/2] Check time spent on attempting RPC to avoid spending too much time on retrying (#1117) * Check time spent on attempting RPC to avoid spending too much time on retrying Signed-off-by: MyonKeminta * Handle refreshRegionStore Signed-off-by: MyonKeminta * Add test Signed-off-by: MyonKeminta * Address comments Signed-off-by: MyonKeminta --------- Signed-off-by: MyonKeminta Co-authored-by: MyonKeminta --- internal/locate/main_test.go | 3 + internal/locate/region_request.go | 68 +++++++++++---- internal/locate/region_request3_test.go | 105 ++++++++++++++++++++++++ 3 files changed, 158 insertions(+), 18 deletions(-) diff --git a/internal/locate/main_test.go b/internal/locate/main_test.go index e60db8d2e..d160281fc 100644 --- a/internal/locate/main_test.go +++ b/internal/locate/main_test.go @@ -17,10 +17,13 @@ package locate import ( "testing" + "github.com/tikv/client-go/v2/util" "go.uber.org/goleak" ) func TestMain(m *testing.M) { + util.EnableFailpoints() + opts := []goleak.Option{ goleak.IgnoreTopFunction("github.com/pingcap/goleveldb/leveldb.(*DB).mpoolDrain"), } diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 6687a5db0..62857c2fd 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -236,18 +236,19 @@ func (s *RegionRequestSender) SendReq(bo *retry.Backoffer, req *tikvrpc.Request, } type replica struct { - store *Store - peer *metapb.Peer - epoch uint32 - attempts int + store *Store + peer *metapb.Peer + epoch uint32 + attempts int + attemptedTime time.Duration } func (r *replica) isEpochStale() bool { return r.epoch != atomic.LoadUint32(&r.store.epoch) } -func (r *replica) isExhausted(maxAttempt int) bool { - return r.attempts >= maxAttempt +func (r *replica) isExhausted(maxAttempt int, maxAttemptTime time.Duration) bool { + return r.attempts >= maxAttempt || (maxAttemptTime > 0 && r.attemptedTime >= maxAttemptTime) } type replicaSelector struct { @@ -336,7 +337,7 @@ func (state *accessKnownLeader) next(bo *retry.Backoffer, selector *replicaSelec // will not be wakened up and re-elect the leader until the follower receives // a request. So, before the new leader is elected, we should not send requests // to the unreachable old leader to avoid unnecessary timeout. - if liveness != reachable || leader.isExhausted(maxReplicaAttempt) { + if liveness != reachable || leader.isExhausted(maxReplicaAttempt, maxReplicaAttemptTime) { selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx} return nil, stateChanged{} } @@ -351,7 +352,7 @@ func (state *accessKnownLeader) onSendFailure(bo *retry.Backoffer, selector *rep selector.state = &accessByKnownProxy{leaderIdx: state.leaderIdx} return } - if liveness != reachable || selector.targetReplica().isExhausted(maxReplicaAttempt) { + if liveness != reachable || selector.targetReplica().isExhausted(maxReplicaAttempt, maxReplicaAttemptTime) { selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx} } if liveness != reachable { @@ -408,7 +409,7 @@ func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) ( if selector.targetIdx < 0 { // Search replica that is not attempted from the last accessed replica idx, selectReplica := filterReplicas(func(selectReplica *replica) bool { - return !selectReplica.isExhausted(1) + return !selectReplica.isExhausted(1, 0) }) if selectReplica != nil && idx >= 0 { state.lastIdx = idx @@ -536,7 +537,7 @@ func (state *tryNewProxy) next(bo *retry.Backoffer, selector *replicaSelector) ( func (state *tryNewProxy) isCandidate(idx AccessIndex, replica *replica) bool { // Try each peer only once - return idx != state.leaderIdx && !replica.isExhausted(1) + return idx != state.leaderIdx && !replica.isExhausted(1, 0) } func (state *tryNewProxy) onSendSuccess(selector *replicaSelector) { @@ -668,9 +669,9 @@ func (state *accessFollower) IsLeaderExhausted(leader *replica) bool { // 3. Stale read flag is removed and processing falls back to snapshot read on the leader peer. // 4. The leader peer should be retried again using snapshot read. if state.isStaleRead && state.option.leaderOnly { - return leader.isExhausted(2) + return leader.isExhausted(2, 0) } - return leader.isExhausted(1) + return leader.isExhausted(1, 0) } func (state *accessFollower) onSendFailure(bo *retry.Backoffer, selector *replicaSelector, cause error) { @@ -680,7 +681,7 @@ func (state *accessFollower) onSendFailure(bo *retry.Backoffer, selector *replic } func (state *accessFollower) isCandidate(idx AccessIndex, replica *replica) bool { - if replica.isEpochStale() || replica.isExhausted(1) || replica.store.getLivenessState() == unreachable { + if replica.isEpochStale() || replica.isExhausted(1, 0) || replica.store.getLivenessState() == unreachable { return false } if state.option.leaderOnly && idx == state.leaderIdx { @@ -751,6 +752,16 @@ func newReplicaSelector(regionCache *RegionCache, regionID RegionVerID, req *tik } } + if val, err := util.EvalFailpoint("newReplicaSelectorInitialAttemptedTime"); err == nil { + attemptedTime, err := time.ParseDuration(val.(string)) + if err != nil { + panic(err) + } + for _, r := range replicas { + r.attemptedTime = attemptedTime + } + } + return &replicaSelector{ regionCache, cachedRegion, @@ -763,7 +774,13 @@ func newReplicaSelector(regionCache *RegionCache, regionID RegionVerID, req *tik }, nil } -const maxReplicaAttempt = 10 +const ( + maxReplicaAttempt = 10 + // The maximum time to allow retrying sending requests after RPC failure. In case an RPC request fails after + // timeout (there might be network issue or the TiKV node stuck), we use this to avoid retrying 10 times which may cost too much time. + // For request using `client.ReadTimeoutShort` which is 30s, it might retry twice which costs 1min. + maxReplicaAttemptTime = time.Second * 50 +) // next creates the RPCContext of the current candidate replica. // It returns a SendError if runs out of all replicas or the cached region is invalidated. @@ -826,8 +843,9 @@ func (s *replicaSelector) refreshRegionStore() { // request is sent to the leader. newLeaderIdx := newRegionStore.workTiKVIdx s.state = &accessKnownLeader{leaderIdx: newLeaderIdx} - if s.replicas[newLeaderIdx].attempts == maxReplicaAttempt { - s.replicas[newLeaderIdx].attempts-- + if s.replicas[newLeaderIdx].isExhausted(maxReplicaAttempt, maxReplicaAttemptTime) { + s.replicas[newLeaderIdx].attempts = maxReplicaAttempt - 1 + s.replicas[newLeaderIdx].attemptedTime = 0 } } } @@ -939,10 +957,11 @@ func (s *replicaSelector) updateLeader(leader *metapb.Peer) { if replica.store.getLivenessState() != reachable { return } - if replica.isExhausted(maxReplicaAttempt) { + if replica.isExhausted(maxReplicaAttempt, maxReplicaAttemptTime) { // Give the replica one more chance and because each follower is tried only once, // it won't result in infinite retry. replica.attempts = maxReplicaAttempt - 1 + replica.attemptedTime = 0 } s.state = &accessKnownLeader{leaderIdx: AccessIndex(i)} // Update the workTiKVIdx so that following requests can be sent to the leader immediately. @@ -1419,8 +1438,12 @@ func (s *RegionRequestSender) sendReqToRegion(bo *retry.Backoffer, rpcCtx *RPCCo if !injectFailOnSend { start := time.Now() resp, err = s.client.SendRequest(ctx, sendToAddr, req, timeout) + rpcDuration := time.Since(start) + if s.replicaSelector != nil { + s.replicaSelector.recordAttemptedTime(rpcDuration) + } if s.Stats != nil { - RecordRegionRequestRuntimeStats(s.Stats, req.Type, time.Since(start)) + RecordRegionRequestRuntimeStats(s.Stats, req.Type, rpcDuration) if val, fpErr := util.EvalFailpoint("tikvStoreRespResult"); fpErr == nil { if val.(bool) { if req.Type == tikvrpc.CmdCop && bo.GetTotalSleep() == 0 { @@ -1973,3 +1996,12 @@ func (s *replicaSelector) patchRequestSource(req *tikvrpc.Request, rpcCtx *RPCCo } sb.WriteString(req.ReadType) } + +func (s *replicaSelector) recordAttemptedTime(duration time.Duration) { + if targetReplica := s.targetReplica(); targetReplica != nil { + targetReplica.attemptedTime += duration + } + if proxyReplica := s.proxyReplica(); proxyReplica != nil { + proxyReplica.attemptedTime += duration + } +} diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index 38bd4ae4b..e47b5b47b 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -37,11 +37,13 @@ package locate import ( "context" "strconv" + "sync" "sync/atomic" "testing" "time" "unsafe" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" @@ -1313,3 +1315,106 @@ func (s *testRegionRequestToThreeStoresSuite) TestRetryRequestSource() { } } } + +func (s *testRegionRequestToThreeStoresSuite) TestLeaderStuck() { + key := []byte("key") + value := []byte("value1") + + s.NoError(failpoint.Enable("tikvclient/injectLiveness", `return("reachable")`)) + defer func() { + s.NoError(failpoint.Disable("tikvclient/injectLiveness")) + }() + + region, err := s.regionRequestSender.regionCache.findRegionByKey(s.bo, key, false) + s.Nil(err) + regionStore := region.getStore() + oldLeader, oldLeaderPeer, _, _ := region.WorkStorePeer(regionStore) + // The follower will become the new leader later + follower, followerPeer, _, _ := region.FollowerStorePeer(regionStore, 0, &storeSelectorOp{}) + + currLeader := struct { + sync.Mutex + addr string + peer *metapb.Peer + }{ + addr: oldLeader.addr, + peer: oldLeaderPeer, + } + + requestHandled := false + + s.regionRequestSender.client = &fnClient{ + fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { + if addr == oldLeader.addr { + time.Sleep(timeout) + return nil, context.DeadlineExceeded + } + + currLeader.Lock() + leaderAddr := currLeader.addr + leaderPeer := currLeader.peer + currLeader.Unlock() + + if addr != leaderAddr { + return &tikvrpc.Response{Resp: &kvrpcpb.PrewriteResponse{RegionError: &errorpb.Error{NotLeader: &errorpb.NotLeader{ + RegionId: region.GetID(), + Leader: leaderPeer, + }}}}, nil + } + + requestHandled = true + return &tikvrpc.Response{Resp: &kvrpcpb.PrewriteResponse{}}, nil + }, + } + + // Simulate the attempted time is nearly reached so that the test won't take too much time to run. + // But the `replicaSelector` of the request sender is not initialized yet before sending any request. + // So try to control it by using a failpoint. + s.NoError(failpoint.Enable("tikvclient/newReplicaSelectorInitialAttemptedTime", fmt.Sprintf(`return("%s")`, (maxReplicaAttemptTime-time.Second).String()))) + defer func() { + s.NoError(failpoint.Disable("tikvclient/newReplicaSelectorInitialAttemptedTime")) + }() + + resCh := make(chan struct { + resp *tikvrpc.Response + err error + }) + startTime := time.Now() + go func() { + bo := retry.NewBackoffer(context.Background(), -1) + req := tikvrpc.NewRequest(tikvrpc.CmdPrewrite, &kvrpcpb.PrewriteRequest{ + Mutations: []*kvrpcpb.Mutation{{ + Op: kvrpcpb.Op_Put, + Key: key, + Value: value, + }}, + StartVersion: 100, + }) + resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, region.VerID(), time.Second*2, tikvrpc.TiKV) + resCh <- struct { + resp *tikvrpc.Response + err error + }{resp: resp, err: err} + }() + + select { + case res := <-resCh: + s.Fail("request finished too early", fmt.Sprintf("resp: %s, error: %+q", res.resp, res.err)) + case <-time.After(time.Millisecond * 200): + } + + s.cluster.ChangeLeader(region.GetID(), followerPeer.GetId()) + currLeader.Lock() + currLeader.addr = follower.addr + currLeader.peer = followerPeer + currLeader.Unlock() + + res := <-resCh + elapsed := time.Since(startTime) + + s.NoError(res.err) + s.Nil(res.resp.GetRegionError()) + s.IsType(&kvrpcpb.PrewriteResponse{}, res.resp.Resp) + s.Less(elapsed, time.Millisecond*2500) + s.True(requestHandled) +} From 58c13741225ae426467da512ab3f399a757644a4 Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Wed, 24 Jan 2024 14:11:40 +0800 Subject: [PATCH 2/2] fix build Signed-off-by: MyonKeminta --- internal/locate/region_request3_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index e47b5b47b..66a15c8e8 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -36,6 +36,7 @@ package locate import ( "context" + "fmt" "strconv" "sync" "sync/atomic" @@ -1390,7 +1391,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestLeaderStuck() { }}, StartVersion: 100, }) - resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, region.VerID(), time.Second*2, tikvrpc.TiKV) + resp, _, err := s.regionRequestSender.SendReqCtx(bo, req, region.VerID(), time.Second*2, tikvrpc.TiKV) resCh <- struct { resp *tikvrpc.Response err error