diff --git a/peerconn.go b/peerconn.go index 27fab546c9..58b7c41184 100644 --- a/peerconn.go +++ b/peerconn.go @@ -550,11 +550,6 @@ func (cn *PeerConn) onPeerHasAllPiecesNoTriggers(lock bool, lockTorrent bool) { } func (cn *PeerConn) onPeerHasAllPieces(lock bool, lockTorrent bool) { - cn.onPeerHasAllPiecesNoTriggers(lock, lockTorrent) - cn.peerHasAllPiecesTriggers(lock, lockTorrent) -} - -func (cn *PeerConn) peerHasAllPiecesTriggers(lock bool, lockTorrent bool) { if lockTorrent { cn.t.mu.Lock() defer cn.t.mu.Unlock() @@ -565,6 +560,8 @@ func (cn *PeerConn) peerHasAllPiecesTriggers(lock bool, lockTorrent bool) { defer cn.mu.Unlock() } + cn.onPeerHasAllPiecesNoTriggers(false, false) + isEmpty := cn.t._pendingPieces.IsEmpty() if !isEmpty { @@ -1006,10 +1003,10 @@ func (c *PeerConn) mainReadLoop() (err error) { err = c.peerSentHaveNone(true, true) case pp.Reject: req := newRequestFromMessage(&msg) - if !c.remoteRejectedRequest(c.t.requestIndexFromRequest(req, false)) { - err = fmt.Errorf("received invalid reject for request %v", req) - c.logger.Levelf(log.Debug, "%v", err) - } + if !c.remoteRejectedRequest(c.t.requestIndexFromRequest(req, false)) { + err = fmt.Errorf("received invalid reject for request %v", req) + c.logger.Levelf(log.Debug, "%v", err) + } case pp.AllowedFast: torrent.Add("allowed fasts received", 1) log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c).LogLevel(log.Debug, c.t.logger) diff --git a/requesting.go b/requesting.go index d5eb7184eb..9b39517c2a 100644 --- a/requesting.go +++ b/requesting.go @@ -321,6 +321,21 @@ func (p *Peer) maybeUpdateActualRequestState(lock bool, lockTorrent bool) { if p.closed.IsSet() { return } + + // hold the torrent/peer locks across the whole state request process + // so that its remains atomic for both peer and torrent - this implies + // that only one peer can update the torrent state at a time + + if lockTorrent { + p.mu.RLock() + defer p.mu.RUnlock() + } + + if lock { + p.mu.RLock() + defer p.mu.RUnlock() + } + if p.needRequestUpdate == "" { return } @@ -334,9 +349,9 @@ func (p *Peer) maybeUpdateActualRequestState(lock bool, lockTorrent bool) { context.Background(), pprof.Labels("update request", p.needRequestUpdate), func(_ context.Context) { - next := p.getDesiredRequestState(false, lock, lockTorrent) - p.applyRequestState(next, lock, lockTorrent) - p.t.cacheNextRequestIndexesForReuse(next.Requests.requestIndexes, lockTorrent) + next := p.getDesiredRequestState(false, false, false) + p.applyRequestState(next, false, false) + p.t.cacheNextRequestIndexesForReuse(next.Requests.requestIndexes, false) }, ) }