From 6bd88d55436894ffa2ab3d7b04484089c6ff80d4 Mon Sep 17 00:00:00 2001 From: Wout Slakhorst Date: Tue, 4 Jul 2023 16:06:11 +0200 Subject: [PATCH 1/3] add xor tree repair loop --- network/dag/consistency.go | 161 ++++++++++++++++++ network/dag/consistency_test.go | 146 ++++++++++++++++ network/dag/interface.go | 5 + network/dag/mock.go | 24 +++ network/dag/state.go | 16 ++ network/dag/tree/tree.go | 60 +++++++ network/dag/tree/tree_test.go | 127 ++++++++++++++ network/transport/v2/handlers.go | 12 ++ network/transport/v2/handlers_test.go | 2 + .../transport/v2/protocol_integration_test.go | 1 + network/transport/v2/protocol_test.go | 3 + 11 files changed, 557 insertions(+) create mode 100644 network/dag/consistency.go create mode 100644 network/dag/consistency_test.go diff --git a/network/dag/consistency.go b/network/dag/consistency.go new file mode 100644 index 0000000000..0d010b379f --- /dev/null +++ b/network/dag/consistency.go @@ -0,0 +1,161 @@ +/* + * Copyright (C) 2023 Nuts community + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + */ + +package dag + +import ( + "context" + "github.com/nuts-foundation/go-stoabs" + "github.com/nuts-foundation/nuts-node/network/dag/tree" + "github.com/nuts-foundation/nuts-node/network/log" + "sync" + "time" +) + +type circuit int + +const ( + circuitGreen circuit = iota + circuitYellow + circuitRed +) + +// xorTreeRepair is responsible for repairing the XOR tree. Its loop is triggered when the network layer detects differences in XOR values with all other nodes. +// It will loop over all pages of the XOR tree and recalculates the XOR value with the transactions in the database. +// This repair is needed because current networks have nodes that have a wrong XOR value. How this happens is not yet known, it could be due to DB failures of due to failures in older versions. +// The fact is that we can fix the state relatively easy. +// The loop checks a page (512 LC values) per 10 seconds and continues looping until the network layer signals all is ok again. +type xorTreeRepair struct { + ctx context.Context + cancel context.CancelFunc + ticker *time.Ticker + currentPage uint32 + state *state + circuitState circuit + mutex sync.Mutex +} + +func newXorTreeRepair(state *state) *xorTreeRepair { + return &xorTreeRepair{ + state: state, + ticker: time.NewTicker(10 * time.Second), + } +} + +func (f *xorTreeRepair) start() { + f.ctx, f.cancel = context.WithCancel(context.Background()) + go f.loop() +} + +func (f *xorTreeRepair) shutdown() { + if f.cancel != nil { + f.cancel() + } +} + +func (f *xorTreeRepair) loop() { + for { + select { + case <-f.ctx.Done(): + return + case <-f.ticker.C: + f.checkPage() + } + } +} + +func (f *xorTreeRepair) checkPage() { + f.mutex.Lock() + defer f.mutex.Unlock() + + // ignore run if circuit is not red + if f.circuitState < circuitRed { + return + } + + currentLC := f.state.lamportClockHigh.Load() + lcStart := f.currentPage * PageSize + lcEnd := lcStart + PageSize + + // initialize an XOR tree + calculatedXorTree := tree.New(tree.NewXor(), PageSize) + + // acquire global lock + err := f.state.graph.db.Write(context.Background(), func(txn stoabs.WriteTx) error { + txs, err := f.state.graph.findBetweenLC(txn, lcStart, lcEnd) + if err != nil { + return err + } + for _, tx := range txs { + calculatedXorTree.Insert(tx.Ref(), tx.Clock()) + } + + // Get XOR leaf from current XOR tree + xorTillEnd, _ := f.state.xorTree.getZeroTo(lcEnd - 1) + if lcStart != 0 { + xorTillStart, _ := f.state.xorTree.getZeroTo(lcStart - 1) + _ = xorTillEnd.Subtract(xorTillStart) + } + + // Subtract the calculated tree, should be empty if the trees are equal + _ = xorTillEnd.Subtract(calculatedXorTree.Root()) + if !xorTillEnd.Empty() { + // it's not empty, so replace the leaf in the current XOR tree with the calculated one + err = f.state.xorTree.tree.Replace(lcStart, calculatedXorTree.Root()) + if err != nil { + return err + } + log.Logger().Warnf("detected XOR tree mismatch for page %d, fixed using recalculated values", f.currentPage) + } + + // Now we do the same for the IBLT tree as stated in + // https://github.com/nuts-foundation/nuts-node/issues/2295 + // we skip the iblt tree for now, since the chance for it to become corrupt is incredibly low. + // there can only be a problem with duplicate entries, not with missing entries. + // the xor tree already has an effect when it's missing entries. + // fixing the iblt tree is a copy of the code above (but with ibltTree instead of xorTree). + + return nil + }) + if err != nil { + log.Logger().Warnf("failed to run xorTreeRepair check: %s", err) + } + + if lcEnd > currentLC { + // start over when end is reached for next run + f.currentPage = 0 + } else { + // increment page so on the next round we check a different page. + f.currentPage++ + } +} + +func (f *xorTreeRepair) stateOK() { + f.mutex.Lock() + defer f.mutex.Unlock() + + f.circuitState = circuitGreen + f.currentPage = 0 +} + +func (f *xorTreeRepair) incrementCount() { + f.mutex.Lock() + defer f.mutex.Unlock() + + f.circuitState++ +} diff --git a/network/dag/consistency_test.go b/network/dag/consistency_test.go new file mode 100644 index 0000000000..eee3acbed8 --- /dev/null +++ b/network/dag/consistency_test.go @@ -0,0 +1,146 @@ +/* + * Copyright (C) 2023 Nuts community + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + */ + +package dag + +import ( + "context" + "encoding/binary" + "github.com/magiconair/properties/assert" + "github.com/nuts-foundation/go-stoabs" + "github.com/nuts-foundation/go-stoabs/bbolt" + "github.com/nuts-foundation/nuts-node/crypto/hash" + "github.com/nuts-foundation/nuts-node/network/dag/tree" + "github.com/nuts-foundation/nuts-node/test" + "github.com/nuts-foundation/nuts-node/test/io" + "github.com/stretchr/testify/require" + "go.uber.org/goleak" + "path/filepath" + "testing" + "time" +) + +func TestXorTreeRepair(t *testing.T) { + t.Cleanup(func() { + goleak.VerifyNone(t) + }) + + tx, _, _ := CreateTestTransaction(1) + t.Run("xor tree repaired after 2 signals", func(t *testing.T) { + txState := createXorTreeRepairState(t, tx) + require.NoError(t, txState.Start()) + txState.xorTree = newTreeStore(xorShelf, tree.New(tree.NewXor(), PageSize)) + + // twice to set circuit to red + txState.IncorrectStateDetected() + txState.IncorrectStateDetected() + + // await for XOR to change + test.WaitFor(t, func() (bool, error) { + xorRoot := txState.xorTree.tree.Root() + hashRoot := xorRoot.(*tree.Xor).Hash() + return hashRoot.Equals(tx.Ref()), nil + }, time.Second, "xorTree not updated within wait period") + }) + t.Run("checkPage executed after 2 signals", func(t *testing.T) { + txState := createXorTreeRepairState(t, tx) + txState.xorTree = newTreeStore(xorShelf, tree.New(tree.NewXor(), PageSize)) + + // twice to set circuit to red + txState.IncorrectStateDetected() + txState.IncorrectStateDetected() + txState.xorTreeRepair.checkPage() + + assert.Equal(t, tx.Ref(), xorRootDate(txState)) + }) + t.Run("checkPage not executed after 1 signal", func(t *testing.T) { + txState := createXorTreeRepairState(t, tx) + txState.xorTree = newTreeStore(xorShelf, tree.New(tree.NewXor(), PageSize)) + + // twice to set circuit to yellow + txState.IncorrectStateDetected() + txState.xorTreeRepair.checkPage() + + assert.Equal(t, hash.EmptyHash(), xorRootDate(txState)) + }) + t.Run("checkPage not executed after okState", func(t *testing.T) { + txState := createXorTreeRepairState(t, tx) + txState.xorTree = newTreeStore(xorShelf, tree.New(tree.NewXor(), PageSize)) + + // twice to set circuit to red + txState.IncorrectStateDetected() + txState.IncorrectStateDetected() + // back to green + txState.CorrectStateDetected() + txState.xorTreeRepair.checkPage() + + assert.Equal(t, hash.EmptyHash(), xorRootDate(txState)) + }) + t.Run("checkPage executed for multiple pages", func(t *testing.T) { + txState := createXorTreeRepairState(t, tx) + prev := tx + expectedHash := tx.Ref() + for i := uint32(1); i < 600; i++ { + tx2, _, _ := CreateTestTransaction(i, prev) + payload := make([]byte, 4) + binary.BigEndian.PutUint32(payload, i) + _ = txState.Add(context.Background(), tx2, payload) + prev = tx2 + expectedHash = expectedHash.Xor(tx2.Ref()) + } + require.NoError(t, txState.Start()) + txState.xorTree = newTreeStore(xorShelf, tree.New(tree.NewXor(), PageSize)) + + // twice to set circuit to red + txState.IncorrectStateDetected() + txState.IncorrectStateDetected() + + // await for XOR to change + test.WaitFor(t, func() (bool, error) { + xorRoot := txState.xorTree.tree.Root() + hashRoot := xorRoot.(*tree.Xor).Hash() + return hashRoot.Equals(expectedHash), nil + }, 5*time.Second, "xorTree not updated within wait period") + }) +} + +func xorRootDate(s *state) hash.SHA256Hash { + return s.xorTree.tree.Root().(*tree.Xor).Hash() +} + +func createXorTreeRepairState(t testing.TB, tx Transaction) *state { + txState := createStoppedState(t) + txState.xorTreeRepair.ticker = time.NewTicker(5 * time.Millisecond) + payload := []byte{0, 0, 0, 1} + txState.Add(context.Background(), tx, payload) + return txState +} + +func createStoppedState(t testing.TB) *state { + testDir := io.TestDirectory(t) + bboltStore, err := bbolt.CreateBBoltStore(filepath.Join(testDir, "test_state"), stoabs.WithNoSync()) + if err != nil { + t.Fatal("failed to create store: ", err) + } + s, err := NewState(bboltStore) + require.NoError(t, err) + t.Cleanup(func() { + s.Shutdown() + }) + return s.(*state) +} diff --git a/network/dag/interface.go b/network/dag/interface.go index bb1d798039..6f70d8c349 100644 --- a/network/dag/interface.go +++ b/network/dag/interface.go @@ -93,6 +93,11 @@ type State interface { // - highest lamport clock in the DAG // A requested clock of math.MaxUint32 will return the iblt of the entire DAG IBLT(reqClock uint32) (tree.Iblt, uint32) + + // IncorrectStateDetected is called when the xor and LC value from a gossip message do NOT match the local state. + IncorrectStateDetected() + // CorrectStateDetected is called when the xor and LC value from a gossip message match the local state. + CorrectStateDetected() } // Statistics holds data about the current state of the DAG. diff --git a/network/dag/mock.go b/network/dag/mock.go index 71470bfb0b..3f8cd8ddfb 100644 --- a/network/dag/mock.go +++ b/network/dag/mock.go @@ -52,6 +52,18 @@ func (mr *MockStateMockRecorder) Add(ctx, transactions, payload interface{}) *go return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Add", reflect.TypeOf((*MockState)(nil).Add), ctx, transactions, payload) } +// CorrectStateDetected mocks base method. +func (m *MockState) CorrectStateDetected() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "CorrectStateDetected") +} + +// CorrectStateDetected indicates an expected call of CorrectStateDetected. +func (mr *MockStateMockRecorder) CorrectStateDetected() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CorrectStateDetected", reflect.TypeOf((*MockState)(nil).CorrectStateDetected)) +} + // Diagnostics mocks base method. func (m *MockState) Diagnostics() []core.DiagnosticResult { m.ctrl.T.Helper() @@ -126,6 +138,18 @@ func (mr *MockStateMockRecorder) IBLT(reqClock interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IBLT", reflect.TypeOf((*MockState)(nil).IBLT), reqClock) } +// IncorrectStateDetected mocks base method. +func (m *MockState) IncorrectStateDetected() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "IncorrectStateDetected") +} + +// IncorrectStateDetected indicates an expected call of IncorrectStateDetected. +func (mr *MockStateMockRecorder) IncorrectStateDetected() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IncorrectStateDetected", reflect.TypeOf((*MockState)(nil).IncorrectStateDetected)) +} + // IsPayloadPresent mocks base method. func (m *MockState) IsPayloadPresent(ctx context.Context, payloadHash hash.SHA256Hash) (bool, error) { m.ctrl.T.Helper() diff --git a/network/dag/state.go b/network/dag/state.go index 7ee1f10a09..12d6b53f8c 100644 --- a/network/dag/state.go +++ b/network/dag/state.go @@ -56,6 +56,7 @@ type state struct { transactionCount prometheus.Counter eventsNotifyCount prometheus.Counter eventsFinishedCount prometheus.Counter + xorTreeRepair *xorTreeRepair } func (s *state) Migrate() error { @@ -80,6 +81,8 @@ func NewState(db stoabs.KVStore, verifiers ...Verifier) (State, error) { return nil, err } + newState.xorTreeRepair = newXorTreeRepair(newState) + return newState, nil } @@ -384,10 +387,20 @@ func (s *state) IBLT(reqClock uint32) (tree.Iblt, uint32) { return *data.(*tree.Iblt), dataClock } +func (s *state) IncorrectStateDetected() { + s.xorTreeRepair.incrementCount() +} +func (s *state) CorrectStateDetected() { + s.xorTreeRepair.stateOK() +} + func (s *state) Shutdown() error { if s.transactionCount != nil { prometheus.Unregister(s.transactionCount) } + if s.xorTreeRepair != nil { + s.xorTreeRepair.shutdown() + } return nil } @@ -408,6 +421,9 @@ func (s *state) Start() error { err = value.(Notifier).Run() return err == nil }) + + // start xorTreeRepair that waits until the state has triggered it to start via IncorrectStateDetected() + s.xorTreeRepair.start() return err } diff --git a/network/dag/tree/tree.go b/network/dag/tree/tree.go index 392fb3b011..010b80407f 100644 --- a/network/dag/tree/tree.go +++ b/network/dag/tree/tree.go @@ -20,6 +20,7 @@ package tree import ( "encoding" + "errors" "sort" "github.com/nuts-foundation/nuts-node/crypto/hash" @@ -70,6 +71,9 @@ type Tree interface { // Load builds a tree from binary leaf data. The keys in leaves correspond to a node's split value. // All consecutive leaves must be present. Gaps must be filled with zero value of the corresponding Data implementation. Load(leaves map[uint32][]byte) error + // Replace replaces the Data at the leaf starting with the given clock value with the given Data. + // It adds the leaf to the dirtyLeaves map. + Replace(clock uint32, data Data) error } /* @@ -191,6 +195,28 @@ func (t *tree) Delete(ref hash.SHA256Hash, clock uint32) { }) } +func (t *tree) Replace(clock uint32, data Data) error { + var current *node + next := t.root + for next != nil { + current = next + if current.isLeaf() { + if clock >= current.limitLC { + t.reRoot() + next = t.root + continue + } + current.data = data + t.dirtyLeaves[current.splitLC] = current + + // recalculate all upper leaves + return t.rebuild() + } + next = t.getNextNode(current, clock) + } + return errors.New("unknown leaf") +} + // updateOrCreatePath calls fn on all nodes on the path from the root to leaf containing the clock value. // If the path/leaf does not exist it will be created. // The leaf is marked dirty. @@ -230,6 +256,15 @@ func (t *tree) reRoot() { t.treeSize *= 2 } +func (t *tree) rebuild() error { + root, err := t.root.rebuild() + if err != nil { + return err + } + *t.root = root + return nil +} + // getNextNode retrieves the next node based on the clock value. If the node does not exist it is created. func (t *tree) getNextNode(n *node, clock uint32) *node { // return nil if n is a leaf @@ -252,6 +287,10 @@ func (t *tree) Root() Data { } func (t *tree) ZeroTo(clock uint32) (Data, uint32) { + if clock < 0 { + return t.prototype.New(), 0 + } + data := t.root.data.Clone() next := t.root for { @@ -382,3 +421,24 @@ func newNode(splitLC, limitLC uint32, data Data) *node { func (n node) isLeaf() bool { return n.left == nil } + +// reBuild rebuilds the node from left and right. +func (n node) rebuild() (node, error) { + var err error + if n.isLeaf() { + return n, nil + } + *n.left, err = n.left.rebuild() + if err != nil { + return n, err + } + n.data = n.left.data.Clone() + if n.right != nil { + *n.right, err = n.right.rebuild() + if err != nil { + return n, err + } + return n, n.data.Add(n.right.data) + } + return n, nil +} diff --git a/network/dag/tree/tree_test.go b/network/dag/tree/tree_test.go index 20bff9b049..4b9706ddc7 100644 --- a/network/dag/tree/tree_test.go +++ b/network/dag/tree/tree_test.go @@ -19,6 +19,7 @@ package tree import ( + "github.com/stretchr/testify/require" "testing" "github.com/stretchr/testify/assert" @@ -133,6 +134,132 @@ func TestTree_GetZeroTo(t *testing.T) { assert.Equal(t, testLeafSize*3-1, lcMax) } +func TestTree_Replace(t *testing.T) { + t.Run("replace a leaf for a single layer", func(t *testing.T) { + tr := newTestTree(NewXor(), 1) + refA := hash.FromSlice([]byte("A")) + refB := hash.FromSlice([]byte("B")) + xor := Xor(refB) + tr.Insert(refA, 0) + + err := tr.Replace(0, &xor) + + require.NoError(t, err) + assert.Equal(t, refB, tr.root.data.(*Xor).Hash()) + }) + + t.Run("replace a 'next' leaf, it should grow the tree", func(t *testing.T) { + tr := newTestTree(NewXor(), 1) + refA := hash.FromSlice([]byte("A")) + refB := hash.FromSlice([]byte("B")) + xor := Xor(refB) + tr.Insert(refA, 0) + + err := tr.Replace(1, &xor) + + require.NoError(t, err) + assert.Equal(t, refB.Xor(refA), tr.root.data.(*Xor).Hash()) + }) + + t.Run("replace a 'future' leaf, it should grow the tree", func(t *testing.T) { + tr := newTestTree(NewXor(), 1) + refA := hash.FromSlice([]byte("A")) + refB := hash.FromSlice([]byte("B")) + xor := Xor(refB) + tr.Insert(refA, 0) + + err := tr.Replace(10, &xor) + + require.NoError(t, err) + assert.Equal(t, refB.Xor(refA), tr.root.data.(*Xor).Hash()) + assert.Equal(t, refA, tr.root.left.data.(*Xor).Hash()) + assert.Equal(t, refB, tr.root.right.data.(*Xor).Hash()) + }) + + t.Run("replace a 'left' leaf", func(t *testing.T) { + tr := newTestTree(NewXor(), 1) + refA := hash.FromSlice([]byte("A")) + refB := hash.FromSlice([]byte("B")) + xor := Xor(refB) + tr.Insert(refA, 0) + tr.Insert(refB, 1) + + err := tr.Replace(0, &xor) + + require.NoError(t, err) + assert.Equal(t, refB.Xor(refB), tr.root.data.(*Xor).Hash()) + }) + + t.Run("replace a leaf for a multi layer tree", func(t *testing.T) { + tr := newTestTree(NewXor(), 1) + refA := hash.FromSlice([]byte("A")) + refB := hash.FromSlice([]byte("B")) + refC := hash.FromSlice([]byte("C")) + refD := hash.FromSlice([]byte("D")) + xor := Xor(refD) + tr.Insert(refA, 0) + tr.Insert(refB, 1) + tr.Insert(refC, 2) + expected := refB.Xor(refC, refD) + + err := tr.Replace(0, &xor) + + require.NoError(t, err) + assert.Equal(t, expected, tr.root.data.(*Xor).Hash()) + }) + + t.Run("replace a leaf for the test tree", func(t *testing.T) { + tr, td := filledTestTree(NewXor(), testLeafSize) + ref := hash.EmptyHash() + xor := Xor(ref) + // expected is c0 ^ c2 + c0Ref := td.c0.(*Xor).Hash() + c2Ref := td.c2.(*Xor).Hash() + expected := ref.Xor(c0Ref, c2Ref) + + // replace leaf that starts with clock = 4 which is the 2nd leaf in the 2nd layer + err := tr.Replace(testLeafSize, &xor) + + require.NoError(t, err) + assert.Equal(t, expected, tr.root.data.(*Xor).Hash()) + }) +} + +func TestTree_rebuild(t *testing.T) { + t.Run("empty tree", func(t *testing.T) { + data := NewXor() + tree := newTestTree(data, testLeafSize) + + err := tree.rebuild() + + assert.NoError(t, err) + }) + + t.Run("single leaf", func(t *testing.T) { + h := hash.RandomHash() + tree := newTestTree(NewXor(), testLeafSize) + tree.Insert(h, 0) + + err := tree.rebuild() + + require.NoError(t, err) + assert.Equal(t, h, tree.root.data.(*Xor).Hash()) + }) + + t.Run("multiple leaves", func(t *testing.T) { + tr, _ := filledTestTree(NewXor(), testLeafSize) + tr2, _ := filledTestTree(NewXor(), testLeafSize) + + err := tr.rebuild() + + require.NoError(t, err) + data, _ := tr2.ZeroTo(8) + data2, _ := tr2.ZeroTo(8) + _ = data2.Subtract(data) + assert.True(t, data2.Empty()) + }) +} + func TestTree_rightmostLeafClock(t *testing.T) { tr, _ := filledTestTree(NewXor(), testLeafSize) diff --git a/network/transport/v2/handlers.go b/network/transport/v2/handlers.go index c8d98df967..bcb11e6e27 100644 --- a/network/transport/v2/handlers.go +++ b/network/transport/v2/handlers.go @@ -264,6 +264,8 @@ func (p *protocol) handleGossip(ctx context.Context, connection grpc.Connection, xor, clock := p.state.XOR(dag.MaxLamportClock) peerXor := hash.FromSlice(msg.XOR) if xor.Equals(peerXor) { + // open circuit on fixer + return nil } @@ -300,6 +302,7 @@ func (p *protocol) handleGossip(ctx context.Context, connection grpc.Connection, // If the XORs are not equal and the peer is behind, still request the missing refs if there are any. tempXor := xor.Xor(refs...) if tempXor.Equals(peerXor) || (msg.LC < clock && len(refs) > 0) { + p.state.CorrectStateDetected() return p.sender.sendTransactionListQuery(connection, refs) } @@ -308,11 +311,20 @@ func (p *protocol) handleGossip(ctx context.Context, connection grpc.Connection, log.Logger(). WithFields(connection.Peer().ToFields()). Debug("XOR is different from peer but Gossip contained no new transactions") + + // if LCs are the same and XOR differs, something is probably broken in our node or the other node. + // If it's this node then all messages from all peers will trigger the incorrect state detection. + // This node will then start to loop over pages of tx until the state is fixed. + if msg.LC == clock { + p.state.IncorrectStateDetected() + } + } else { log.Logger(). WithFields(connection.Peer().ToFields()). Debug("XOR is different from peer and peer's clock is equal or higher") } + return p.sender.sendState(connection, xor, clock) } diff --git a/network/transport/v2/handlers_test.go b/network/transport/v2/handlers_test.go index f1c2b92518..da5de2e793 100644 --- a/network/transport/v2/handlers_test.go +++ b/network/transport/v2/handlers_test.go @@ -303,6 +303,7 @@ func TestProtocol_handleGossip(t *testing.T) { p, mocks := newTestProtocol(t, nil) mocks.State.EXPECT().XOR(uint32(dag.MaxLamportClock)).Return(xorLocal, clockLocal) + mocks.State.EXPECT().IncorrectStateDetected() mocks.Sender.EXPECT().sendState(connection, xorLocal, clockLocal).Return(nil) err := p.handleGossip(ctx, connection, envelope) @@ -316,6 +317,7 @@ func TestProtocol_handleGossip(t *testing.T) { p, mocks := newTestProtocol(t, nil) mocks.State.EXPECT().XOR(uint32(dag.MaxLamportClock)).Return(xorLocal, clockLocal) mocks.State.EXPECT().IsPresent(gomock.Any(), xorLocal).Return(true, nil) + mocks.State.EXPECT().IncorrectStateDetected() mocks.Gossip.EXPECT().GossipReceived(peer, xorLocal) mocks.Sender.EXPECT().sendState(connection, xorLocal, clockLocal).Return(nil) diff --git a/network/transport/v2/protocol_integration_test.go b/network/transport/v2/protocol_integration_test.go index 9e2f955a74..dfd90995bc 100644 --- a/network/transport/v2/protocol_integration_test.go +++ b/network/transport/v2/protocol_integration_test.go @@ -175,6 +175,7 @@ func startNode(t *testing.T, name string, configurers ...func(config *Config)) * } ctx.protocol.Start() t.Cleanup(func() { + _ = ctx.state.Shutdown() ctx.protocol.Stop() ctx.connectionManager.Stop() }) diff --git a/network/transport/v2/protocol_test.go b/network/transport/v2/protocol_test.go index 2aef4c74d3..4ff4395ab9 100644 --- a/network/transport/v2/protocol_test.go +++ b/network/transport/v2/protocol_test.go @@ -98,6 +98,9 @@ func newTestProtocol(t *testing.T, nodeDID *did.DID) (*protocol, protocolMocks) proto.sender = sender proto.listHandler = newTransactionListHandler(context.Background(), proto.handleTransactionList) + // called whenever XOR values match up + state.EXPECT().CorrectStateDetected().AnyTimes() + return proto, protocolMocks{ Controller: ctrl, State: state, From 2071d2cdcf3e1f4980d6d46c7ce8a4019692abd6 Mon Sep 17 00:00:00 2001 From: Wout Slakhorst Date: Tue, 4 Jul 2023 16:37:52 +0200 Subject: [PATCH 2/3] fix race during test --- network/dag/consistency.go | 25 ++++++++++++------------- network/dag/consistency_test.go | 6 ++++++ 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/network/dag/consistency.go b/network/dag/consistency.go index 0d010b379f..2cc4c7cc64 100644 --- a/network/dag/consistency.go +++ b/network/dag/consistency.go @@ -58,8 +58,18 @@ func newXorTreeRepair(state *state) *xorTreeRepair { } func (f *xorTreeRepair) start() { - f.ctx, f.cancel = context.WithCancel(context.Background()) - go f.loop() + var ctx context.Context + ctx, f.cancel = context.WithCancel(context.Background()) + go func() { + for { + select { + case <-ctx.Done(): + return + case <-f.ticker.C: + f.checkPage() + } + } + }() } func (f *xorTreeRepair) shutdown() { @@ -68,17 +78,6 @@ func (f *xorTreeRepair) shutdown() { } } -func (f *xorTreeRepair) loop() { - for { - select { - case <-f.ctx.Done(): - return - case <-f.ticker.C: - f.checkPage() - } - } -} - func (f *xorTreeRepair) checkPage() { f.mutex.Lock() defer f.mutex.Unlock() diff --git a/network/dag/consistency_test.go b/network/dag/consistency_test.go index eee3acbed8..4f3a172486 100644 --- a/network/dag/consistency_test.go +++ b/network/dag/consistency_test.go @@ -52,6 +52,9 @@ func TestXorTreeRepair(t *testing.T) { // await for XOR to change test.WaitFor(t, func() (bool, error) { + txState.xorTreeRepair.mutex.Lock() + defer txState.xorTreeRepair.mutex.Unlock() + xorRoot := txState.xorTree.tree.Root() hashRoot := xorRoot.(*tree.Xor).Hash() return hashRoot.Equals(tx.Ref()), nil @@ -112,6 +115,9 @@ func TestXorTreeRepair(t *testing.T) { // await for XOR to change test.WaitFor(t, func() (bool, error) { + txState.xorTreeRepair.mutex.Lock() + defer txState.xorTreeRepair.mutex.Unlock() + xorRoot := txState.xorTree.tree.Root() hashRoot := xorRoot.(*tree.Xor).Hash() return hashRoot.Equals(expectedHash), nil From 97a89a016706ccce2874a5d9d7e529763a8c147a Mon Sep 17 00:00:00 2001 From: Wout Slakhorst Date: Fri, 7 Jul 2023 08:49:10 +0200 Subject: [PATCH 3/3] PR feedback --- network/dag/consistency.go | 6 +++--- network/dag/tree/tree.go | 4 ---- network/transport/v2/handlers.go | 3 +-- 3 files changed, 4 insertions(+), 9 deletions(-) diff --git a/network/dag/consistency.go b/network/dag/consistency.go index 2cc4c7cc64..843bf4a43f 100644 --- a/network/dag/consistency.go +++ b/network/dag/consistency.go @@ -37,7 +37,7 @@ const ( // xorTreeRepair is responsible for repairing the XOR tree. Its loop is triggered when the network layer detects differences in XOR values with all other nodes. // It will loop over all pages of the XOR tree and recalculates the XOR value with the transactions in the database. -// This repair is needed because current networks have nodes that have a wrong XOR value. How this happens is not yet known, it could be due to DB failures of due to failures in older versions. +// This repair is needed because current networks have nodes that have a wrong XOR value. How this happens is not yet known, it could be due to DB failures of due to bugs in older versions. // The fact is that we can fix the state relatively easy. // The loop checks a page (512 LC values) per 10 seconds and continues looping until the network layer signals all is ok again. type xorTreeRepair struct { @@ -61,6 +61,7 @@ func (f *xorTreeRepair) start() { var ctx context.Context ctx, f.cancel = context.WithCancel(context.Background()) go func() { + defer f.ticker.Stop() for { select { case <-ctx.Done(): @@ -132,7 +133,7 @@ func (f *xorTreeRepair) checkPage() { return nil }) if err != nil { - log.Logger().Warnf("failed to run xorTreeRepair check: %s", err) + log.Logger().WithError(err).Warnf("Failed to run xorTreeRepair check.") } if lcEnd > currentLC { @@ -149,7 +150,6 @@ func (f *xorTreeRepair) stateOK() { defer f.mutex.Unlock() f.circuitState = circuitGreen - f.currentPage = 0 } func (f *xorTreeRepair) incrementCount() { diff --git a/network/dag/tree/tree.go b/network/dag/tree/tree.go index 010b80407f..10deb98fd2 100644 --- a/network/dag/tree/tree.go +++ b/network/dag/tree/tree.go @@ -287,10 +287,6 @@ func (t *tree) Root() Data { } func (t *tree) ZeroTo(clock uint32) (Data, uint32) { - if clock < 0 { - return t.prototype.New(), 0 - } - data := t.root.data.Clone() next := t.root for { diff --git a/network/transport/v2/handlers.go b/network/transport/v2/handlers.go index bcb11e6e27..4acbb4aaa7 100644 --- a/network/transport/v2/handlers.go +++ b/network/transport/v2/handlers.go @@ -264,8 +264,7 @@ func (p *protocol) handleGossip(ctx context.Context, connection grpc.Connection, xor, clock := p.state.XOR(dag.MaxLamportClock) peerXor := hash.FromSlice(msg.XOR) if xor.Equals(peerXor) { - // open circuit on fixer - + p.state.CorrectStateDetected() return nil }