Skip to content

Commit

Permalink
Only request 2 pages per conversation on full sync (#2408)
Browse files Browse the repository at this point in the history
* Only request 2 pages per conversation on full sync

* also limit the sender to two pages
  • Loading branch information
woutslakhorst authored Aug 9, 2023
1 parent 996e567 commit 223d22d
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 3 deletions.
12 changes: 10 additions & 2 deletions network/transport/v2/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,14 @@ func (p *protocol) handleTransactionRangeQuery(ctx context.Context, connection g
return errors.New("invalid range query")
}

txs, err := p.state.FindBetweenLC(ctx, msg.Start, msg.End)
// limit to two pages to reduce load
limit := msg.Start + 2*dag.PageSize
end := msg.End
if end > limit {
end = limit
}

txs, err := p.state.FindBetweenLC(ctx, msg.Start, end)
if err != nil {
return err
}
Expand Down Expand Up @@ -514,7 +521,8 @@ func (p *protocol) handleTransactionSet(_ context.Context, connection grpc.Conne
return p.sender.sendTransactionRangeQuery(connection, pageClockStart(reqPageNum+1), pageClockStart(reqPageNum+2))
}
// TODO: Distribute synchronization of new nodes over multiple peers.
return p.sender.sendTransactionRangeQuery(connection, pageClockStart(reqPageNum+1), dag.MaxLamportClock)
// Currently locked at 2 pages (~1000TX) per peer to prevent overloading the peer.
return p.sender.sendTransactionRangeQuery(connection, pageClockStart(reqPageNum+1), pageClockStart(reqPageNum+3))
}

// peer is behind
Expand Down
19 changes: 18 additions & 1 deletion network/transport/v2/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,23 @@ func TestProtocol_handleTransactionRangeQuery(t *testing.T) {

assert.NoError(t, err)
})
t.Run("ok - too many requested", func(t *testing.T) {
p, mocks := newTestProtocol(t, nil)

mocks.State.EXPECT().FindBetweenLC(gomock.Any(), lcStart, 2*dag.PageSize+1).Return([]dag.Transaction{tx1, tx2}, nil)
mocks.State.EXPECT().ReadPayload(gomock.Any(), tx1.PayloadHash()).Return(payload, nil)
mocks.State.EXPECT().ReadPayload(gomock.Any(), tx2.PayloadHash()).Return(payload, nil)
mocks.Sender.EXPECT().sendTransactionList(connection, gomock.Any(), gomock.Any())

msg := &Envelope_TransactionRangeQuery{&TransactionRangeQuery{
Start: lcStart,
End: dag.MaxLamportClock,
}}
p.cMan.startConversation(msg, peer)
err := p.handleTransactionRangeQuery(ctx, connection, &Envelope{Message: msg})

assert.NoError(t, err)
})
t.Run("context cancelled", func(t *testing.T) {
p, mocks := newTestProtocol(t, nil)

Expand Down Expand Up @@ -650,7 +667,7 @@ func TestProtocol_handleTransactionSet(t *testing.T) {
conversation := p.cMan.startConversation(request, peer)
mocks.State.EXPECT().IBLT(requestLC).Return(*oneHashIblt.Clone().(*tree.Iblt), dag.PageSize-1)
mocks.State.EXPECT().XOR(uint32(dag.MaxLamportClock)).Return(hash.FromSlice([]byte("ignored")), requestLC)
mocks.Sender.EXPECT().sendTransactionRangeQuery(connection, dag.PageSize, uint32(dag.MaxLamportClock)).Return(nil)
mocks.Sender.EXPECT().sendTransactionRangeQuery(connection, dag.PageSize, 3*dag.PageSize).Return(nil)

err := p.handleTransactionSet(ctx, connection, &Envelope{Message: &Envelope_TransactionSet{
TransactionSet: &TransactionSet{ConversationID: conversation.conversationID.slice(), LCReq: requestLC, LC: peerLC, IBLT: oneHashIbltBytes},
Expand Down

0 comments on commit 223d22d

Please sign in to comment.