From 97fd6641d7e440c2df06848537117ea9a5f2b0fa Mon Sep 17 00:00:00 2001 From: Mark Holt Date: Wed, 4 Sep 2024 17:16:48 +0100 Subject: [PATCH] moved event cond from client to torrent --- client.go | 60 +++++++++++++++++++++++------------------ peer.go | 2 +- piece.go | 4 +-- reader.go | 4 +-- torrent.go | 41 ++++++++++++++++------------ ut-holepunching_test.go | 6 ++--- 6 files changed, 66 insertions(+), 51 deletions(-) diff --git a/client.go b/client.go index e9f3e909ff..72fa9be6c4 100644 --- a/client.go +++ b/client.go @@ -60,7 +60,6 @@ type Client struct { connStats ConnStats _mu lockWithDeferreds - event sync.Cond closed chansync.SetOnce config *ClientConfig @@ -206,7 +205,6 @@ func (cl *Client) init(cfg *ClientConfig) { g.MakeMap(&cl.dopplegangerAddrs) cl.torrents = make(map[metainfo.Hash]*Torrent) cl.activeAnnounceLimiter.SlotsPerKey = 2 - cl.event.L = cl.locker() cl.ipBlockList = cfg.IPBlocklist cl.httpClient = &http.Client{ Transport: cfg.WebTransport, @@ -468,7 +466,6 @@ func (cl *Client) Close() (errs []error) { for i := range cl.onClose { cl.onClose[len(cl.onClose)-1-i]() } - cl.event.Broadcast() return } @@ -1367,6 +1364,9 @@ func (cl *Client) newTorrentOpt(opts AddTorrentOpts) (t *Torrent) { webSeeds: make(map[string]*Peer), gotMetainfoC: make(chan struct{}), } + + t.event.L = &t.mu + var salt [8]byte rand.Read(salt[:]) t.smartBanCache.Hash = func(b []byte) uint64 { @@ -1424,7 +1424,7 @@ func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStor cl.clearAcceptLimits() t.updateWantPeersEvent(true) // Tickle Client.waitAccept, new torrent may want conns. - cl.event.Broadcast() + t.event.Broadcast() return } @@ -1458,7 +1458,7 @@ func (cl *Client) AddTorrentOpt(opts AddTorrentOpts) (t *Torrent, new bool) { t.updateWantPeersEvent(false) }() // Tickle Client.waitAccept, new torrent may want conns. - cl.event.Broadcast() + t.event.Broadcast() return } @@ -1545,32 +1545,40 @@ func (cl *Client) dropTorrent(infoHash metainfo.Hash, wg *sync.WaitGroup) (err e return } -func (cl *Client) allTorrentsCompleted() bool { - cl.torrentsMu.RLock() - defer cl.torrentsMu.RUnlock() - - for _, t := range cl.torrents { - if !t.haveInfo(true) { - return false - } - if !t.haveAllPieces(true, true) { - return false - } - } - return true -} - // Returns true when all torrents are completely downloaded and false if the // client is stopped before that. func (cl *Client) WaitAll() bool { - cl.lock() - defer cl.unlock() - for !cl.allTorrentsCompleted() { - if cl.closed.IsSet() { - return false + torrents := cl.torrentsAsSlice() + + for { + for _, t := range torrents { + if !func() bool { + t.mu.Lock() + defer t.mu.Lock() + + for !t.haveInfo(true) || !t.haveAllPieces(true, true) { + if cl.closed.IsSet() || t.closed.IsSet() { + return false + } + + t.event.Wait() + } + + return true + }() { + return false + } } - cl.event.Wait() + + next := cl.torrentsAsSlice() + + if len(next) <= len(torrents) { + break + } + + torrents = next } + return true } diff --git a/peer.go b/peer.go index dddb3ed3c6..22a4c2d6c1 100644 --- a/peer.go +++ b/peer.go @@ -929,7 +929,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { // that chunk pieces are pended at an appropriate time later however. } - cl.event.Broadcast() + t.event.Broadcast() // We do this because we've written a chunk, and may change PieceState.Partial. t.publishPieceStateChange(pieceIndex(ppReq.Index), false) diff --git a/piece.go b/piece.go index 80c5f8791d..b365453c01 100644 --- a/piece.go +++ b/piece.go @@ -200,13 +200,13 @@ func (p *Piece) VerifyData() { p.mu.RUnlock() // log.Printf("target: %d", target) - p.t.queuePieceCheck(p.index, true) + p.t.queuePieceCheck(p.index, false) for { // log.Printf("got %d verifies", p.numVerifies) if p.numVerifies >= target { break } - p.t.cl.event.Wait() + p.t.event.Wait() } // log.Print("done") } diff --git a/reader.go b/reader.go index 801b7bea37..9db3fc21cb 100644 --- a/reader.go +++ b/reader.go @@ -70,13 +70,13 @@ var _ io.ReadSeekCloser = (*reader)(nil) func (r *reader) SetResponsive() { r.responsive = true - r.t.cl.event.Broadcast() + r.t.event.Broadcast() } // Disable responsive mode. TODO: Remove? func (r *reader) SetNonResponsive() { r.responsive = false - r.t.cl.event.Broadcast() + r.t.event.Broadcast() } func (r *reader) SetReadahead(readahead int64) { diff --git a/torrent.go b/torrent.go index fa034ebe6e..35dd8e12c3 100644 --- a/torrent.go +++ b/torrent.go @@ -124,6 +124,7 @@ type Torrent struct { mu sync.RWMutex imu sync.RWMutex + // Name used if the info name isn't available. Should be cleared when the // Info does become available. displayName string @@ -136,7 +137,7 @@ type Torrent struct { // received that piece. metadataCompletedChunks []bool metadataChanged sync.Cond - + event sync.Cond // Closed when .Info is obtained. gotMetainfoC chan struct{} @@ -595,7 +596,7 @@ func (t *Torrent) onSetInfo(lock bool, lockClient bool) { t.updatePiecePriority(i, "Torrent.OnSetInfo", false) } } - t.cl.event.Broadcast() + t.event.Broadcast() close(t.gotMetainfoC) t.updateWantPeersEvent(false) t.requestState = make(map[RequestIndex]requestState) @@ -1093,7 +1094,7 @@ func (t *Torrent) close() (err error) { defer t.mu.Unlock() t.assertAllPiecesRelativeAvailabilityZero(false) t.pex.Reset() - t.cl.event.Broadcast() + t.event.Broadcast() t.pieceStateChanges.Close() t.updateWantPeersEvent(false) if t.hashResults != nil { @@ -1548,7 +1549,7 @@ func (t *Torrent) readerPosChanged(from, to pieceRange, lock bool) { func (t *Torrent) maybeNewConns(lock bool) { // Tickle the accept routine. - t.cl.event.Broadcast() + t.event.Broadcast() t.openNewConns(lock) } @@ -1684,7 +1685,7 @@ func (t *Torrent) pieceCompletionChanged(piece pieceIndex, reason string, lock b defer t.mu.Unlock() } - t.cl.event.Broadcast() + t.event.Broadcast() if t.pieceComplete(piece, false) { t.onPieceCompleted(piece, false) } else { @@ -2001,7 +2002,7 @@ func (t *Torrent) assertPendingRequests(lock bool) { } func (t *Torrent) dropConnection(c *PeerConn, lock bool) { - t.cl.event.Broadcast() + t.event.Broadcast() if t.deletePeerConn(c, lock) { t.openNewConns(lock) } @@ -2300,9 +2301,8 @@ func (t *Torrent) timeboxedAnnounceToDht(s DhtServer) error { } func (t *Torrent) dhtAnnouncer(s DhtServer) { - cl := t.cl - cl.lock() - defer cl.unlock() + t.mu.Lock() + defer t.mu.Lock() for { for { if t.closed.IsSet() { @@ -2311,21 +2311,28 @@ func (t *Torrent) dhtAnnouncer(s DhtServer) { // We're also announcing ourselves as a listener, so we don't just want peer addresses. // TODO: We can include the announce_peer step depending on whether we can receive // inbound connections. We should probably only announce once every 15 mins too. - if !t.wantAnyConns(true) { + if !t.wantAnyConns(false) { goto wait } - // TODO: Determine if there's a listener on the port we're announcing. - if len(cl.dialers) == 0 && len(cl.listeners) == 0 { + + if func() bool { + t.mu.Unlock() + defer t.mu.Lock() + t.cl.lock() + defer t.cl.unlock() + // TODO: Determine if there's a listener on the port we're announcing. + return len(t.cl.dialers) == 0 && len(t.cl.listeners) == 0 + }() { goto wait } break wait: - cl.event.Wait() + t.event.Wait() } func() { t.numDHTAnnounces++ - cl.unlock() - defer cl.lock() + t.mu.Unlock() + defer t.mu.Lock() err := t.timeboxedAnnounceToDht(s) if err != nil { t.logger.WithDefaultLevel(log.Warning).Printf("error announcing %q to DHT: %s", t, err) @@ -2467,7 +2474,7 @@ func (t *Torrent) addPeerConn(c *PeerConn, lockTorrent bool) (err error) { t.conns[c] = struct{}{} - t.cl.event.Broadcast() + t.event.Broadcast() // We'll never receive the "p" extended handshake parameter. if !t.cl.config.DisablePEX && !c.PeerExtensionBytes.SupportsExtended() { c.mu.Lock() @@ -2586,7 +2593,7 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) { p.numVerifies++ storageCompletionOk := p.storageCompletionOk p.mu.Unlock() - t.cl.event.Broadcast() + t.event.Broadcast() if t.closed.IsSet() { return diff --git a/ut-holepunching_test.go b/ut-holepunching_test.go index 76209285cc..5e80a61774 100644 --- a/ut-holepunching_test.go +++ b/ut-holepunching_test.go @@ -133,13 +133,13 @@ func TestHolepunchConnect(t *testing.T) { } func waitForConns(t *Torrent) { - t.mu.RLock() - defer t.mu.RUnlock() + t.mu.Lock() + defer t.mu.Unlock() for { for range t.conns { return } - t.cl.event.Wait() + t.event.Wait() } }