Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix races and panics around connections #17

Merged
merged 21 commits into from
Jul 3, 2024
Merged
45 changes: 24 additions & 21 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ func (cl *Client) incomingConnection(nc net.Conn) {
connString: regularNetConnPeerConnConnString(nc),
})
defer func() {
c.close(true, true)
c.close(true)
}()
c.Discovery = PeerSourceIncoming
cl.runReceivedConn(c)
Expand Down Expand Up @@ -898,7 +898,7 @@ func (cl *Client) outgoingConnection(
cl.unlock()
return
}
defer c.close(true, true)
defer c.close(true)
c.Discovery = opts.peerInfo.Source
c.trusted = opts.peerInfo.Trusted
// runHandshookConn will unlock the connection before calling the
Expand Down Expand Up @@ -1117,7 +1117,7 @@ func (t *Torrent) runHandshookConn(pc *PeerConn, lockClient bool) error {
return err
}

defer t.dropConnection(pc, true, true)
defer t.dropConnection(pc, true)
pc.initUpdateRequestsTimer(true)

if err := pc.mainReadLoop(); err != nil {
Expand Down Expand Up @@ -1148,23 +1148,26 @@ func (c *Peer) updateRequestsTimerFunc() {
c.t.mu.Lock()
defer c.t.mu.Unlock()

c.mu.Lock()
defer c.mu.Unlock()
func() {
c.mu.Lock()
defer c.mu.Unlock()

if c.closed.IsSet() {
return
}
if c.isLowOnRequests(false, false) {
// If there are no outstanding requests, then a request update should have already run.
return
}
if d := time.Since(c.lastRequestUpdate); d < updateRequestsTimerDuration {
// These should be benign, Timer.Stop doesn't guarantee that its function won't run if it's
// already been fired.
torrent.Add("spurious timer requests updates", 1)
return
}
c.updateRequests(peerUpdateRequestsTimerReason, false, false)
if c.closed.IsSet() {
return
}
if c.isLowOnRequests(true, false) {
// If there are no outstanding requests, then a request update should have already run.
return
}
if d := time.Since(c.lastRequestUpdate); d < updateRequestsTimerDuration {
// These should be benign, Timer.Stop doesn't guarantee that its function won't run if it's
// already been fired.
torrent.Add("spurious timer requests updates", 1)
return
}
}()

c.updateRequests(peerUpdateRequestsTimerReason, false)
}

// Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this
Expand Down Expand Up @@ -1205,7 +1208,7 @@ func (pc *PeerConn) sendInitialMessages(lockTorrent bool) {
}, true)
}

if pc.fastEnabled() {
if pc.fastEnabled(true) {
if t.haveAllPieces(lockTorrent, true) {
pc.write(pp.Message{Type: pp.HaveAll}, true)
pc.sentHaves.AddRange(0, bitmap.BitRange(pc.t.NumPieces()))
Expand Down Expand Up @@ -1639,7 +1642,7 @@ func (cl *Client) banPeerIP(ip net.IP) {
if p.remoteIp().Equal(ip) {
t.logger.Levelf(log.Warning, "dropping peer %v with banned ip %v", p, ip)
// Should this be a close?
p.drop(true, false)
p.drop(false)
}
}
}()
Expand Down
2 changes: 1 addition & 1 deletion peer-conn-msg-writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (pc *PeerConn) startMessageWriter() {
}

func (pc *PeerConn) messageWriterRunner() {
defer pc.close(true, true)
defer pc.close(true)
pc.messageWriter.run(pc.t.cl.config.KeepAliveTimeout)
}

Expand Down
9 changes: 5 additions & 4 deletions peer-impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ import (
// BitTorrent protocol connections. Some methods are underlined so as to avoid collisions with
// legacy PeerConn methods.
type peerImpl interface {
// Trigger the actual request state to get updated
handleUpdateRequests(lock bool, lockTorrent bool)
// Trigger the actual request state to get updated. This call needs control of the
// peer lock and assumes it unlocked on entry
handleUpdateRequests(lockTorrent bool)
writeInterested(interested bool, lock bool) bool

// _cancel initiates cancellation of a request and returns acked if it expects the cancel to be
Expand All @@ -20,9 +21,9 @@ type peerImpl interface {
_request(r Request, lock bool) bool
connectionFlags() string
onClose(lockTorrent bool)
onGotInfo(info *metainfo.Info, lock bool, lockTorrent bool)
onGotInfo(info *metainfo.Info, lockTorrent bool)
// Drop connection. This may be a no-op if there is no connection.
drop(lock bool, lockTorrent bool)
drop(lockTorrent bool)
// Rebuke the peer
ban()
String() string
Expand Down
76 changes: 35 additions & 41 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type (
Peer struct {
// First to ensure 64-bit alignment for atomics. See #262.
_stats ConnStats
mu sync.RWMutex
mu sync.RWMutex

t *Torrent

Expand Down Expand Up @@ -364,16 +364,14 @@ func (cn *Peer) writeStatus(w io.Writer, lock bool, lockTorrent bool) {
fmt.Fprintf(w, "\n")
}

func (p *Peer) close(lock bool, lockTorrent bool) {
func (p *Peer) close(lockTorrent bool) {
if lockTorrent && p.t != nil {
p.t.mu.Lock()
defer p.t.mu.Unlock()
}

if lock {
p.mu.RLock()
defer p.mu.RUnlock()
}
p.mu.Lock()
defer p.mu.Unlock()

if !p.closed.Set() {
return
Expand Down Expand Up @@ -555,7 +553,7 @@ func (cn *Peer) request(r RequestIndex, maxRequests int, lock bool, lockTorrent
return cn.peerImpl._request(ppReq, false), nil
}

func (me *Peer) cancel(r RequestIndex, updateRequests bool, lock bool, lockTorrent bool) {
func (me *Peer) cancel(r RequestIndex, updateRequests bool, lockTorrent bool) {
func() {
// keep the torrent and peer locked across the whole
// cancel operation to avoid part processing holes
Expand All @@ -565,10 +563,8 @@ func (me *Peer) cancel(r RequestIndex, updateRequests bool, lock bool, lockTorre
defer me.t.mu.Unlock()
}

if lock {
me.mu.Lock()
defer me.mu.Unlock()
}
me.mu.Lock()
defer me.mu.Unlock()

if !me.deleteRequest(r, false, false) {
panic(fmt.Sprintf("request %d not existing: should have been guarded", r))
Expand All @@ -582,18 +578,18 @@ func (me *Peer) cancel(r RequestIndex, updateRequests bool, lock bool, lockTorre
me.decPeakRequests(false)
}()

if updateRequests && me.isLowOnRequests(lock, lockTorrent) {
me.updateRequests("Peer.cancel", lock, lockTorrent)
if updateRequests && me.isLowOnRequests(true, lockTorrent) {
me.updateRequests("Peer.cancel", lockTorrent)
}
}

// Sets a reason to update requests, and if there wasn't already one, handle it.
func (cn *Peer) updateRequests(reason string, lock bool, lockTorrent bool) {
// This method need control of the peer lock as handleUpdateRequests will iterate
// all peers - and having this peer locked risks deadlocks for those iterations
func (cn *Peer) updateRequests(reason string, lockTorrent bool) {
needUpdate := func() bool {
if lock {
cn.mu.Lock()
defer cn.mu.Unlock()
}
cn.mu.Lock()
defer cn.mu.Unlock()

if cn.needRequestUpdate != "" {
return false
Expand All @@ -604,7 +600,7 @@ func (cn *Peer) updateRequests(reason string, lock bool, lockTorrent bool) {
}()

if needUpdate {
cn.handleUpdateRequests(lock, lockTorrent)
cn.handleUpdateRequests(lockTorrent)
}
}

Expand Down Expand Up @@ -716,7 +712,7 @@ func (c *Peer) remoteRejectedRequest(r RequestIndex) bool {
}

if c.isLowOnRequests(true, true) {
c.updateRequests("Peer.remoteRejectedRequest", true, true)
c.updateRequests("Peer.remoteRejectedRequest", true)
}
c.decExpectedChunkReceive(r, true)
return true
Expand Down Expand Up @@ -859,7 +855,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
if p == c {
panic("should not be pending request from conn that just received it")
}
p.cancel(req, true, true, false)
p.cancel(req, true, false)
}

return piece, intended, err
Expand Down Expand Up @@ -893,7 +889,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
// Necessary to pass TestReceiveChunkStorageFailureSeederFastExtensionDisabled. I think a
// request update runs while we're writing the chunk that just failed. Then we never do a
// fresh update after pending the failed request.
c.updateRequests("Peer.receiveChunk error writing chunk", true, false)
c.updateRequests("Peer.receiveChunk error writing chunk", false)
t.onWriteChunkErr(err)
return nil
}
Expand All @@ -916,7 +912,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
// this is moved after all processing to avoid request rehere because as we no longer have a
if intended {
if c.isLowOnRequests(true, false) {
c.updateRequests("Peer.receiveChunk deleted request", true, false)
c.updateRequests("Peer.receiveChunk deleted request", false)
}
}

Expand Down Expand Up @@ -1007,30 +1003,31 @@ func (c *Peer) deleteRequest(r RequestIndex, lock bool, lockTorrent bool) bool {
return true
}

func (c *Peer) deleteAllRequests(reason string, lock bool, lockTorrent bool) {
func (c *Peer) deleteAllRequests(reason string, lockTorrent bool) {
if lockTorrent {
c.t.mu.Lock()
defer c.t.mu.Unlock()
}

if lock {
func() {
c.mu.Lock()
defer c.mu.Unlock()
}

if c.requestState.Requests.IsEmpty() {
return
}
c.requestState.Requests.IterateSnapshot(func(x RequestIndex) bool {
if !c.deleteRequest(x, false, false) {
panic("request should exist")
if c.requestState.Requests.IsEmpty() {
return
}
return true
})
c.assertNoRequests()
c.requestState.Requests.IterateSnapshot(func(x RequestIndex) bool {
if !c.deleteRequest(x, false, false) {
panic("request should exist")
}
return true
})
c.assertNoRequests()
}()

c.t.iterPeers(func(p *Peer) {
if p.isLowOnRequests(false, false) {
p.updateRequests(reason, false, false)
if p.isLowOnRequests(true, false) {
p.updateRequests(reason, false)
}
}, false)
}
Expand All @@ -1047,11 +1044,8 @@ func (c *Peer) cancelAllRequests(lockTorrent bool) {
defer c.t.mu.Unlock()
}

c.mu.Lock()
defer c.mu.Unlock()

c.requestState.Requests.IterateSnapshot(func(x RequestIndex) bool {
c.cancel(x, false, false, false)
c.cancel(x, false, false)
return true
})
c.assertNoRequests()
Expand Down
Loading
Loading