Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase webseed parallelization #27

Merged
merged 24 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 26 additions & 8 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,19 +440,35 @@ func (cl *Client) eachDhtServer(f func(DhtServer)) {
func (cl *Client) Close() (errs []error) {
var closeGroup sync.WaitGroup // For concurrent cleanup to complete before returning
cl.lock()

if cl.closed.IsSet() {
cl.unlock()
return
}

var mu sync.Mutex
for _, t := range cl.torrentsAsSlice() {
err := t.close(&closeGroup)
if err != nil {
errs = append(errs, err)
}
closeGroup.Add(1)

go func(t *Torrent) {
defer closeGroup.Done()

err := t.close()
if err != nil {
mu.Lock()
errs = append(errs, err)
mu.Unlock()
}
}(t)
}
cl.closed.Set()
cl.unlock()
closeGroup.Wait()
// don't close resources until torrent closes are complete
for i := range cl.onClose {
cl.onClose[len(cl.onClose)-1-i]()
}
cl.closed.Set()
cl.unlock()
cl.event.Broadcast()
closeGroup.Wait() // defer is LIFO. We want to Wait() after cl.unlock()
return
}

Expand Down Expand Up @@ -1522,7 +1538,9 @@ func (cl *Client) dropTorrent(infoHash metainfo.Hash, wg *sync.WaitGroup) (err e
err = fmt.Errorf("no such torrent")
return
}
err = t.close(wg)
wg.Add(1)
defer wg.Done()
err = t.close()
delete(cl.torrents, infoHash)
return
}
Expand Down
34 changes: 19 additions & 15 deletions mmap_span/mmap_span.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,18 @@ func (ms *MMapSpan) Append(mMap Mmap) {
func (ms *MMapSpan) Flush(onFlush func(size int64)) (errs []error) {
ms.mu.Lock()
defer ms.mu.Unlock()
if ms.flushTimer == nil {
if len(ms.mMaps) > 0 && ms.flushTimer == nil {
ms.flushTimer = time.AfterFunc(ms.FlushTime,
func() {
// TODO deal with logging errors
ms.flushMaps(onFlush, true)
ms.mu.Lock()
flushTimer := ms.flushTimer
ms.flushTimer = nil
ms.mu.Unlock()

if flushTimer != nil {
// TODO deal with logging errors
ms.flushMaps(onFlush, true)
}
})
}
return
Expand All @@ -63,21 +70,18 @@ func (ms *MMapSpan) flushMaps(onFlush func(size int64), lock bool) (errs []error
dirtyPieces = ms.dirtyPieces.Clone()
dirtySize = ms.dirtySize

if ms.flushTimer != nil {
ms.flushTimer = nil
for _, mMap := range ms.mMaps {
err := mMap.Flush()
if err != nil {
errs = append(errs, err)
for _, mMap := range ms.mMaps {
err := mMap.Flush()
if err != nil {
errs = append(errs, err)

}
}
}

if len(errs) == 0 {
flushedCallback = ms.FlushedCallback
ms.dirtyPieces = roaring.Bitmap{}
ms.dirtySize = 0
}
if len(errs) == 0 {
flushedCallback = ms.FlushedCallback
ms.dirtyPieces = roaring.Bitmap{}
ms.dirtySize = 0
}

return
Expand Down
8 changes: 7 additions & 1 deletion torrent-piece-request-order.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,16 @@ func (t *Torrent) clientPieceRequestOrderKey() interface{} {
return t.storage.Capacity
}

func (t *Torrent) deletePieceRequestOrder() {
func (t *Torrent) deletePieceRequestOrder(lockClient bool) {
if t.storage == nil {
return
}

if lockClient {
t.cl.lock()
defer t.cl.unlock()
}

cpro := t.cl.pieceRequestOrder
key := t.clientPieceRequestOrderKey()
pro := cpro[key]
Expand Down
99 changes: 60 additions & 39 deletions torrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -1065,52 +1065,53 @@ func (t *Torrent) numPiecesCompleted(lock bool) (num pieceIndex) {
return pieceIndex(t._completedPieces.GetCardinality())
}

func (t *Torrent) close(wg *sync.WaitGroup) (err error) {
func (t *Torrent) close() (err error) {
if !t.closed.Set() {
err = errors.New("already closed")
return
}

for _, f := range t.onClose {
f()
}

func() {
t.mu.Lock()
defer t.mu.Unlock()

t.iterPeers(func(p *Peer) {
p.close(false)
}, false)
}()

if t.storage != nil {
closed := make(chan struct{})
defer func() { closed <- struct{}{} }()

wg.Add(1)
go func() {
defer wg.Done()
<-closed
t.storageLock.Lock()
defer t.storageLock.Unlock()
if f := t.storage.Close; f != nil {
err1 := f()
if err1 != nil {
t.logger.WithDefaultLevel(log.Warning).Printf("error closing storage: %v", err1)
}
}
}()
t.deletePieceRequestOrder(true)
}

t.mu.Lock()
defer t.mu.Unlock()
func() {
t.mu.Lock()
defer t.mu.Unlock()
t.assertAllPiecesRelativeAvailabilityZero(false)
t.pex.Reset()
t.cl.event.Broadcast()
t.pieceStateChanges.Close()
t.updateWantPeersEvent(false)
if t.hashResults != nil {
close(t.hashResults)
t.hashResults = nil
}
}()

t.iterPeers(func(p *Peer) {
p.close(false)
}, false)
if t.storage != nil {
t.deletePieceRequestOrder()
}
t.assertAllPiecesRelativeAvailabilityZero(false)
t.pex.Reset()
t.cl.event.Broadcast()
t.pieceStateChanges.Close()
t.updateWantPeersEvent(false)
if t.hashResults != nil {
close(t.hashResults)
t.hashResults = nil
t.storageLock.Lock()
defer t.storageLock.Unlock()
if f := t.storage.Close; f != nil {
if err := f(); err != nil {
t.logger.WithDefaultLevel(log.Warning).Printf("error closing storage: %v", err)
}
}
}

return
}

Expand Down Expand Up @@ -2643,7 +2644,7 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
}

err := p.Storage().MarkComplete(!hasDirtyChunks)

if err == nil {
t.allStats(func(cs *ConnStats) {
cs.pieceCompleted(int64(p.length(true)))
Expand Down Expand Up @@ -2773,7 +2774,7 @@ func (t *Torrent) tryCreateMorePieceHashers(lock bool) {
}
if t.hashResults == nil {
t.hashResults = make(chan hashResult, t.cl.config.PieceHashersPerTorrent*16)
go t.processHashResults()
go t.processHashResults(t.hashResults)
}
}()
}
Expand Down Expand Up @@ -2856,7 +2857,14 @@ func (t *Torrent) tryCreatePieceHasher(lock bool) bool {
cs.pieceHashed(length)
})

t.hashResults <- hashResult{p.index, correct, failedPeers, copyErr}
t.mu.RLock()
hashResults := t.hashResults
t.mu.RUnlock()

select {
case hashResults <- hashResult{p.index, correct, failedPeers, copyErr}:
case <-t.closed.Done():
}
}
}()

Expand All @@ -2870,7 +2878,7 @@ type hashResult struct {
copyErr error
}

func (t *Torrent) processHashResults() {
func (t *Torrent) processHashResults(hashResults chan hashResult) {

g, ctx := errgroup.WithContext(context.Background())
_, cancel := context.WithCancel(ctx)
Expand All @@ -2883,12 +2891,25 @@ func (t *Torrent) processHashResults() {
defer cancel()

for !t.closed.IsSet() {
results := []hashResult{<-t.hashResults}
results := []hashResult{}

select {
case result, ok := <-hashResults:
if ok {
results = append(results, result)
}
case <-t.closed.Done():
return
}

for done := false; !done; {
select {
case result := <-t.hashResults:
results = append(results, result)
case result, ok := <-hashResults:
if ok {
results = append(results, result)
}
case <-t.closed.Done():
return
default:
done = true
}
Expand Down
4 changes: 1 addition & 3 deletions torrent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"net"
"os"
"path/filepath"
"sync"
"testing"

g "github.com/anacrolix/generics"
Expand Down Expand Up @@ -247,7 +246,6 @@ func TestRelativeAvailabilityHaveNone(t *testing.T) {
tt.onSetInfo(true, true)
err = pc.peerSentHaveNone(true)
c.Assert(err, qt.IsNil)
var wg sync.WaitGroup
tt.close(&wg)
tt.close()
tt.assertAllPiecesRelativeAvailabilityZero(true)
}
54 changes: 45 additions & 9 deletions tracker_scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"math/rand"
"net"
"net/url"
"sync"
Expand Down Expand Up @@ -72,18 +73,52 @@ type trackerAnnounceResult struct {
Completed time.Time
}

type ilu struct {
err error
ips []net.IP
lookupTime time.Time
}

var ipc = map[string]*ilu{}
var ipcmu sync.RWMutex

func (me *trackerScraper) getIp() (ip net.IP, err error) {
var ips []net.IP
if me.lookupTrackerIp != nil {
ips, err = me.lookupTrackerIp(&me.u)
} else {
// Do a regular dns lookup
ips, err = net.LookupIP(me.u.Hostname())
// cache the ip lookup for 15 mins, this avoids
// spamming DNS on os's that don't cache DNS lookups
// Cache TTL between 1 and 6 hours

ipcmu.RLock()
lu := ipc[me.u.String()]
ipcmu.RUnlock()

if lu == nil ||
time.Since(lu.lookupTime) > time.Hour+time.Duration(rand.Int63n(int64(5*time.Hour))) ||
lu.err != nil && time.Since(lu.lookupTime) > 15*time.Minute {
var ips []net.IP

if me.lookupTrackerIp != nil {
ips, err = me.lookupTrackerIp(&me.u)
} else {
// Do a regular dns lookup
ips, err = net.LookupIP(me.u.Hostname())
}

ipcmu.Lock()
lu = &ilu{
err: err,
ips: ips,
lookupTime: time.Now(),
}
ipc[me.u.String()] = lu
ipcmu.Unlock()

}
if err != nil {

if lu.err != nil {
return
}
if len(ips) == 0 {

if len(lu.ips) == 0 {
err = errors.New("no ips")
return
}
Expand All @@ -93,7 +128,7 @@ func (me *trackerScraper) getIp() (ip net.IP, err error) {
err = errors.New("client is closed")
return
}
for _, ip = range ips {
for _, ip = range lu.ips {
if me.t.cl.ipIsBlocked(ip) {
continue
}
Expand All @@ -109,6 +144,7 @@ func (me *trackerScraper) getIp() (ip net.IP, err error) {
}
return
}

err = errors.New("no acceptable ips")
return
}
Expand Down
Loading