diff --git a/network/transport/v2/handlers.go b/network/transport/v2/handlers.go index 2e83e887eb..f043406153 100644 --- a/network/transport/v2/handlers.go +++ b/network/transport/v2/handlers.go @@ -241,7 +241,14 @@ func (p *protocol) handleTransactionRangeQuery(ctx context.Context, connection g return errors.New("invalid range query") } - txs, err := p.state.FindBetweenLC(ctx, msg.Start, msg.End) + // limit to two pages to reduce load + limit := msg.Start + 2*dag.PageSize + end := msg.End + if end > limit { + end = limit + } + + txs, err := p.state.FindBetweenLC(ctx, msg.Start, end) if err != nil { return err } @@ -514,7 +521,8 @@ func (p *protocol) handleTransactionSet(_ context.Context, connection grpc.Conne return p.sender.sendTransactionRangeQuery(connection, pageClockStart(reqPageNum+1), pageClockStart(reqPageNum+2)) } // TODO: Distribute synchronization of new nodes over multiple peers. - return p.sender.sendTransactionRangeQuery(connection, pageClockStart(reqPageNum+1), dag.MaxLamportClock) + // Currently locked at 2 pages (~1000TX) per peer to prevent overloading the peer. + return p.sender.sendTransactionRangeQuery(connection, pageClockStart(reqPageNum+1), pageClockStart(reqPageNum+3)) } // peer is behind diff --git a/network/transport/v2/handlers_test.go b/network/transport/v2/handlers_test.go index 41a5408ee3..8f34a1e5f8 100644 --- a/network/transport/v2/handlers_test.go +++ b/network/transport/v2/handlers_test.go @@ -510,6 +510,23 @@ func TestProtocol_handleTransactionRangeQuery(t *testing.T) { assert.NoError(t, err) }) + t.Run("ok - too many requested", func(t *testing.T) { + p, mocks := newTestProtocol(t, nil) + + mocks.State.EXPECT().FindBetweenLC(gomock.Any(), lcStart, 2*dag.PageSize+1).Return([]dag.Transaction{tx1, tx2}, nil) + mocks.State.EXPECT().ReadPayload(gomock.Any(), tx1.PayloadHash()).Return(payload, nil) + mocks.State.EXPECT().ReadPayload(gomock.Any(), tx2.PayloadHash()).Return(payload, nil) + mocks.Sender.EXPECT().sendTransactionList(connection, gomock.Any(), gomock.Any()) + + msg := &Envelope_TransactionRangeQuery{&TransactionRangeQuery{ + Start: lcStart, + End: dag.MaxLamportClock, + }} + p.cMan.startConversation(msg, peer) + err := p.handleTransactionRangeQuery(ctx, connection, &Envelope{Message: msg}) + + assert.NoError(t, err) + }) t.Run("context cancelled", func(t *testing.T) { p, mocks := newTestProtocol(t, nil) @@ -650,7 +667,7 @@ func TestProtocol_handleTransactionSet(t *testing.T) { conversation := p.cMan.startConversation(request, peer) mocks.State.EXPECT().IBLT(requestLC).Return(*oneHashIblt.Clone().(*tree.Iblt), dag.PageSize-1) mocks.State.EXPECT().XOR(uint32(dag.MaxLamportClock)).Return(hash.FromSlice([]byte("ignored")), requestLC) - mocks.Sender.EXPECT().sendTransactionRangeQuery(connection, dag.PageSize, uint32(dag.MaxLamportClock)).Return(nil) + mocks.Sender.EXPECT().sendTransactionRangeQuery(connection, dag.PageSize, 3*dag.PageSize).Return(nil) err := p.handleTransactionSet(ctx, connection, &Envelope{Message: &Envelope_TransactionSet{ TransactionSet: &TransactionSet{ConversationID: conversation.conversationID.slice(), LCReq: requestLC, LC: peerLC, IBLT: oneHashIbltBytes},