diff --git a/client.go b/client.go index 5996593414..83828bb070 100644 --- a/client.go +++ b/client.go @@ -215,9 +215,9 @@ func (cl *Client) init(cfg *ClientConfig) { cl.httpClient.Transport = &http.Transport{ Proxy: cfg.HTTPProxy, DialContext: cfg.HTTPDialContext, - // I think this value was observed from some webseeds. It seems reasonable to extend it - // to other uses of HTTP from the client. - MaxConnsPerHost: 10, + // Don't set maxconns - to avoid panic: net/http: internal error: connCount underflow + // which is caused under load due to an ongoing issue in the go http transport code + MaxConnsPerHost: 0, } } } @@ -1249,7 +1249,7 @@ func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerCon if !c.requestedMetadataPiece(piece) { return fmt.Errorf("got unexpected piece %d", piece) } - + c.mu.Lock() c.metadataRequests[piece] = false c.lastUsefulChunkReceived = time.Now() @@ -1260,7 +1260,7 @@ func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerCon return fmt.Errorf("data has bad offset in payload: %d", begin) } t.saveMetadataPiece(piece, payload[begin:]) - + err = t.maybeCompleteMetadata() if err != nil { // Log this at the Torrent-level, as we don't partition metadata by Peer yet, so we diff --git a/conn_stats.go b/conn_stats.go index a288d4d39f..a7493c9681 100644 --- a/conn_stats.go +++ b/conn_stats.go @@ -26,6 +26,7 @@ type ConnStats struct { BytesReadUsefulIntendedData Count BytesHashed Count + BytesFlushed Count BytesCompleted Count ChunksWritten Count @@ -87,6 +88,18 @@ func (cs *ConnStats) receivedChunk(size int64) { cs.BytesReadData.Add(size) } +func (cs *ConnStats) pieceHashed(size int64) { + cs.BytesHashed.Add(size) +} + +func (cs *ConnStats) pieceCompleted(size int64) { + cs.BytesCompleted.Add(size) +} + +func (cs *ConnStats) pieceFlushed(size int64) { + cs.BytesFlushed.Add(size) +} + func (cs *ConnStats) incrementPiecesDirtiedGood() { cs.PiecesDirtiedGood.Add(1) } diff --git a/mmap_span/mmap_span.go b/mmap_span/mmap_span.go index a343c1670b..a8fb518dfa 100644 --- a/mmap_span/mmap_span.go +++ b/mmap_span/mmap_span.go @@ -23,6 +23,7 @@ type MMapSpan struct { mu sync.RWMutex mMaps []Mmap dirtyPieces roaring.Bitmap + dirtySize int64 segmentLocater segments.Index Created bool InfoHash infohash.T @@ -35,22 +36,23 @@ func (ms *MMapSpan) Append(mMap Mmap) { ms.mMaps = append(ms.mMaps, mMap) } -func (ms *MMapSpan) Flush() (errs []error) { +func (ms *MMapSpan) Flush(onFlush func(size int64)) (errs []error) { ms.mu.Lock() defer ms.mu.Unlock() if ms.flushTimer == nil { ms.flushTimer = time.AfterFunc(ms.FlushTime, func() { // TODO deal with logging errors - ms.flushMaps(true) + ms.flushMaps(onFlush, true) }) } return } -func (ms *MMapSpan) flushMaps(lock bool) (errs []error) { +func (ms *MMapSpan) flushMaps(onFlush func(size int64), lock bool) (errs []error) { var flushedCallback FlushedCallback var dirtyPieces *roaring.Bitmap + var dirtySize int64 errs = func() (errs []error) { if lock { @@ -59,6 +61,7 @@ func (ms *MMapSpan) flushMaps(lock bool) (errs []error) { } dirtyPieces = ms.dirtyPieces.Clone() + dirtySize = ms.dirtySize if ms.flushTimer != nil { ms.flushTimer = nil @@ -73,6 +76,7 @@ func (ms *MMapSpan) flushMaps(lock bool) (errs []error) { if len(errs) == 0 { flushedCallback = ms.FlushedCallback ms.dirtyPieces = roaring.Bitmap{} + ms.dirtySize = 0 } } @@ -83,6 +87,10 @@ func (ms *MMapSpan) flushMaps(lock bool) (errs []error) { flushedCallback(ms.InfoHash, dirtyPieces) } + if onFlush != nil && dirtySize > 0 { + onFlush(dirtySize) + } + return } @@ -92,7 +100,7 @@ func (ms *MMapSpan) Close() (errs []error) { if ms.flushTimer != nil { ms.flushTimer.Stop() - errs = ms.flushMaps(false) + errs = ms.flushMaps(nil, false) ms.flushTimer = nil } @@ -171,6 +179,7 @@ func (ms *MMapSpan) WriteAt(index int, p []byte, off int64) (n int, err error) { ms.mu.Lock() ms.dirtyPieces.Add(uint32(index)) + ms.dirtySize += int64(len(p)) ms.mu.Unlock() return diff --git a/peer.go b/peer.go index e2de52bcd4..dddb3ed3c6 100644 --- a/peer.go +++ b/peer.go @@ -395,6 +395,9 @@ func (p *Peer) close(lockTorrent bool) { if p.t != nil { p.t.decPeerPieceAvailability(p, false, false) } + + p.desiredRequestLen = 0 + for _, f := range p.callbacks.PeerClosed { f(p) } @@ -1138,6 +1141,7 @@ func (p *Peer) desiredRequests(lock bool) int { p.mu.RLock() defer p.mu.RUnlock() } + return p.desiredRequestLen } diff --git a/piece.go b/piece.go index 4be0975b13..80c5f8791d 100644 --- a/piece.go +++ b/piece.go @@ -56,9 +56,9 @@ func (p *Piece) Storage() storage.Piece { return p.t.storage.Piece(p.Info()) } -func (p *Piece) Flush() { +func (p *Piece) Flush(onFlushed func(size int64)) { if p.t.storage.Flush != nil { - _ = p.t.storage.Flush() + _ = p.t.storage.Flush(onFlushed) } } diff --git a/storage/interface.go b/storage/interface.go index cb7b24b3fd..324289842b 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -22,7 +22,7 @@ type TorrentCapacity *func() (cap int64, capped bool) type TorrentImpl struct { Piece func(p metainfo.Piece) PieceImpl Close func() error - Flush func() error + Flush func(onFlushed func(size int64)) error // Storages that share the same space, will provide equal pointers. The function is called once // to determine the storage for torrents sharing the same function pointer, and mutated in // place. diff --git a/storage/mmap.go b/storage/mmap.go index 640b428f78..875a34da89 100644 --- a/storage/mmap.go +++ b/storage/mmap.go @@ -90,8 +90,8 @@ func (ts *mmapTorrentStorage) Close() error { return nil } -func (ts *mmapTorrentStorage) Flush() error { - errs := ts.span.Flush() +func (ts *mmapTorrentStorage) Flush(onFlush func(size int64)) error { + errs := ts.span.Flush(onFlush) if len(errs) > 0 { return errs[0] } diff --git a/t.go b/t.go index 4a7de069e5..3add0ae9d8 100644 --- a/t.go +++ b/t.go @@ -251,7 +251,14 @@ func (t *Torrent) DownloadPieces(begin, end pieceIndex) { if checkCompletion && !storage.IsNew() { if sum, _, err := t.hashPiece(piece); err == nil && sum == *piece.hash { + size := int64(piece.length(true)) + t.allStats(func(cs *ConnStats) { + cs.pieceHashed(size) + }) storage.MarkComplete(false) + t.allStats(func(cs *ConnStats) { + cs.pieceCompleted(size) + }) t.updatePieceCompletion(piece.index, true) return nil } diff --git a/torrent.go b/torrent.go index ccd12b2b2c..995219f194 100644 --- a/torrent.go +++ b/torrent.go @@ -2633,12 +2633,22 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) { t.clearPieceTouchers(piece, true) hasDirtyChunks := p.hasDirtyChunks(true) + if hasDirtyChunks { - p.Flush() + p.Flush(func(size int64) { + t.allStats(func(cs *ConnStats) { + cs.pieceFlushed(size) + }) + }) } err := p.Storage().MarkComplete(!hasDirtyChunks) - if err != nil { + + if err == nil { + t.allStats(func(cs *ConnStats) { + cs.pieceCompleted(int64(p.length(true))) + }) + } else { t.logger.Levelf(log.Warning, "%T: error marking piece complete %d: %s", t.storage, piece, err) } @@ -2810,9 +2820,12 @@ func (t *Torrent) tryCreatePieceHasher(lock bool) bool { break } + var length int64 + func() { t.mu.Lock() defer t.mu.Unlock() + length = int64(p.length(false)) t.piecesQueuedForHash.Remove(bitmap.BitIndex(p.index)) p.mu.Lock() p.hashing = true @@ -2839,6 +2852,10 @@ func (t *Torrent) tryCreatePieceHasher(lock bool) bool { p.mu.Unlock() hashing.Add(-1) + t.allStats(func(cs *ConnStats) { + cs.pieceHashed(length) + }) + t.hashResults <- hashResult{p.index, correct, failedPeers, copyErr} } }() diff --git a/webseed-peer.go b/webseed-peer.go index a4a8c2e451..f23f8b0179 100644 --- a/webseed-peer.go +++ b/webseed-peer.go @@ -184,7 +184,7 @@ func (ws *webseedPeer) requester(i int) { ws.requesterCond.L.Lock() defer ws.requesterCond.L.Unlock() - for !ws.peer.closed.IsSet() && i < ws.maxRequesters { + for !(ws.peer.closed.IsSet() || ws.peer.t.Complete.Bool()) && i < ws.maxRequesters { // Restart is set if we don't need to wait for the requestCond before trying again. restart := false @@ -448,7 +448,7 @@ func requestUpdate(ws *webseedPeer) { if p == &ws.peer { this = "*" } - flags := p.connectionFlags() + flags := p.StatusFlags() peerInfo = append(peerInfo, fmt.Sprintf("%s%s:p=%d,d=%d: %f", this, flags, pieces, desired, rate)) }, false) @@ -498,7 +498,7 @@ func (cn *webseedPeer) ban() { banCount := cn.peer.banCount cn.peer.mu.Unlock() - if banCount > 16 && !cn.peer.closed.IsSet() { + if banCount > 5 && !cn.peer.closed.IsSet() { cn.peer.close(true) } }