Skip to content

Commit

Permalink
make maybeUpdateActualRequestState atomic
Browse files Browse the repository at this point in the history
  • Loading branch information
mh0lt committed Jul 2, 2024
1 parent 0812b77 commit d891cd7
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 12 deletions.
15 changes: 6 additions & 9 deletions peerconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,11 +550,6 @@ func (cn *PeerConn) onPeerHasAllPiecesNoTriggers(lock bool, lockTorrent bool) {
}

func (cn *PeerConn) onPeerHasAllPieces(lock bool, lockTorrent bool) {
cn.onPeerHasAllPiecesNoTriggers(lock, lockTorrent)
cn.peerHasAllPiecesTriggers(lock, lockTorrent)
}

func (cn *PeerConn) peerHasAllPiecesTriggers(lock bool, lockTorrent bool) {
if lockTorrent {
cn.t.mu.Lock()
defer cn.t.mu.Unlock()
Expand All @@ -565,6 +560,8 @@ func (cn *PeerConn) peerHasAllPiecesTriggers(lock bool, lockTorrent bool) {
defer cn.mu.Unlock()
}

cn.onPeerHasAllPiecesNoTriggers(false, false)

isEmpty := cn.t._pendingPieces.IsEmpty()

if !isEmpty {
Expand Down Expand Up @@ -1006,10 +1003,10 @@ func (c *PeerConn) mainReadLoop() (err error) {
err = c.peerSentHaveNone(true, true)
case pp.Reject:
req := newRequestFromMessage(&msg)
if !c.remoteRejectedRequest(c.t.requestIndexFromRequest(req, false)) {
err = fmt.Errorf("received invalid reject for request %v", req)
c.logger.Levelf(log.Debug, "%v", err)
}
if !c.remoteRejectedRequest(c.t.requestIndexFromRequest(req, false)) {
err = fmt.Errorf("received invalid reject for request %v", req)
c.logger.Levelf(log.Debug, "%v", err)
}
case pp.AllowedFast:
torrent.Add("allowed fasts received", 1)
log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c).LogLevel(log.Debug, c.t.logger)
Expand Down
21 changes: 18 additions & 3 deletions requesting.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,21 @@ func (p *Peer) maybeUpdateActualRequestState(lock bool, lockTorrent bool) {
if p.closed.IsSet() {
return
}

// hold the torrent/peer locks across the whole state request process
// so that its remains atomic for both peer and torrent - this implies
// that only one peer can update the torrent state at a time

if lockTorrent {
p.mu.RLock()
defer p.mu.RUnlock()
}

if lock {
p.mu.RLock()
defer p.mu.RUnlock()
}

if p.needRequestUpdate == "" {
return
}
Expand All @@ -334,9 +349,9 @@ func (p *Peer) maybeUpdateActualRequestState(lock bool, lockTorrent bool) {
context.Background(),
pprof.Labels("update request", p.needRequestUpdate),
func(_ context.Context) {
next := p.getDesiredRequestState(false, lock, lockTorrent)
p.applyRequestState(next, lock, lockTorrent)
p.t.cacheNextRequestIndexesForReuse(next.Requests.requestIndexes, lockTorrent)
next := p.getDesiredRequestState(false, false, false)
p.applyRequestState(next, false, false)
p.t.cacheNextRequestIndexesForReuse(next.Requests.requestIndexes, false)
},
)
}
Expand Down

0 comments on commit d891cd7

Please sign in to comment.