Skip to content

Commit

Permalink
trying to make the lock work
Browse files Browse the repository at this point in the history
  • Loading branch information
alfredomusumeci committed Jul 31, 2023
1 parent de1ea11 commit 7fce530
Show file tree
Hide file tree
Showing 12 changed files with 253 additions and 18 deletions.
23 changes: 23 additions & 0 deletions common/deliver/deliver.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ type Chain interface {

// Errored returns a channel which closes when the backing consenter has errored
Errored() <-chan struct{}

// Forked returns a channel which closes when the backing consenter has forked
Forked() <-chan struct{}
}

//go:generate counterfeiter -o mock/policy_checker.go -fake-name PolicyChecker . PolicyChecker
Expand Down Expand Up @@ -228,6 +231,26 @@ func (h *Handler) deliverBlocks(ctx context.Context, srv *Server, envelope *cb.E
return cb.Status_BAD_REQUEST, nil
}

forkedChan := chain.Forked()
logger.Debug("Fork address:", forkedChan)
select {
case _, ok := <-forkedChan:
if !ok {
logger.Debug("Fork channel is closed")
} else {
logger.Debug("Fork channel is open and contains data")
}
default:
logger.Debug("Fork channel is open but does not contain data")
}
logger.Debugf("[channel: %s] Checking if channel is forked", chdr.ChannelId)
select {
case <-forkedChan:
logger.Warningf("[channel: %s] Rejecting deliver request for %s because of chain fork", chdr.ChannelId, addr)
return cb.Status_FORKED, nil
default:
}

erroredChan := chain.Errored()
if seekInfo.ErrorResponse == ab.SeekInfo_BEST_EFFORT {
// In a 'best effort' delivery of blocks, we should ignore consenter errors
Expand Down
26 changes: 26 additions & 0 deletions common/forknotifier/forknotifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package forknotifier

import (
"github.com/hyperledger/fabric/common/flogging"
"sync"
)

var (
forkNotifications chan bool
once sync.Once
)

var logger = flogging.MustGetLogger("forknotifier")

func GetForkNotificationsChannel() chan bool {
once.Do(func() {
forkNotifications = make(chan bool)
})
return forkNotifications
}

func NotifyFork() {
logger.Warningf("Fork detected, notifying...")
forkNotifications <- true
logger.Warningf("Fork notification sent")
}
10 changes: 10 additions & 0 deletions core/peer/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,16 @@ func (c *Channel) Errored() <-chan struct{} {
return nil
}

// Forked returns a channel that can be used to determine if a backing
// resource has forked. At this point in time, the peer does not have any
// error conditions that lead to this function signaling that a fork has
// occurred.
func (c *Channel) Forked() <-chan struct{} {
// If this is ever updated to return a real channel, the error message
// in deliver.go around this channel closing should be updated.
return nil
}

func capabilitiesSupportedOrPanic(res channelconfig.Resources) {
ac, ok := res.ApplicationConfig()
if !ok {
Expand Down
28 changes: 14 additions & 14 deletions core/scc/bscc/bscc.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,20 +136,20 @@ func (bscc *BSCC) processEvent(event event.Event) {
var err error
bloccProtoLogger.Info("BLOCC - Received approval event:", event)
bloccProtoLogger.Debug("index:", index)
if index == 2 {
bscc.peerServer.Stop()
bscc.unjoin.SetArgs([]string{
"--channelID=" + event.ChannelID,
})
err = bscc.unjoin.Execute()
if err != nil {
bloccProtoLogger.Errorf("Failed to unjoin channel: %s", err)
}
err = bscc.peerServer.Start()
if err != nil {
bloccProtoLogger.Errorf("Failed to start peer server: %s", err)
}
}
//if index == 2 {
// bscc.peerServer.Stop()
// bscc.unjoin.SetArgs([]string{
// "--channelID=" + event.ChannelID,
// })
// err = bscc.unjoin.Execute()
// if err != nil {
// bloccProtoLogger.Errorf("Failed to unjoin channel: %s", err)
// }
// err = bscc.peerServer.Start()
// if err != nil {
// bloccProtoLogger.Errorf("Failed to start peer server: %s", err)
// }
//}
index++
address, rootCertFile, err := bscc.gatherOrdererInfo(event.ChannelID)
if err != nil {
Expand Down
16 changes: 16 additions & 0 deletions internal/peer/node/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package node
import (
"context"
"fmt"
"github.com/hyperledger/fabric/common/forknotifier"
"io"
"io/ioutil"
"net"
Expand Down Expand Up @@ -938,6 +939,21 @@ func serve(args []string) error {
serve <- grpcErr
}()

// The peer has joined the channels, we can now start the goroutine to listen to possible fork events
go func() {
logger.Debug("Starting fork notifier")
for {
forkChannel := forknotifier.GetForkNotificationsChannel()
print("forkChannel: ", forkChannel)
select {
case forkNotification := <-forkChannel:
if forkNotification {
logger.Debug("Received fork notification")
}
}
}
}()

// Block until grpc server exits
return <-serve
}
Expand Down
3 changes: 3 additions & 0 deletions orderer/consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ type Chain interface {
// clients when the consenter is not up to date.
Errored() <-chan struct{}

// Forked returns a channel which closes when the backing consenter has forked
Forked() <-chan struct{}

// Start should allocate whatever resources are needed for staying up to date with the chain.
// Typically, this involves creating a thread which reads from the ordering source, passes those
// messages to a block cutter, and writes the resulting blocks to the ledger.
Expand Down
108 changes: 105 additions & 3 deletions orderer/consensus/etcdraft/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ type Chain struct {
errorCLock sync.RWMutex
errorC chan struct{} // returned by Errored()

forkCLock sync.RWMutex
forkC chan struct{} // returned by Forked()

raftMetadataLock sync.RWMutex
confChangeInProgress *raftpb.ConfChange
justElected bool // this is true when node has just been elected
Expand Down Expand Up @@ -270,6 +273,7 @@ func NewChain(
startC: make(chan struct{}),
snapC: make(chan *raftpb.Snapshot),
errorC: make(chan struct{}),
forkC: make(chan struct{}),
gcC: make(chan *gc),
observeC: observeC,
support: support,
Expand Down Expand Up @@ -372,6 +376,7 @@ func (c *Chain) Start() {

close(c.startC)
close(c.errorC)
close(c.forkC)

go c.gc()
go c.run()
Expand Down Expand Up @@ -430,6 +435,13 @@ func (c *Chain) Errored() <-chan struct{} {
return c.errorC
}

// Forked returns a channel that closes when the chain forks.
func (c *Chain) Forked() <-chan struct{} {
c.forkCLock.RLock()
defer c.forkCLock.RUnlock()
return c.forkC
}

// Halt stops the chain.
func (c *Chain) Halt() {
c.stop()
Expand Down Expand Up @@ -768,6 +780,10 @@ func (c *Chain) run() {
c.errorCLock.Lock()
c.errorC = make(chan struct{})
c.errorCLock.Unlock()

c.forkCLock.Lock()
c.forkC = make(chan struct{})
c.forkCLock.Unlock()
}

if isCandidate(app.soft.RaftState) || newLeader == raft.None {
Expand Down Expand Up @@ -868,8 +884,10 @@ func (c *Chain) run() {

select {
case <-c.errorC: // avoid closing closed channel
case <-c.forkC: // avoid closing closed channel
default:
close(c.errorC)
close(c.forkC)
}

c.logger.Infof("Stop serving requests")
Expand All @@ -882,8 +900,37 @@ func (c *Chain) run() {
func (c *Chain) writeBlock(block *common.Block, index uint64) {
isForkAttempt := c.isPossibleFork(block)
if isForkAttempt {
// Add the block to the forked block list
// implement me
// Log was is inside c.forkC
select {
case _, ok := <-c.forkC:
if !ok {
c.logger.Debug("Fork channel is closed")
} else {
c.logger.Debug("Fork channel is open and contains data")
}
default:
c.logger.Debug("Fork channel is open but does not contain data")
}

c.logger.Debug("WriteBlock Fork address:", c.forkC)
c.logger.Debug("Sending fork event")
c.forkCLock.Lock()
c.logger.Debug("Locking fork channel")
c.forkC <- struct{}{}
c.logger.Debug("Unlocking fork channel")
c.forkCLock.Unlock()
c.logger.Debug("Fork event sent")

select {
case _, ok := <-c.forkC:
if !ok {
c.logger.Debug("Fork channel is closed")
} else {
c.logger.Debug("Fork channel is open and contains data")
}
default:
c.logger.Debug("Fork channel is open but does not contain data")
}
}

if block.Header.Number > c.lastBlock.Header.Number+1 {
Expand Down Expand Up @@ -915,7 +962,6 @@ func (c *Chain) writeBlock(block *common.Block, index uint64) {
c.logger.Debug("Chain is:", c.channelID)
c.logger.Debug("Active nodes are:", c.ActiveNodes)
c.logger.Debug("Node is:", c.Node)
//c.Halt()
c.logger.Debug("Is chain running:", c.isRunning())
}

Expand Down Expand Up @@ -1647,3 +1693,59 @@ func (c *Chain) isPossibleFork(incomingBlock *common.Block) bool {
c.logger.Infof("No fork attempt detected.")
return false
}

//func (c *Chain) updateHeaderToEquivocationType(incomingBlock *common.Block) error {
// // Update the header
// envelope, err := protoutil.ExtractEnvelope(incomingBlock, 0)
// if err != nil {
// return err
// }
// hdr, err := protoutil.ChannelHeader(envelope)
// if err != nil {
// return err
// }
// hdr.Type = int32(common.HeaderType_EQUIVOCATION_PROOF)
//
// // Re-serialize the block
//
// return nil
//}
//
//func (c *Chain) updateHeaderToEquivocationType(incomingBlock *common.Block) error {
// // Unmarshal the block's data to a Payload
// payload := &common.Payload{}
// err := proto.Unmarshal(incomingBlock.Data.Data, payload)
// if err != nil {
// return err
// }
//
// // Unmarshal the payload's header to a ChannelHeader
// channelHeader := &common.ChannelHeader{}
// err = proto.Unmarshal(payload.Header.ChannelHeader, channelHeader)
// if err != nil {
// return err
// }
//
// // Update the header type to EQUIVOCATION_PROOF
// channelHeader.Type = int32(common.HeaderType_EQUIVOCATION_PROOF)
//
// // Marshal the ChannelHeader back to bytes
// channelHeaderBytes, err := proto.Marshal(channelHeader)
// if err != nil {
// return err
// }
//
// // Update the payload's ChannelHeader
// payload.Header.ChannelHeader = channelHeaderBytes
//
// // Marshal the Payload back to bytes
// payloadBytes, err := proto.Marshal(payload)
// if err != nil {
// return err
// }
//
// // Update the block's data
// incomingBlock.Data.Data = payloadBytes
//
// return nil
//}
6 changes: 6 additions & 0 deletions orderer/consensus/inactive/inactive_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ func (*Chain) Errored() <-chan struct{} {
return closedChannel
}

func (c *Chain) Forked() <-chan struct{} {
closedForkedChannel := make(chan struct{})
close(closedForkedChannel)
return closedForkedChannel
}

func (c *Chain) Start() {
}

Expand Down
13 changes: 13 additions & 0 deletions orderer/consensus/kafka/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ type chainImpl struct {
// When the partition consumer errors, close the channel. Otherwise, make
// this an open, unbuffered channel.
errorChan chan struct{}
// Forked channel is closed when the chain is forked.
forkedChan chan struct{}
// When a Halt() request comes, close the channel. Unlike errorChan, this
// channel never re-opens when closed. Its closing triggers the exit of the
// processMessagesToBlock loop.
Expand All @@ -124,6 +126,17 @@ type chainImpl struct {
replicaIDs []int32
}

func (chain *chainImpl) Forked() <-chan struct{} {
select {
case <-chain.startChan:
return chain.forkedChan
default:
// While the consenter is starting, return empty channel because
// the chain is not forked yet.
return make(chan struct{})
}
}

// Errored returns a channel which will close when a partition consumer error
// has occurred. Checked by Deliver().
func (chain *chainImpl) Errored() <-chan struct{} {
Expand Down
6 changes: 6 additions & 0 deletions orderer/consensus/solo/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type chain struct {
support consensus.ConsenterSupport
sendChan chan *message
exitChan chan struct{}
forkChan chan struct{}
}

type message struct {
Expand All @@ -49,6 +50,7 @@ func newChain(support consensus.ConsenterSupport) *chain {
support: support,
sendChan: make(chan *message),
exitChan: make(chan struct{}),
forkChan: make(chan struct{}),
}
}

Expand Down Expand Up @@ -100,6 +102,10 @@ func (ch *chain) Errored() <-chan struct{} {
return ch.exitChan
}

func (ch *chain) Forked() <-chan struct{} {
return ch.forkChan
}

func (ch *chain) main() {
var timer <-chan time.Time
var err error
Expand Down
Loading

0 comments on commit 7fce530

Please sign in to comment.