Skip to content

Commit

Permalink
Merge pull request #20 from erigontech/increase_webseed_parallelization
Browse files Browse the repository at this point in the history
Close banned web peers
  • Loading branch information
mh0lt authored Jul 11, 2024
2 parents 39247e7 + 7de8739 commit 9e5f962
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 55 deletions.
35 changes: 1 addition & 34 deletions deferrwl.go
Original file line number Diff line number Diff line change
@@ -1,42 +1,20 @@
package torrent

import (
"github.com/anacrolix/sync"
)
import "github.com/anacrolix/sync"

// Runs deferred actions on Unlock. Note that actions are assumed to be the results of changes that
// would only occur with a write lock at present. The race detector should catch instances of defers
// without the write lock being held.
type lockWithDeferreds struct {
internal sync.RWMutex
unlockActions []func()

//lc atomic.Int32
//locker string
//locktime time.Time
//rlc atomic.Int32
//rlmu sync.Mutex
//rlocker [20]string
}

//func stack(skip int) string {
// return stack2.Trace().TrimBelow(stack2.Caller(skip)).String()
//}

func (me *lockWithDeferreds) Lock() {
me.internal.Lock()
// me.lc.Add(1)
// me.locker = stack(2)
// me.locktime = time.Now()
}

func (me *lockWithDeferreds) Unlock() {
// me.lc.Add(-1)
// if me.lc.Load() < 0 {
// panic("lock underflow")
// }
// me.locker = ""
// me.locktime = time.Time{}
unlockActions := me.unlockActions
for i := 0; i < len(unlockActions); i += 1 {
unlockActions[i]()
Expand All @@ -47,20 +25,9 @@ func (me *lockWithDeferreds) Unlock() {

func (me *lockWithDeferreds) RLock() {
me.internal.RLock()
// me.rlmu.Lock()
// me.rlocker[me.rlc.Load()] = string(stack(2))
// me.rlc.Add(1)
// me.rlmu.Unlock()
}

func (me *lockWithDeferreds) RUnlock() {
// me.rlmu.Lock()
// me.rlc.Add(-1)
// if me.rlc.Load() < 0 {
// panic("lock underflow")
// }
// me.rlocker[me.rlc.Load()] = ""
// me.rlmu.Unlock()
me.internal.RUnlock()
}

Expand Down
4 changes: 2 additions & 2 deletions mmap_span/mmap_span.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ func (ms *MMapSpan) locateCopy(copyArgs func(remainingArgument, mmapped []byte)
func (ms *MMapSpan) WriteAt(index int, p []byte, off int64) (n int, err error) {
// log.Printf("writing %v bytes at %v", len(p), off)
n, err = func() (n int, err error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
ms.mu.Lock()
defer ms.mu.Unlock()
n = ms.locateCopy(func(a, b []byte) (_, _ []byte) { return b, a }, p, off)
if n != len(p) {
err = io.ErrShortWrite
Expand Down
24 changes: 16 additions & 8 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,16 @@ func eventAgeString(t time.Time) string {
}

// Inspired by https://github.com/transmission/transmission/wiki/Peer-Status-Text.
func (cn *Peer) statusFlags() (ret string) {
func (cn *Peer) StatusFlags() (ret string) {
c := func(b byte) {
ret += string([]byte{b})
}
if cn.closed.IsSet() {
ret = "c-"
} else {
ret = "o-"
}

if cn.requestState.Interested {
c('i')
}
Expand All @@ -257,12 +263,14 @@ func (cn *Peer) statusFlags() (ret string) {
}
c('-')
ret += cn.connectionFlags()
c('-')
if cn.peerInterested {
c('i')
}
if cn.peerChoking {
c('c')
if cn.peerInterested || cn.peerChoking {
c('-')
if cn.peerInterested {
c('i')
}
if cn.peerChoking {
c('c')
}
}
return
}
Expand Down Expand Up @@ -354,7 +362,7 @@ func (cn *Peer) writeStatus(w io.Writer, lock bool, lockTorrent bool) {
cn.PeerMaxRequests,
len(cn.peerRequests),
localClientReqq,
cn.statusFlags(),
cn.StatusFlags(),
cn.downloadRate()/(1<<10),
)
fmt.Fprintf(w, "requested pieces:")
Expand Down
21 changes: 13 additions & 8 deletions peerconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ func (c *PeerConn) fastEnabled(lock bool) bool {
}

func (c *PeerConn) reject(r Request, lock bool) {
if !c.fastEnabled(true) {
if !c.fastEnabled(lock) {
panic("fast not enabled")
}
c.write(r.ToMsg(pp.Reject), lock)
Expand All @@ -674,6 +674,14 @@ func (c *PeerConn) maximumPeerRequestChunkLength() (_ Option[int]) {

// startFetch is for testing purposes currently.
func (c *PeerConn) onReadRequest(r Request, startFetch bool, lock bool) error {
if !c.t.havePiece(pieceIndex(r.Index), true) {
// TODO: Tell the peer we don't have the piece, and reject this request.
requestsReceivedForMissingPieces.Add(1)
return fmt.Errorf("peer requested piece we don't have: %v", r.Index.Int())
}

pieceLength := c.t.pieceLength(pieceIndex(r.Index), true)

if lock {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down Expand Up @@ -714,12 +722,6 @@ func (c *PeerConn) onReadRequest(r Request, startFetch bool, lock bool) error {
return err
}
}
if !c.t.havePiece(pieceIndex(r.Index), true) {
// TODO: Tell the peer we don't have the piece, and reject this request.
requestsReceivedForMissingPieces.Add(1)
return fmt.Errorf("peer requested piece we don't have: %v", r.Index.Int())
}
pieceLength := c.t.pieceLength(pieceIndex(r.Index), true)
// Check this after we know we have the piece, so that the piece length will be known.
if chunkOverflowsPiece(r.ChunkSpec, pieceLength) {
torrent.Add("bad requests received", 1)
Expand Down Expand Up @@ -757,7 +759,9 @@ func (c *PeerConn) peerRequestDataReader(r Request, prs *peerRequestState) {
panic("data must be non-nil to trigger send")
}
torrent.Add("peer request data read successes", 1)
c.mu.Lock()
prs.data = b
c.mu.Unlock()
// This might be required for the error case too (#752 and #753).
c.tickleWriter()
}
Expand Down Expand Up @@ -1211,7 +1215,8 @@ another:
}
peerRequests = make([]peerRequest, 0, len(c.peerRequests))
for r, state := range c.peerRequests {
peerRequests = append(peerRequests, peerRequest{r, state})
s := *state
peerRequests = append(peerRequests, peerRequest{r, &s})
}
}()

Expand Down
5 changes: 5 additions & 0 deletions webseed-peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,12 @@ func (cn *webseedPeer) ban() {

cn.peer.mu.Lock()
cn.peer.banCount++
banCount := cn.peer.banCount
cn.peer.mu.Unlock()

if banCount > 16 && !cn.peer.closed.IsSet() {
cn.peer.close(true)
}
}

func (cn *webseedPeer) isLowOnRequests(lock bool, lockTorrent bool) bool {
Expand Down
6 changes: 3 additions & 3 deletions webseed/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (me ErrBadResponse) Error() string {
return me.Msg
}

func recvPartResult(ctx context.Context, buf io.Writer, part requestPart, resp *http.Response) error {
func recvPartResult(ctx context.Context, writer io.Writer, part requestPart, resp *http.Response) error {
defer resp.Body.Close()
var body io.Reader = resp.Body
if part.responseBodyWrapper != nil {
Expand All @@ -161,7 +161,7 @@ func recvPartResult(ctx context.Context, buf io.Writer, part requestPart, resp *
}
switch resp.StatusCode {
case http.StatusPartialContent:
copied, err := io.Copy(buf, body)
copied, err := io.Copy(writer, body)
if err != nil {
return err
}
Expand Down Expand Up @@ -189,7 +189,7 @@ func recvPartResult(ctx context.Context, buf io.Writer, part requestPart, resp *
if discarded != 0 {
log.Printf("discarded %v bytes in webseed request response part", discarded)
}
_, err := io.CopyN(buf, body, part.e.Length)
_, err := io.CopyN(writer, body, part.e.Length)
return err
} else {
return ErrBadResponse{"resp status ok but requested range", resp}
Expand Down

0 comments on commit 9e5f962

Please sign in to comment.