Skip to content

Commit

Permalink
use region pessimsitic lock rollback and clean for write-write confli…
Browse files Browse the repository at this point in the history
…ct processing

Signed-off-by: cfzjywxk <[email protected]>
  • Loading branch information
cfzjywxk committed Jan 22, 2024
1 parent b8a6587 commit faf5fbc
Show file tree
Hide file tree
Showing 5 changed files with 200 additions and 20 deletions.
146 changes: 146 additions & 0 deletions integration_tests/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"github.com/tikv/client-go/v2/config"
"github.com/tikv/client-go/v2/config/retry"
tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
Expand Down Expand Up @@ -1028,6 +1029,7 @@ type testLockWithTiKVSuite struct {
func (s *testLockWithTiKVSuite) SetupTest() {
if *withTiKV {
s.store = tikv.StoreProbe{KVStore: NewTestStore(s.T())}
s.cleanupLocks()
} else {
s.store = tikv.StoreProbe{KVStore: NewTestUniStore(s.T())}
}
Expand All @@ -1037,6 +1039,19 @@ func (s *testLockWithTiKVSuite) TearDownTest() {
s.store.Close()
}

func (s *testLockWithTiKVSuite) cleanupLocks() {
// Cleanup possible left locks.
bo := tikv.NewBackofferWithVars(context.Background(), int(transaction.PrewriteMaxBackoff.Load()), nil)
ctx := context.WithValue(context.Background(), util.SessionID, uint64(1))
currentTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)
s.NoError(err)
remainingLocks, err := s.store.ScanLocks(ctx, []byte("k"), []byte("l"), currentTS)
s.NoError(err)
if len(remainingLocks) > 0 {
s.mustResolve(ctx, bo, remainingLocks, currentTS, []byte("k"), []byte("l"))
}
}

// TODO: Migrate FairLocking related tests here.

func withRetry[T any](f func() (T, error), limit int, delay time.Duration) (T, error) {
Expand Down Expand Up @@ -1559,3 +1574,134 @@ func (s *testLockWithTiKVSuite) TestBatchResolveLocks() {
s.NoError(err)
s.Equal(v3, v)
}

func (s *testLockWithTiKVSuite) makeLock(startTS uint64, forUpdateTS uint64, key []byte, primary []byte) *txnlock.Lock {
return &txnlock.Lock{
Key: key,
Primary: primary,
TxnID: startTS,
TTL: 10,
TxnSize: 1024,
LockType: kvrpcpb.Op_PessimisticLock,
UseAsyncCommit: false,
LockForUpdateTS: forUpdateTS,
MinCommitTS: forUpdateTS,
}
}

func (s *testLockWithTiKVSuite) mustLockNum(ctx context.Context, expectedNum int, scanTS uint64, startKey []byte, endKey []byte) {
remainingLocks, err := s.store.ScanLocks(ctx, startKey, endKey, scanTS)
s.NoError(err)
s.Len(remainingLocks, expectedNum)
}

func (s *testLockWithTiKVSuite) mustResolve(ctx context.Context, bo *retry.Backoffer, remainingLocks []*txnlock.Lock, callerTS uint64, startKey []byte, endKey []byte) {
if len(remainingLocks) > 0 {
_, err := s.store.GetLockResolver().ResolveLocksWithOpts(bo, txnlock.ResolveLocksOptions{
CallerStartTS: callerTS,
Locks: remainingLocks,
Lite: false,
ForRead: false,
Detail: nil,
PessimisticRegionResolve: true,
})
s.NoError(err)

lockAfterResolve, err := s.store.ScanLocks(ctx, startKey, endKey, callerTS)
s.NoError(err)
s.Len(lockAfterResolve, 0)
}
}

func (s *testLockWithTiKVSuite) TestPessimisticRollbackWithRead() {
// The test relies on the pessimistic rollback read phase implementations in tikv
// https://github.com/tikv/tikv/pull/16185, which is not implemented in mockstore by now.
if !*withTiKV {
return
}

s.NoError(failpoint.Enable("tikvclient/shortPessimisticLockTTL", "return"))
s.NoError(failpoint.Enable("tikvclient/twoPCShortLockTTL", "return"))
defer func() {
s.NoError(failpoint.Disable("tikvclient/shortPessimisticLockTTL"))
s.NoError(failpoint.Disable("tikvclient/twoPCShortLockTTL"))
}()
test := func(inMemoryLock bool) {
recoverFunc := s.trySetTiKVConfig("pessimistic-txn.in-memory", inMemoryLock)
defer recoverFunc()

// Init, cleanup possible left locks.
bo := tikv.NewBackofferWithVars(context.Background(), int(transaction.PrewriteMaxBackoff.Load()), nil)
ctx := context.WithValue(context.Background(), util.SessionID, uint64(1))
s.cleanupLocks()

// Basic case, three keys could be rolled back within one pessimistic rollback request.
k1, k2, k3 := []byte("k1"), []byte("k2"), []byte("k3")
txn1, err := s.store.Begin()
s.NoError(err)
startTS := txn1.StartTS()
txn1.SetPessimistic(true)
lockCtx := kv.NewLockCtx(startTS, 200, time.Now())
err = txn1.LockKeys(ctx, lockCtx, k1, k2, k3)
s.NoError(err)
txn1.GetCommitter().CloseTTLManager()

time.Sleep(time.Millisecond * 100)
s.mustLockNum(ctx, 3, startTS+1, []byte("k"), []byte("l"))
locks := []*txnlock.Lock{
s.makeLock(startTS, startTS, k3, k1),
}
s.mustResolve(ctx, bo, locks, startTS+1, []byte("k"), []byte("l"))

time.Sleep(time.Millisecond * 100)
s.mustLockNum(ctx, 0, startTS+1, []byte("k"), []byte("l"))

// Acquire pessimistic locks for more than 256(RESOLVE_LOCK_BATCH_SIZE) keys.
formatKey := func(prefix rune, i int) []byte {
return []byte(fmt.Sprintf("%c%04d", prefix, i))
}
numKeys := 1000
prewriteKeys := make([][]byte, 0, numKeys/2)
pessimisticLockKeys := make([][]byte, 0, numKeys/2)
for i := 0; i < numKeys; i++ {
key := formatKey('k', i)
if i%2 == 0 {
err = txn1.LockKeys(ctx, lockCtx, key)
pessimisticLockKeys = append(pessimisticLockKeys, key)
} else {
err = txn1.Set(key, []byte("val"))
s.NoError(err)
prewriteKeys = append(prewriteKeys, key)
}
s.NoError(err)
}
committer, err := txn1.NewCommitter(1)
s.NoError(err)
mutations := committer.MutationsOfKeys(prewriteKeys)
err = committer.PrewriteMutations(ctx, mutations)
s.NoError(err)

// All the pessimistic locks belonging to the same transaction are pessimistic
// rolled back within one request.
time.Sleep(time.Millisecond * 100)
pessimisticLock := s.makeLock(startTS, startTS, pessimisticLockKeys[1], pessimisticLockKeys[0])
_, err = s.store.GetLockResolver().ResolveLocksWithOpts(bo, txnlock.ResolveLocksOptions{
CallerStartTS: startTS + 1,
Locks: []*txnlock.Lock{pessimisticLock},
Lite: false,
ForRead: false,
Detail: nil,
PessimisticRegionResolve: true,
})
s.NoError(err)

time.Sleep(time.Millisecond * 100)
s.mustLockNum(ctx, numKeys/2, startTS+1, []byte("k"), []byte("l"))

// Cleanup.
err = txn1.Rollback()
s.NoError(err)
}
test(false)
test(true)
}
10 changes: 6 additions & 4 deletions txnkv/transaction/pessimistic.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,9 @@ func (action actionPessimisticLock) handlePessimisticLockResponseNormalMode(
c.store.GetLockResolver().UpdateResolvingLocks(locks, c.startTS, *diagCtx.resolvingRecordToken)
}
resolveLockOpts := txnlock.ResolveLocksOptions{
CallerStartTS: 0,
Locks: locks,
CallerStartTS: 0,
Locks: locks,
PessimisticRegionResolve: true,
}
if action.LockCtx.Stats != nil {
resolveLockOpts.Detail = &action.LockCtx.Stats.ResolveLock
Expand Down Expand Up @@ -484,8 +485,9 @@ func (action actionPessimisticLock) handlePessimisticLockResponseForceLockMode(
c.store.GetLockResolver().UpdateResolvingLocks(locks, c.startTS, *diagCtx.resolvingRecordToken)
}
resolveLockOpts := txnlock.ResolveLocksOptions{
CallerStartTS: 0,
Locks: locks,
CallerStartTS: 0,
Locks: locks,
PessimisticRegionResolve: true,
}
if action.LockCtx.Stats != nil {
resolveLockOpts.Detail = &action.LockCtx.Stats.ResolveLock
Expand Down
7 changes: 4 additions & 3 deletions txnkv/transaction/prewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,9 +471,10 @@ func (action actionPrewrite) handleSingleBatch(
c.store.GetLockResolver().UpdateResolvingLocks(locks, c.startTS, *resolvingRecordToken)
}
resolveLockOpts := txnlock.ResolveLocksOptions{
CallerStartTS: c.startTS,
Locks: locks,
Detail: &c.getDetail().ResolveLock,
CallerStartTS: c.startTS,
Locks: locks,
Detail: &c.getDetail().ResolveLock,
PessimisticRegionResolve: true,
}
resolveLockRes, err := c.store.GetLockResolver().ResolveLocksWithOpts(bo, resolveLockOpts)
if err != nil {
Expand Down
55 changes: 43 additions & 12 deletions txnkv/txnlock/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ func (s TxnStatus) StatusCacheable() bool {
return false
}

func (s TxnStatus) String() string {
// TODO: print primary lock after redact is introduced.
return fmt.Sprintf("ttl:%v commit_ts:%v action: %v", s.ttl, s.commitTS, s.action)
}

// Lock represents a lock from tikv server.
type Lock struct {
Key []byte
Expand Down Expand Up @@ -261,7 +266,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *retry.Backoffer, locks []*Lock, lo
//
// `resolvePessimisticLock` should be called after calling `getTxnStatus`.
// See: https://github.com/pingcap/tidb/issues/45134
err := lr.resolvePessimisticLock(bo, l)
err := lr.resolvePessimisticLock(bo, l, false, nil)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -350,11 +355,12 @@ func (lr *LockResolver) BatchResolveLocks(bo *retry.Backoffer, locks []*Lock, lo

// ResolveLocksOptions is the options struct for calling resolving lock.
type ResolveLocksOptions struct {
CallerStartTS uint64
Locks []*Lock
Lite bool
ForRead bool
Detail *util.ResolveLockDetail
CallerStartTS uint64
Locks []*Lock
Lite bool
ForRead bool
Detail *util.ResolveLockDetail
PessimisticRegionResolve bool
}

// ResolveLockResult is the result struct for resolving lock.
Expand Down Expand Up @@ -441,7 +447,7 @@ func (lr *LockResolver) ResolveLocksDone(callerStartTS uint64, token int) {
}

func (lr *LockResolver) resolveLocks(bo *retry.Backoffer, opts ResolveLocksOptions) (ResolveLockResult, error) {
callerStartTS, locks, forRead, lite, detail := opts.CallerStartTS, opts.Locks, opts.ForRead, opts.Lite, opts.Detail
callerStartTS, locks, forRead, lite, detail, pessimisticRegionResolve := opts.CallerStartTS, opts.Locks, opts.ForRead, opts.Lite, opts.Detail, opts.PessimisticRegionResolve
if lr.testingKnobs.meetLock != nil {
lr.testingKnobs.meetLock(locks)
}
Expand All @@ -464,6 +470,7 @@ func (lr *LockResolver) resolveLocks(bo *retry.Backoffer, opts ResolveLocksOptio
// TxnID -> []Region, record resolved Regions.
// TODO: Maybe put it in LockResolver and share by all txns.
cleanTxns := make(map[uint64]map[locate.RegionVerID]struct{})
pessimisticCleanTxns := make(map[uint64]map[locate.RegionVerID]struct{})
var resolve func(*Lock, bool) (TxnStatus, error)
resolve = func(l *Lock, forceSyncCommit bool) (TxnStatus, error) {
status, err := lr.getTxnStatusFromLock(bo, l, callerStartTS, forceSyncCommit, detail)
Expand All @@ -482,7 +489,8 @@ func (lr *LockResolver) resolveLocks(bo *retry.Backoffer, opts ResolveLocksOptio
return status, nil
}

// If the lock is committed or rollbacked, resolve lock.
// If the lock is committed or rolled back, resolve lock.
// If the lock is regarded as an expired pessimistic lock, pessimistic rollback it.
metrics.LockResolverCountWithExpired.Inc()
cleanRegions, exists := cleanTxns[l.TxnID]
if !exists {
Expand All @@ -504,7 +512,16 @@ func (lr *LockResolver) resolveLocks(bo *retry.Backoffer, opts ResolveLocksOptio
}
if l.LockType == kvrpcpb.Op_PessimisticLock {
// pessimistic locks don't block read so it needn't be async.
err = lr.resolvePessimisticLock(bo, l)
if pessimisticRegionResolve {
pessimisticCleanRegions, exists := pessimisticCleanTxns[l.TxnID]
if !exists {
pessimisticCleanRegions = make(map[locate.RegionVerID]struct{})
pessimisticCleanTxns[l.TxnID] = pessimisticCleanRegions
}
err = lr.resolvePessimisticLock(bo, l, true, pessimisticCleanRegions)
} else {
err = lr.resolvePessimisticLock(bo, l, false, nil)
}
} else {
if forRead {
asyncCtx := context.WithValue(lr.asyncResolveCtx, util.RequestSourceKey, bo.GetCtx().Value(util.RequestSourceKey))
Expand Down Expand Up @@ -669,7 +686,8 @@ func (lr *LockResolver) getTxnStatusFromLock(bo *retry.Backoffer, l *Lock, calle
if lr.store.GetOracle().UntilExpired(l.TxnID, l.TTL, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) <= 0 {
logutil.Logger(bo.GetCtx()).Warn("lock txn not found, lock has expired",
zap.Uint64("CallerStartTs", callerStartTS),
zap.Stringer("lock str", l))
zap.Stringer("lock str", l),
zap.Stringer("status", status))
if l.LockType == kvrpcpb.Op_PessimisticLock {
if _, err := util.EvalFailpoint("txnExpireRetTTL"); err == nil {
return TxnStatus{action: kvrpcpb.Action_LockNotExistDoNothing},
Expand Down Expand Up @@ -1178,7 +1196,10 @@ func (lr *LockResolver) resolveLock(bo *retry.Backoffer, l *Lock, status TxnStat

// resolvePessimisticLock handles pessimistic locks after checking txn status.
// Note that this function assumes `CheckTxnStatus` is done (or `getTxnStatusFromLock` has been called) on the lock.
func (lr *LockResolver) resolvePessimisticLock(bo *retry.Backoffer, l *Lock) error {
// When "pessimisticRegionResolve" is set to false, only pessimistic rollback input lock. Otherwise, the corresponding
// region will be scanned, and all relevant pessimistic locks that are read will be rolled back at the same time,
// similar to the `resolveLock` function.
func (lr *LockResolver) resolvePessimisticLock(bo *retry.Backoffer, l *Lock, pessimisticRegionResolve bool, pessimisticCleanRegions map[locate.RegionVerID]struct{}) error {
metrics.LockResolverCountWithResolveLocks.Inc()
// The lock has been resolved by getTxnStatusFromLock.
if bytes.Equal(l.Key, l.Primary) {
Expand All @@ -1189,14 +1210,21 @@ func (lr *LockResolver) resolvePessimisticLock(bo *retry.Backoffer, l *Lock) err
if err != nil {
return err
}
if pessimisticRegionResolve && pessimisticCleanRegions != nil {
if _, ok := pessimisticCleanRegions[loc.Region]; ok {
return nil
}
}
forUpdateTS := l.LockForUpdateTS
if forUpdateTS == 0 {
forUpdateTS = math.MaxUint64
}
pessimisticRollbackReq := &kvrpcpb.PessimisticRollbackRequest{
StartVersion: l.TxnID,
ForUpdateTs: forUpdateTS,
Keys: [][]byte{l.Key},
}
if !pessimisticRegionResolve {
pessimisticRollbackReq.Keys = [][]byte{l.Key}
}
req := tikvrpc.NewRequest(tikvrpc.CmdPessimisticRollback, pessimisticRollbackReq, kvrpcpb.Context{
ResourceControlContext: &kvrpcpb.ResourceControlContext{
Expand Down Expand Up @@ -1228,6 +1256,9 @@ func (lr *LockResolver) resolvePessimisticLock(bo *retry.Backoffer, l *Lock) err
logutil.Logger(bo.GetCtx()).Error("resolveLock error", zap.Error(err))
return err
}
if pessimisticRegionResolve && pessimisticCleanRegions != nil {
pessimisticCleanRegions[loc.Region] = struct{}{}
}
return nil
}
}
2 changes: 1 addition & 1 deletion txnkv/txnlock/test_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (l LockResolverProbe) ResolveLock(bo *retry.Backoffer, lock *Lock) error {

// ResolvePessimisticLock resolves single pessimistic lock.
func (l LockResolverProbe) ResolvePessimisticLock(bo *retry.Backoffer, lock *Lock) error {
return l.resolvePessimisticLock(bo, lock)
return l.resolvePessimisticLock(bo, lock, false, make(map[locate.RegionVerID]struct{}))
}

// GetTxnStatus sends the CheckTxnStatus request to the TiKV server.
Expand Down

0 comments on commit faf5fbc

Please sign in to comment.