Skip to content

Commit

Permalink
moved event cond from client to torrent
Browse files Browse the repository at this point in the history
  • Loading branch information
mh0lt committed Sep 4, 2024
1 parent 9848cb5 commit 97fd664
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 51 deletions.
60 changes: 34 additions & 26 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ type Client struct {
connStats ConnStats

_mu lockWithDeferreds
event sync.Cond
closed chansync.SetOnce

config *ClientConfig
Expand Down Expand Up @@ -206,7 +205,6 @@ func (cl *Client) init(cfg *ClientConfig) {
g.MakeMap(&cl.dopplegangerAddrs)
cl.torrents = make(map[metainfo.Hash]*Torrent)
cl.activeAnnounceLimiter.SlotsPerKey = 2
cl.event.L = cl.locker()
cl.ipBlockList = cfg.IPBlocklist
cl.httpClient = &http.Client{
Transport: cfg.WebTransport,
Expand Down Expand Up @@ -468,7 +466,6 @@ func (cl *Client) Close() (errs []error) {
for i := range cl.onClose {
cl.onClose[len(cl.onClose)-1-i]()
}
cl.event.Broadcast()
return
}

Expand Down Expand Up @@ -1367,6 +1364,9 @@ func (cl *Client) newTorrentOpt(opts AddTorrentOpts) (t *Torrent) {
webSeeds: make(map[string]*Peer),
gotMetainfoC: make(chan struct{}),
}

t.event.L = &t.mu

var salt [8]byte
rand.Read(salt[:])
t.smartBanCache.Hash = func(b []byte) uint64 {
Expand Down Expand Up @@ -1424,7 +1424,7 @@ func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStor
cl.clearAcceptLimits()
t.updateWantPeersEvent(true)
// Tickle Client.waitAccept, new torrent may want conns.
cl.event.Broadcast()
t.event.Broadcast()
return
}

Expand Down Expand Up @@ -1458,7 +1458,7 @@ func (cl *Client) AddTorrentOpt(opts AddTorrentOpts) (t *Torrent, new bool) {
t.updateWantPeersEvent(false)
}()
// Tickle Client.waitAccept, new torrent may want conns.
cl.event.Broadcast()
t.event.Broadcast()
return
}

Expand Down Expand Up @@ -1545,32 +1545,40 @@ func (cl *Client) dropTorrent(infoHash metainfo.Hash, wg *sync.WaitGroup) (err e
return
}

func (cl *Client) allTorrentsCompleted() bool {
cl.torrentsMu.RLock()
defer cl.torrentsMu.RUnlock()

for _, t := range cl.torrents {
if !t.haveInfo(true) {
return false
}
if !t.haveAllPieces(true, true) {
return false
}
}
return true
}

// Returns true when all torrents are completely downloaded and false if the
// client is stopped before that.
func (cl *Client) WaitAll() bool {
cl.lock()
defer cl.unlock()
for !cl.allTorrentsCompleted() {
if cl.closed.IsSet() {
return false
torrents := cl.torrentsAsSlice()

for {
for _, t := range torrents {
if !func() bool {
t.mu.Lock()
defer t.mu.Lock()

for !t.haveInfo(true) || !t.haveAllPieces(true, true) {
if cl.closed.IsSet() || t.closed.IsSet() {
return false
}

t.event.Wait()
}

return true
}() {
return false
}
}
cl.event.Wait()

next := cl.torrentsAsSlice()

if len(next) <= len(torrents) {
break
}

torrents = next
}

return true
}

Expand Down
2 changes: 1 addition & 1 deletion peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -929,7 +929,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
// that chunk pieces are pended at an appropriate time later however.
}

cl.event.Broadcast()
t.event.Broadcast()
// We do this because we've written a chunk, and may change PieceState.Partial.
t.publishPieceStateChange(pieceIndex(ppReq.Index), false)

Expand Down
4 changes: 2 additions & 2 deletions piece.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,13 +200,13 @@ func (p *Piece) VerifyData() {
p.mu.RUnlock()

// log.Printf("target: %d", target)
p.t.queuePieceCheck(p.index, true)
p.t.queuePieceCheck(p.index, false)
for {
// log.Printf("got %d verifies", p.numVerifies)
if p.numVerifies >= target {
break
}
p.t.cl.event.Wait()
p.t.event.Wait()
}
// log.Print("done")
}
Expand Down
4 changes: 2 additions & 2 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,13 @@ var _ io.ReadSeekCloser = (*reader)(nil)

func (r *reader) SetResponsive() {
r.responsive = true
r.t.cl.event.Broadcast()
r.t.event.Broadcast()
}

// Disable responsive mode. TODO: Remove?
func (r *reader) SetNonResponsive() {
r.responsive = false
r.t.cl.event.Broadcast()
r.t.event.Broadcast()
}

func (r *reader) SetReadahead(readahead int64) {
Expand Down
41 changes: 24 additions & 17 deletions torrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ type Torrent struct {

mu sync.RWMutex
imu sync.RWMutex

// Name used if the info name isn't available. Should be cleared when the
// Info does become available.
displayName string
Expand All @@ -136,7 +137,7 @@ type Torrent struct {
// received that piece.
metadataCompletedChunks []bool
metadataChanged sync.Cond

event sync.Cond
// Closed when .Info is obtained.
gotMetainfoC chan struct{}

Expand Down Expand Up @@ -595,7 +596,7 @@ func (t *Torrent) onSetInfo(lock bool, lockClient bool) {
t.updatePiecePriority(i, "Torrent.OnSetInfo", false)
}
}
t.cl.event.Broadcast()
t.event.Broadcast()
close(t.gotMetainfoC)
t.updateWantPeersEvent(false)
t.requestState = make(map[RequestIndex]requestState)
Expand Down Expand Up @@ -1093,7 +1094,7 @@ func (t *Torrent) close() (err error) {
defer t.mu.Unlock()
t.assertAllPiecesRelativeAvailabilityZero(false)
t.pex.Reset()
t.cl.event.Broadcast()
t.event.Broadcast()
t.pieceStateChanges.Close()
t.updateWantPeersEvent(false)
if t.hashResults != nil {
Expand Down Expand Up @@ -1548,7 +1549,7 @@ func (t *Torrent) readerPosChanged(from, to pieceRange, lock bool) {

func (t *Torrent) maybeNewConns(lock bool) {
// Tickle the accept routine.
t.cl.event.Broadcast()
t.event.Broadcast()
t.openNewConns(lock)
}

Expand Down Expand Up @@ -1684,7 +1685,7 @@ func (t *Torrent) pieceCompletionChanged(piece pieceIndex, reason string, lock b
defer t.mu.Unlock()
}

t.cl.event.Broadcast()
t.event.Broadcast()
if t.pieceComplete(piece, false) {
t.onPieceCompleted(piece, false)
} else {
Expand Down Expand Up @@ -2001,7 +2002,7 @@ func (t *Torrent) assertPendingRequests(lock bool) {
}

func (t *Torrent) dropConnection(c *PeerConn, lock bool) {
t.cl.event.Broadcast()
t.event.Broadcast()
if t.deletePeerConn(c, lock) {
t.openNewConns(lock)
}
Expand Down Expand Up @@ -2300,9 +2301,8 @@ func (t *Torrent) timeboxedAnnounceToDht(s DhtServer) error {
}

func (t *Torrent) dhtAnnouncer(s DhtServer) {
cl := t.cl
cl.lock()
defer cl.unlock()
t.mu.Lock()
defer t.mu.Lock()
for {
for {
if t.closed.IsSet() {
Expand All @@ -2311,21 +2311,28 @@ func (t *Torrent) dhtAnnouncer(s DhtServer) {
// We're also announcing ourselves as a listener, so we don't just want peer addresses.
// TODO: We can include the announce_peer step depending on whether we can receive
// inbound connections. We should probably only announce once every 15 mins too.
if !t.wantAnyConns(true) {
if !t.wantAnyConns(false) {
goto wait
}
// TODO: Determine if there's a listener on the port we're announcing.
if len(cl.dialers) == 0 && len(cl.listeners) == 0 {

if func() bool {
t.mu.Unlock()
defer t.mu.Lock()
t.cl.lock()
defer t.cl.unlock()
// TODO: Determine if there's a listener on the port we're announcing.
return len(t.cl.dialers) == 0 && len(t.cl.listeners) == 0
}() {
goto wait
}
break
wait:
cl.event.Wait()
t.event.Wait()
}
func() {
t.numDHTAnnounces++
cl.unlock()
defer cl.lock()
t.mu.Unlock()
defer t.mu.Lock()
err := t.timeboxedAnnounceToDht(s)
if err != nil {
t.logger.WithDefaultLevel(log.Warning).Printf("error announcing %q to DHT: %s", t, err)
Expand Down Expand Up @@ -2467,7 +2474,7 @@ func (t *Torrent) addPeerConn(c *PeerConn, lockTorrent bool) (err error) {

t.conns[c] = struct{}{}

t.cl.event.Broadcast()
t.event.Broadcast()
// We'll never receive the "p" extended handshake parameter.
if !t.cl.config.DisablePEX && !c.PeerExtensionBytes.SupportsExtended() {
c.mu.Lock()
Expand Down Expand Up @@ -2586,7 +2593,7 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
p.numVerifies++
storageCompletionOk := p.storageCompletionOk
p.mu.Unlock()
t.cl.event.Broadcast()
t.event.Broadcast()

if t.closed.IsSet() {
return
Expand Down
6 changes: 3 additions & 3 deletions ut-holepunching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,13 @@ func TestHolepunchConnect(t *testing.T) {
}

func waitForConns(t *Torrent) {
t.mu.RLock()
defer t.mu.RUnlock()
t.mu.Lock()
defer t.mu.Unlock()
for {
for range t.conns {
return
}
t.cl.event.Wait()
t.event.Wait()
}
}

Expand Down

0 comments on commit 97fd664

Please sign in to comment.