Skip to content

Commit

Permalink
Merge pull request #26 from erigontech/increase_webseed_parallelization
Browse files Browse the repository at this point in the history
Fixed http panic and added hash, flush and completion stats
  • Loading branch information
mh0lt authored Jul 24, 2024
2 parents 1079022 + 31175cf commit ee8ee63
Show file tree
Hide file tree
Showing 10 changed files with 69 additions and 19 deletions.
10 changes: 5 additions & 5 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,9 @@ func (cl *Client) init(cfg *ClientConfig) {
cl.httpClient.Transport = &http.Transport{
Proxy: cfg.HTTPProxy,
DialContext: cfg.HTTPDialContext,
// I think this value was observed from some webseeds. It seems reasonable to extend it
// to other uses of HTTP from the client.
MaxConnsPerHost: 10,
// Don't set maxconns - to avoid panic: net/http: internal error: connCount underflow
// which is caused under load due to an ongoing issue in the go http transport code
MaxConnsPerHost: 0,
}
}
}
Expand Down Expand Up @@ -1249,7 +1249,7 @@ func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerCon
if !c.requestedMetadataPiece(piece) {
return fmt.Errorf("got unexpected piece %d", piece)
}

c.mu.Lock()
c.metadataRequests[piece] = false
c.lastUsefulChunkReceived = time.Now()
Expand All @@ -1260,7 +1260,7 @@ func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerCon
return fmt.Errorf("data has bad offset in payload: %d", begin)
}
t.saveMetadataPiece(piece, payload[begin:])

err = t.maybeCompleteMetadata()
if err != nil {
// Log this at the Torrent-level, as we don't partition metadata by Peer yet, so we
Expand Down
13 changes: 13 additions & 0 deletions conn_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type ConnStats struct {
BytesReadUsefulIntendedData Count

BytesHashed Count
BytesFlushed Count
BytesCompleted Count

ChunksWritten Count
Expand Down Expand Up @@ -87,6 +88,18 @@ func (cs *ConnStats) receivedChunk(size int64) {
cs.BytesReadData.Add(size)
}

func (cs *ConnStats) pieceHashed(size int64) {
cs.BytesHashed.Add(size)
}

func (cs *ConnStats) pieceCompleted(size int64) {
cs.BytesCompleted.Add(size)
}

func (cs *ConnStats) pieceFlushed(size int64) {
cs.BytesFlushed.Add(size)
}

func (cs *ConnStats) incrementPiecesDirtiedGood() {
cs.PiecesDirtiedGood.Add(1)
}
Expand Down
17 changes: 13 additions & 4 deletions mmap_span/mmap_span.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type MMapSpan struct {
mu sync.RWMutex
mMaps []Mmap
dirtyPieces roaring.Bitmap
dirtySize int64
segmentLocater segments.Index
Created bool
InfoHash infohash.T
Expand All @@ -35,22 +36,23 @@ func (ms *MMapSpan) Append(mMap Mmap) {
ms.mMaps = append(ms.mMaps, mMap)
}

func (ms *MMapSpan) Flush() (errs []error) {
func (ms *MMapSpan) Flush(onFlush func(size int64)) (errs []error) {
ms.mu.Lock()
defer ms.mu.Unlock()
if ms.flushTimer == nil {
ms.flushTimer = time.AfterFunc(ms.FlushTime,
func() {
// TODO deal with logging errors
ms.flushMaps(true)
ms.flushMaps(onFlush, true)
})
}
return
}

func (ms *MMapSpan) flushMaps(lock bool) (errs []error) {
func (ms *MMapSpan) flushMaps(onFlush func(size int64), lock bool) (errs []error) {
var flushedCallback FlushedCallback
var dirtyPieces *roaring.Bitmap
var dirtySize int64

errs = func() (errs []error) {
if lock {
Expand All @@ -59,6 +61,7 @@ func (ms *MMapSpan) flushMaps(lock bool) (errs []error) {
}

dirtyPieces = ms.dirtyPieces.Clone()
dirtySize = ms.dirtySize

if ms.flushTimer != nil {
ms.flushTimer = nil
Expand All @@ -73,6 +76,7 @@ func (ms *MMapSpan) flushMaps(lock bool) (errs []error) {
if len(errs) == 0 {
flushedCallback = ms.FlushedCallback
ms.dirtyPieces = roaring.Bitmap{}
ms.dirtySize = 0
}
}

Expand All @@ -83,6 +87,10 @@ func (ms *MMapSpan) flushMaps(lock bool) (errs []error) {
flushedCallback(ms.InfoHash, dirtyPieces)
}

if onFlush != nil && dirtySize > 0 {
onFlush(dirtySize)
}

return
}

Expand All @@ -92,7 +100,7 @@ func (ms *MMapSpan) Close() (errs []error) {

if ms.flushTimer != nil {
ms.flushTimer.Stop()
errs = ms.flushMaps(false)
errs = ms.flushMaps(nil, false)
ms.flushTimer = nil
}

Expand Down Expand Up @@ -171,6 +179,7 @@ func (ms *MMapSpan) WriteAt(index int, p []byte, off int64) (n int, err error) {

ms.mu.Lock()
ms.dirtyPieces.Add(uint32(index))
ms.dirtySize += int64(len(p))
ms.mu.Unlock()

return
Expand Down
4 changes: 4 additions & 0 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,9 @@ func (p *Peer) close(lockTorrent bool) {
if p.t != nil {
p.t.decPeerPieceAvailability(p, false, false)
}

p.desiredRequestLen = 0

for _, f := range p.callbacks.PeerClosed {
f(p)
}
Expand Down Expand Up @@ -1138,6 +1141,7 @@ func (p *Peer) desiredRequests(lock bool) int {
p.mu.RLock()
defer p.mu.RUnlock()
}

return p.desiredRequestLen
}

Expand Down
4 changes: 2 additions & 2 deletions piece.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ func (p *Piece) Storage() storage.Piece {
return p.t.storage.Piece(p.Info())
}

func (p *Piece) Flush() {
func (p *Piece) Flush(onFlushed func(size int64)) {
if p.t.storage.Flush != nil {
_ = p.t.storage.Flush()
_ = p.t.storage.Flush(onFlushed)
}
}

Expand Down
2 changes: 1 addition & 1 deletion storage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type TorrentCapacity *func() (cap int64, capped bool)
type TorrentImpl struct {
Piece func(p metainfo.Piece) PieceImpl
Close func() error
Flush func() error
Flush func(onFlushed func(size int64)) error
// Storages that share the same space, will provide equal pointers. The function is called once
// to determine the storage for torrents sharing the same function pointer, and mutated in
// place.
Expand Down
4 changes: 2 additions & 2 deletions storage/mmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ func (ts *mmapTorrentStorage) Close() error {
return nil
}

func (ts *mmapTorrentStorage) Flush() error {
errs := ts.span.Flush()
func (ts *mmapTorrentStorage) Flush(onFlush func(size int64)) error {
errs := ts.span.Flush(onFlush)
if len(errs) > 0 {
return errs[0]
}
Expand Down
7 changes: 7 additions & 0 deletions t.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,14 @@ func (t *Torrent) DownloadPieces(begin, end pieceIndex) {

if checkCompletion && !storage.IsNew() {
if sum, _, err := t.hashPiece(piece); err == nil && sum == *piece.hash {
size := int64(piece.length(true))
t.allStats(func(cs *ConnStats) {
cs.pieceHashed(size)
})
storage.MarkComplete(false)
t.allStats(func(cs *ConnStats) {
cs.pieceCompleted(size)
})
t.updatePieceCompletion(piece.index, true)
return nil
}
Expand Down
21 changes: 19 additions & 2 deletions torrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2633,12 +2633,22 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
t.clearPieceTouchers(piece, true)

hasDirtyChunks := p.hasDirtyChunks(true)

if hasDirtyChunks {
p.Flush()
p.Flush(func(size int64) {
t.allStats(func(cs *ConnStats) {
cs.pieceFlushed(size)
})
})
}

err := p.Storage().MarkComplete(!hasDirtyChunks)
if err != nil {

if err == nil {
t.allStats(func(cs *ConnStats) {
cs.pieceCompleted(int64(p.length(true)))
})
} else {
t.logger.Levelf(log.Warning, "%T: error marking piece complete %d: %s", t.storage, piece, err)
}

Expand Down Expand Up @@ -2810,9 +2820,12 @@ func (t *Torrent) tryCreatePieceHasher(lock bool) bool {
break
}

var length int64

func() {
t.mu.Lock()
defer t.mu.Unlock()
length = int64(p.length(false))
t.piecesQueuedForHash.Remove(bitmap.BitIndex(p.index))
p.mu.Lock()
p.hashing = true
Expand All @@ -2839,6 +2852,10 @@ func (t *Torrent) tryCreatePieceHasher(lock bool) bool {
p.mu.Unlock()
hashing.Add(-1)

t.allStats(func(cs *ConnStats) {
cs.pieceHashed(length)
})

t.hashResults <- hashResult{p.index, correct, failedPeers, copyErr}
}
}()
Expand Down
6 changes: 3 additions & 3 deletions webseed-peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (ws *webseedPeer) requester(i int) {
ws.requesterCond.L.Lock()
defer ws.requesterCond.L.Unlock()

for !ws.peer.closed.IsSet() && i < ws.maxRequesters {
for !(ws.peer.closed.IsSet() || ws.peer.t.Complete.Bool()) && i < ws.maxRequesters {
// Restart is set if we don't need to wait for the requestCond before trying again.
restart := false

Expand Down Expand Up @@ -448,7 +448,7 @@ func requestUpdate(ws *webseedPeer) {
if p == &ws.peer {
this = "*"
}
flags := p.connectionFlags()
flags := p.StatusFlags()
peerInfo = append(peerInfo, fmt.Sprintf("%s%s:p=%d,d=%d: %f", this, flags, pieces, desired, rate))

}, false)
Expand Down Expand Up @@ -498,7 +498,7 @@ func (cn *webseedPeer) ban() {
banCount := cn.peer.banCount
cn.peer.mu.Unlock()

if banCount > 16 && !cn.peer.closed.IsSet() {
if banCount > 5 && !cn.peer.closed.IsSet() {
cn.peer.close(true)
}
}
Expand Down

0 comments on commit ee8ee63

Please sign in to comment.