Skip to content

Commit

Permalink
protcted t.cons
Browse files Browse the repository at this point in the history
  • Loading branch information
mh0lt committed May 31, 2024
1 parent e49875d commit ecb0752
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 29 deletions.
8 changes: 4 additions & 4 deletions t.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,8 @@ func (t *Torrent) Piece(i pieceIndex) *Piece {
}

func (t *Torrent) PeerConns() []*PeerConn {
t.cl.rLock()
defer t.cl.rUnlock()
t.mu.RLock()
defer t.mu.RUnlock()
ret := make([]*PeerConn, 0, len(t.conns))
for c := range t.conns {
ret = append(ret, c)
Expand All @@ -276,8 +276,8 @@ func (t *Torrent) PeerConns() []*PeerConn {
}

func (t *Torrent) WebseedPeerConns() []*Peer {
t.cl.rLock()
defer t.cl.rUnlock()
t.mu.RLock()
defer t.mu.RUnlock()
ret := make([]*Peer, 0, len(t.conns))
for _, c := range t.webSeeds {
ret = append(ret, c)
Expand Down
141 changes: 118 additions & 23 deletions torrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,14 +192,18 @@ func (t *Torrent) length() int64 {

func (t *Torrent) selectivePieceAvailabilityFromPeers(i pieceIndex) (count int) {
// This could be done with roaring.BitSliceIndexing.
t.iterPeers(func(peer *Peer) {
if _, ok := t.connsWithAllPieces[peer]; ok {
for _, peer := range t.peersAsSlice() {
t.mu.RLock()
_, ok := t.connsWithAllPieces[peer]
t.mu.RUnlock()

if ok {
return
}
if peer.peerHasPiece(i) {
count++
}
})
}
return
}

Expand Down Expand Up @@ -257,8 +261,8 @@ func (t *Torrent) KnownSwarm() (ks []PeerInfo) {
}

// Add active peers to the list
t.cl.rLock()
defer t.cl.rUnlock()
t.mu.RLock()
defer t.mu.RUnlock()
for conn := range t.conns {
ks = append(ks, PeerInfo{
Id: conn.PeerID,
Expand Down Expand Up @@ -302,6 +306,9 @@ func (t *Torrent) pieceCompleteUncached(piece pieceIndex) storage.Completion {

// There's a connection to that address already.
func (t *Torrent) addrActive(addr string) bool {
t.mu.RLock()
defer t.mu.RUnlock()

if _, ok := t.halfOpen[addr]; ok {
return true
}
Expand All @@ -321,7 +328,9 @@ func (t *Torrent) appendUnclosedConns(ret []*PeerConn) []*PeerConn {
}

func (t *Torrent) appendConns(ret []*PeerConn, f func(*PeerConn) bool) []*PeerConn {
for c := range t.conns {
conns := t.peerConnsAsSlice()
defer conns.free()
for _, c := range conns {
if f(c) {
ret = append(ret, c)
}
Expand Down Expand Up @@ -574,7 +583,10 @@ func (t *Torrent) setMetadataSize(size int) (err error) {
t.metadataBytes = make([]byte, size)
t.metadataCompletedChunks = make([]bool, (size+(1<<14)-1)/(1<<14))
t.metadataChanged.Broadcast()
for c := range t.conns {

conns := t.peerConnsAsSlice()
defer conns.free()
for _, c := range conns {
c.requestPendingMetadata()
}
return
Expand Down Expand Up @@ -709,6 +721,9 @@ func (psr PieceStateRun) String() (ret string) {
}

func (t *Torrent) writeStatus(w io.Writer) {
t.mu.RLock()
defer t.mu.RUnlock()

fmt.Fprintf(w, "Infohash: %s\n", t.infoHash.HexString())
fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
if !t.haveInfo() {
Expand Down Expand Up @@ -794,7 +809,9 @@ func (t *Torrent) writeStatus(w io.Writer) {
fmt.Fprintf(w, "webseeds:\n")
t.writePeerStatuses(w, maps.Values(t.webSeeds))

peerConns := maps.Keys(t.conns)
peerConns := t.peerConnsAsSlice()
defer peerConns.free()

// Peers without priorities first, then those with. I'm undecided about how to order peers
// without priorities.
sort.Slice(peerConns, func(li, ri int) bool {
Expand Down Expand Up @@ -1141,7 +1158,25 @@ func (t *Torrent) wantPieceIndex(index pieceIndex) bool {
// conns (which is a map).
var peerConnSlices sync.Pool

func getPeerConnSlice(cap int) []*PeerConn {
type conns []*PeerConn

func (c conns) free() {
peerConnSlices.Put(c)
}

func (t *Torrent) peerConnsAsSlice() conns {
t.mu.RLock()
defer t.mu.RUnlock()

conns := getPeerConnSlice(len(t.conns))
for k := range t.conns {
conns = append(conns, k)
}

return conns
}

func getPeerConnSlice(cap int) conns {
getInterface := peerConnSlices.Get()
if getInterface == nil {
return make([]*PeerConn, 0, cap)
Expand All @@ -1153,9 +1188,13 @@ func getPeerConnSlice(cap int) []*PeerConn {
// Calls the given function with a slice of unclosed conns. It uses a pool to reduce allocations as
// this is a frequent occurrence.
func (t *Torrent) withUnclosedConns(f func([]*PeerConn)) {
sl := t.appendUnclosedConns(getPeerConnSlice(len(t.conns)))
t.mu.RLock()
conns := getPeerConnSlice(len(t.conns))
t.mu.RUnlock()

sl := t.appendUnclosedConns(conns)
f(sl)
peerConnSlices.Put(sl)
conns.free()
}

func (t *Torrent) worstBadConnFromSlice(opts worseConnLensOpts, sl []*PeerConn) *PeerConn {
Expand Down Expand Up @@ -1395,6 +1434,9 @@ func (t *Torrent) pieceCompletionChanged(piece pieceIndex, reason string) {
}

func (t *Torrent) numReceivedConns() (ret int) {
t.mu.RLock()
defer t.mu.RUnlock()

for c := range t.conns {
if c.Discovery == PeerSourceIncoming {
ret++
Expand All @@ -1404,6 +1446,9 @@ func (t *Torrent) numReceivedConns() (ret int) {
}

func (t *Torrent) numOutgoingConns() (ret int) {
t.mu.RLock()
defer t.mu.RUnlock()

for c := range t.conns {
if c.outgoing {
ret++
Expand All @@ -1415,7 +1460,12 @@ func (t *Torrent) numOutgoingConns() (ret int) {
func (t *Torrent) maxHalfOpen() int {
// Note that if we somehow exceed the maximum established conns, we want
// the negative value to have an effect.
establishedHeadroom := int64(t.maxEstablishedConns - len(t.conns))

t.mu.RLock()
lenConns := len(t.conns)
t.mu.RUnlock()

establishedHeadroom := int64(t.maxEstablishedConns - lenConns)
extraIncoming := int64(t.numReceivedConns() - t.maxEstablishedConns/2)
// We want to allow some experimentation with new peers, and to try to
// upset an oversupply of received connections.
Expand Down Expand Up @@ -1605,6 +1655,9 @@ func (t *Torrent) SetInfoBytes(b []byte) (err error) {

// Returns true if connection is removed from torrent.Conns.
func (t *Torrent) deletePeerConn(c *PeerConn) (ret bool) {
t.mu.Lock()
defer t.mu.Unlock()

if !c.closed.IsSet() {
panic("connection is not closed")
// There are behaviours prevented by the closed state that will fail
Expand Down Expand Up @@ -1984,8 +2037,8 @@ func (t *Torrent) addPeers(peers []PeerInfo) (added int) {
// The returned TorrentStats may require alignment in memory. See
// https://github.com/anacrolix/torrent/issues/383.
func (t *Torrent) Stats() TorrentStats {
t.cl.rLock()
defer t.cl.rUnlock()
t.mu.RLock()
defer t.mu.RUnlock()
return t.statsLocked()
}

Expand All @@ -2007,6 +2060,9 @@ func (t *Torrent) statsLocked() (ret TorrentStats) {

// The total number of peers in the torrent.
func (t *Torrent) numTotalPeers() int {
t.mu.RLock()
defer t.mu.RUnlock()

peers := make(map[string]struct{})
for conn := range t.conns {
ra := conn.conn.RemoteAddr()
Expand Down Expand Up @@ -2044,6 +2100,9 @@ func (t *Torrent) reconcileHandshakeStats(c *Peer) {

// Returns true if the connection is added.
func (t *Torrent) addPeerConn(c *PeerConn) (err error) {
t.mu.Lock()
defer t.mu.Unlock()

defer func() {
if err == nil {
torrent.Add("added connections", 1)
Expand Down Expand Up @@ -2107,6 +2166,9 @@ func (t *Torrent) newConnsAllowed() bool {
}

func (t *Torrent) wantAnyConns() bool {
t.mu.RLock()
defer t.mu.RUnlock()

if !t.networkingEnabled.Bool() {
return false
}
Expand All @@ -2120,13 +2182,18 @@ func (t *Torrent) wantAnyConns() bool {
}

func (t *Torrent) wantOutgoingConns() bool {

if !t.newConnsAllowed() {
return false
}

t.mu.RLock()
if len(t.conns) < t.maxEstablishedConns {
return true
}
numIncomingConns := len(t.conns) - t.numOutgoingConns()
t.mu.RUnlock()

return t.worstBadConn(worseConnLensOpts{
incomingIsBad: numIncomingConns-t.numOutgoingConns() > 1,
outgoingIsBad: false,
Expand All @@ -2137,30 +2204,39 @@ func (t *Torrent) wantIncomingConns() bool {
if !t.newConnsAllowed() {
return false
}

t.mu.RLock()
if len(t.conns) < t.maxEstablishedConns {
return true
}
numIncomingConns := len(t.conns) - t.numOutgoingConns()
t.mu.RUnlock()

return t.worstBadConn(worseConnLensOpts{
incomingIsBad: false,
outgoingIsBad: t.numOutgoingConns()-numIncomingConns > 1,
}) != nil
}

func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
t.cl.lock()
defer t.cl.unlock()
t.mu.Lock()
lenConns := len(t.conns)
oldMax = t.maxEstablishedConns
t.maxEstablishedConns = max
t.mu.Unlock()

wcs := worseConnSlice{
conns: t.appendConns(nil, func(*PeerConn) bool {
return true
}),
}
wcs.initKeys(worseConnLensOpts{})
heap.Init(&wcs)
for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 {
for lenConns > t.maxEstablishedConns && wcs.Len() > 0 {
t.dropConnection(heap.Pop(&wcs).(*PeerConn))
t.mu.RLock()
lenConns = len(t.conns)
t.mu.RUnlock()
}
t.openNewConns()
return oldMax
Expand Down Expand Up @@ -2305,7 +2381,9 @@ func (t *Torrent) onPieceCompleted(piece pieceIndex) {
t.pendAllChunkSpecs(piece)
t.cancelRequestsForPiece(piece)
t.piece(piece).readerCond.Broadcast()
for conn := range t.conns {
conns := t.peerConnsAsSlice()
defer conns.free()
for _, conn := range conns {
conn.have(piece)
t.maybeDropMutuallyCompletePeer(conn)
}
Expand Down Expand Up @@ -2535,6 +2613,9 @@ func (t *Torrent) connectingToPeerAddr(addrStr string) bool {
}

func (t *Torrent) hasPeerConnForAddr(x PeerRemoteAddr) bool {
t.mu.RLock()
defer t.mu.RUnlock()

addrStr := x.String()
for c := range t.conns {
ra := c.RemoteAddr
Expand Down Expand Up @@ -2686,7 +2767,9 @@ func (t *Torrent) DisallowDataUpload() {
t.cl.lock()
defer t.cl.unlock()
t.dataUploadDisallowed = true
for c := range t.conns {
conns := t.peerConnsAsSlice()
defer conns.free()
for _, c := range conns {
// TODO: This doesn't look right. Shouldn't we tickle writers to choke peers or something instead?
c.updateRequests("disallow data upload")
}
Expand All @@ -2695,13 +2778,16 @@ func (t *Torrent) DisallowDataUpload() {
// Sets a handler that is called if there's an error writing a chunk to local storage. By default,
// or if nil, a critical message is logged, and data download is disabled.
func (t *Torrent) SetOnWriteChunkError(f func(error)) {
t.cl.lock()
defer t.cl.unlock()
t.mu.Lock()
defer t.mu.Unlock()
t.userOnWriteChunkErr = f
}

func (t *Torrent) iterPeers(f func(p *Peer)) {
for pc := range t.conns {
conns := t.peerConnsAsSlice()
defer conns.free()

for _, pc := range conns {
f(&pc.Peer)
}
for _, ws := range t.webSeeds {
Expand Down Expand Up @@ -2851,13 +2937,19 @@ func (t *Torrent) requestingPeer(r RequestIndex) *Peer {
}

func (t *Torrent) addConnWithAllPieces(p *Peer) {
t.mu.Lock()
defer t.mu.Unlock()

if t.connsWithAllPieces == nil {
t.connsWithAllPieces = make(map[*Peer]struct{}, t.maxEstablishedConns)
}
t.connsWithAllPieces[p] = struct{}{}
}

func (t *Torrent) deleteConnWithAllPieces(p *Peer) bool {
t.mu.Lock()
defer t.mu.Unlock()

_, ok := t.connsWithAllPieces[p]
delete(t.connsWithAllPieces, p)
return ok
Expand Down Expand Up @@ -3061,7 +3153,10 @@ func addrPortProtocolStr(addrPort netip.AddrPort) string {

func (t *Torrent) trySendHolepunchRendezvous(addrPort netip.AddrPort) error {
rzsSent := 0
for pc := range t.conns {
conns := t.peerConnsAsSlice()
defer conns.free()

for _, pc := range conns {
if !pc.supportsExtension(utHolepunch.ExtensionName) {
continue
}
Expand Down
4 changes: 2 additions & 2 deletions ut-holepunching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ func TestHolepunchConnect(t *testing.T) {
}

func waitForConns(t *Torrent) {
t.cl.lock()
defer t.cl.unlock()
t.mu.RLock()
defer t.mu.RUnlock()
for {
for range t.conns {
return
Expand Down

0 comments on commit ecb0752

Please sign in to comment.