Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close banned web peers #20

Merged
merged 22 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 1 addition & 34 deletions deferrwl.go
Original file line number Diff line number Diff line change
@@ -1,42 +1,20 @@
package torrent

import (
"github.com/anacrolix/sync"
)
import "github.com/anacrolix/sync"

// Runs deferred actions on Unlock. Note that actions are assumed to be the results of changes that
// would only occur with a write lock at present. The race detector should catch instances of defers
// without the write lock being held.
type lockWithDeferreds struct {
internal sync.RWMutex
unlockActions []func()

//lc atomic.Int32
//locker string
//locktime time.Time
//rlc atomic.Int32
//rlmu sync.Mutex
//rlocker [20]string
}

//func stack(skip int) string {
// return stack2.Trace().TrimBelow(stack2.Caller(skip)).String()
//}

func (me *lockWithDeferreds) Lock() {
me.internal.Lock()
// me.lc.Add(1)
// me.locker = stack(2)
// me.locktime = time.Now()
}

func (me *lockWithDeferreds) Unlock() {
// me.lc.Add(-1)
// if me.lc.Load() < 0 {
// panic("lock underflow")
// }
// me.locker = ""
// me.locktime = time.Time{}
unlockActions := me.unlockActions
for i := 0; i < len(unlockActions); i += 1 {
unlockActions[i]()
Expand All @@ -47,20 +25,9 @@ func (me *lockWithDeferreds) Unlock() {

func (me *lockWithDeferreds) RLock() {
me.internal.RLock()
// me.rlmu.Lock()
// me.rlocker[me.rlc.Load()] = string(stack(2))
// me.rlc.Add(1)
// me.rlmu.Unlock()
}

func (me *lockWithDeferreds) RUnlock() {
// me.rlmu.Lock()
// me.rlc.Add(-1)
// if me.rlc.Load() < 0 {
// panic("lock underflow")
// }
// me.rlocker[me.rlc.Load()] = ""
// me.rlmu.Unlock()
me.internal.RUnlock()
}

Expand Down
4 changes: 2 additions & 2 deletions mmap_span/mmap_span.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ func (ms *MMapSpan) locateCopy(copyArgs func(remainingArgument, mmapped []byte)
func (ms *MMapSpan) WriteAt(index int, p []byte, off int64) (n int, err error) {
// log.Printf("writing %v bytes at %v", len(p), off)
n, err = func() (n int, err error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
ms.mu.Lock()
defer ms.mu.Unlock()
n = ms.locateCopy(func(a, b []byte) (_, _ []byte) { return b, a }, p, off)
if n != len(p) {
err = io.ErrShortWrite
Expand Down
24 changes: 16 additions & 8 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,16 @@ func eventAgeString(t time.Time) string {
}

// Inspired by https://github.com/transmission/transmission/wiki/Peer-Status-Text.
func (cn *Peer) statusFlags() (ret string) {
func (cn *Peer) StatusFlags() (ret string) {
c := func(b byte) {
ret += string([]byte{b})
}
if cn.closed.IsSet() {
ret = "c-"
} else {
ret = "o-"
}

if cn.requestState.Interested {
c('i')
}
Expand All @@ -257,12 +263,14 @@ func (cn *Peer) statusFlags() (ret string) {
}
c('-')
ret += cn.connectionFlags()
c('-')
if cn.peerInterested {
c('i')
}
if cn.peerChoking {
c('c')
if cn.peerInterested || cn.peerChoking {
c('-')
if cn.peerInterested {
c('i')
}
if cn.peerChoking {
c('c')
}
}
return
}
Expand Down Expand Up @@ -354,7 +362,7 @@ func (cn *Peer) writeStatus(w io.Writer, lock bool, lockTorrent bool) {
cn.PeerMaxRequests,
len(cn.peerRequests),
localClientReqq,
cn.statusFlags(),
cn.StatusFlags(),
cn.downloadRate()/(1<<10),
)
fmt.Fprintf(w, "requested pieces:")
Expand Down
21 changes: 13 additions & 8 deletions peerconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ func (c *PeerConn) fastEnabled(lock bool) bool {
}

func (c *PeerConn) reject(r Request, lock bool) {
if !c.fastEnabled(true) {
if !c.fastEnabled(lock) {
panic("fast not enabled")
}
c.write(r.ToMsg(pp.Reject), lock)
Expand All @@ -674,6 +674,14 @@ func (c *PeerConn) maximumPeerRequestChunkLength() (_ Option[int]) {

// startFetch is for testing purposes currently.
func (c *PeerConn) onReadRequest(r Request, startFetch bool, lock bool) error {
if !c.t.havePiece(pieceIndex(r.Index), true) {
// TODO: Tell the peer we don't have the piece, and reject this request.
requestsReceivedForMissingPieces.Add(1)
return fmt.Errorf("peer requested piece we don't have: %v", r.Index.Int())
}

pieceLength := c.t.pieceLength(pieceIndex(r.Index), true)

if lock {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down Expand Up @@ -714,12 +722,6 @@ func (c *PeerConn) onReadRequest(r Request, startFetch bool, lock bool) error {
return err
}
}
if !c.t.havePiece(pieceIndex(r.Index), true) {
// TODO: Tell the peer we don't have the piece, and reject this request.
requestsReceivedForMissingPieces.Add(1)
return fmt.Errorf("peer requested piece we don't have: %v", r.Index.Int())
}
pieceLength := c.t.pieceLength(pieceIndex(r.Index), true)
// Check this after we know we have the piece, so that the piece length will be known.
if chunkOverflowsPiece(r.ChunkSpec, pieceLength) {
torrent.Add("bad requests received", 1)
Expand Down Expand Up @@ -757,7 +759,9 @@ func (c *PeerConn) peerRequestDataReader(r Request, prs *peerRequestState) {
panic("data must be non-nil to trigger send")
}
torrent.Add("peer request data read successes", 1)
c.mu.Lock()
prs.data = b
c.mu.Unlock()
// This might be required for the error case too (#752 and #753).
c.tickleWriter()
}
Expand Down Expand Up @@ -1211,7 +1215,8 @@ another:
}
peerRequests = make([]peerRequest, 0, len(c.peerRequests))
for r, state := range c.peerRequests {
peerRequests = append(peerRequests, peerRequest{r, state})
s := *state
peerRequests = append(peerRequests, peerRequest{r, &s})
}
}()

Expand Down
5 changes: 5 additions & 0 deletions webseed-peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,12 @@ func (cn *webseedPeer) ban() {

cn.peer.mu.Lock()
cn.peer.banCount++
banCount := cn.peer.banCount
cn.peer.mu.Unlock()

if banCount > 16 && !cn.peer.closed.IsSet() {
cn.peer.close(true)
}
}

func (cn *webseedPeer) isLowOnRequests(lock bool, lockTorrent bool) bool {
Expand Down
6 changes: 3 additions & 3 deletions webseed/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (me ErrBadResponse) Error() string {
return me.Msg
}

func recvPartResult(ctx context.Context, buf io.Writer, part requestPart, resp *http.Response) error {
func recvPartResult(ctx context.Context, writer io.Writer, part requestPart, resp *http.Response) error {
defer resp.Body.Close()
var body io.Reader = resp.Body
if part.responseBodyWrapper != nil {
Expand All @@ -161,7 +161,7 @@ func recvPartResult(ctx context.Context, buf io.Writer, part requestPart, resp *
}
switch resp.StatusCode {
case http.StatusPartialContent:
copied, err := io.Copy(buf, body)
copied, err := io.Copy(writer, body)
if err != nil {
return err
}
Expand Down Expand Up @@ -189,7 +189,7 @@ func recvPartResult(ctx context.Context, buf io.Writer, part requestPart, resp *
if discarded != 0 {
log.Printf("discarded %v bytes in webseed request response part", discarded)
}
_, err := io.CopyN(buf, body, part.e.Length)
_, err := io.CopyN(writer, body, part.e.Length)
return err
} else {
return ErrBadResponse{"resp status ok but requested range", resp}
Expand Down
Loading