Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed http panic and added hash, flush and completion stats #26

Merged
merged 18 commits into from
Jul 24, 2024
10 changes: 5 additions & 5 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,9 @@ func (cl *Client) init(cfg *ClientConfig) {
cl.httpClient.Transport = &http.Transport{
Proxy: cfg.HTTPProxy,
DialContext: cfg.HTTPDialContext,
// I think this value was observed from some webseeds. It seems reasonable to extend it
// to other uses of HTTP from the client.
MaxConnsPerHost: 10,
// Don't set maxconns - to avoid panic: net/http: internal error: connCount underflow
// which is caused under load due to an ongoing issue in the go http transport code
MaxConnsPerHost: 0,
}
}
}
Expand Down Expand Up @@ -1249,7 +1249,7 @@ func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerCon
if !c.requestedMetadataPiece(piece) {
return fmt.Errorf("got unexpected piece %d", piece)
}

c.mu.Lock()
c.metadataRequests[piece] = false
c.lastUsefulChunkReceived = time.Now()
Expand All @@ -1260,7 +1260,7 @@ func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerCon
return fmt.Errorf("data has bad offset in payload: %d", begin)
}
t.saveMetadataPiece(piece, payload[begin:])

err = t.maybeCompleteMetadata()
if err != nil {
// Log this at the Torrent-level, as we don't partition metadata by Peer yet, so we
Expand Down
13 changes: 13 additions & 0 deletions conn_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type ConnStats struct {
BytesReadUsefulIntendedData Count

BytesHashed Count
BytesFlushed Count
BytesCompleted Count

ChunksWritten Count
Expand Down Expand Up @@ -87,6 +88,18 @@ func (cs *ConnStats) receivedChunk(size int64) {
cs.BytesReadData.Add(size)
}

func (cs *ConnStats) pieceHashed(size int64) {
cs.BytesHashed.Add(size)
}

func (cs *ConnStats) pieceCompleted(size int64) {
cs.BytesCompleted.Add(size)
}

func (cs *ConnStats) pieceFlushed(size int64) {
cs.BytesFlushed.Add(size)
}

func (cs *ConnStats) incrementPiecesDirtiedGood() {
cs.PiecesDirtiedGood.Add(1)
}
Expand Down
17 changes: 13 additions & 4 deletions mmap_span/mmap_span.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type MMapSpan struct {
mu sync.RWMutex
mMaps []Mmap
dirtyPieces roaring.Bitmap
dirtySize int64
segmentLocater segments.Index
Created bool
InfoHash infohash.T
Expand All @@ -35,22 +36,23 @@ func (ms *MMapSpan) Append(mMap Mmap) {
ms.mMaps = append(ms.mMaps, mMap)
}

func (ms *MMapSpan) Flush() (errs []error) {
func (ms *MMapSpan) Flush(onFlush func(size int64)) (errs []error) {
ms.mu.Lock()
defer ms.mu.Unlock()
if ms.flushTimer == nil {
ms.flushTimer = time.AfterFunc(ms.FlushTime,
func() {
// TODO deal with logging errors
ms.flushMaps(true)
ms.flushMaps(onFlush, true)
})
}
return
}

func (ms *MMapSpan) flushMaps(lock bool) (errs []error) {
func (ms *MMapSpan) flushMaps(onFlush func(size int64), lock bool) (errs []error) {
var flushedCallback FlushedCallback
var dirtyPieces *roaring.Bitmap
var dirtySize int64

errs = func() (errs []error) {
if lock {
Expand All @@ -59,6 +61,7 @@ func (ms *MMapSpan) flushMaps(lock bool) (errs []error) {
}

dirtyPieces = ms.dirtyPieces.Clone()
dirtySize = ms.dirtySize

if ms.flushTimer != nil {
ms.flushTimer = nil
Expand All @@ -73,6 +76,7 @@ func (ms *MMapSpan) flushMaps(lock bool) (errs []error) {
if len(errs) == 0 {
flushedCallback = ms.FlushedCallback
ms.dirtyPieces = roaring.Bitmap{}
ms.dirtySize = 0
}
}

Expand All @@ -83,6 +87,10 @@ func (ms *MMapSpan) flushMaps(lock bool) (errs []error) {
flushedCallback(ms.InfoHash, dirtyPieces)
}

if onFlush != nil && dirtySize > 0 {
onFlush(dirtySize)
}

return
}

Expand All @@ -92,7 +100,7 @@ func (ms *MMapSpan) Close() (errs []error) {

if ms.flushTimer != nil {
ms.flushTimer.Stop()
errs = ms.flushMaps(false)
errs = ms.flushMaps(nil, false)
ms.flushTimer = nil
}

Expand Down Expand Up @@ -171,6 +179,7 @@ func (ms *MMapSpan) WriteAt(index int, p []byte, off int64) (n int, err error) {

ms.mu.Lock()
ms.dirtyPieces.Add(uint32(index))
ms.dirtySize += int64(len(p))
ms.mu.Unlock()

return
Expand Down
4 changes: 4 additions & 0 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,9 @@ func (p *Peer) close(lockTorrent bool) {
if p.t != nil {
p.t.decPeerPieceAvailability(p, false, false)
}

p.desiredRequestLen = 0

for _, f := range p.callbacks.PeerClosed {
f(p)
}
Expand Down Expand Up @@ -1138,6 +1141,7 @@ func (p *Peer) desiredRequests(lock bool) int {
p.mu.RLock()
defer p.mu.RUnlock()
}

return p.desiredRequestLen
}

Expand Down
4 changes: 2 additions & 2 deletions piece.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ func (p *Piece) Storage() storage.Piece {
return p.t.storage.Piece(p.Info())
}

func (p *Piece) Flush() {
func (p *Piece) Flush(onFlushed func(size int64)) {
if p.t.storage.Flush != nil {
_ = p.t.storage.Flush()
_ = p.t.storage.Flush(onFlushed)
}
}

Expand Down
2 changes: 1 addition & 1 deletion storage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type TorrentCapacity *func() (cap int64, capped bool)
type TorrentImpl struct {
Piece func(p metainfo.Piece) PieceImpl
Close func() error
Flush func() error
Flush func(onFlushed func(size int64)) error
// Storages that share the same space, will provide equal pointers. The function is called once
// to determine the storage for torrents sharing the same function pointer, and mutated in
// place.
Expand Down
4 changes: 2 additions & 2 deletions storage/mmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ func (ts *mmapTorrentStorage) Close() error {
return nil
}

func (ts *mmapTorrentStorage) Flush() error {
errs := ts.span.Flush()
func (ts *mmapTorrentStorage) Flush(onFlush func(size int64)) error {
errs := ts.span.Flush(onFlush)
if len(errs) > 0 {
return errs[0]
}
Expand Down
7 changes: 7 additions & 0 deletions t.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,14 @@ func (t *Torrent) DownloadPieces(begin, end pieceIndex) {

if checkCompletion && !storage.IsNew() {
if sum, _, err := t.hashPiece(piece); err == nil && sum == *piece.hash {
size := int64(piece.length(true))
t.allStats(func(cs *ConnStats) {
cs.pieceHashed(size)
})
storage.MarkComplete(false)
t.allStats(func(cs *ConnStats) {
cs.pieceCompleted(size)
})
t.updatePieceCompletion(piece.index, true)
return nil
}
Expand Down
21 changes: 19 additions & 2 deletions torrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2633,12 +2633,22 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
t.clearPieceTouchers(piece, true)

hasDirtyChunks := p.hasDirtyChunks(true)

if hasDirtyChunks {
p.Flush()
p.Flush(func(size int64) {
t.allStats(func(cs *ConnStats) {
cs.pieceFlushed(size)
})
})
}

err := p.Storage().MarkComplete(!hasDirtyChunks)
if err != nil {

if err == nil {
t.allStats(func(cs *ConnStats) {
cs.pieceCompleted(int64(p.length(true)))
})
} else {
t.logger.Levelf(log.Warning, "%T: error marking piece complete %d: %s", t.storage, piece, err)
}

Expand Down Expand Up @@ -2810,9 +2820,12 @@ func (t *Torrent) tryCreatePieceHasher(lock bool) bool {
break
}

var length int64

func() {
t.mu.Lock()
defer t.mu.Unlock()
length = int64(p.length(false))
t.piecesQueuedForHash.Remove(bitmap.BitIndex(p.index))
p.mu.Lock()
p.hashing = true
Expand All @@ -2839,6 +2852,10 @@ func (t *Torrent) tryCreatePieceHasher(lock bool) bool {
p.mu.Unlock()
hashing.Add(-1)

t.allStats(func(cs *ConnStats) {
cs.pieceHashed(length)
})

t.hashResults <- hashResult{p.index, correct, failedPeers, copyErr}
}
}()
Expand Down
6 changes: 3 additions & 3 deletions webseed-peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (ws *webseedPeer) requester(i int) {
ws.requesterCond.L.Lock()
defer ws.requesterCond.L.Unlock()

for !ws.peer.closed.IsSet() && i < ws.maxRequesters {
for !(ws.peer.closed.IsSet() || ws.peer.t.Complete.Bool()) && i < ws.maxRequesters {
// Restart is set if we don't need to wait for the requestCond before trying again.
restart := false

Expand Down Expand Up @@ -448,7 +448,7 @@ func requestUpdate(ws *webseedPeer) {
if p == &ws.peer {
this = "*"
}
flags := p.connectionFlags()
flags := p.StatusFlags()
peerInfo = append(peerInfo, fmt.Sprintf("%s%s:p=%d,d=%d: %f", this, flags, pieces, desired, rate))

}, false)
Expand Down Expand Up @@ -498,7 +498,7 @@ func (cn *webseedPeer) ban() {
banCount := cn.peer.banCount
cn.peer.mu.Unlock()

if banCount > 16 && !cn.peer.closed.IsSet() {
if banCount > 5 && !cn.peer.closed.IsSet() {
cn.peer.close(true)
}
}
Expand Down
Loading