Skip to content

Commit

Permalink
isPossibleFork and changed bscc to unjoin a peer from the channel aft…
Browse files Browse the repository at this point in the history
…er second approval event
  • Loading branch information
alfredomusumeci committed Jul 30, 2023
1 parent 84c7bc4 commit de1ea11
Show file tree
Hide file tree
Showing 10 changed files with 153 additions and 58 deletions.
2 changes: 2 additions & 0 deletions common/ledger/blockledger/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ func (nfei *NotFoundErrorIterator) Close() {}
//
// to accommodate non-deterministic marshaling
func CreateNextBlock(rl Reader, messages []*cb.Envelope) *cb.Block {
logger.Debugf("Creating next block for chain with height %d", rl.Height())

var nextBlockNumber uint64
var previousBlockHash []byte
var err error
Expand Down
2 changes: 2 additions & 0 deletions core/ledger/kvledger/tests/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ func newBlockGenerator(lgr ledger.PeerLedger, t *testing.T) *blkGenerator {
}

// nextBlockAndPvtdata cuts the next block
// BLOCC: Only used in tests.
func (g *blkGenerator) nextBlockAndPvtdata(trans []*txAndPvtdata, missingPvtData ledger.TxMissingPvtData) *ledger.BlockAndPvtData {
logger.Debugf("Cutting the next block")
block := protoutil.NewBlock(g.lastNum+1, g.lastHash)
blockPvtdata := make(map[uint64]*ledger.TxPvtData)
for i, tran := range trans {
Expand Down
64 changes: 37 additions & 27 deletions core/scc/bscc/bscc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,26 @@ package bscc

import (
"fmt"
"io/ioutil"
"os"
"time"

"github.com/hyperledger/fabric-chaincode-go/shim"
pb "github.com/hyperledger/fabric-protos-go/peer"
"github.com/hyperledger/fabric/bccsp"
event "github.com/hyperledger/fabric/common/blocc-events"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/core/peer"
blocc "github.com/hyperledger/fabric/internal/peer/blocc/chaincode"
"github.com/hyperledger/fabric/internal/pkg/comm"
"github.com/hyperledger/fabric/protoutil"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"io/ioutil"
"os"
)

func New(peerInstance *peer.Peer) *BSCC {
func New(peerInstance *peer.Peer, server *comm.GRPCServer, command *cobra.Command) *BSCC {
return &BSCC{
peerInstance: peerInstance,
peerServer: server,
unjoin: command,
}
}

Expand All @@ -34,15 +36,14 @@ func (bscc *BSCC) Chaincode() shim.Chaincode {
type BSCC struct {
peerInstance *peer.Peer
config Config
peerServer *comm.GRPCServer
unjoin *cobra.Command
}

type Config struct {
PeerAddress string
TLSCertFile string
OrgMspID string
WaitForEvent bool
WaitForEventTimeout time.Duration
CryptoProvider bccsp.BCCSP
PeerAddress string
TLSCertFile string
CryptoProvider bccsp.BCCSP
}

var bloccProtoLogger = flogging.MustGetLogger("bscc")
Expand All @@ -61,6 +62,8 @@ func (f InvalidFunctionError) Error() string {

// -------------------- Stub Interface ------------------- //

var index uint64

func (bscc *BSCC) Init(stub shim.ChaincodeStubInterface) pb.Response {
bloccProtoLogger.Info("Init BSCC")
go func() {
Expand All @@ -81,21 +84,12 @@ func (bscc *BSCC) Init(stub shim.ChaincodeStubInterface) pb.Response {
return shim.Error("CORE_PEER_TLS_ROOTCERT_FILE is not set")
}

orgMspID, ok := os.LookupEnv("CORE_PEER_LOCALMSPID")
if !ok {
bloccProtoLogger.Error("CORE_PEER_LOCALMSPID is not set")
return shim.Error("CORE_PEER_LOCALMSPID is not set")
}

bscc.config = Config{
PeerAddress: peerAddress,
TLSCertFile: tlsCertFile,
OrgMspID: orgMspID,
WaitForEvent: true,
WaitForEventTimeout: 3 * time.Second,
CryptoProvider: bscc.peerInstance.CryptoProvider,
PeerAddress: peerAddress,
TLSCertFile: tlsCertFile,
CryptoProvider: bscc.peerInstance.CryptoProvider,
}

index = 1
return shim.Success(nil)
}

Expand Down Expand Up @@ -139,8 +133,24 @@ func (bscc *BSCC) Invoke(stub shim.ChaincodeStubInterface) pb.Response {
// ----------------- BSCC Implementation ----------------- //

func (bscc *BSCC) processEvent(event event.Event) {
var err error
bloccProtoLogger.Info("BLOCC - Received approval event:", event)

bloccProtoLogger.Debug("index:", index)
if index == 2 {
bscc.peerServer.Stop()
bscc.unjoin.SetArgs([]string{
"--channelID=" + event.ChannelID,
})
err = bscc.unjoin.Execute()
if err != nil {
bloccProtoLogger.Errorf("Failed to unjoin channel: %s", err)
}
err = bscc.peerServer.Start()
if err != nil {
bloccProtoLogger.Errorf("Failed to start peer server: %s", err)
}
}
index++
address, rootCertFile, err := bscc.gatherOrdererInfo(event.ChannelID)
if err != nil {
bloccProtoLogger.Errorf("Failed to gather orderer info: %s", err)
Expand All @@ -161,12 +171,12 @@ func (bscc *BSCC) processEvent(event event.Event) {
}

func (bscc *BSCC) gatherOrdererInfo(channelID string) (address string, rootCertFile []byte, err error) {
_, ordererOrg, err := bscc.peerInstance.GetOrdererInfo(channelID)
_, ordererOrgs, err := bscc.peerInstance.GetOrdererInfo(channelID)
if err != nil {
return "", nil, err
}

orderer, ok := ordererOrg["OrdererOrg"]
orderer, ok := ordererOrgs["OrdererOrg"]
if !ok {
return "", nil, errors.New("orderer org not found")
}
Expand Down
29 changes: 0 additions & 29 deletions internal/peer/blocc/chaincode/approveforthispeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,31 +65,11 @@ func ApproveForThisPeerCmd(a *ApproveForThisPeer, cryptoProvider bccsp.BCCSP) *c
Long: "FOR INTERNAL USE ONLY. Approve a sensory reading for this peer",
RunE: func(cmd *cobra.Command, args []string) error {
if a == nil {
// Log every input element
logger.Debugf("OrdererAddress: %v", ordererAddress)
logger.Debugf("RootCertFilePath: %v", rootCertFilePath)
logger.Debugf("channelID: %v", channelID)
logger.Debugf("txID: %v", txID)
logger.Debugf("peerAddress: %v", peerAddress)
logger.Debugf("connectionProfilePath: %v", connectionProfilePath)
logger.Debugf("waitForEvent: %v", waitForEvent)
logger.Debugf("waitForEventTimeout: %v", waitForEventTimeout)

input, err := a.createInput()
if err != nil {
return err
}

// Log every input element
logger.Debugf("OrdererAddress: %v", input.OrdererAddress)
logger.Debugf("RootCertFilePath: %v", input.RootCertFilePath)
logger.Debugf("channelID: %v", input.ChannelID)
logger.Debugf("txID: %v", input.TxID)
logger.Debugf("peerAddress: %v", input.PeerAddress)
logger.Debugf("connectionProfilePath: %v", input.ConnectionProfilePath)
logger.Debugf("waitForEvent: %v", input.WaitForEvent)
logger.Debugf("waitForEventTimeout: %v", input.WaitForEventTimeout)

ccInput := &ClientConnectionsInput{
CommandName: cmd.Name(),
EndorserRequired: true,
Expand Down Expand Up @@ -236,14 +216,6 @@ func (a *ApproveForThisPeer) Approve() error {
}

func (a *ApproveForThisPeer) createInput() (*ApproveForThisPeerInput, error) {
logger.Debugf("OrdererAddress: %v", ordererAddress)
logger.Debugf("RootCertFilePath: %v", rootCertFilePath)
logger.Debugf("channelID: %v", channelID)
logger.Debugf("txID: %v", txID)
logger.Debugf("waitForEvent: %v", waitForEvent)
logger.Debugf("waitForEventTimeout: %v", waitForEventTimeout)
logger.Debugf("peerAddress: %v", peerAddress)

input := &ApproveForThisPeerInput{
OrdererAddress: ordererAddress,
RootCertFilePath: rootCertFilePath,
Expand All @@ -253,7 +225,6 @@ func (a *ApproveForThisPeer) createInput() (*ApproveForThisPeerInput, error) {
WaitForEventTimeout: waitForEventTimeout,
PeerAddress: peerAddress,
}
logger.Debugf("ApproveForThisPeerCmd: input: %+v", input)

return input, nil
}
Expand Down
2 changes: 1 addition & 1 deletion internal/peer/node/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,7 @@ func serve(args []string) error {
factory.GetDefault(),
)
qsccInst := scc.SelfDescribingSysCC(qscc.New(aclProvider, peerInstance))
bsccInst := bscc.New(peerInstance)
bsccInst := bscc.New(peerInstance, peerServer, unjoinCmd())

pb.RegisterChaincodeSupportServer(ccSrv.Server(), ccSupSrv)

Expand Down
19 changes: 19 additions & 0 deletions internal/peer/node/unjoin.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,22 @@ func unjoinChannel(channelID string) error {

return nil
}

// Unjoin the peer from a channel.
func UnjoinChannel(channelID string) error {
// transient storage must be scrubbed prior to removing the kvledger for the channel. Once the
// kvledger storage has been removed, a subsequent ledger removal will return a "no such ledger" error.
// By removing the transient storage prior to deleting the ledger, a crash may be recovered by re-running
// the peer unjoin.
transientStoragePath := filepath.Join(coreconfig.GetPath("peer.fileSystemPath"), "transientstore")
if err := transientstore.Drop(transientStoragePath, channelID); err != nil {
return err
}

config := ledgerConfig()
if err := kvledger.UnjoinChannel(config, channelID); err != nil {
return err
}

return nil
}
2 changes: 2 additions & 0 deletions internal/pkg/peer/blocksprovider/blocksprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func (d *Deliverer) DeliverBlocks() {
for {
select {
case <-d.DoneC:
d.Logger.Info("Done channel closed, exiting")
return
default:
}
Expand Down Expand Up @@ -173,6 +174,7 @@ func (d *Deliverer) DeliverBlocks() {
go func() {
for {
resp, err := deliverClient.Recv()
d.Logger.Debug("Response is: ", resp)
if err != nil {
connLogger.Warningf("Encountered an error reading from deliver stream: %s", err)
close(recv)
Expand Down
1 change: 1 addition & 0 deletions orderer/common/multichannel/blockwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func newBlockWriter(lastBlock *cb.Block, r *Registrar, support blockWriterSuppor
}

// CreateNextBlock creates a new block with the next block number, and the given contents.
// BLOCC: This apparently is never used, except in tests.
func (bw *BlockWriter) CreateNextBlock(messages []*cb.Envelope) *cb.Block {
previousBlockHash := protoutil.BlockHeaderHash(bw.lastBlock.Header)

Expand Down
14 changes: 13 additions & 1 deletion orderer/consensus/etcdraft/blockcreator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0
package etcdraft

import (
"encoding/base64"
"github.com/golang/protobuf/proto"
cb "github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric/common/flogging"
Expand All @@ -23,6 +24,7 @@ type blockCreator struct {
}

func (bc *blockCreator) createNextBlock(envs []*cb.Envelope) *cb.Block {
bc.logger.Debugf("Creating next block for chain with height %d", bc.number)
data := &cb.BlockData{
Data: make([][]byte, len(envs)),
}
Expand All @@ -35,12 +37,22 @@ func (bc *blockCreator) createNextBlock(envs []*cb.Envelope) *cb.Block {
}
}

bc.number++
//bc.number++

if bc.number == 15 {
bc.number = 15
} else {
bc.number++
}

block := protoutil.NewBlock(bc.number, bc.hash)
block.Header.DataHash = protoutil.BlockDataHash(data)
block.Data = data

bc.hash = protoutil.BlockHeaderHash(block.Header)

bc.logger.Debug("Block header data hash: ", base64.StdEncoding.EncodeToString(block.Header.DataHash))
bc.logger.Debug("Block header previous hash: ", base64.StdEncoding.EncodeToString(block.Header.PreviousHash))
bc.logger.Debug("Block header hash: ", base64.StdEncoding.EncodeToString(bc.hash))
return block
}
76 changes: 76 additions & 0 deletions orderer/consensus/etcdraft/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0
package etcdraft

import (
"bytes"
"context"
"encoding/pem"
"fmt"
Expand Down Expand Up @@ -879,6 +880,12 @@ func (c *Chain) run() {
}

func (c *Chain) writeBlock(block *common.Block, index uint64) {
isForkAttempt := c.isPossibleFork(block)
if isForkAttempt {
// Add the block to the forked block list
// implement me
}

if block.Header.Number > c.lastBlock.Header.Number+1 {
c.logger.Panicf("Got block [%d], expect block [%d]", block.Header.Number, c.lastBlock.Header.Number+1)
} else if block.Header.Number < c.lastBlock.Header.Number+1 {
Expand All @@ -904,6 +911,12 @@ func (c *Chain) writeBlock(block *common.Block, index uint64) {
c.raftMetadataLock.Unlock()

c.support.WriteBlock(block, m)

c.logger.Debug("Chain is:", c.channelID)
c.logger.Debug("Active nodes are:", c.ActiveNodes)
c.logger.Debug("Node is:", c.Node)
//c.Halt()
c.logger.Debug("Is chain running:", c.isRunning())
}

// Orders the envelope in the `msg` content. SubmitRequest.
Expand Down Expand Up @@ -1571,3 +1584,66 @@ func (c *Chain) checkForEvictionNCertRotation(env *common.Envelope) bool {
c.logger.Debugf("Node %d is still part of the consenters set", c.raftID)
return false
}

// isPossibleFork returns true if:
// 1. the committed block number is smaller than the height of all orderers, and
// 1a. the block hash of the committed block is not the same as corresponding block hash of all orderers for
// the same block number.
func (c *Chain) isPossibleFork(incomingBlock *common.Block) bool {
// Genesis block is not a fork
if c.support.Height() == 0 {
return false
}

incomingBlockNumber := incomingBlock.Header.Number
lastBlockNumber := c.support.Height() - 1

// If the incoming block number is smaller or equal to the current height of the orderer, then it is possible that
// the orderer is forked.
if incomingBlockNumber <= lastBlockNumber {
c.logger.Infof("Possible fork detected. Checking if this orderer is up to date.")
}

puller, err := c.createPuller()
if err != nil {
c.logger.Errorf("It was not possible to check for forks. Error creating a puller: %s", err)
return false
}

var maxHeight uint64
var mostUpToDateEndpoint string
heightsByEndpoints, err := puller.HeightsByEndpoints()
if err != nil {
c.logger.Errorf("It was not possible to check for forks. Error getting heights by endpoints: %s", err)
return false
}

for endpoint, height := range heightsByEndpoints {
if height >= maxHeight {
maxHeight = height
mostUpToDateEndpoint = endpoint
}
}

mostUpdatedLastBlockNumber := maxHeight - 1

// If the incoming block number is smaller or equal to the current height of the orderer, then it is possible that
// the orderer is forked.
if incomingBlockNumber <= mostUpdatedLastBlockNumber {
block := puller.PullBlock(incomingBlockNumber)
if block == nil {
c.logger.Errorf("Error pulling block %d from %s", incomingBlockNumber, mostUpToDateEndpoint)
}
// Check if the block hash of the incoming block is the same as the block hash of the most up-to-date orderer
// for the same block number.
expectedHash := protoutil.BlockDataHash(block.Data)
actualHash := protoutil.BlockDataHash(incomingBlock.Data)
if !bytes.Equal(expectedHash, actualHash) {
c.logger.Infof("Fork attempt detected. Expected block hash %x, actual block hash %x", expectedHash, actualHash)
return true
}
}

c.logger.Infof("No fork attempt detected.")
return false
}

0 comments on commit de1ea11

Please sign in to comment.