Skip to content

Commit

Permalink
Merge pull request #857 from iotaledger/fix/retained-tx-first
Browse files Browse the repository at this point in the history
Hook to OnTransactionAttached in submitBlock
  • Loading branch information
alexsporn committed Mar 22, 2024
2 parents dc5b73c + c307b3c commit 4499ef0
Show file tree
Hide file tree
Showing 12 changed files with 114 additions and 48 deletions.
2 changes: 1 addition & 1 deletion components/debugapi/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func configure() error {

debugAPIWorkerPool := workerpool.NewGroup("DebugAPI").CreatePool("DebugAPI", workerpool.WithWorkerCount(1))

deps.Protocol.Events.Engine.Retainer.BlockRetained.Hook(func(block *blocks.Block) {
deps.Protocol.Events.Engine.BlockRetainer.BlockRetained.Hook(func(block *blocks.Block) {
blocksPerSlot.Set(block.ID().Slot(), append(lo.Return1(blocksPerSlot.GetOrCreate(block.ID().Slot(), func() []*blocks.Block {
return make([]*blocks.Block, 0)
})), block))
Expand Down
2 changes: 1 addition & 1 deletion components/inx/server_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (s *Server) attachBlock(ctx context.Context, block *iotago.Block) (*inx.Blo
mergedCtx, mergedCtxCancel := contextutils.MergeContexts(ctx, Component.Daemon().ContextStopped())
defer mergedCtxCancel()

blockID, err := deps.RequestHandler.SubmitBlockAndAwaitBooking(mergedCtx, block)
blockID, err := deps.RequestHandler.SubmitBlockAndAwaitRetainer(mergedCtx, block)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to attach block: %s", err.Error())
}
Expand Down
2 changes: 1 addition & 1 deletion components/restapi/core/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func sendBlock(c echo.Context) (*api.BlockCreatedResponse, error) {
return nil, err
}

blockID, err := deps.RequestHandler.SubmitBlockAndAwaitBooking(c.Request().Context(), iotaBlock)
blockID, err := deps.RequestHandler.SubmitBlockAndAwaitRetainer(c.Request().Context(), iotaBlock)
if err != nil {
return nil, ierrors.WithMessagef(echo.ErrInternalServerError, "failed to attach block: %w", err)
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/protocol/chains.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,14 @@ func attachEngineLogs(instance *engine.Engine) func() {
instance.LogTrace("BlockProcessed", "block", blockID)
}).Unhook,

events.Retainer.BlockRetained.Hook(func(block *blocks.Block) {
events.BlockRetainer.BlockRetained.Hook(func(block *blocks.Block) {
instance.LogTrace("Retainer.BlockRetained", "block", block.ID())
}).Unhook,

events.TransactionRetainer.TransactionRetained.Hook(func(txID iotago.TransactionID) {
instance.LogTrace("Retainer.TransactionRetained", "transaction", txID)
}).Unhook,

events.Notarization.SlotCommitted.Hook(func(details *notarization.SlotCommittedDetails) {
instance.LogTrace("NotarizationManager.SlotCommitted", "commitment", details.Commitment.ID(), "acceptedBlocks count", details.AcceptedBlocks.Size(), "accepted transactions", len(details.Mutations))
}).Unhook,
Expand Down
38 changes: 20 additions & 18 deletions pkg/protocol/engine/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,24 @@ type Events struct {
AcceptedBlockProcessed *event.Event1[*blocks.Block]
Evict *event.Event1[iotago.SlotIndex]

PreSolidFilter *presolidfilter.Events
PostSolidFilter *postsolidfilter.Events
BlockRequester *eventticker.Events[iotago.SlotIndex, iotago.BlockID]
TipManager *tipmanager.Events
BlockDAG *blockdag.Events
Booker *booker.Events
Clock *clock.Events
BlockGadget *blockgadget.Events
SlotGadget *slotgadget.Events
SybilProtection *sybilprotection.Events
Ledger *ledger.Events
Notarization *notarization.Events
SpendDAG *spenddag.Events[iotago.TransactionID, mempool.StateID]
Scheduler *scheduler.Events
SeatManager *seatmanager.Events
SyncManager *syncmanager.Events
Retainer *retainer.Events
PreSolidFilter *presolidfilter.Events
PostSolidFilter *postsolidfilter.Events
BlockRequester *eventticker.Events[iotago.SlotIndex, iotago.BlockID]
TipManager *tipmanager.Events
BlockDAG *blockdag.Events
Booker *booker.Events
Clock *clock.Events
BlockGadget *blockgadget.Events
SlotGadget *slotgadget.Events
SybilProtection *sybilprotection.Events
Ledger *ledger.Events
Notarization *notarization.Events
SpendDAG *spenddag.Events[iotago.TransactionID, mempool.StateID]
Scheduler *scheduler.Events
SeatManager *seatmanager.Events
SyncManager *syncmanager.Events
BlockRetainer *retainer.BlockRetainerEvents
TransactionRetainer *retainer.TransactionRetainerEvents
event.Group[Events, *Events]
}

Expand All @@ -71,6 +72,7 @@ var NewEvents = event.CreateGroupConstructor(func() (newEvents *Events) {
Scheduler: scheduler.NewEvents(),
SeatManager: seatmanager.NewEvents(),
SyncManager: syncmanager.NewEvents(),
Retainer: retainer.NewEvents(),
BlockRetainer: retainer.NewBlockRetainerEvents(),
TransactionRetainer: retainer.NewTransactionRetainerEvents(),
}
})
55 changes: 41 additions & 14 deletions pkg/requesthandler/blockissuance.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/iotaledger/iota-core/pkg/protocol/engine/blocks"
"github.com/iotaledger/iota-core/pkg/protocol/engine/filter/postsolidfilter"
"github.com/iotaledger/iota-core/pkg/protocol/engine/filter/presolidfilter"
"github.com/iotaledger/iota-core/pkg/retainer/txretainer"
iotago "github.com/iotaledger/iota.go/v4"
)

Expand All @@ -24,27 +25,53 @@ func (r *RequestHandler) SubmitBlockWithoutAwaitingBooking(block *model.Block) e
return r.submitBlock(block)
}

// submitBlockAndAwaitEvent submits a block to be processed and waits for the event to be triggered.
func (r *RequestHandler) submitBlockAndAwaitEvent(ctx context.Context, block *model.Block, evt *event.Event1[*blocks.Block]) error {
// submitBlockAndAwaitRetainer submits a block to be processed and waits for the block gets retained.
func (r *RequestHandler) submitBlockAndAwaitRetainer(ctx context.Context, block *model.Block) error {
filtered := make(chan error, 1)
exit := make(chan struct{})
defer close(exit)

// Make sure we don't wait forever here. If the block is not dispatched to the main engine,
// it will never trigger one of the below events.
processingCtx, processingCtxCancel := context.WithTimeout(ctx, 5*time.Second)
processingCtx, processingCtxCancel := context.WithTimeout(ctx, 10*time.Second)
defer processingCtxCancel()
// Calculate the blockID so that we don't capture the block pointer in the event handlers.
blockID := block.ID()
evtUnhook := evt.Hook(func(eventBlock *blocks.Block) {
if blockID != eventBlock.ID() {
return
}
select {
case filtered <- nil:
case <-exit:

var successUnhook func()
// Hook to TransactionAttached event if the block contains a transaction.
signedTx, isTx := block.SignedTransaction()
if isTx {
txID := signedTx.Transaction.MustID()
// Check if the transaction is already retained. The onTransactionAttached event is only triggered if it's a new transaction.
// If the transaction is already retained, we hook to the BlockRetained event.
_, err := r.protocol.Engines.Main.Get().TxRetainer.TransactionMetadata(txID)
if ierrors.Is(err, txretainer.ErrEntryNotFound) {
successUnhook = r.protocol.Events.Engine.TransactionRetainer.TransactionRetained.Hook(func(transactionID iotago.TransactionID) {
if transactionID != txID {
return
}
select {
case filtered <- nil:
case <-exit:
}
}, event.WithWorkerPool(r.workerPool)).Unhook
}
}, event.WithWorkerPool(r.workerPool)).Unhook
}

// if no hook was set, hook to the block retained event.
if successUnhook == nil {
successUnhook = r.protocol.Events.Engine.BlockRetainer.BlockRetained.Hook(func(eventBlock *blocks.Block) {
if blockID != eventBlock.ID() {
return
}
select {
case filtered <- nil:
case <-exit:
}
}, event.WithWorkerPool(r.workerPool)).Unhook
}

prefilteredUnhook := r.protocol.Events.Engine.PreSolidFilter.BlockPreFiltered.Hook(func(event *presolidfilter.BlockPreFilteredEvent) {
if blockID != event.Block.ID() {
return
Expand All @@ -65,7 +92,7 @@ func (r *RequestHandler) submitBlockAndAwaitEvent(ctx context.Context, block *mo
}
}, event.WithWorkerPool(r.workerPool)).Unhook

defer lo.BatchReverse(evtUnhook, prefilteredUnhook, postfilteredUnhook)()
defer lo.BatchReverse(successUnhook, prefilteredUnhook, postfilteredUnhook)()

if err := r.submitBlock(block); err != nil {
return ierrors.Wrapf(err, "failed to issue block %s", blockID)
Expand All @@ -82,13 +109,13 @@ func (r *RequestHandler) submitBlockAndAwaitEvent(ctx context.Context, block *mo
}
}

func (r *RequestHandler) SubmitBlockAndAwaitBooking(ctx context.Context, iotaBlock *iotago.Block) (iotago.BlockID, error) {
func (r *RequestHandler) SubmitBlockAndAwaitRetainer(ctx context.Context, iotaBlock *iotago.Block) (iotago.BlockID, error) {
modelBlock, err := model.BlockFromBlock(iotaBlock)
if err != nil {
return iotago.EmptyBlockID, ierrors.Wrap(err, "error serializing block to model block")
}

if err = r.submitBlockAndAwaitEvent(ctx, modelBlock, r.protocol.Events.Engine.Retainer.BlockRetained); err != nil {
if err = r.submitBlockAndAwaitRetainer(ctx, modelBlock); err != nil {
return iotago.EmptyBlockID, ierrors.Wrap(err, "error issuing model block")
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/retainer/blockretainer/block_retainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type (
)

type BlockRetainer struct {
events *retainer.Events
events *retainer.BlockRetainerEvents
store StoreFunc
cache *cache

Expand All @@ -39,7 +39,7 @@ type BlockRetainer struct {
func New(module module.Module, workersGroup *workerpool.Group, retainerStoreFunc StoreFunc, finalizedSlotFunc FinalizedSlotFunc, errorHandler func(error)) *BlockRetainer {
b := &BlockRetainer{
Module: module,
events: retainer.NewEvents(),
events: retainer.NewBlockRetainerEvents(),
workerPool: workersGroup.CreatePool("Retainer", workerpool.WithWorkerCount(1)),
store: retainerStoreFunc,
cache: newCache(),
Expand Down Expand Up @@ -99,7 +99,7 @@ func NewProvider() module.Provider[*engine.Engine, retainer.BlockRetainer] {
}
}, asyncOpt)

e.Events.Retainer.BlockRetained.LinkTo(r.events.BlockRetained)
e.Events.BlockRetainer.BlockRetained.LinkTo(r.events.BlockRetained)

r.InitializedEvent().Trigger()

Expand Down
28 changes: 22 additions & 6 deletions pkg/retainer/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,35 @@ package retainer
import (
"github.com/iotaledger/hive.go/runtime/event"
"github.com/iotaledger/iota-core/pkg/protocol/engine/blocks"
iotago "github.com/iotaledger/iota.go/v4"
)

// Events is a collection of Retainer related Events.
type Events struct {
// BlockRetainerEvents is a collection of Retainer related BlockRetainerEvents.
type BlockRetainerEvents struct {
// BlockRetained is triggered when a block is stored in the retainer.
BlockRetained *event.Event1[*blocks.Block]

event.Group[Events, *Events]
event.Group[BlockRetainerEvents, *BlockRetainerEvents]
}

// NewEvents contains the constructor of the Events object (it is generated by a generic factory).
var NewEvents = event.CreateGroupConstructor(func() (newEvents *Events) {
return &Events{
// NewBlockRetainerEvents contains the constructor of the Events object (it is generated by a generic factory).
var NewBlockRetainerEvents = event.CreateGroupConstructor(func() (newEvents *BlockRetainerEvents) {
return &BlockRetainerEvents{
BlockRetained: event.New1[*blocks.Block](),
}
})

// TransactionRetainerEvents is a collection of Retainer related TransactionRetainerEvents.
type TransactionRetainerEvents struct {
// TransactionRetained is triggered when a transaction is stored in the retainer.
TransactionRetained *event.Event1[iotago.TransactionID]

event.Group[TransactionRetainerEvents, *TransactionRetainerEvents]
}

// NewTransactionRetainerEvents contains the constructor of the Events object (it is generated by a generic factory).
var NewTransactionRetainerEvents = event.CreateGroupConstructor(func() (newEvents *TransactionRetainerEvents) {
return &TransactionRetainerEvents{
TransactionRetained: event.New1[iotago.TransactionID](),
}
})
8 changes: 8 additions & 0 deletions pkg/retainer/txretainer/tx_retainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ type (

// TransactionRetainer keeps and resolves all the transaction-related metadata needed in the API and INX.
type TransactionRetainer struct {
events *retainer.TransactionRetainerEvents
workerPool *workerpool.WorkerPool
txRetainerDatabase *transactionRetainerDatabase
latestCommittedSlotFunc SlotFunc
Expand All @@ -113,6 +114,7 @@ func WithDebugStoreErrorMessages(store bool) options.Option[TransactionRetainer]
func New(parentModule module.Module, workersGroup *workerpool.Group, dbExecFunc storage.SQLDatabaseExecFunc, latestCommittedSlotFunc SlotFunc, finalizedSlotFunc SlotFunc, errorHandler func(error), opts ...options.Option[TransactionRetainer]) *TransactionRetainer {
return module.InitSimpleLifecycle(options.Apply(&TransactionRetainer{
Module: parentModule.NewSubModule("TransactionRetainer"),
events: retainer.NewTransactionRetainerEvents(),
workerPool: workersGroup.CreatePool("TxRetainer", workerpool.WithWorkerCount(1)),
txRetainerCache: NewTransactionRetainerCache(),
txRetainerDatabase: NewTransactionRetainerDB(dbExecFunc),
Expand Down Expand Up @@ -158,6 +160,8 @@ func NewProvider(opts ...options.Option[TransactionRetainer]) module.Provider[*e
// therefore we use false for the "validSignature" argument.
r.UpdateTransactionMetadata(transactionMetadata.ID(), false, transactionMetadata.EarliestIncludedAttachment().Slot(), api.TransactionStatePending, nil)

r.events.TransactionRetained.Trigger(transactionMetadata.ID())

// the transaction was accepted
transactionMetadata.OnAccepted(func() {
e.LogTrace("TxRetainer.TransactionAccepted", "tx", transactionMetadata.ID())
Expand Down Expand Up @@ -242,6 +246,10 @@ func NewProvider(opts ...options.Option[TransactionRetainer]) module.Provider[*e
}, asyncOpt)
})

e.Events.TransactionRetainer.TransactionRetained.LinkTo(r.events.TransactionRetained)

r.InitializedEvent().Trigger()

return r
})
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/tests/reward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,10 +373,13 @@ func Test_Account_StakeAmountCalculation(t *testing.T) {
)
block7 := lo.PanicOnErr(ts.IssueBasicBlockWithOptions("block7", ts.DefaultWallet(), tx7, mock.WithStrongParents(latestParents...)))

latestParents = ts.CommitUntilSlot(block7_8Slot, block7.ID())

tx8 := ts.DefaultWallet().ClaimDelegatorRewards("TX8", "TX7:0")
block8 := lo.PanicOnErr(ts.IssueBasicBlockWithOptions("block8", ts.DefaultWallet(), tx8, mock.WithStrongParents(block7.ID())))
block8 := lo.PanicOnErr(ts.IssueBasicBlockWithOptions("block8", ts.DefaultWallet(), tx8, mock.WithStrongParents(latestParents...)))

latestParents = ts.CommitUntilSlot(block7_8Slot, block8.ID())
block8Slot := ts.CurrentSlot()
latestParents = ts.CommitUntilSlot(block8Slot, block8.ID())

// Delegated Stake should be unaffected since no new delegation was effectively added in that slot.
ts.AssertAccountStake(accountID, stakedAmount, deleg1+deleg2, ts.Nodes()...)
Expand Down
2 changes: 1 addition & 1 deletion pkg/testsuite/mock/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func (c *TestSuiteClient) Routes(_ context.Context) (*api.RoutesResponse, error)
}

func (c *TestSuiteClient) SubmitBlock(ctx context.Context, block *iotago.Block) (iotago.BlockID, error) {
return c.Node.RequestHandler.SubmitBlockAndAwaitBooking(ctx, block)
return c.Node.RequestHandler.SubmitBlockAndAwaitRetainer(ctx, block)
}

func (c *TestSuiteClient) TransactionIncludedBlock(_ context.Context, txID iotago.TransactionID) (*iotago.Block, error) {
Expand Down
6 changes: 6 additions & 0 deletions tools/docker-network/tests/eventapiframework.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,13 @@ func (e *EventAPIDockerTestFramework) AssertTransactionMetadataByTransactionID(c
select {
case metadata := <-acceptedChan:
if txID.Compare(metadata.TransactionID) == 0 {
// make sure the transaction state is available from the core API
resp, err := eventClt.Client.TransactionMetadata(ctx, txID)
require.NoError(e.Testing, err)
require.NotNil(e.Testing, resp)

e.finishChan <- struct{}{}

return
}

Expand Down

0 comments on commit 4499ef0

Please sign in to comment.