diff --git a/crates/hotshot-signature-key/src/bn254/bn254_priv.rs b/crates/hotshot-signature-key/src/bn254/bn254_priv.rs index 439b52ff39..5b16c7a93d 100644 --- a/crates/hotshot-signature-key/src/bn254/bn254_priv.rs +++ b/crates/hotshot-signature-key/src/bn254/bn254_priv.rs @@ -54,6 +54,7 @@ impl BLSPrivKey { } } +// #[allow(clippy::incorrect_partial_ord_impl_on_ord_type)] impl PartialOrd for BLSPrivKey { fn partial_cmp(&self, other: &Self) -> Option { let self_bytes = &self.priv_key.to_string(); diff --git a/crates/hotshot-signature-key/src/bn254/bn254_pub.rs b/crates/hotshot-signature-key/src/bn254/bn254_pub.rs index 025d455129..43fc3a6c43 100644 --- a/crates/hotshot-signature-key/src/bn254/bn254_pub.rs +++ b/crates/hotshot-signature-key/src/bn254/bn254_pub.rs @@ -27,6 +27,7 @@ pub struct BLSPubKey { pub_key: VerKey, } +// #[allow(clippy::incorrect_partial_ord_impl_on_ord_type)] impl PartialOrd for BLSPubKey { fn partial_cmp(&self, other: &Self) -> Option { let self_bytes = &self.pub_key.to_string(); diff --git a/crates/hotshot/Cargo.toml b/crates/hotshot/Cargo.toml index 77f9f63d4e..0c26d2ade1 100644 --- a/crates/hotshot/Cargo.toml +++ b/crates/hotshot/Cargo.toml @@ -76,7 +76,6 @@ required-features = ["demo", "libp2p/rsa"] path = "examples/web-server-da/multi-web-server.rs" [dependencies] -# TODO ED We should upgrade ark libraries to 0.4 async-compatibility-layer = { workspace = true } async-lock = { workspace = true } async-trait = { workspace = true } diff --git a/crates/hotshot/src/lib.rs b/crates/hotshot/src/lib.rs index ae6e438147..dcf6670dc7 100644 --- a/crates/hotshot/src/lib.rs +++ b/crates/hotshot/src/lib.rs @@ -53,6 +53,9 @@ use hotshot_task::{ task_launcher::TaskRunner, }; use hotshot_task_impls::{events::SequencingHotShotEvent, network::NetworkTaskKind}; +use hotshot_types::{ + certificate::TimeoutCertificate, traits::node_implementation::SequencingTimeoutEx, +}; use hotshot_types::{ block_impl::{VIDBlockPayload, VIDTransaction}, @@ -238,22 +241,14 @@ impl> SystemContext { Ok(Self { inner }) } - /// "Starts" consensus by sending a `ViewChange` event + /// "Starts" consensus by sending a `QCFormed` event pub async fn start_consensus(&self) { self.inner .internal_event_stream - .publish(SequencingHotShotEvent::ViewChange(TYPES::Time::new(1))) + .publish(SequencingHotShotEvent::QCFormed(either::Left( + QuorumCertificate::genesis(), + ))) .await; - - // ED This isn't ideal... - // async_sleep(Duration::new(1, 0)).await; - - // self.inner - // .internal_event_stream - // .publish(SequencingHotShotEvent::QCFormed( - // QuorumCertificate::genesis(), - // )) - // .await; } /// Marks a given view number as timed out. This should be called a fixed period after a round is started. @@ -663,6 +658,14 @@ where Commitment = Commitment>, Membership = MEMBERSHIP, > + 'static, + SequencingTimeoutEx: ConsensusExchange< + TYPES, + Message, + Proposal = QuorumProposal>, + Certificate = TimeoutCertificate, + Commitment = Commitment, + Membership = MEMBERSHIP, + > + 'static, { fn consensus(&self) -> &Arc>> { &self.inner.consensus diff --git a/crates/hotshot/src/tasks/mod.rs b/crates/hotshot/src/tasks/mod.rs index 01a9d06030..b1ee195e63 100644 --- a/crates/hotshot/src/tasks/mod.rs +++ b/crates/hotshot/src/tasks/mod.rs @@ -28,15 +28,16 @@ use hotshot_task_impls::{ }; use hotshot_types::{ block_impl::{VIDBlockPayload, VIDTransaction}, - certificate::ViewSyncCertificate, + certificate::{TimeoutCertificate, ViewSyncCertificate}, data::{ProposalType, QuorumProposal, SequencingLeaf}, event::Event, message::{Message, Messages, SequencingMessage}, traits::{ election::{ConsensusExchange, Membership}, - network::{CommunicationChannel, TransmitType}, + network::{CommunicationChannel, ConsensusIntentEvent, TransmitType}, node_implementation::{ - CommitteeEx, ExchangesType, NodeImplementation, NodeType, QuorumEx, ViewSyncEx, + CommitteeEx, ExchangesType, NodeImplementation, NodeType, QuorumEx, + SequencingTimeoutEx, ViewSyncEx, }, state::ConsensusTime, }, @@ -276,6 +277,13 @@ where Certificate = DACertificate, Commitment = Commitment, >, + SequencingTimeoutEx: ConsensusExchange< + TYPES, + Message, + Proposal = QuorumProposal>, + Certificate = TimeoutCertificate, + Commitment = Commitment, + >, { let consensus = handle.hotshot.get_consensus(); let c_api: HotShotSequencingConsensusApi = HotShotSequencingConsensusApi { @@ -290,6 +298,7 @@ where cur_view: TYPES::Time::new(0), block: Some(VIDBlockPayload::genesis()), quorum_exchange: c_api.inner.exchanges.quorum_exchange().clone().into(), + timeout_exchange: c_api.inner.exchanges.timeout_exchange().clone().into(), api: c_api.clone(), committee_exchange: c_api.inner.exchanges.committee_exchange().clone().into(), _pd: PhantomData, @@ -302,6 +311,16 @@ where id: handle.hotshot.inner.id, qc: None, }; + consensus_state + .quorum_exchange + .network() + .inject_consensus_info(ConsensusIntentEvent::PollForCurrentProposal) + .await; + consensus_state + .quorum_exchange + .network() + .inject_consensus_info(ConsensusIntentEvent::PollForProposal(1)) + .await; let filter = FilterEvent(Arc::new(consensus_event_filter)); let consensus_name = "Consensus Task"; let consensus_event_handler = HandleEvent(Arc::new( diff --git a/crates/hotshot/src/traits/networking/combined_network.rs b/crates/hotshot/src/traits/networking/combined_network.rs index 63fdb6df88..e717c40b1b 100644 --- a/crates/hotshot/src/traits/networking/combined_network.rs +++ b/crates/hotshot/src/traits/networking/combined_network.rs @@ -287,7 +287,7 @@ impl, MEMBERSHIP: Membership { + Ok(()) => { self.primary_down.store(0, Ordering::Relaxed); } Err(e) => { @@ -318,7 +318,7 @@ impl, MEMBERSHIP: Membership { + Ok(()) => { self.primary_down.store(0, Ordering::Relaxed); } Err(e) => { diff --git a/crates/hotshot/src/traits/networking/web_server_network.rs b/crates/hotshot/src/traits/networking/web_server_network.rs index 72cd62cc8d..5a67880e9f 100644 --- a/crates/hotshot/src/traits/networking/web_server_network.rs +++ b/crates/hotshot/src/traits/networking/web_server_network.rs @@ -116,8 +116,6 @@ struct Inner { /// The last tx_index we saw from the web server tx_index: Arc>, - // TODO ED This should be TYPES::Time - // Theoretically there should never be contention for this lock... /// Task map for quorum proposals. proposal_task_map: Arc>>>>, diff --git a/crates/hotshot/src/types/handle.rs b/crates/hotshot/src/types/handle.rs index 35e79a78dd..7c54b46964 100644 --- a/crates/hotshot/src/types/handle.rs +++ b/crates/hotshot/src/types/handle.rs @@ -191,7 +191,7 @@ impl + 'static> SystemContextHandl if anchor_leaf.view_number == TYPES::Time::genesis() { let leaf: I::Leaf = I::Leaf::from_stored_view(anchor_leaf); let mut qc = QuorumCertificate::>::genesis(); - qc.set_leaf_commitment(leaf.commit()); + qc.leaf_commitment = leaf.commit(); let event = Event { view_number: TYPES::Time::genesis(), event: EventType::Decide { diff --git a/crates/libp2p-networking/src/network/node/handle.rs b/crates/libp2p-networking/src/network/node/handle.rs index c4e6460666..fe68d679ce 100644 --- a/crates/libp2p-networking/src/network/node/handle.rs +++ b/crates/libp2p-networking/src/network/node/handle.rs @@ -136,6 +136,9 @@ impl NetworkNodeHandle { /// /// Will panic if a handler is already spawned #[allow(clippy::unused_async)] + // // Tokio and async_std disagree how this function should be linted + // #[allow(clippy::ignored_unit_patterns)] + pub async fn spawn_handler(self: &Arc, cb: F) -> impl Future where F: Fn(NetworkEvent, Arc>) -> RET + Sync + Send + 'static, diff --git a/crates/orchestrator/src/config.rs b/crates/orchestrator/src/config.rs index f61911d944..ad937e7754 100644 --- a/crates/orchestrator/src/config.rs +++ b/crates/orchestrator/src/config.rs @@ -1,6 +1,6 @@ use hotshot_types::{ExecutionType, HotShotConfig}; -use std::marker::PhantomData; use std::{ + marker::PhantomData, net::{IpAddr, Ipv4Addr, SocketAddr}, num::NonZeroUsize, time::Duration, diff --git a/crates/task-impls/src/consensus.rs b/crates/task-impls/src/consensus.rs index cac7d4d1f2..748e6e9f1d 100644 --- a/crates/task-impls/src/consensus.rs +++ b/crates/task-impls/src/consensus.rs @@ -15,26 +15,29 @@ use hotshot_task::{ task::{FilterEvent, HandleEvent, HotShotTaskCompleted, HotShotTaskTypes, TS}, task_impls::{HSTWithEvent, TaskBuilder}, }; -use hotshot_types::vote::QuorumVoteAccumulator; use hotshot_types::{ - certificate::{DACertificate, QuorumCertificate}, + certificate::{DACertificate, QuorumCertificate, TimeoutCertificate}, consensus::{Consensus, View}, data::{LeafType, ProposalType, QuorumProposal, SequencingLeaf}, event::{Event, EventType}, message::{GeneralConsensusMessage, Message, Proposal, SequencingMessage}, traits::{ consensus_api::SequencingConsensusApi, - election::{ConsensusExchange, QuorumExchangeType, SignedCertificate}, + election::{ConsensusExchange, QuorumExchangeType, SignedCertificate, TimeoutExchangeType}, network::{CommunicationChannel, ConsensusIntentEvent}, - node_implementation::{CommitteeEx, NodeImplementation, NodeType, SequencingQuorumEx}, + node_implementation::{ + CommitteeEx, NodeImplementation, NodeType, SequencingQuorumEx, SequencingTimeoutEx, + }, signature_key::SignatureKey, state::ConsensusTime, BlockPayload, }, utils::{Terminator, ViewInner}, - vote::{QuorumVote, VoteType}, + vote::{QuorumVote, QuorumVoteAccumulator, TimeoutVoteAccumulator, VoteType}, }; +use tracing::warn; + use snafu::Snafu; use std::{ collections::{HashMap, HashSet}, @@ -73,6 +76,13 @@ pub struct SequencingConsensusTaskState< Certificate = DACertificate, Commitment = Commitment, >, + SequencingTimeoutEx: ConsensusExchange< + TYPES, + Message, + Proposal = QuorumProposal>, + Certificate = TimeoutCertificate, + Commitment = Commitment, + >, { /// The global task registry pub registry: GlobalRegistry, @@ -89,6 +99,9 @@ pub struct SequencingConsensusTaskState< /// the quorum exchange pub quorum_exchange: Arc>, + /// The timeout exchange + pub timeout_exchange: Arc>, + /// Consensus api pub api: A, @@ -141,9 +154,19 @@ pub struct VoteCollectionTaskState< Certificate = QuorumCertificate>>, Commitment = Commitment>, >, + SequencingTimeoutEx: ConsensusExchange< + TYPES, + Message, + Proposal = QuorumProposal>, + Certificate = TimeoutCertificate, + Commitment = Commitment, + >, { /// the quorum exchange pub quorum_exchange: Arc>, + /// the timeout exchange + pub timeout_exchange: Arc>, + #[allow(clippy::type_complexity)] /// Accumulator for votes pub accumulator: Either< @@ -155,6 +178,18 @@ pub struct VoteCollectionTaskState< >>::VoteAccumulator, QuorumCertificate>>, >, + + /// Accumulator for votes + #[allow(clippy::type_complexity)] + pub timeout_accumulator: Either< + as SignedCertificate< + TYPES, + TYPES::Time, + TYPES::VoteTokenType, + Commitment, + >>::VoteAccumulator, + TimeoutCertificate, + >, /// View which this vote collection task is collecting votes in pub cur_view: TYPES::Time, /// The event stream shared by all tasks @@ -173,6 +208,13 @@ where Certificate = QuorumCertificate>>, Commitment = Commitment>, >, + SequencingTimeoutEx: ConsensusExchange< + TYPES, + Message, + Proposal = QuorumProposal>, + Certificate = TimeoutCertificate, + Commitment = Commitment, + >, { } @@ -193,6 +235,13 @@ where Certificate = QuorumCertificate>>, Commitment = Commitment>, >, + SequencingTimeoutEx: ConsensusExchange< + TYPES, + Message, + Proposal = QuorumProposal>, + Certificate = TimeoutCertificate, + Commitment = Commitment, + >, { match event { SequencingHotShotEvent::QuorumVoteRecv(vote) => match vote.clone() { @@ -212,7 +261,7 @@ where let accumulator = state.accumulator.left().unwrap(); - match state.quorum_exchange.accumulate_vote_2( + match state.quorum_exchange.accumulate_vote( accumulator, &vote, &vote_internal.leaf_commitment, @@ -225,7 +274,7 @@ where debug!("QCFormed! {:?}", qc.view_number); state .event_stream - .publish(SequencingHotShotEvent::QCFormed(qc.clone())) + .publish(SequencingHotShotEvent::QCFormed(either::Left(qc.clone()))) .await; state.accumulator = Either::Right(qc.clone()); @@ -242,18 +291,66 @@ where } } } - QuorumVote::Timeout(_vote) => { - error!("The next leader has received an unexpected vote!"); - return (None, state); - } QuorumVote::No(_) => { error!("The next leader has received an unexpected vote!"); } }, + // TODO: Code below is redundant of code above; can be fixed + // during exchange refactor + // https://github.com/EspressoSystems/HotShot/issues/1799 + SequencingHotShotEvent::TimeoutVoteRecv(vote) => { + debug!("Received timeout vote for view {}", *vote.get_view()); + if state.timeout_accumulator.is_right() { + return (None, state); + } + + if vote.get_view() != state.cur_view { + error!( + "Vote view does not match! vote view is {} current view is {}", + *vote.get_view(), + *state.cur_view + ); + return (None, state); + } + + let accumulator = state.timeout_accumulator.left().unwrap(); + + match state.timeout_exchange.accumulate_vote( + accumulator, + &vote, + &vote.get_view().commit(), + ) { + Either::Left(acc) => { + state.timeout_accumulator = Either::Left(acc); + return (None, state); + } + Either::Right(qc) => { + debug!("QCFormed! {:?}", qc.view_number); + state + .event_stream + .publish(SequencingHotShotEvent::QCFormed(either::Right(qc.clone()))) + .await; + state.timeout_accumulator = Either::Right(qc.clone()); + + // No longer need to poll for votes + state + .quorum_exchange + .network() + .inject_consensus_info(ConsensusIntentEvent::CancelPollForVotes( + *qc.view_number, + )) + .await; + + return (Some(HotShotTaskCompleted::ShutDown), state); + } + } + } SequencingHotShotEvent::Shutdown => { return (Some(HotShotTaskCompleted::ShutDown), state); } - _ => {} + _ => { + error!("Unexpected event"); + } } (None, state) } @@ -281,6 +378,13 @@ where Certificate = DACertificate, Commitment = Commitment, >, + SequencingTimeoutEx: ConsensusExchange< + TYPES, + Message, + Proposal = QuorumProposal>, + Certificate = TimeoutCertificate, + Commitment = Commitment, + >, { #[instrument(skip_all, fields(id = self.id, view = *self.cur_view), name = "Consensus genesis leaf", level = "error")] @@ -367,7 +471,10 @@ where ); if let GeneralConsensusMessage::Vote(vote) = message { - debug!("Sending vote to next quorum leader {:?}", vote.get_view()); + debug!( + "Sending vote to next quorum leader {:?}", + vote.get_view() + 1 + ); self.event_stream .publish(SequencingHotShotEvent::QuorumVoteSend(vote)) .await; @@ -441,7 +548,7 @@ where cert.view_number, vote_token) } else { - error!("Invalid DAC in proposal! Skipping proposal. {:?} cur view is: {:?}", cert.view_number, self.cur_view ); + error!("Invalid DAC in proposal! Skipping proposal. {:?} cur view is: {:?}", cert, self.cur_view ); return false; }; @@ -480,20 +587,11 @@ where ); // Remove old certs, we won't vote on past views - // TODO ED Put back in once we fix other errors - // for view in *self.cur_view..*new_view - 1 { - // let v = TYPES::Time::new(view); - // self.certs.remove(&v); - // } - self.cur_view = new_view; - self.current_proposal = None; - - if new_view == TYPES::Time::new(1) { - self.quorum_exchange - .network() - .inject_consensus_info(ConsensusIntentEvent::PollForCurrentProposal) - .await; + for view in *self.cur_view..*new_view - 1 { + let v = TYPES::Time::new(view); + self.certs.remove(&v); } + self.cur_view = new_view; // Poll the future leader for lookahead let lookahead_view = new_view + LOOK_AHEAD; @@ -510,12 +608,12 @@ where // Start polling for proposals for the new view self.quorum_exchange .network() - .inject_consensus_info(ConsensusIntentEvent::PollForProposal(*self.cur_view)) + .inject_consensus_info(ConsensusIntentEvent::PollForProposal(*self.cur_view + 1)) .await; self.quorum_exchange .network() - .inject_consensus_info(ConsensusIntentEvent::PollForDAC(*self.cur_view)) + .inject_consensus_info(ConsensusIntentEvent::PollForDAC(*self.cur_view + 1)) .await; if self.quorum_exchange.is_leader(self.cur_view + 1) { @@ -534,7 +632,9 @@ where let timeout = self.timeout; self.timeout_task = async_spawn({ let stream = self.event_stream.clone(); - let view_number = self.cur_view; + // Nuance: We timeout on the view + 1 here because that means that we have + // not seen evidence to transition to this new view + let view_number = self.cur_view + 1; async move { async_sleep(Duration::from_millis(timeout)).await; stream @@ -581,245 +681,192 @@ where return; } - self.current_proposal = Some(proposal.data.clone()); - - let vote_token = self.quorum_exchange.make_vote_token(view); - // TODO: do some of this logic without the vote token check, only do that when voting. - match vote_token { - Err(e) => { - error!("Failed to generate vote token for {:?} {:?}", view, e); + // Verify a timeout certificate exists and is valid + if proposal.data.justify_qc.view_number() != view - 1 { + let Some(timeout_cert) = proposal.data.timeout_certificate.clone() else { + warn!( + "Quorum proposal for view {} needed a timeout certificate but did not have one", + *view); + return; + }; + + if timeout_cert.view_number != view - 1 { + warn!("Timeout certificate for view {} was not for the immediately preceding view", *view); + return; } - Ok(None) => { - debug!("We were not chosen for consensus committee on {:?}", view); - } - Ok(Some(vote_token)) => { - debug!("We were chosen for consensus committee on {:?}", view); - let consensus = self.consensus.upgradable_read().await; - // TODO ED Insert TC logic here + if !self + .timeout_exchange + .is_valid_timeout_cert(&timeout_cert.clone(), view - 1) + { + warn!("Timeout certificate for view {} was invalid", *view); + return; + } + } - // Construct the leaf. - let justify_qc = proposal.clone().data.justify_qc; - let parent = if justify_qc.is_genesis() { - self.genesis_leaf().await - } else { - consensus - .saved_leaves - .get(&justify_qc.leaf_commitment()) - .cloned() - }; + let justify_qc = proposal.data.justify_qc.clone(); - // Validate the `justify_qc`. - let justify_qc_commitment = justify_qc.commit(); - let invalid = !self.quorum_exchange.is_valid_cert(&justify_qc); - let leaf; + if !self.quorum_exchange.is_valid_cert(&justify_qc) { + error!("Invalid justify_qc in proposal for view {}", *view); + let consensus = self.consensus.write().await; + consensus.metrics.invalid_qc.update(1); + return; + } - // Justify qc's leaf commitment is not the same as the parent's leaf commitment, but it should be (in this case) - if let Some(parent) = parent.clone() { - let message; - leaf = SequencingLeaf { - view_number: view, - height: proposal.data.height, - justify_qc: justify_qc.clone(), - parent_commitment: parent.commit(), - deltas: Right(proposal.data.block_commitment), - rejected: Vec::new(), - timestamp: time::OffsetDateTime::now_utc().unix_timestamp_nanos(), - proposer_id: sender.to_bytes(), - }; - let parent_commitment = parent.commit(); - let leaf_commitment = leaf.commit(); - - if invalid { - error!("Invalid justify_qc in proposal! parent commitment is {:?} justify qc is {:?}", parent_commitment, justify_qc.clone()); - consensus.metrics.invalid_qc.update(1); - message = self.quorum_exchange.create_no_message::( - justify_qc_commitment, - leaf_commitment, - view, - vote_token, - ); - } - // Validate the leaf commitment for non-genesis QC. - else if !justify_qc.is_genesis() - && justify_qc.leaf_commitment() != parent_commitment - { - error!("Leaf commitment does not equal parent commitment"); - message = self.quorum_exchange.create_no_message::( - justify_qc_commitment, - leaf_commitment, - view, - vote_token, - ); - } - // Validate the `height`. - else if leaf.height != parent.height + 1 { - error!( - "Incorrect height in proposal (expected {}, got {})", - parent.height + 1, - leaf.height - ); - message = self.quorum_exchange.create_no_message( - justify_qc_commitment, - leaf_commitment, - view, - vote_token, - ); - } - // Validate the signature. - else if !view_leader_key - .validate(&proposal.signature, leaf_commitment.as_ref()) - { - error!(?proposal.signature, "Could not verify proposal."); - message = self.quorum_exchange.create_no_message( - justify_qc_commitment, - leaf_commitment, - view, - vote_token, - ); - } - // Create a positive vote if either liveness or safety check - // passes. - // Liveness check. - else { - let liveness_check = justify_qc.view_number > consensus.locked_view; - - // Safety check. - // Check if proposal extends from the locked leaf. - let outcome = consensus.visit_leaf_ancestors( - justify_qc.view_number, - Terminator::Inclusive(consensus.locked_view), - false, - |leaf| { - // if leaf view no == locked view no then we're done, report success by - // returning true - leaf.view_number != consensus.locked_view - }, - ); - let safety_check = outcome.is_ok(); - if let Err(e) = outcome { - self.api.send_view_error(view, Arc::new(e)).await; - } + // NOTE: We could update our view with a valid TC but invalid QC, but that is not what we do here + self.update_view(view).await; - // Skip if both saftey and liveness checks fail. - if !safety_check && !liveness_check { - error!("Failed safety check and liveness check"); - message = self.quorum_exchange.create_no_message( - justify_qc_commitment, - leaf_commitment, - view, - vote_token, - ); - } - // Generate a message with yes vote. - else { - message = self.quorum_exchange.create_yes_message( - justify_qc_commitment, - leaf_commitment, - view, - vote_token, - ); - } - } + self.current_proposal = Some(proposal.data.clone()); - if let GeneralConsensusMessage::Vote(vote) = message { - debug!("Sending vote to next leader {:?}", vote); - }; - } else { - // Allow missing parent so we can update the state, but we won't - // vote in this case. - error!( - "Proposal's parent missing from storage with commitment: {:?}, proposal view {:?}", - justify_qc.leaf_commitment(), - proposal.data.view_number, - ); + let consensus = self.consensus.upgradable_read().await; + + // Construct the leaf. + let parent = if justify_qc.is_genesis() { + self.genesis_leaf().await + } else { + consensus + .saved_leaves + .get(&justify_qc.leaf_commitment()) + .cloned() + }; + + // + // Justify qc's leaf commitment is not the same as the parent's leaf commitment, but it should be (in this case) + let Some(parent) = parent else { + // If no parent then just update our state map and return. We will not vote. + error!( + "Proposal's parent missing from storage with commitment: {:?}", + justify_qc.leaf_commitment() + ); + let leaf = SequencingLeaf { + view_number: view, + height: proposal.data.height, + justify_qc: justify_qc.clone(), + parent_commitment: justify_qc.leaf_commitment(), + deltas: Right(proposal.data.block_commitment), + rejected: Vec::new(), + timestamp: time::OffsetDateTime::now_utc().unix_timestamp_nanos(), + proposer_id: sender.to_bytes(), + }; + + let mut consensus = RwLockUpgradableReadGuard::upgrade(consensus).await; + consensus.state_map.insert( + view, + View { + view_inner: ViewInner::Leaf { + leaf: leaf.commit(), + }, + }, + ); + consensus.saved_leaves.insert(leaf.commit(), leaf.clone()); - if invalid { - error!("Invalid justify_qc in proposal {:?}", justify_qc.clone()); - return; - } - leaf = SequencingLeaf { - view_number: view, - height: proposal.data.height, - justify_qc: justify_qc.clone(), - parent_commitment: justify_qc.leaf_commitment(), - deltas: Right(proposal.data.block_commitment), - rejected: Vec::new(), - timestamp: time::OffsetDateTime::now_utc().unix_timestamp_nanos(), - proposer_id: sender.to_bytes(), - }; - if !view_leader_key - .validate(&proposal.signature, leaf.commit().as_ref()) - { - error!(?proposal.signature, "Could not verify proposal."); - return; - } + return; + }; + let parent_commitment = parent.commit(); + let leaf: SequencingLeaf<_> = SequencingLeaf { + view_number: view, + height: proposal.data.height, + justify_qc: justify_qc.clone(), + parent_commitment, + deltas: Right(proposal.data.block_commitment), + rejected: Vec::new(), + timestamp: time::OffsetDateTime::now_utc().unix_timestamp_nanos(), + proposer_id: sender.to_bytes(), + }; + let leaf_commitment = leaf.commit(); + + // Validate the `height` + // TODO Remove height from proposal validation; view number is sufficient + // https://github.com/EspressoSystems/HotShot/issues/1796 + if leaf.height != parent.height + 1 { + error!( + "Incorrect height in proposal (expected {}, got {})", + parent.height + 1, + leaf.height + ); + return; + } + // Validate the signature. This should also catch if the leaf_commitment does not equal our calculated parent commitment + else if !view_leader_key.validate(&proposal.signature, leaf_commitment.as_ref()) { + error!(?proposal.signature, "Could not verify proposal."); + return; + } + // Create a positive vote if either liveness or safety check + // passes. + + // Liveness check. + let liveness_check = justify_qc.view_number > consensus.locked_view; + + // Safety check. + // Check if proposal extends from the locked leaf. + let outcome = consensus.visit_leaf_ancestors( + justify_qc.view_number, + Terminator::Inclusive(consensus.locked_view), + false, + |leaf| { + // if leaf view no == locked view no then we're done, report success by + // returning true + leaf.view_number != consensus.locked_view + }, + ); + let safety_check = outcome.is_ok(); + if let Err(e) = outcome { + self.api.send_view_error(view, Arc::new(e)).await; + return; + } - let mut consensus = RwLockUpgradableReadGuard::upgrade(consensus).await; - consensus.state_map.insert( - view, - View { - view_inner: ViewInner::Leaf { - leaf: leaf.commit(), - }, - }, - ); - consensus.saved_leaves.insert(leaf.commit(), leaf.clone()); - drop(consensus); - // The valid QC and signature on the proposal is evidence we can go to the next view - // even though we can't vote in this round because we missed the last proposal. - self.update_view(TYPES::Time::new(*view + 1)).await; - return; - } + // Skip if both saftey and liveness checks fail. + if !safety_check && !liveness_check { + error!("Failed safety check and liveness check"); + return; + } - // TODO (Keyao) Update consensus state only if all verifications pass. - // - let high_qc = leaf.justify_qc.clone(); - let mut new_anchor_view = consensus.last_decided_view; - let mut new_locked_view = consensus.locked_view; - let mut last_view_number_visited = view; - let mut new_commit_reached: bool = false; - let mut new_decide_reached = false; - let mut new_decide_qc = None; - let mut leaf_views = Vec::new(); - let mut included_txns = HashSet::new(); - if parent.is_some() { - let old_anchor_view = consensus.last_decided_view; - let parent_view = leaf.justify_qc.view_number; - let mut current_chain_length = 0usize; - if parent_view + 1 == view { - current_chain_length += 1; - if let Err(e) = consensus.visit_leaf_ancestors( - parent_view, - Terminator::Exclusive(old_anchor_view), - true, - |leaf| { - if !new_decide_reached { - if last_view_number_visited == leaf.view_number + 1 { - last_view_number_visited = leaf.view_number; - current_chain_length += 1; - if current_chain_length == 2 { - new_locked_view = leaf.view_number; - new_commit_reached = true; - // The next leaf in the chain, if there is one, is decided, so this - // leaf's justify_qc would become the QC for the decided chain. - new_decide_qc = Some(leaf.justify_qc.clone()); - } else if current_chain_length == 3 { - new_anchor_view = leaf.view_number; - new_decide_reached = true; - } - } else { - // nothing more to do here... we don't have a new chain extension - return false; - } + let high_qc = leaf.justify_qc.clone(); + let mut new_anchor_view = consensus.last_decided_view; + let mut new_locked_view = consensus.locked_view; + let mut last_view_number_visited = view; + let mut new_commit_reached: bool = false; + let mut new_decide_reached = false; + let mut new_decide_qc = None; + let mut leaf_views = Vec::new(); + let mut included_txns = HashSet::new(); + let old_anchor_view = consensus.last_decided_view; + let parent_view = leaf.justify_qc.view_number; + let mut current_chain_length = 0usize; + if parent_view + 1 == view { + current_chain_length += 1; + if let Err(e) = consensus.visit_leaf_ancestors( + parent_view, + Terminator::Exclusive(old_anchor_view), + true, + |leaf| { + if !new_decide_reached { + if last_view_number_visited == leaf.view_number + 1 { + last_view_number_visited = leaf.view_number; + current_chain_length += 1; + if current_chain_length == 2 { + new_locked_view = leaf.view_number; + new_commit_reached = true; + // The next leaf in the chain, if there is one, is decided, so this + // leaf's justify_qc would become the QC for the decided chain. + new_decide_qc = Some(leaf.justify_qc.clone()); + } else if current_chain_length == 3 { + new_anchor_view = leaf.view_number; + new_decide_reached = true; } - // starting from the first iteration with a three chain, e.g. right after the else if case nested in the if case above - if new_decide_reached { - let mut leaf = leaf.clone(); - consensus - .metrics - .last_synced_block_height - .set(usize::try_from(leaf.height).unwrap_or(0)); + } else { + // nothing more to do here... we don't have a new chain extension + return false; + } + } + // starting from the first iteration with a three chain, e.g. right after the else if case nested in the if case above + if new_decide_reached { + let mut leaf = leaf.clone(); + consensus + .metrics + .last_synced_block_height + .set(usize::try_from(leaf.height).unwrap_or(0)); + // If the full block is available for this leaf, include it in the leaf // chain that we send to the client. if let Some(block) = @@ -831,124 +878,118 @@ where } } - leaf_views.push(leaf.clone()); - match &leaf.deltas { - Left(block) => { - let txns = block.contained_transactions(); - for txn in txns { - included_txns.insert(txn); - } - } - Right(_) => {} + leaf_views.push(leaf.clone()); + match &leaf.deltas { + Left(block) => { + let txns = block.contained_transactions(); + for txn in txns { + included_txns.insert(txn); } } - true - }, - ) { - error!("publishing view error"); - self.output_event_stream.publish(Event { - view_number: view, - event: EventType::Error { error: e.into() }, - }).await; + Right(_) => {} } } + true + }, + ) { + error!("publishing view error"); + self.output_event_stream.publish(Event { + view_number: view, + event: EventType::Error { error: e.into() }, + }).await; } + } - let included_txns_set: HashSet<_> = if new_decide_reached { - included_txns - } else { - HashSet::new() - }; + let included_txns_set: HashSet<_> = if new_decide_reached { + included_txns + } else { + HashSet::new() + }; - // promote lock here to add proposal to statemap - let mut consensus = RwLockUpgradableReadGuard::upgrade(consensus).await; - if high_qc.view_number > consensus.high_qc.view_number { - consensus.high_qc = high_qc; - } - consensus.state_map.insert( - view, - View { - view_inner: ViewInner::Leaf { - leaf: leaf.commit(), - }, - }, - ); - consensus.saved_leaves.insert(leaf.commit(), leaf.clone()); - if new_commit_reached { - consensus.locked_view = new_locked_view; - } - #[allow(clippy::cast_precision_loss)] - if new_decide_reached { - debug!("about to publish decide"); - self.event_stream - .publish(SequencingHotShotEvent::LeafDecided(leaf_views.clone())) - .await; - let decide_sent = self.output_event_stream.publish(Event { - view_number: consensus.last_decided_view, - event: EventType::Decide { - leaf_chain: Arc::new(leaf_views), - qc: Arc::new(new_decide_qc.unwrap()), - block_size: Some(included_txns_set.len().try_into().unwrap()), - }, - }); - let old_anchor_view = consensus.last_decided_view; - consensus - .collect_garbage(old_anchor_view, new_anchor_view) - .await; - consensus.last_decided_view = new_anchor_view; - consensus.metrics.invalid_qc.set(0); - consensus.metrics.last_decided_view.set( - usize::try_from(consensus.last_decided_view.get_u64()).unwrap(), - ); - let cur_number_of_views_per_decide_event = - *self.cur_view - consensus.last_decided_view.get_u64(); - consensus - .metrics - .number_of_views_per_decide_event - .add_point(cur_number_of_views_per_decide_event as f64); - - // We're only storing the last QC. We could store more but we're realistically only going to retrieve the last one. - if let Err(e) = self.api.store_leaf(old_anchor_view, leaf).await { - error!("Could not insert new anchor into the storage API: {:?}", e); - } + // promote lock here to add proposal to statemap + let mut consensus = RwLockUpgradableReadGuard::upgrade(consensus).await; + if high_qc.view_number > consensus.high_qc.view_number { + consensus.high_qc = high_qc; + } + consensus.state_map.insert( + view, + View { + view_inner: ViewInner::Leaf { + leaf: leaf.commit(), + }, + }, + ); + consensus.saved_leaves.insert(leaf.commit(), leaf.clone()); + if new_commit_reached { + consensus.locked_view = new_locked_view; + } + #[allow(clippy::cast_precision_loss)] + if new_decide_reached { + debug!("about to publish decide"); + self.event_stream + .publish(SequencingHotShotEvent::LeafDecided(leaf_views.clone())) + .await; + let decide_sent = self.output_event_stream.publish(Event { + view_number: consensus.last_decided_view, + event: EventType::Decide { + leaf_chain: Arc::new(leaf_views), + qc: Arc::new(new_decide_qc.unwrap()), + block_size: Some(included_txns_set.len().try_into().unwrap()), + }, + }); + let old_anchor_view = consensus.last_decided_view; + consensus + .collect_garbage(old_anchor_view, new_anchor_view) + .await; + consensus.last_decided_view = new_anchor_view; + consensus.metrics.invalid_qc.set(0); + consensus + .metrics + .last_decided_view + .set(usize::try_from(consensus.last_decided_view.get_u64()).unwrap()); + let cur_number_of_views_per_decide_event = + *self.cur_view - consensus.last_decided_view.get_u64(); + consensus + .metrics + .number_of_views_per_decide_event + .add_point(cur_number_of_views_per_decide_event as f64); + + // We're only storing the last QC. We could store more but we're realistically only going to retrieve the last one. + if let Err(e) = self.api.store_leaf(old_anchor_view, leaf).await { + error!("Could not insert new anchor into the storage API: {:?}", e); + } - debug!("Sending Decide for view {:?}", consensus.last_decided_view); - debug!("Decided txns len {:?}", included_txns_set.len()); - decide_sent.await; - } + debug!("Sending Decide for view {:?}", consensus.last_decided_view); + debug!("Decided txns len {:?}", included_txns_set.len()); + decide_sent.await; + } - let new_view = self.current_proposal.clone().unwrap().view_number + 1; - // In future we can use the mempool model where we fetch the proposal if we don't have it, instead of having to wait for it here - // This is for the case where we form a QC but have not yet seen the previous proposal ourselves - let should_propose = self.quorum_exchange.is_leader(new_view) - && consensus.high_qc.view_number - == self.current_proposal.clone().unwrap().view_number; - // todo get rid of this clone - let qc = consensus.high_qc.clone(); - - drop(consensus); - if should_propose { - debug!( - "Attempting to publish proposal before voting; now in view: {}", - *new_view - ); - self.publish_proposal_if_able(qc.clone(), qc.view_number + 1) - .await; - } - if !self.vote_if_able().await { - // TOOD ED This means we publish the proposal without updating our own view, which doesn't seem right - return; - } + let new_view = self.current_proposal.clone().unwrap().view_number + 1; + // In future we can use the mempool model where we fetch the proposal if we don't have it, instead of having to wait for it here + // This is for the case where we form a QC but have not yet seen the previous proposal ourselves + let should_propose = self.quorum_exchange.is_leader(new_view) + && consensus.high_qc.view_number + == self.current_proposal.clone().unwrap().view_number; + // todo get rid of this clone + let qc = consensus.high_qc.clone(); - // ED Only do this GC if we are able to vote - for v in (*self.cur_view)..=(*view) { - let time = TYPES::Time::new(v); - self.certs.remove(&time); - } + drop(consensus); + if should_propose { + debug!( + "Attempting to publish proposal after voting; now in view: {}", + *new_view + ); + self.publish_proposal_if_able(qc.clone(), qc.view_number + 1, None) + .await; + } + if !self.vote_if_able().await { + return; + } + self.current_proposal = None; - // Update current view and publish a view change event so other tasks also update - self.update_view(new_view).await; - } + for v in (*self.cur_view)..=(*view) { + let time = TYPES::Time::new(v); + self.certs.remove(&time); } } SequencingHotShotEvent::QuorumVoteRecv(vote) => { @@ -994,23 +1035,39 @@ where phantom: PhantomData, }; - let accumulator = self.quorum_exchange.accumulate_vote_2( + let accumulator = self.quorum_exchange.accumulate_vote( new_accumulator, &vote, &vote_internal.clone().leaf_commitment, ); + // TODO Create default functions for accumulators + // https://github.com/EspressoSystems/HotShot/issues/1797 + let timeout_accumulator = TimeoutVoteAccumulator { + da_vote_outcomes: HashMap::new(), + success_threshold: self.timeout_exchange.success_threshold(), + sig_lists: Vec::new(), + signers: bitvec![0; self.timeout_exchange.total_nodes()], + phantom: PhantomData, + }; + if vote_internal.current_view > collection_view { let state = VoteCollectionTaskState { quorum_exchange: self.quorum_exchange.clone(), + timeout_exchange: self.timeout_exchange.clone(), accumulator, + timeout_accumulator: either::Left(timeout_accumulator), cur_view: vote_internal.current_view, event_stream: self.event_stream.clone(), id: self.id, }; let name = "Quorum Vote Collection"; let filter = FilterEvent(Arc::new(|event| { - matches!(event, SequencingHotShotEvent::QuorumVoteRecv(_)) + matches!( + event, + SequencingHotShotEvent::QuorumVoteRecv(_) + | SequencingHotShotEvent::TimeoutVoteRecv(_) + ) })); let builder = @@ -1044,46 +1101,146 @@ where .await; } } - QuorumVote::Timeout(_) | QuorumVote::No(_) => { + QuorumVote::No(_) => { error!("The next leader has received an unexpected vote!"); } } } - SequencingHotShotEvent::QCFormed(qc) => { - debug!("QC Formed event happened!"); + SequencingHotShotEvent::TimeoutVoteRecv(vote) => { + if !self.timeout_exchange.is_leader(vote.get_view() + 1) { + error!( + "We are not the leader for view {} are we the leader for view + 1? {}", + *vote.get_view() + 1, + self.timeout_exchange.is_leader(vote.get_view() + 2) + ); + return; + } - let mut consensus = self.consensus.write().await; - consensus.high_qc = qc.clone(); - drop(consensus); + let handle_event = HandleEvent(Arc::new(move |event, state| { + async move { vote_handle(state, event).await }.boxed() + })); + let collection_view = + if let Some((collection_view, collection_task, _)) = &self.vote_collector { + if vote.get_view() > *collection_view { + // ED I think we'd want to let that task timeout to avoid a griefing vector + self.registry.shutdown_task(*collection_task).await; + } + *collection_view + } else { + TYPES::Time::new(0) + }; - // View may have already been updated by replica if they voted for this QC - // TODO ED We should separate leader state from replica state, they shouldn't share the same view - // Leader task should only run for a specific view, and never update its current view, but instead spawn another task - // let _res = self.update_view(qc.view_number + 1).await; - - // Start polling for votes for the next view - // if _res { - // if self.quorum_exchange.is_leader(qc.view_number + 2) { - // self.quorum_exchange - // .network() - // .inject_consensus_info( - // (ConsensusIntentEvent::PollForVotes(*qc.view_number + 1)), - // ) - // .await; - // } - // } - - // So we don't create a QC on the first view unless we are the leader - debug!( - "Attempting to publish proposal after forming a QC for view {}", - *qc.view_number + // // Todo check if we are the leader + let new_accumulator = TimeoutVoteAccumulator { + da_vote_outcomes: HashMap::new(), + + success_threshold: self.timeout_exchange.success_threshold(), + + sig_lists: Vec::new(), + signers: bitvec![0; self.timeout_exchange.total_nodes()], + phantom: PhantomData, + }; + + let timeout_accumulator = self.timeout_exchange.accumulate_vote( + new_accumulator, + &vote, + &vote.get_view().commit(), ); - if self - .publish_proposal_if_able(qc.clone(), qc.view_number + 1) - .await - { - self.update_view(qc.view_number + 1).await; + let quorum_accumulator = QuorumVoteAccumulator { + total_vote_outcomes: HashMap::new(), + yes_vote_outcomes: HashMap::new(), + no_vote_outcomes: HashMap::new(), + + success_threshold: self.quorum_exchange.success_threshold(), + failure_threshold: self.quorum_exchange.failure_threshold(), + + sig_lists: Vec::new(), + signers: bitvec![0; self.quorum_exchange.total_nodes()], + phantom: PhantomData, + }; + + // self.timeout_accumulator = accumulator; + + if vote.get_view() > collection_view { + let state = VoteCollectionTaskState { + quorum_exchange: self.quorum_exchange.clone(), + timeout_exchange: self.timeout_exchange.clone(), + accumulator: either::Left(quorum_accumulator), + timeout_accumulator, + cur_view: vote.get_view(), + event_stream: self.event_stream.clone(), + id: self.id, + }; + let name = "Quorum Vote Collection"; + let filter = FilterEvent(Arc::new(|event| { + matches!( + event, + SequencingHotShotEvent::QuorumVoteRecv(_) + | SequencingHotShotEvent::TimeoutVoteRecv(_) + ) + })); + + let builder = + TaskBuilder::>::new(name.to_string()) + .register_event_stream(self.event_stream.clone(), filter) + .await + .register_registry(&mut self.registry.clone()) + .await + .register_state(state) + .register_event_handler(handle_event); + let id = builder.get_task_id().unwrap(); + let stream_id = builder.get_stream_id().unwrap(); + + self.vote_collector = Some((vote.get_view(), id, stream_id)); + + let _task = async_spawn(async move { + VoteCollectionTypes::build(builder).launch().await; + }); + debug!("Starting vote handle for view {:?}", vote.get_view()); + } else if let Some((_, _, stream_id)) = self.vote_collector { + self.event_stream + .direct_message(stream_id, SequencingHotShotEvent::TimeoutVoteRecv(vote)) + .await; + } + } + SequencingHotShotEvent::QCFormed(cert) => { + debug!("QC Formed event happened!"); + + if let either::Right(qc) = cert.clone() { + debug!( + "Attempting to publish proposal after forming a TC for view {}", + *qc.view_number + ); + + let view = qc.view_number + 1; + + let high_qc = self.consensus.read().await.high_qc.clone(); + + if self + .publish_proposal_if_able(high_qc, view, Some(qc.clone())) + .await + { + } else { + warn!("Wasn't able to publish proposal"); + } + } + if let either::Left(qc) = cert { + let mut consensus = self.consensus.write().await; + consensus.high_qc = qc.clone(); + + drop(consensus); + debug!( + "Attempting to publish proposal after forming a QC for view {}", + *qc.view_number + ); + + if !self + .publish_proposal_if_able(qc.clone(), qc.view_number + 1, None) + .await + { + warn!("Wasn't able to publish proposal"); + } } } SequencingHotShotEvent::DACRecv(cert) => { @@ -1092,9 +1249,8 @@ where let view = cert.view_number; self.certs.insert(view, cert); - // TODO Make sure we aren't voting for an arbitrarily old round for no reason if self.vote_if_able().await { - self.update_view(view + 1).await; + self.current_proposal = None; } } SequencingHotShotEvent::VidCertRecv(cert) => { @@ -1105,7 +1261,7 @@ where // TODO Make sure we aren't voting for an arbitrarily old round for no reason if self.vote_if_able().await { - self.update_view(view + 1).await; + self.current_proposal = None; } } SequencingHotShotEvent::ViewChange(new_view) => { @@ -1128,43 +1284,42 @@ where }, }) .await; - - debug!("View changed to {}", *new_view); - - // ED Need to update the view here? What does otherwise? - // self.update_view(qc.view_number + 1).await; - // So we don't create a QC on the first view unless we are the leader - if !self.quorum_exchange.is_leader(self.cur_view) { + } + SequencingHotShotEvent::Timeout(view) => { + // NOTE: We may optionally have the timeout task listen for view change events + if self.cur_view >= view { return; } + let vote_token = self.timeout_exchange.make_vote_token(view); - let consensus = self.consensus.read().await; - let qc = consensus.high_qc.clone(); - drop(consensus); - if !self.publish_proposal_if_able(qc, self.cur_view).await { - error!( - "Failed to publish proposal on view change. View = {:?}", - self.cur_view - ); + match vote_token { + Err(e) => { + error!("Failed to generate vote token for {:?} {:?}", view, e); + } + Ok(None) => { + debug!("We were not chosen for consensus committee on {:?}", view); + } + Ok(Some(vote_token)) => { + let message = self + .timeout_exchange + .create_timeout_message::(view, vote_token); + + debug!("Sending timeout vote for view {}", *view); + if let GeneralConsensusMessage::TimeoutVote(vote) = message { + self.event_stream + .publish(SequencingHotShotEvent::TimeoutVoteSend(vote)) + .await; + } + } } - } - SequencingHotShotEvent::Timeout(view) => { - // The view sync module will handle updating views in the case of timeout - // TODO ED In the future send a timeout vote - self.quorum_exchange - .network() - .inject_consensus_info(ConsensusIntentEvent::CancelPollForVotes(*view)) - .await; debug!( - "We received a timeout event in the consensus task for view {}!", + "We did not receive evidence for view {} in time, sending timeout vote for that view!", *view ); let consensus = self.consensus.read().await; consensus.metrics.number_of_timeouts.add(1); } SequencingHotShotEvent::SendDABlockData(block) => { - // ED TODO Should make sure this is actually the most recent block - // ED Should make this a map to view self.block = Some(block); } _ => {} @@ -1176,6 +1331,7 @@ where &mut self, _qc: QuorumCertificate>, view: TYPES::Time, + timeout_certificate: Option>, ) -> bool { if !self.quorum_exchange.is_leader(view) { error!( @@ -1204,6 +1360,7 @@ where return false; }; if leaf_commitment != consensus.high_qc.leaf_commitment() { + // NOTE: This happens on the genesis block debug!( "They don't equal: {:?} {:?}", leaf_commitment, @@ -1263,8 +1420,7 @@ where view_number: leaf.view_number, height: leaf.height, justify_qc: consensus.high_qc.clone(), - // TODO ED Update this to be the actual TC if there is one - timeout_certificate: None, + timeout_certificate: timeout_certificate.or_else(|| None), proposer_id: leaf.proposer_id, dac: None, }; @@ -1273,7 +1429,10 @@ where data: proposal, signature, }; - debug!("Sending proposal for view {:?} \n {:?}", self.cur_view, ""); + debug!( + "Sending proposal for view {:?} \n {:?}", + leaf.view_number, "" + ); self.event_stream .publish(SequencingHotShotEvent::QuorumProposalSend( @@ -1284,6 +1443,7 @@ where self.block = None; return true; } + debug!("Self block was None"); false } } @@ -1311,6 +1471,13 @@ where Certificate = DACertificate, Commitment = Commitment, >, + SequencingTimeoutEx: ConsensusExchange< + TYPES, + Message, + Proposal = QuorumProposal>, + Certificate = TimeoutCertificate, + Commitment = Commitment, + >, { } @@ -1360,6 +1527,13 @@ where Certificate = DACertificate, Commitment = Commitment, >, + SequencingTimeoutEx: ConsensusExchange< + TYPES, + Message, + Proposal = QuorumProposal>, + Certificate = TimeoutCertificate, + Commitment = Commitment, + >, { if let SequencingHotShotEvent::Shutdown = event { (Some(HotShotTaskCompleted::ShutDown), state) @@ -1383,6 +1557,7 @@ pub fn consensus_event_filter>( | SequencingHotShotEvent::ViewChange(_) | SequencingHotShotEvent::SendDABlockData(_) | SequencingHotShotEvent::Timeout(_) + | SequencingHotShotEvent::TimeoutVoteRecv(_) | SequencingHotShotEvent::Shutdown, ) } diff --git a/crates/task-impls/src/da.rs b/crates/task-impls/src/da.rs index 5aadf0e162..b7d5ea78ad 100644 --- a/crates/task-impls/src/da.rs +++ b/crates/task-impls/src/da.rs @@ -12,9 +12,6 @@ use hotshot_task::{ task::{FilterEvent, HandleEvent, HotShotTaskCompleted, HotShotTaskTypes, TS}, task_impls::{HSTWithEvent, TaskBuilder}, }; -use hotshot_types::traits::election::SignedCertificate; -use hotshot_types::vote::DAVoteAccumulator; -use hotshot_types::vote::VoteType; use hotshot_types::{ certificate::DACertificate, consensus::{Consensus, View}, @@ -22,7 +19,7 @@ use hotshot_types::{ message::{Message, Proposal, SequencingMessage}, traits::{ consensus_api::SequencingConsensusApi, - election::{CommitteeExchangeType, ConsensusExchange, Membership}, + election::{CommitteeExchangeType, ConsensusExchange, Membership, SignedCertificate}, network::{CommunicationChannel, ConsensusIntentEvent}, node_implementation::{CommitteeEx, NodeImplementation, NodeType}, signature_key::SignatureKey, @@ -30,11 +27,11 @@ use hotshot_types::{ BlockPayload, }, utils::ViewInner, + vote::{DAVoteAccumulator, VoteType}, }; use snafu::Snafu; -use std::marker::PhantomData; -use std::{collections::HashMap, sync::Arc}; +use std::{collections::HashMap, marker::PhantomData, sync::Arc}; use tracing::{debug, error, instrument, warn}; #[derive(Snafu, Debug)] @@ -156,7 +153,7 @@ where let accumulator = state.accumulator.left().unwrap(); - match state.committee_exchange.accumulate_vote_2( + match state.committee_exchange.accumulate_vote( accumulator, &vote, &vote.block_commitment, @@ -197,7 +194,7 @@ where let accumulator = state.accumulator.left().unwrap(); - match state.committee_exchange.accumulate_vote_2( + match state.committee_exchange.accumulate_vote( accumulator, &vote, &vote.block_commitment, @@ -275,7 +272,9 @@ where // `self.cur_view` should be at least 1 since there is a view change before getting // the `DAProposalRecv` event. Otherewise, the view number subtraction below will // cause an overflow error. - if view < self.cur_view - 1 { + // TODO ED Come back to this - we probably don't need this, but we should also never receive a DAC where this fails, investigate block ready so it doesn't make one for the genesis block + + if self.cur_view != TYPES::Time::genesis() && view < self.cur_view - 1 { warn!("Throwing away DA proposal that is more than one view older"); return None; } @@ -373,7 +372,7 @@ where phantom: PhantomData, }; - let accumulator = self.committee_exchange.accumulate_vote_2( + let accumulator = self.committee_exchange.accumulate_vote( new_accumulator, &vote, &vote.clone().block_commitment, @@ -455,7 +454,7 @@ where phantom: PhantomData, }; - let accumulator = self.committee_exchange.accumulate_vote_2( + let accumulator = self.committee_exchange.accumulate_vote( new_accumulator, &vote, &vote.clone().block_commitment, @@ -510,7 +509,9 @@ where // `self.cur_view` should be at least 1 since there is a view change before getting // the `DAProposalRecv` event. Otherewise, the view number subtraction below will // cause an overflow error. - if view < self.cur_view - 1 { + // TODO ED Revisit this + + if self.cur_view != TYPES::Time::genesis() && view < self.cur_view - 1 { warn!("Throwing away VID disperse data that is more than one view older"); return None; } @@ -579,9 +580,8 @@ where error!("View changed by more than 1 going to view {:?}", view); } self.cur_view = view; - // Inject view info into network - // ED I think it is possible that you receive a quorum proposal, vote on it and update your view before the da leader has sent their proposal, and therefore you skip polling for this view? + // Inject view info into network let is_da = self .committee_exchange .membership() @@ -636,12 +636,7 @@ where }; debug!("Sending DA proposal for view {:?}", data.view_number); - // let message = SequencingMessage::(Right( - // CommitteeConsensusMessage::DAProposal(Proposal { data, signature }), - // )); let message = Proposal { data, signature }; - // Brodcast DA proposal - // TODO ED We should send an event to do this, but just getting it to work for now self.event_stream .publish(SequencingHotShotEvent::SendDABlockData(block.clone())) @@ -662,6 +657,7 @@ where } SequencingHotShotEvent::Shutdown => { + error!("Shutting down because of shutdown signal!"); return Some(HotShotTaskCompleted::ShutDown); } _ => { diff --git a/crates/task-impls/src/events.rs b/crates/task-impls/src/events.rs index f9fc7f8ba3..4672e791ff 100644 --- a/crates/task-impls/src/events.rs +++ b/crates/task-impls/src/events.rs @@ -1,13 +1,14 @@ use crate::view_sync::ViewSyncPhase; use commit::Commitment; +use either::Either; use hotshot_types::{ - certificate::{DACertificate, QuorumCertificate}, + certificate::{DACertificate, QuorumCertificate, TimeoutCertificate}, data::{DAProposal, VidDisperse}, message::Proposal, traits::node_implementation::{ NodeImplementation, NodeType, QuorumProposalType, ViewSyncProposalType, }, - vote::{DAVote, QuorumVote, ViewSyncVote}, + vote::{DAVote, QuorumVote, TimeoutVote, ViewSyncVote}, }; /// All of the possible events that can be passed between Sequecning `HotShot` tasks @@ -19,6 +20,10 @@ pub enum SequencingHotShotEvent> { QuorumProposalRecv(Proposal>, TYPES::SignatureKey), /// A quorum vote has been received from the network; handled by the consensus task QuorumVoteRecv(QuorumVote>), + /// A timeout vote recevied from the network; handled by consensus task + TimeoutVoteRecv(TimeoutVote), + /// Send a timeout vote to the network; emitted by consensus task replicas + TimeoutVoteSend(TimeoutVote), /// A DA proposal has been received from the network; handled by the DA task DAProposalRecv(Proposal>, TYPES::SignatureKey), /// A DA vote has been received by the network; handled by the DA task @@ -34,7 +39,7 @@ pub enum SequencingHotShotEvent> { /// Send a DA vote to the DA leader; emitted by DA committee members in the DA task after seeing a valid DA proposal DAVoteSend(DAVote), /// The next leader has collected enough votes to form a QC; emitted by the next leader in the consensus task; an internal event only - QCFormed(QuorumCertificate>), + QCFormed(Either>, TimeoutCertificate>), /// The DA leader has collected enough votes to form a DAC; emitted by the DA leader in the DA task; sent to the entire network via the networking task DACSend(DACertificate, TYPES::SignatureKey), /// The current view has changed; emitted by the replica in the consensus task or replica in the view sync task; received by almost all other tasks diff --git a/crates/task-impls/src/harness.rs b/crates/task-impls/src/harness.rs index 136093bc3e..c2e2f0be50 100644 --- a/crates/task-impls/src/harness.rs +++ b/crates/task-impls/src/harness.rs @@ -3,7 +3,7 @@ use async_compatibility_layer::art::async_spawn; use futures::FutureExt; use hotshot_task::{ - event_stream::{self, ChannelStream, EventStream}, + event_stream::{ChannelStream, EventStream}, task::{FilterEvent, HandleEvent, HotShotTaskCompleted, HotShotTaskTypes, TS}, task_impls::{HSTWithEvent, TaskBuilder}, task_launcher::TaskRunner, @@ -52,7 +52,7 @@ pub async fn run_harness( { let task_runner = TaskRunner::new(); let registry = task_runner.registry.clone(); - let event_stream = event_stream.unwrap_or(event_stream::ChannelStream::new()); + let event_stream = event_stream.unwrap_or_default(); let state = TestHarnessState { expected_output }; let handler = HandleEvent(Arc::new(move |event, state| { async move { handle_event(event, state) }.boxed() @@ -76,7 +76,7 @@ pub async fn run_harness( let runner = async_spawn(async move { task_runner.launch().await }); for event in input { - let _ = event_stream.publish(event).await; + let () = event_stream.publish(event).await; } let _ = runner.await; diff --git a/crates/task-impls/src/network.rs b/crates/task-impls/src/network.rs index 605239c748..62c67757d8 100644 --- a/crates/task-impls/src/network.rs +++ b/crates/task-impls/src/network.rs @@ -90,6 +90,9 @@ impl< GeneralConsensusMessage::ViewSyncCertificate(view_sync_message) => { SequencingHotShotEvent::ViewSyncCertificateRecv(view_sync_message) } + GeneralConsensusMessage::TimeoutVote(message) => { + SequencingHotShotEvent::TimeoutVoteRecv(message) + } GeneralConsensusMessage::InternalTrigger(_) => { error!("Got unexpected message type in network task!"); return; @@ -100,7 +103,6 @@ impl< SequencingHotShotEvent::DAProposalRecv(proposal.clone(), sender) } CommitteeConsensusMessage::DAVote(vote) => { - // error!("DA Vote message recv {:?}", vote.current_view); SequencingHotShotEvent::DAVoteRecv(vote.clone()) } CommitteeConsensusMessage::DACertificate(cert) => { @@ -239,7 +241,7 @@ impl< CommitteeConsensusMessage::VidVote(vote.clone()), ))), TransmitType::Direct, - Some(membership.get_leader(vote.current_view)), // TODO who is VID leader? https://github.com/EspressoSystems/HotShot/issues/1699 + Some(membership.get_leader(vote.get_view())), // TODO who is VID leader? https://github.com/EspressoSystems/HotShot/issues/1699 ), SequencingHotShotEvent::DAVoteSend(vote) => ( vote.signature_key(), @@ -247,7 +249,7 @@ impl< CommitteeConsensusMessage::DAVote(vote.clone()), ))), TransmitType::Direct, - Some(membership.get_leader(vote.current_view)), + Some(membership.get_leader(vote.get_view())), ), SequencingHotShotEvent::VidCertSend(certificate, sender) => ( sender, @@ -285,11 +287,20 @@ impl< Some(membership.get_leader(vote.round() + vote.relay())), ) } + SequencingHotShotEvent::TimeoutVoteSend(vote) => ( + vote.get_key(), + MessageKind::::from_consensus_message(SequencingMessage(Left( + GeneralConsensusMessage::TimeoutVote(vote.clone()), + ))), + TransmitType::Direct, + Some(membership.get_leader(vote.get_view() + 1)), + ), SequencingHotShotEvent::ViewChange(view) => { self.view = view; return None; } SequencingHotShotEvent::Shutdown => { + error!("Networking task shutting down"); return Some(HotShotTaskCompleted::ShutDown); } event => { @@ -339,6 +350,7 @@ impl< | SequencingHotShotEvent::DACSend(_, _) | SequencingHotShotEvent::VidCertSend(_, _) | SequencingHotShotEvent::ViewChange(_) + | SequencingHotShotEvent::TimeoutVoteSend(_) ) } diff --git a/crates/task-impls/src/transactions.rs b/crates/task-impls/src/transactions.rs index a47a05e349..cb939d0ca7 100644 --- a/crates/task-impls/src/transactions.rs +++ b/crates/task-impls/src/transactions.rs @@ -1,7 +1,7 @@ use crate::events::SequencingHotShotEvent; -use async_compatibility_layer::async_primitives::subscribable_rwlock::SubscribableRwLock; use async_compatibility_layer::{ - art::async_timeout, async_primitives::subscribable_rwlock::ReadView, + art::async_timeout, + async_primitives::subscribable_rwlock::{ReadView, SubscribableRwLock}, }; use async_lock::RwLock; use bincode::config::Options; diff --git a/crates/task-impls/src/view_sync.rs b/crates/task-impls/src/view_sync.rs index 4b241c37cd..c7efa7e212 100644 --- a/crates/task-impls/src/view_sync.rs +++ b/crates/task-impls/src/view_sync.rs @@ -490,7 +490,7 @@ where } &SequencingHotShotEvent::Timeout(view_number) => { // This is an old timeout and we can ignore it - if view_number < TYPES::Time::new(*self.current_view) { + if view_number <= TYPES::Time::new(*self.current_view) { return; } @@ -500,16 +500,12 @@ where self.num_timeouts_tracked, *view_number ); - if self.num_timeouts_tracked > 2 { + if self.num_timeouts_tracked > 3 { error!("Too many timeouts! This shouldn't happen"); } // TODO ED Make this a configurable variable - if self.num_timeouts_tracked == 2 { - error!( - "Starting view sync protocol; attempting to sync on view {}", - *view_number + 1 - ); + if self.num_timeouts_tracked > 2 { // Start polling for view sync certificates self.exchange .network() @@ -606,7 +602,7 @@ where }); } else { // If this is the first timeout we've seen advance to the next view - self.current_view += 1; + self.current_view = view_number; self.event_stream .publish(SequencingHotShotEvent::ViewChange(TYPES::Time::new( *self.current_view, @@ -1035,7 +1031,7 @@ where *vote_internal.round, vote_internal.relay ); - let accumulator = self.exchange.accumulate_vote_2( + let accumulator = self.exchange.accumulate_vote( self.accumulator.left().unwrap(), &vote, &view_sync_data, diff --git a/crates/task/src/event_stream.rs b/crates/task/src/event_stream.rs index 875d045994..5248fe4373 100644 --- a/crates/task/src/event_stream.rs +++ b/crates/task/src/event_stream.rs @@ -129,7 +129,7 @@ impl EventStream for ChannelStream { Some((filter, sender)) => { if filter(&event) { match sender.send(event.clone()).await { - Ok(_) => (), + Ok(()) => (), // error sending => stream is closed so remove it Err(_) => self.unsubscribe(id).await, } @@ -147,7 +147,7 @@ impl EventStream for ChannelStream { for (uid, (filter, sender)) in &inner.subscribers { if filter(&event) { match sender.send(event.clone()).await { - Ok(_) => (), + Ok(()) => (), // error sending => stream is closed so remove it Err(_) => { self.unsubscribe(*uid).await; diff --git a/crates/task/src/task.rs b/crates/task/src/task.rs index 37b6f2d56f..8435ff0fcf 100644 --- a/crates/task/src/task.rs +++ b/crates/task/src/task.rs @@ -381,7 +381,7 @@ impl<'pin, HSTT: HotShotTaskTypes> ProjectedHST<'pin, HSTT> { cx: &mut Context<'_>, ) -> Poll { match fut.as_mut().poll(cx) { - Poll::Ready(_) => Poll::Ready( + Poll::Ready(()) => Poll::Ready( self.r_val .take() .unwrap_or_else(|| HotShotTaskCompleted::LostReturnValue), diff --git a/crates/testing/src/spinning_task.rs b/crates/testing/src/spinning_task.rs index a9d9e5d586..1144dbd2d8 100644 --- a/crates/testing/src/spinning_task.rs +++ b/crates/testing/src/spinning_task.rs @@ -7,9 +7,7 @@ use std::{ use crate::{test_launcher::TaskGenerator, test_runner::Node, GlobalTestEvent}; use async_compatibility_layer::art::async_sleep; use futures::FutureExt; -use hotshot::traits::TestableNodeImplementation; -use hotshot::HotShotType; -use hotshot::SystemContext; +use hotshot::{traits::TestableNodeImplementation, HotShotType, SystemContext}; use hotshot_task::{ boxed_sync, event_stream::ChannelStream, diff --git a/crates/testing/src/test_runner.rs b/crates/testing/src/test_runner.rs index cf6932a390..0f9429c22c 100644 --- a/crates/testing/src/test_runner.rs +++ b/crates/testing/src/test_runner.rs @@ -3,8 +3,10 @@ use super::{ overall_safety_task::{OverallSafetyTask, RoundCtx}, txn_task::TxnTask, }; -use crate::spinning_task::UpDown; -use crate::test_launcher::{Networks, TestLauncher}; +use crate::{ + spinning_task::UpDown, + test_launcher::{Networks, TestLauncher}, +}; use hotshot::types::SystemContextHandle; use hotshot::{traits::TestableNodeImplementation, HotShotInitializer, HotShotType, SystemContext}; @@ -22,8 +24,7 @@ use hotshot_types::{ }, HotShotConfig, }; -use std::collections::HashMap; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; #[allow(deprecated)] use tracing::info; diff --git a/crates/testing/tests/basic.rs b/crates/testing/tests/basic.rs index 35e75fffa7..cbc2c149dc 100644 --- a/crates/testing/tests/basic.rs +++ b/crates/testing/tests/basic.rs @@ -18,7 +18,7 @@ async fn test_success() { // allow more time to pass in CI completion_task_description: CompletionTaskDescription::TimeBasedCompletionTaskBuilder( TimeBasedCompletionTaskDescription { - duration: Duration::from_millis(1_200_000), + duration: Duration::from_secs(60), }, ), ..TestMetadata::default() diff --git a/crates/testing/tests/consensus_task.rs b/crates/testing/tests/consensus_task.rs index ec1bd7b05c..dc8439a85d 100644 --- a/crates/testing/tests/consensus_task.rs +++ b/crates/testing/tests/consensus_task.rs @@ -1,3 +1,4 @@ +use commit::Commitment; use commit::Committable; use either::Right; use hotshot::{ @@ -85,9 +86,11 @@ async fn build_vote( tokio::test(flavor = "multi_thread", worker_threads = 2) )] #[cfg_attr(async_executor_impl = "async-std", async_std::test)] +#[ignore] async fn test_consensus_task() { use hotshot_task_impls::harness::run_harness; use hotshot_testing::task_helpers::build_system_handle; + use hotshot_types::certificate::QuorumCertificate; async_compatibility_layer::logging::setup_logging(); async_compatibility_layer::logging::setup_backtrace(); @@ -98,19 +101,30 @@ async fn test_consensus_task() { let mut input = Vec::new(); let mut output = HashMap::new(); - input.push(SequencingHotShotEvent::ViewChange(ViewNumber::new(1))); - input.push(SequencingHotShotEvent::ViewChange(ViewNumber::new(2))); + // Trigger a proposal to send by creating a new QC. Then recieve that proposal and update view based on the valid QC in the proposal + let qc = QuorumCertificate::< + SequencingTestTypes, + Commitment>, + >::genesis(); + let proposal = build_quorum_proposal(&handle, &private_key, 1).await; + + input.push(SequencingHotShotEvent::QCFormed(either::Left(qc.clone()))); + input.push(SequencingHotShotEvent::QuorumProposalRecv( + proposal.clone(), + public_key, + )); input.push(SequencingHotShotEvent::Shutdown); + output.insert(SequencingHotShotEvent::QCFormed(either::Left(qc)), 1); + output.insert( + SequencingHotShotEvent::QuorumProposalSend(proposal.clone(), public_key), + 1, + ); output.insert( - SequencingHotShotEvent::QuorumProposalSend( - build_quorum_proposal(&handle, &private_key, 1).await, - public_key, - ), + SequencingHotShotEvent::QuorumProposalRecv(proposal.clone(), public_key), 1, ); - output.insert(SequencingHotShotEvent::ViewChange(ViewNumber::new(1)), 2); - output.insert(SequencingHotShotEvent::ViewChange(ViewNumber::new(2)), 2); + output.insert(SequencingHotShotEvent::ViewChange(ViewNumber::new(1)), 1); output.insert(SequencingHotShotEvent::Shutdown, 1); let build_fn = |task_runner, event_stream| { @@ -141,14 +155,11 @@ async fn test_consensus_vote() { let proposal = build_quorum_proposal(&handle, &private_key, 1).await; - input.push(SequencingHotShotEvent::ViewChange(ViewNumber::new(1))); + // Send a proposal, vote on said proposal, update view based on proposal QC, receive vote as next leader input.push(SequencingHotShotEvent::QuorumProposalRecv( proposal.clone(), public_key, )); - - input.push(SequencingHotShotEvent::Shutdown); - output.insert( SequencingHotShotEvent::QuorumProposalRecv(proposal.clone(), public_key), 1, @@ -157,10 +168,14 @@ async fn test_consensus_vote() { if let GeneralConsensusMessage::Vote(vote) = build_vote(&handle, proposal, ViewNumber::new(1)).await { - output.insert(SequencingHotShotEvent::QuorumVoteSend(vote), 1); + output.insert(SequencingHotShotEvent::QuorumVoteSend(vote.clone()), 1); + input.push(SequencingHotShotEvent::QuorumVoteRecv(vote.clone())); + output.insert(SequencingHotShotEvent::QuorumVoteRecv(vote), 1); } - output.insert(SequencingHotShotEvent::ViewChange(ViewNumber::new(1)), 2); - output.insert(SequencingHotShotEvent::ViewChange(ViewNumber::new(2)), 1); + + output.insert(SequencingHotShotEvent::ViewChange(ViewNumber::new(1)), 1); + + input.push(SequencingHotShotEvent::Shutdown); output.insert(SequencingHotShotEvent::Shutdown, 1); let build_fn = |task_runner, event_stream| { diff --git a/crates/testing/tests/timeout.rs b/crates/testing/tests/timeout.rs index f8963c9d52..e8e4278195 100644 --- a/crates/testing/tests/timeout.rs +++ b/crates/testing/tests/timeout.rs @@ -4,13 +4,17 @@ tokio::test(flavor = "multi_thread", worker_threads = 2) )] #[cfg_attr(async_executor_impl = "async-std", async_std::test)] -#[ignore] -async fn test_timeout() { +// TODO Add memory network tests after this issue is finished: +// https://github.com/EspressoSystems/HotShot/issues/1790 +async fn test_timeout_web() { use std::time::Duration; + use hotshot_testing::node_types::SequencingWebImpl; + use hotshot_testing::{ completion_task::{CompletionTaskDescription, TimeBasedCompletionTaskDescription}, - node_types::{SequencingMemoryImpl, SequencingTestTypes}, + node_types::SequencingTestTypes, + overall_safety_task::OverallSafetyPropertiesDescription, spinning_task::{ChangeNode, SpinningTaskDescription, UpDown}, test_builder::{TestMetadata, TimingData}, }; @@ -21,7 +25,12 @@ async fn test_timeout() { next_view_timeout: 1000, ..Default::default() }; - let mut metadata = TestMetadata::default(); + + let mut metadata = TestMetadata { + total_nodes: 10, + start_nodes: 10, + ..Default::default() + }; let dead_nodes = vec![ChangeNode { idx: 0, updown: UpDown::Down, @@ -29,20 +38,89 @@ async fn test_timeout() { metadata.timing_data = timing_data; + metadata.overall_safety_properties = OverallSafetyPropertiesDescription { + num_successful_views: 25, + ..Default::default() + }; + metadata.spinning_properties = SpinningTaskDescription { - node_changes: vec![(Duration::new(0, 5000), dead_nodes)], + node_changes: vec![(Duration::from_millis(500), dead_nodes)], + }; + + metadata.completion_task_description = + CompletionTaskDescription::TimeBasedCompletionTaskBuilder( + TimeBasedCompletionTaskDescription { + duration: Duration::from_secs(30), + }, + ); + + // TODO ED Test with memory network once issue is resolved + // https://github.com/EspressoSystems/HotShot/issues/1790 + metadata + .gen_launcher::() + .launch() + .run_test() + .await; +} + +#[cfg(test)] +#[cfg_attr( + async_executor_impl = "tokio", + tokio::test(flavor = "multi_thread", worker_threads = 2) +)] +#[cfg_attr(async_executor_impl = "async-std", async_std::test)] +async fn test_timeout_libp2p() { + use std::time::Duration; + + use hotshot_testing::node_types::SequencingLibp2pImpl; + + use hotshot_testing::{ + completion_task::{CompletionTaskDescription, TimeBasedCompletionTaskDescription}, + node_types::SequencingTestTypes, + overall_safety_task::OverallSafetyPropertiesDescription, + spinning_task::{ChangeNode, SpinningTaskDescription, UpDown}, + test_builder::{TestMetadata, TimingData}, + }; + + async_compatibility_layer::logging::setup_logging(); + async_compatibility_layer::logging::setup_backtrace(); + let timing_data = TimingData { + next_view_timeout: 1000, + ..Default::default() + }; + + let mut metadata = TestMetadata { + total_nodes: 10, + start_nodes: 10, + ..Default::default() + }; + let dead_nodes = vec![ChangeNode { + idx: 0, + updown: UpDown::Down, + }]; + + metadata.timing_data = timing_data; + + metadata.overall_safety_properties = OverallSafetyPropertiesDescription { + num_successful_views: 25, + ..Default::default() }; - // TODO ED Add safety task, etc to confirm TCs are being formed + metadata.spinning_properties = SpinningTaskDescription { + node_changes: vec![(Duration::from_millis(500), dead_nodes)], + }; metadata.completion_task_description = CompletionTaskDescription::TimeBasedCompletionTaskBuilder( TimeBasedCompletionTaskDescription { - duration: Duration::from_millis(10000), + duration: Duration::from_secs(30), }, ); + + // TODO ED Test with memory network once issue is resolved + // https://github.com/EspressoSystems/HotShot/issues/1790 metadata - .gen_launcher::() + .gen_launcher::() .launch() .run_test() .await; diff --git a/crates/testing/tests/view_sync_task.rs b/crates/testing/tests/view_sync_task.rs index 25cbe55b9a..8116dcf2a2 100644 --- a/crates/testing/tests/view_sync_task.rs +++ b/crates/testing/tests/view_sync_task.rs @@ -23,7 +23,7 @@ async fn test_view_sync_task() { use core::panic; use hotshot::tasks::add_view_sync_task; - use hotshot_task_impls::{harness::run_harness, view_sync::ViewSyncPhase}; + use hotshot_task_impls::harness::run_harness; use hotshot_testing::task_helpers::build_system_handle; use hotshot_types::{ traits::election::VoteData, @@ -34,7 +34,7 @@ async fn test_view_sync_task() { async_compatibility_layer::logging::setup_backtrace(); // Build the API for node 3. - let handle = build_system_handle(3).await.0; + let handle = build_system_handle(5).await.0; let api: HotShotSequencingConsensusApi = HotShotSequencingConsensusApi { inner: handle.hotshot.inner.clone(), @@ -42,19 +42,19 @@ async fn test_view_sync_task() { let view_sync_exchange = api.inner.exchanges.view_sync_exchange().clone(); let relay_pub_key = api.public_key().to_bytes(); let vote_token = view_sync_exchange - .make_vote_token(ViewNumber::new(3)) + .make_vote_token(ViewNumber::new(5)) .unwrap_or_else(|_| panic!("Error making vote token")) .unwrap_or_else(|| panic!("Not chosen for the committee")); let vote_data_internal: ViewSyncData = ViewSyncData { relay: relay_pub_key.clone(), - round: ViewNumber::new(3), + round: ViewNumber::new(5), }; let vote_data_internal_commitment = vote_data_internal.commit(); let signature = view_sync_exchange.sign_precommit_message(vote_data_internal_commitment); let vote = ViewSyncVote::PreCommit(ViewSyncVoteInternal { relay_pub_key, relay: 0, - round: ViewNumber::new(3), + round: ViewNumber::new(5), signature, vote_token, vote_data: VoteData::ViewSyncPreCommit(vote_data_internal_commitment), @@ -64,23 +64,20 @@ async fn test_view_sync_task() { let mut input = Vec::new(); let mut output = HashMap::new(); - input.push(SequencingHotShotEvent::ViewChange(ViewNumber::new(1))); - input.push(SequencingHotShotEvent::Timeout(ViewNumber::new(1))); input.push(SequencingHotShotEvent::Timeout(ViewNumber::new(2))); + input.push(SequencingHotShotEvent::Timeout(ViewNumber::new(3))); + input.push(SequencingHotShotEvent::Timeout(ViewNumber::new(4))); + input.push(SequencingHotShotEvent::Shutdown); - output.insert(SequencingHotShotEvent::ViewChange(ViewNumber::new(1)), 1); - output.insert(SequencingHotShotEvent::Timeout(ViewNumber::new(1)), 1); output.insert(SequencingHotShotEvent::Timeout(ViewNumber::new(2)), 1); - // 2 `Timeout` events will trigger a replica task to handle a `ViewSyncTrigger` event, which - // will then publish a `ViewSyncVoteSend` event. + output.insert(SequencingHotShotEvent::Timeout(ViewNumber::new(3)), 1); + output.insert(SequencingHotShotEvent::Timeout(ViewNumber::new(4)), 1); + output.insert(SequencingHotShotEvent::ViewSyncVoteSend(vote.clone()), 1); - output.insert( - SequencingHotShotEvent::ViewSyncTimeout(ViewNumber::new(3), 0, ViewSyncPhase::None), - 1, - ); - // Triggered by the `Timeout` events. output.insert(SequencingHotShotEvent::ViewChange(ViewNumber::new(2)), 1); + output.insert(SequencingHotShotEvent::ViewChange(ViewNumber::new(3)), 1); + output.insert(SequencingHotShotEvent::Shutdown, 1); let build_fn = diff --git a/crates/types/src/certificate.rs b/crates/types/src/certificate.rs index 2e55334f56..bf02d2f871 100644 --- a/crates/types/src/certificate.rs +++ b/crates/types/src/certificate.rs @@ -1,19 +1,15 @@ //! Provides two types of cerrtificates and their accumulators. -use crate::vote::DAVoteAccumulator; -use crate::vote::QuorumVote; -use crate::vote::QuorumVoteAccumulator; -use crate::vote::ViewSyncVoteAccumulator; -use crate::vote::VoteType; use crate::{ data::serialize_signature, traits::{ - election::{SignedCertificate, VoteData, VoteToken}, - node_implementation::NodeType, - signature_key::{EncodedPublicKey, EncodedSignature, SignatureKey}, + election::SignedCertificate, node_implementation::NodeType, signature_key::SignatureKey, state::ConsensusTime, }, - vote::{DAVote, ViewSyncData, ViewSyncVote}, + vote::{ + DAVote, DAVoteAccumulator, QuorumVote, QuorumVoteAccumulator, TimeoutVote, + TimeoutVoteAccumulator, ViewSyncData, ViewSyncVote, ViewSyncVoteAccumulator, VoteType, + }, }; use bincode::Options; use commit::{Commitment, CommitmentBounds, Committable}; @@ -24,7 +20,6 @@ use serde::{Deserialize, Serialize}; use std::{ fmt::{self, Debug, Display, Formatter}, hash::Hash, - ops::Deref, }; use tracing::debug; @@ -86,6 +81,42 @@ pub struct TimeoutCertificate { pub signatures: AssembledSignature, } +impl + SignedCertificate> + for TimeoutCertificate +{ + type Vote = TimeoutVote; + + type VoteAccumulator = TimeoutVoteAccumulator, Self::Vote>; + + fn create_certificate(signatures: AssembledSignature, vote: Self::Vote) -> Self { + TimeoutCertificate { + view_number: vote.get_view(), + signatures, + } + } + + fn view_number(&self) -> TYPES::Time { + self.view_number + } + + fn signatures(&self) -> AssembledSignature { + self.signatures.clone() + } + + fn leaf_commitment(&self) -> Commitment { + self.view_number.commit() + } + + fn is_genesis(&self) -> bool { + false + } + + fn genesis() -> Self { + unimplemented!() + } +} + /// Certificate for view sync. #[derive(custom_debug::Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq, Hash)] #[serde(bound(deserialize = ""))] @@ -130,6 +161,8 @@ pub enum AssembledSignature { No(::QCType), /// These signatures are for a 'DA' certificate DA(::QCType), + /// These signatures are for a `Timeout` certificate + Timeout(::QCType), /// These signatures are for genesis certificate Genesis(), /// These signatures are for ViewSyncPreCommit @@ -140,25 +173,6 @@ pub enum AssembledSignature { ViewSyncFinalize(::QCType), } -/// Data from a vote needed to accumulate into a `SignedCertificate` -pub struct VoteMetaData { - /// Voter's public key - pub encoded_key: EncodedPublicKey, - /// Votes signature - pub encoded_signature: EncodedSignature, - /// Commitment to what's voted on. E.g. the leaf for a `QuorumCertificate` - pub commitment: COMMITMENT, - /// Data of the vote, yes, no, timeout, or DA - pub data: VoteData, - /// The votes's token - pub vote_token: T, - /// View number for the vote - pub view_number: TIME, - /// The relay index for view sync - // TODO ED Make VoteMetaData more generic to avoid this variable that only ViewSync uses - pub relay: Option, -} - impl SignedCertificate for QuorumCertificate @@ -166,15 +180,11 @@ impl type Vote = QuorumVote; type VoteAccumulator = QuorumVoteAccumulator; - fn from_signatures_and_commitment( - signatures: AssembledSignature, - vote: Self::Vote, - ) -> Self { + fn create_certificate(signatures: AssembledSignature, vote: Self::Vote) -> Self { let leaf_commitment = match vote.clone() { QuorumVote::Yes(vote_internal) | QuorumVote::No(vote_internal) => { vote_internal.leaf_commitment } - QuorumVote::Timeout(_) => unimplemented!(), }; let qc = QuorumCertificate { leaf_commitment, @@ -198,10 +208,6 @@ impl self.leaf_commitment } - fn set_leaf_commitment(&mut self, commitment: COMMITMENT) { - self.leaf_commitment = commitment; - } - fn is_genesis(&self) -> bool { self.is_genesis } @@ -224,7 +230,7 @@ impl Committable commit::RawCommitmentBuilder::new("Quorum Certificate Commitment") .var_size_field("leaf commitment", self.leaf_commitment.as_ref()) - .u64_field("view number", *self.view_number.deref()) + .u64_field("view number", *self.view_number) .constant_str("justify_qc signatures") .var_size_bytes(&signatures_bytes) .finalize() @@ -242,10 +248,7 @@ impl type Vote = DAVote; type VoteAccumulator = DAVoteAccumulator, Self::Vote>; - fn from_signatures_and_commitment( - signatures: AssembledSignature, - vote: Self::Vote, - ) -> Self { + fn create_certificate(signatures: AssembledSignature, vote: Self::Vote) -> Self { DACertificate { view_number: vote.get_view(), signatures, @@ -265,10 +268,6 @@ impl self.block_commitment } - fn set_leaf_commitment(&mut self, _commitment: Commitment) { - // This function is only useful for QC. Will be removed after we have separated cert traits. - } - fn is_genesis(&self) -> bool { // This function is only useful for QC. Will be removed after we have separated cert traits. false @@ -287,17 +286,10 @@ impl Committable for ViewSyncCertificate { let signatures_bytes = serialize_signature(&self.signatures()); let mut builder = commit::RawCommitmentBuilder::new("View Sync Certificate Commitment") - // .field("leaf commitment", self.leaf_commitment) - // .u64_field("view number", *self.view_number.deref()) .constant_str("justify_qc signatures") .var_size_bytes(&signatures_bytes); - // builder = builder - // .field("Leaf commitment", self.leaf_commitment) - // .u64_field("View number", *self.view_number.deref()); - let certificate_internal = match &self { - // TODO ED Not the best way to do this ViewSyncCertificate::PreCommit(certificate_internal) => { builder = builder.var_size_field("View Sync Phase", "PreCommit".as_bytes()); certificate_internal @@ -332,10 +324,7 @@ impl type VoteAccumulator = ViewSyncVoteAccumulator>, Self::Vote>; /// Build a QC from the threshold signature and commitment - fn from_signatures_and_commitment( - signatures: AssembledSignature, - vote: Self::Vote, - ) -> Self { + fn create_certificate(signatures: AssembledSignature, vote: Self::Vote) -> Self { let certificate_internal = ViewSyncCertificateInternal { round: vote.get_view(), relay: vote.relay(), @@ -381,11 +370,6 @@ impl todo!() } - /// Set the leaf commitment. - fn set_leaf_commitment(&mut self, _commitment: Commitment>) { - todo!() - } - /// Get whether the certificate is for the genesis block. fn is_genesis(&self) -> bool { todo!() diff --git a/crates/types/src/consensus.rs b/crates/types/src/consensus.rs index 6574a73f8a..427fd5e587 100644 --- a/crates/types/src/consensus.rs +++ b/crates/types/src/consensus.rs @@ -1,10 +1,11 @@ //! Provides the core consensus types -pub use crate::traits::node_implementation::ViewQueue; -pub use crate::utils::{View, ViewInner}; +pub use crate::{ + traits::node_implementation::ViewQueue, + utils::{View, ViewInner}, +}; use displaydoc::Display; -use crate::utils::Terminator; use crate::{ certificate::QuorumCertificate, data::LeafType, @@ -13,6 +14,7 @@ use crate::{ metrics::{Counter, Gauge, Histogram, Label, Metrics}, node_implementation::NodeType, }, + utils::Terminator, }; use commit::{Commitment, Committable}; use derivative::Derivative; diff --git a/crates/types/src/data.rs b/crates/types/src/data.rs index 1a02a6b9a7..e20a7da9de 100644 --- a/crates/types/src/data.rs +++ b/crates/types/src/data.rs @@ -888,6 +888,10 @@ pub fn serialize_signature(signature: &AssembledSignature { + signatures_bytes.extend("Timeout".as_bytes()); + Some(signatures.clone()) + } AssembledSignature::ViewSyncPreCommit(signatures) => { signatures_bytes.extend("ViewSyncPreCommit".as_bytes()); Some(signatures.clone()) diff --git a/crates/types/src/message.rs b/crates/types/src/message.rs index 2823c788e1..a44722d8ed 100644 --- a/crates/types/src/message.rs +++ b/crates/types/src/message.rs @@ -13,7 +13,7 @@ use crate::{ }, signature_key::EncodedSignature, }, - vote::{DAVote, QuorumVote, ViewSyncVote, VoteType}, + vote::{DAVote, QuorumVote, TimeoutVote, ViewSyncVote, VoteType}, }; use commit::Commitment; use derivative::Derivative; @@ -200,6 +200,7 @@ where } GeneralConsensusMessage::ViewSyncVote(_) | GeneralConsensusMessage::ViewSyncCertificate(_) => todo!(), + GeneralConsensusMessage::TimeoutVote(_) => todo!(), } } } @@ -321,6 +322,9 @@ where /// Message with a view sync certificate. ViewSyncCertificate(Proposal>), + /// Message with a Timeout vote + TimeoutVote(TimeoutVote), + /// Internal ONLY message indicating a view interrupt. #[serde(skip)] InternalTrigger(InternalTrigger), @@ -415,6 +419,7 @@ impl< GeneralConsensusMessage::ViewSyncCertificate(message) => { message.data.get_view_number() } + GeneralConsensusMessage::TimeoutVote(message) => message.get_view(), } } Right(committee_message) => { @@ -442,7 +447,9 @@ impl< match &self.0 { Left(general_message) => match general_message { GeneralConsensusMessage::Proposal(_) => MessagePurpose::Proposal, - GeneralConsensusMessage::Vote(_) => MessagePurpose::Vote, + GeneralConsensusMessage::Vote(_) | GeneralConsensusMessage::TimeoutVote(_) => { + MessagePurpose::Vote + } GeneralConsensusMessage::InternalTrigger(_) => MessagePurpose::Internal, GeneralConsensusMessage::ViewSyncVote(_) => MessagePurpose::ViewSyncVote, GeneralConsensusMessage::ViewSyncCertificate(_) => MessagePurpose::ViewSyncProposal, diff --git a/crates/types/src/traits/election.rs b/crates/types/src/traits/election.rs index 3961187fae..0b660a4221 100644 --- a/crates/types/src/traits/election.rs +++ b/crates/types/src/traits/election.rs @@ -9,9 +9,11 @@ use super::{ }; use crate::{ certificate::{ - AssembledSignature, DACertificate, QuorumCertificate, ViewSyncCertificate, VoteMetaData, + AssembledSignature, DACertificate, QuorumCertificate, TimeoutCertificate, + ViewSyncCertificate, }, data::{DAProposal, ProposalType}, + vote::TimeoutVote, }; use crate::{ @@ -19,7 +21,6 @@ use crate::{ vote::ViewSyncVoteInternal, }; -use crate::vote::Accumulator2; use crate::{ data::LeafType, traits::{ @@ -28,10 +29,7 @@ use crate::{ signature_key::SignatureKey, state::ConsensusTime, }, - vote::{ - DAVote, QuorumVote, TimeoutVote, ViewSyncData, ViewSyncVote, VoteAccumulator, VoteType, - YesOrNoVote, - }, + vote::{Accumulator, DAVote, QuorumVote, ViewSyncData, ViewSyncVote, VoteType, YesOrNoVote}, }; use bincode::Options; use commit::{Commitment, CommitmentBounds, Committable}; @@ -176,16 +174,13 @@ where type Vote: VoteType; /// `Accumulator` type to accumulate votes. - type VoteAccumulator: Accumulator2; + type VoteAccumulator: Accumulator; /// Build a QC from the threshold signature and commitment // TODO ED Rename this function and rework this function parameters // Assumes last vote was valid since it caused a QC to form. // Removes need for relay on other cert specific fields - fn from_signatures_and_commitment( - signatures: AssembledSignature, - vote: Self::Vote, - ) -> Self; + fn create_certificate(signatures: AssembledSignature, vote: Self::Vote) -> Self; /// Get the view number. fn view_number(&self) -> TIME; @@ -199,9 +194,6 @@ where /// Get the leaf commitment. fn leaf_commitment(&self) -> COMMITMENT; - /// Set the leaf commitment. - fn set_leaf_commitment(&mut self, commitment: COMMITMENT); - /// Get whether the certificate is for the genesis block. fn is_genesis(&self) -> bool; @@ -337,9 +329,6 @@ pub trait ConsensusExchange: Send + Sync { .make_vote_token(view_number, self.private_key()) } - /// The contents of a vote on `commit`. - fn vote_data(&self, commit: Self::Commitment) -> VoteData; - /// Validate a certificate. fn is_valid_cert(&self, qc: &Self::Certificate) -> bool { if qc.is_genesis() && qc.view_number() == TYPES::Time::genesis() { @@ -372,6 +361,10 @@ pub trait ConsensusExchange: Send + Sync { ); ::check(&real_qc_pp, real_commit.as_ref(), &qc) } + AssembledSignature::Timeout(_) => { + error!("QC type should not be timeout here"); + false + } AssembledSignature::Genesis() => true, AssembledSignature::ViewSyncPreCommit(_) | AssembledSignature::ViewSyncCommit(_) @@ -384,31 +377,6 @@ pub trait ConsensusExchange: Send + Sync { /// Validate a vote by checking its signature and token. fn is_valid_vote( - &self, - encoded_key: &EncodedPublicKey, - encoded_signature: &EncodedSignature, - data: VoteData, - vote_token: Checked, - ) -> bool { - let mut is_valid_vote_token = false; - let mut is_valid_signature = false; - if let Some(key) = ::from_bytes(encoded_key) { - is_valid_signature = key.validate(encoded_signature, data.commit().as_ref()); - let valid_vote_token = self.membership().validate_vote_token(key, vote_token); - is_valid_vote_token = match valid_vote_token { - Err(_) => { - error!("Vote token was invalid"); - false - } - Ok(Checked::Valid(_)) => true, - Ok(Checked::Inval(_) | Checked::Unchecked(_)) => false, - }; - } - is_valid_signature && is_valid_vote_token - } - - /// Validate a vote by checking its signature and token. - fn is_valid_vote_2( &self, key: &TYPES::SignatureKey, encoded_signature: &EncodedSignature, @@ -431,38 +399,12 @@ pub trait ConsensusExchange: Send + Sync { is_valid_signature && is_valid_vote_token } - #[doc(hidden)] - - fn accumulate_internal( - &self, - _vota_meta: VoteMetaData, - _accumulator: VoteAccumulator, - ) -> Either, Self::Certificate> - { - todo!() // TODO ED Remove this function - } - - /// Add a vote to the accumulating signature. Return The certificate if the vote - /// brings us over the threshould, Else return the accumulator. - #[allow(clippy::too_many_arguments)] - fn accumulate_vote( - &self, - encoded_key: &EncodedPublicKey, - encoded_signature: &EncodedSignature, - leaf_commitment: Self::Commitment, - vote_data: VoteData, - vote_token: TYPES::VoteTokenType, - view_number: TYPES::Time, - accumlator: VoteAccumulator, - relay: Option, - ) -> Either, Self::Certificate>; - // TODO ED Depending on what we do in the future with the exchanges trait, we can move the accumulator out of the `SignedCertificate` // trait. Logically, I feel it makes sense to accumulate on the certificate rather than the exchange, however. /// Accumulate vote /// Returns either the accumulate if no threshold was reached, or a `SignedCertificate` if the threshold was reached #[allow(clippy::type_complexity)] - fn accumulate_vote_2( + fn accumulate_vote( &self, accumulator: <>::Certificate as SignedCertificate< TYPES, @@ -486,13 +428,14 @@ pub trait ConsensusExchange: Send + Sync { >>::VoteAccumulator, Self::Certificate, > { - if !self.is_valid_vote_2( + if !self.is_valid_vote( &vote.get_key(), &vote.get_signature(), &vote.get_data(), - // TODO ED We've had this comment for a while: Ignoring deserialization errors below since we are getting rid of it soon &Checked::Unchecked(vote.get_vote_token()), ) { + error!("Vote data is {:?}", vote.get_data()); + error!("Invalid vote!"); return Either::Left(accumulator); } @@ -513,13 +456,10 @@ pub trait ConsensusExchange: Send + Sync { self.membership().get_committee_qc_stake_table(), ) { Either::Left(accumulator) => Either::Left(accumulator), - Either::Right(signatures) => { - // TODO ED Update this function to just take in the signatures and most recent vote - Either::Right(Self::Certificate::from_signatures_and_commitment( - signatures, - vote.clone(), - )) - } + Either::Right(signatures) => Either::Right(Self::Certificate::create_certificate( + signatures, + vote.clone(), + )), } } @@ -717,35 +657,6 @@ impl< .make_vote_token(view_number, &self.private_key) } - fn vote_data(&self, commit: Self::Commitment) -> VoteData { - VoteData::DA(commit) - } - - /// Add a vote to the accumulating signature. Return The certificate if the vote - /// brings us over the threshould, Else return the accumulator. - fn accumulate_vote( - &self, - encoded_key: &EncodedPublicKey, - encoded_signature: &EncodedSignature, - leaf_commitment: Self::Commitment, - vote_data: VoteData, - vote_token: TYPES::VoteTokenType, - view_number: TYPES::Time, - accumlator: VoteAccumulator, - _relay: Option, - ) -> Either, Self::Certificate> - { - let meta = VoteMetaData { - encoded_key: encoded_key.clone(), - encoded_signature: encoded_signature.clone(), - commitment: leaf_commitment, - data: vote_data, - vote_token, - view_number, - relay: None, - }; - self.accumulate_internal(meta, accumlator) - } fn membership(&self) -> &Self::Membership { &self.membership } @@ -762,6 +673,7 @@ pub trait QuorumExchangeType, ConsensusExchange { /// Create a message with a positive vote on validating or commitment proposal. + // TODO ED This returns just a general message type, it's not even bound to a proposal, and this is just a function on the QC. Make proprosal doesn't really apply to all cert types. fn create_yes_message>( &self, justify_qc_commitment: Commitment, @@ -806,15 +718,6 @@ pub trait QuorumExchangeType, leaf_commitment: Commitment, ) -> (EncodedPublicKey, EncodedSignature); - /// Sign a timeout vote. - /// - /// We only sign the view number, which is the minimum amount of information necessary for - /// checking that this node timed out on that view. - /// - /// This also allows for the high QC included with the vote to be spoofed in a MITM scenario, - /// but it is outside our threat model. - fn sign_timeout_vote(&self, view_number: TYPES::Time) -> (EncodedPublicKey, EncodedSignature); - /// Create a message with a negative vote on validating or commitment proposal. fn create_no_message>( &self, @@ -825,16 +728,6 @@ pub trait QuorumExchangeType, ) -> GeneralConsensusMessage where I::Exchanges: ExchangesType>; - - /// Create a message with a timeout vote on validating or commitment proposal. - fn create_timeout_message>( - &self, - justify_qc: QuorumCertificate>, - current_view: TYPES::Time, - vote_token: TYPES::VoteTokenType, - ) -> GeneralConsensusMessage - where - I::Exchanges: ExchangesType>; } /// Standard implementation of [`QuroumExchangeType`] based on Hot Stuff consensus. @@ -946,21 +839,6 @@ impl< (self.public_key.to_bytes(), signature) } - /// Sign a timeout vote. - /// - /// We only sign the view number, which is the minimum amount of information necessary for - /// checking that this node timed out on that view. - /// - /// This also allows for the high QC included with the vote to be spoofed in a MITM scenario, - /// but it is outside our threat model. - /// TODO GG: why return the pubkey? Some other `sign_xxx` methods do not return the pubkey. - fn sign_timeout_vote(&self, view_number: TYPES::Time) -> (EncodedPublicKey, EncodedSignature) { - let signature = TYPES::SignatureKey::sign( - &self.private_key, - VoteData::Timeout(view_number.commit()).commit().as_ref(), - ); - (self.public_key.to_bytes(), signature) - } /// Create a message with a negative vote on validating or commitment proposal. fn create_no_message>( &self, @@ -982,26 +860,6 @@ impl< vote_data: VoteData::No(leaf_commitment), })) } - - /// Create a message with a timeout vote on validating or commitment proposal. - fn create_timeout_message>( - &self, - high_qc: QuorumCertificate>, - current_view: TYPES::Time, - vote_token: TYPES::VoteTokenType, - ) -> GeneralConsensusMessage - where - I::Exchanges: ExchangesType>, - { - let signature = self.sign_timeout_vote(current_view); - GeneralConsensusMessage::::Vote(QuorumVote::Timeout(TimeoutVote { - high_qc, - signature, - current_view, - vote_token, - vote_data: VoteData::Timeout(current_view.commit()), - })) - } } impl< @@ -1045,35 +903,6 @@ impl< &self.network } - fn vote_data(&self, commit: Self::Commitment) -> VoteData { - VoteData::Yes(commit) - } - - /// Add a vote to the accumulating signature. Return The certificate if the vote - /// brings us over the threshould, Else return the accumulator. - fn accumulate_vote( - &self, - encoded_key: &EncodedPublicKey, - encoded_signature: &EncodedSignature, - leaf_commitment: Commitment, - vote_data: VoteData, - vote_token: TYPES::VoteTokenType, - view_number: TYPES::Time, - accumlator: VoteAccumulator, TYPES>, - _relay: Option, - ) -> Either, TYPES>, Self::Certificate> - { - let meta = VoteMetaData { - encoded_key: encoded_key.clone(), - encoded_signature: encoded_signature.clone(), - commitment: leaf_commitment, - data: vote_data, - vote_token, - view_number, - relay: None, - }; - self.accumulate_internal(meta, accumlator) - } fn membership(&self) -> &Self::Membership { &self.membership } @@ -1403,34 +1232,146 @@ impl< &self.network } - fn vote_data(&self, _commit: Self::Commitment) -> VoteData { - unimplemented!() + fn membership(&self) -> &Self::Membership { + &self.membership + } + fn public_key(&self) -> &TYPES::SignatureKey { + &self.public_key } + fn private_key(&self) -> &<::SignatureKey as SignatureKey>::PrivateKey { + &self.private_key + } +} - fn accumulate_vote( +// TODO ED All the exchange structs are the same. We could just considate them into one struct +/// Standard implementation of a Timeout Exchange based on Hot Stuff consensus. +#[derive(Derivative)] +#[derivative(Clone, Debug)] +pub struct TimeoutExchange< + TYPES: NodeType, + PROPOSAL: ProposalType, + MEMBERSHIP: Membership, + NETWORK: CommunicationChannel, + M: NetworkMsg, +> { + /// The network being used by this exchange. + network: NETWORK, + /// The committee which votes on proposals. + membership: MEMBERSHIP, + /// This participant's public key. + public_key: TYPES::SignatureKey, + /// Entry with public key and staking value for certificate aggregation in the stake table. + entry: ::StakeTableEntry, + /// This participant's private key. + #[derivative(Debug = "ignore")] + private_key: ::PrivateKey, + #[doc(hidden)] + _pd: PhantomData<(PROPOSAL, MEMBERSHIP, M)>, +} + +impl< + TYPES: NodeType, + PROPOSAL: ProposalType, + MEMBERSHIP: Membership, + NETWORK: CommunicationChannel, + M: NetworkMsg, + > TimeoutExchange +{ +} + +/// Trait defining functiosn for a `TimeoutExchange` +pub trait TimeoutExchangeType: ConsensusExchange { + /// Create and sign a timeout message + fn create_timeout_message>( &self, - encoded_key: &EncodedPublicKey, - encoded_signature: &EncodedSignature, - leaf_commitment: Commitment>, - vote_data: VoteData, + view: TYPES::Time, vote_token: TYPES::VoteTokenType, - view_number: TYPES::Time, - accumlator: VoteAccumulator>, TYPES>, - relay: Option, - ) -> Either< - VoteAccumulator>, TYPES>, - Self::Certificate, - > { - let meta = VoteMetaData { - encoded_key: encoded_key.clone(), - encoded_signature: encoded_signature.clone(), - commitment: leaf_commitment, - data: vote_data, + ) -> GeneralConsensusMessage + where + I::Exchanges: ExchangesType>, + { + let signature = TYPES::SignatureKey::sign( + self.private_key(), + VoteData::>::Timeout(view.commit()) + .commit() + .as_ref(), + ); + + GeneralConsensusMessage::::TimeoutVote(TimeoutVote { + signature: (self.public_key().to_bytes(), signature), + current_view: view, vote_token, - view_number, - relay, - }; - self.accumulate_internal(meta, accumlator) + }) + } + + /// Validate a timeout certificate. + /// This is separate from other certificate verification functions because we also need to + /// verify the certificate is signed over the view we expect + fn is_valid_timeout_cert(&self, qc: &Self::Certificate, view_number: TYPES::Time) -> bool { + let comparison_commitment = view_number.commit(); + + if let AssembledSignature::Timeout(qc) = qc.signatures() { + let real_commit = VoteData::Timeout(comparison_commitment).commit(); + let real_qc_pp = ::get_public_parameter( + self.membership().get_committee_qc_stake_table(), + U256::from(self.membership().success_threshold().get()), + ); + ::check(&real_qc_pp, real_commit.as_ref(), &qc) + } else { + error!("Expected TimeoutCertificate, received another certificate variant"); + false + } + } +} + +impl< + TYPES: NodeType, + PROPOSAL: ProposalType, + MEMBERSHIP: Membership, + NETWORK: CommunicationChannel, + M: NetworkMsg, + > TimeoutExchangeType for TimeoutExchange +{ +} + +// TODO ED Get rid of ProposalType as generic, is debt left over from Validating Consensus +impl< + TYPES: NodeType, + PROPOSAL: ProposalType, + MEMBERSHIP: Membership, + NETWORK: CommunicationChannel, + M: NetworkMsg, + > ConsensusExchange for TimeoutExchange +{ + type Proposal = PROPOSAL; + type Vote = TimeoutVote; + type Certificate = TimeoutCertificate; + type Membership = MEMBERSHIP; + type Networking = NETWORK; + type Commitment = Commitment; + + fn create( + entries: Vec<::StakeTableEntry>, + config: TYPES::ElectionConfigType, + network: Self::Networking, + pk: TYPES::SignatureKey, + entry: ::StakeTableEntry, + sk: ::PrivateKey, + ) -> Self { + let membership = + >::Membership::create_election(entries, config); + Self { + network, + membership, + public_key: pk, + entry, + private_key: sk, + _pd: PhantomData, + } + } + + fn network(&self) -> &NETWORK { + &self.network } fn membership(&self) -> &Self::Membership { diff --git a/crates/types/src/traits/node_implementation.rs b/crates/types/src/traits/node_implementation.rs index f229a9af9b..6d27d70103 100644 --- a/crates/types/src/traits/node_implementation.rs +++ b/crates/types/src/traits/node_implementation.rs @@ -7,7 +7,7 @@ use super::{ block_contents::Transaction, election::{ CommitteeExchangeType, ConsensusExchange, ElectionConfig, QuorumExchangeType, - ViewSyncExchangeType, VoteToken, + TimeoutExchange, TimeoutExchangeType, ViewSyncExchangeType, VoteToken, }, network::{CommunicationChannel, NetworkMsg, TestableNetworkingImplementation}, state::{ConsensusTime, TestableBlock, TestableState}, @@ -153,15 +153,21 @@ pub trait ExchangesType, MESSA /// Protocol for exchanging data availability proposals and votes. type CommitteeExchange: CommitteeExchangeType + Clone + Debug; - /// Get the committee exchange. + /// Get the committee exchange fn committee_exchange(&self) -> &Self::CommitteeExchange; + /// Get the timeout exchange + fn timeout_exchange(&self) -> &Self::TimeoutExchange; + /// Protocol for exchanging quorum proposals and votes. type QuorumExchange: QuorumExchangeType + Clone + Debug; /// Protocol for exchanging view sync proposals and votes. type ViewSyncExchange: ViewSyncExchangeType + Clone + Debug; + /// Protocol for receiving timeout votes + type TimeoutExchange: TimeoutExchangeType + Clone + Debug; + /// Election configurations for exchanges type ElectionConfigs; @@ -219,9 +225,9 @@ pub trait TestableExchange, ME pub struct SequencingExchanges< TYPES: NodeType, MESSAGE: NetworkMsg, - QUORUMEXCHANGE: QuorumExchangeType, MESSAGE>, - COMMITTEEEXCHANGE: CommitteeExchangeType, - VIEWSYNCEXCHANGE: ViewSyncExchangeType, + QUORUMEXCHANGE: QuorumExchangeType, MESSAGE> + Clone + Debug, + COMMITTEEEXCHANGE: CommitteeExchangeType + Clone + Debug, + VIEWSYNCEXCHANGE: ViewSyncExchangeType + Clone + Debug, > { /// Quorum exchange. quorum_exchange: QUORUMEXCHANGE, @@ -232,7 +238,15 @@ pub struct SequencingExchanges< /// Committee exchange. committee_exchange: COMMITTEEEXCHANGE, - /// Phantom data. + /// Timeout exchange + // This type can be simplified once we rework the exchanges trait + // It is here to avoid needing to instantiate it where all the other exchanges are instantiated + // https://github.com/EspressoSystems/HotShot/issues/1799 + #[allow(clippy::type_complexity)] + + pub timeout_exchange: TimeoutExchange as ExchangesType, MESSAGE>>::QuorumExchange as ConsensusExchange>::Proposal, < as ExchangesType, MESSAGE>>::QuorumExchange as ConsensusExchange>::Membership, >::Networking, MESSAGE>, + + /// Phantom data _phantom: PhantomData<(TYPES, MESSAGE)>, } @@ -250,12 +264,18 @@ where type CommitteeExchange = COMMITTEEEXCHANGE; type QuorumExchange = QUORUMEXCHANGE; type ViewSyncExchange = VIEWSYNCEXCHANGE; + #[allow(clippy::type_complexity)] + type TimeoutExchange = TimeoutExchange as ExchangesType, MESSAGE>>::QuorumExchange as ConsensusExchange>::Proposal, < as ExchangesType, MESSAGE>>::QuorumExchange as ConsensusExchange>::Membership, >::Networking, MESSAGE>; type ElectionConfigs = (TYPES::ElectionConfigType, TYPES::ElectionConfigType); fn committee_exchange(&self) -> &COMMITTEEEXCHANGE { &self.committee_exchange } + fn timeout_exchange(&self) -> &Self::TimeoutExchange { + &self.timeout_exchange + } + fn create( entries: Vec<::StakeTableEntry>, configs: Self::ElectionConfigs, @@ -269,6 +289,15 @@ where sk: ::PrivateKey, ) -> Self { let quorum_exchange = QUORUMEXCHANGE::create( + entries.clone(), + configs.0.clone(), + networks.0.clone(), + pk.clone(), + entry.clone(), + sk.clone(), + ); + #[allow(clippy::type_complexity)] + let timeout_exchange: TimeoutExchange as ExchangesType, MESSAGE>>::QuorumExchange as ConsensusExchange>::Proposal, < as ExchangesType, MESSAGE>>::QuorumExchange as ConsensusExchange>::Membership, >::Networking, MESSAGE> = TimeoutExchange::create( entries.clone(), configs.0.clone(), networks.0, @@ -276,6 +305,7 @@ where entry.clone(), sk.clone(), ); + let view_sync_exchange = VIEWSYNCEXCHANGE::create( entries.clone(), configs.0, @@ -291,6 +321,7 @@ where quorum_exchange, committee_exchange, view_sync_exchange, + timeout_exchange, _phantom: PhantomData, } } @@ -329,6 +360,14 @@ pub type SequencingQuorumEx = Message, >>::QuorumExchange; +/// Alias for `TimeoutExchange` type +pub type SequencingTimeoutEx = + <>::Exchanges as ExchangesType< + TYPES, + >::Leaf, + Message, + >>::TimeoutExchange; + /// Alias for the [`CommitteeExchange`] type. pub type CommitteeEx = <>::Exchanges as ExchangesType< TYPES, diff --git a/crates/types/src/vote.rs b/crates/types/src/vote.rs index 9e232b755a..bb4542ebe3 100644 --- a/crates/types/src/vote.rs +++ b/crates/types/src/vote.rs @@ -79,20 +79,38 @@ pub struct YesOrNoVote { pub vote_data: VoteData, } -/// A timeout vote. +/// A timeout vote #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)] #[serde(bound(deserialize = ""))] -pub struct TimeoutVote { - /// The highest valid QC this node knows about - pub high_qc: QuorumCertificate, +pub struct TimeoutVote { /// The signature share associated with this vote pub signature: (EncodedPublicKey, EncodedSignature), /// The view this vote was cast for pub current_view: TYPES::Time, /// The vote token generated by this replica pub vote_token: TYPES::VoteTokenType, - /// The vote data this vote is signed over - pub vote_data: VoteData>, +} + +impl VoteType> for TimeoutVote { + fn get_view(&self) -> ::Time { + self.current_view + } + + fn get_key(&self) -> ::SignatureKey { + ::from_bytes(&self.signature.0).unwrap() + } + + fn get_signature(&self) -> EncodedSignature { + self.signature.1.clone() + } + + fn get_data(&self) -> VoteData> { + VoteData::Timeout(self.get_view().commit()) + } + + fn get_vote_token(&self) -> ::VoteTokenType { + self.vote_token.clone() + } } /// The internals of a view sync vote @@ -192,8 +210,6 @@ pub enum QuorumVote { Yes(YesOrNoVote), /// Negative vote. No(YesOrNoVote), - /// Timeout vote. - Timeout(TimeoutVote), } impl VoteType> for DAVote { @@ -229,7 +245,6 @@ impl VoteType fn get_view(&self) -> TYPES::Time { match self { QuorumVote::Yes(v) | QuorumVote::No(v) => v.current_view, - QuorumVote::Timeout(v) => v.current_view, } } @@ -242,13 +257,11 @@ impl VoteType fn get_data(&self) -> VoteData { match self { QuorumVote::Yes(v) | QuorumVote::No(v) => v.vote_data.clone(), - QuorumVote::Timeout(_) => unimplemented!(), } } fn get_vote_token(&self) -> ::VoteTokenType { match self { QuorumVote::Yes(v) | QuorumVote::No(v) => v.vote_token.clone(), - QuorumVote::Timeout(_) => unimplemented!(), } } } @@ -259,17 +272,14 @@ impl QuorumVote EncodedSignature { match &self { Self::Yes(vote) | Self::No(vote) => vote.signature.1.clone(), - Self::Timeout(vote) => vote.signature.1.clone(), } } /// Get the signature key. /// # Panics /// If the deserialization fails. - pub fn signature_key(&self) -> TYPES::SignatureKey { let encoded = match &self { Self::Yes(vote) | Self::No(vote) => vote.signature.0.clone(), - Self::Timeout(vote) => vote.signature.0.clone(), }; ::from_bytes(&encoded).unwrap() } @@ -307,17 +317,8 @@ impl VoteType>> for ViewS } } -/// The aggreation of votes, implemented by `VoteAccumulator`. -pub trait Accumulator: Sized { - /// Accumate the `val` to the current state. - /// - /// If a threshold is reached, returns `U` (e.g., a certificate). Else, returns `Self` and - /// continues accumulating items. - fn append(self, val: T) -> Either; -} - /// Accumulator trait used to accumulate votes into an `AssembledSignature` -pub trait Accumulator2< +pub trait Accumulator< TYPES: NodeType, COMMITMENT: CommitmentBounds, VOTE: VoteType, @@ -334,8 +335,10 @@ pub trait Accumulator2< ) -> Either>; } -/// Accumulates DA votes -pub struct DAVoteAccumulator< +// TODO Make a default accumulator +// https://github.com/EspressoSystems/HotShot/issues/1797 +/// Accumulator for `TimeoutVote`s +pub struct TimeoutVoteAccumulator< TYPES: NodeType, COMMITMENT: CommitmentBounds, VOTE: VoteType, @@ -352,8 +355,11 @@ pub struct DAVoteAccumulator< pub phantom: PhantomData, } -impl> - Accumulator2 for DAVoteAccumulator +impl< + TYPES: NodeType, + COMMITMENT: CommitmentBounds + Clone + Copy + PartialEq + Eq + Hash, + VOTE: VoteType, + > Accumulator for DAVoteAccumulator { fn append( mut self, @@ -403,7 +409,6 @@ impl= u64::from(self.success_threshold) { // Assemble QC let real_qc_pp = ::get_public_parameter( - // TODO ED Something about stake table entries. Might be easier to just pass in membership? stake_table_entries.clone(), U256::from(self.success_threshold.get()), ); @@ -422,6 +427,96 @@ impl, + > Accumulator for TimeoutVoteAccumulator +{ + fn append( + mut self, + vote: VOTE, + vote_node_id: usize, + stake_table_entries: Vec<::StakeTableEntry>, + ) -> Either> { + let VoteData::Timeout(vote_commitment) = vote.get_data() else { + return Either::Left(self); + }; + + let encoded_key = vote.get_key().to_bytes(); + + // Deserialize the signature so that it can be assembeld into a QC + // TODO ED Update this once we've gotten rid of EncodedSignature + let original_signature: ::PureAssembledSignatureType = + bincode_opts() + .deserialize(&vote.get_signature().0) + .expect("Deserialization on the signature shouldn't be able to fail."); + + let (da_stake_casted, da_vote_map) = self + .da_vote_outcomes + .entry(vote_commitment) + .or_insert_with(|| (0, BTreeMap::new())); + + // Check for duplicate vote + // TODO ED Re-encoding signature key to bytes until we get rid of EncodedKey + // Have to do this because SignatureKey is not hashable + if da_vote_map.contains_key(&encoded_key) { + return Either::Left(self); + } + + if self.signers.get(vote_node_id).as_deref() == Some(&true) { + error!("Node id is already in signers list"); + return Either::Left(self); + } + self.signers.set(vote_node_id, true); + self.sig_lists.push(original_signature); + + // Already checked that vote data was for a DA vote above + *da_stake_casted += u64::from(vote.get_vote_token().vote_count()); + da_vote_map.insert( + encoded_key, + (vote.get_signature(), vote.get_data(), vote.get_vote_token()), + ); + + if *da_stake_casted >= u64::from(self.success_threshold) { + // Assemble QC + let real_qc_pp = ::get_public_parameter( + stake_table_entries.clone(), + U256::from(self.success_threshold.get()), + ); + + let real_qc_sig = ::assemble( + &real_qc_pp, + self.signers.as_bitslice(), + &self.sig_lists[..], + ); + + self.da_vote_outcomes.remove(&vote_commitment); + + return Either::Right(AssembledSignature::Timeout(real_qc_sig)); + } + Either::Left(self) + } +} + +/// Accumulates DA votes +pub struct DAVoteAccumulator< + TYPES: NodeType, + COMMITMENT: CommitmentBounds + Clone, + VOTE: VoteType, +> { + /// Map of all da signatures accumlated so far + pub da_vote_outcomes: VoteMap, + /// A quorum's worth of stake, generally 2f + 1 + pub success_threshold: NonZeroU64, + /// A list of valid signatures for certificate aggregation + pub sig_lists: Vec<::PureAssembledSignatureType>, + /// A bitvec to indicate which node is active and send out a valid signature for certificate aggregation, this automatically do uniqueness check + pub signers: BitVec, + /// Phantom data to specify the vote this accumulator is for + pub phantom: PhantomData, +} + /// Accumulate quorum votes pub struct QuorumVoteAccumulator< TYPES: NodeType, @@ -447,8 +542,11 @@ pub struct QuorumVoteAccumulator< pub phantom: PhantomData, } -impl> - Accumulator2 for QuorumVoteAccumulator +impl< + TYPES: NodeType, + COMMITMENT: CommitmentBounds + Clone + Copy + PartialEq + Eq + Hash, + VOTE: VoteType, + > Accumulator for QuorumVoteAccumulator { fn append( mut self, @@ -499,7 +597,6 @@ impl= u64::from(self.success_threshold) { // Assemble QC let real_qc_pp = ::get_public_parameter( - // TODO ED Something about stake table entries. Might be easier to just pass in membership? stake_table_entries.clone(), U256::from(self.success_threshold.get()), ); @@ -575,8 +671,11 @@ pub struct ViewSyncVoteAccumulator< pub phantom: PhantomData, } -impl> - Accumulator2 for ViewSyncVoteAccumulator +impl< + TYPES: NodeType, + COMMITMENT: CommitmentBounds + Clone + Copy + PartialEq + Eq + Hash, + VOTE: VoteType, + > Accumulator for ViewSyncVoteAccumulator { #[allow(clippy::too_many_lines)] fn append( @@ -721,31 +820,7 @@ impl, -> { - /// Phantom data to make compiler happy - pub phantom: PhantomData<(TYPES, VOTE, COMMITMENT)>, -} - -impl> - Accumulator2 for AccumulatorPlaceholder -{ - fn append( - self, - _vote: VOTE, - _vote_node_id: usize, - _stake_table_entries: Vec<::StakeTableEntry>, - ) -> Either> { - either::Left(self) - } -} - /// Mapping of commitments to vote tokens by key. -// TODO ED Remove this whole token generic type VoteMap = HashMap< COMMITMENT, ( @@ -753,217 +828,3 @@ type VoteMap = HashMap< BTreeMap, TOKEN)>, ), >; - -/// Describe the process of collecting signatures on block or leaf commitment, to form a DAC or QC, -/// respectively. -/// -/// TODO GG used only in election.rs; move this to there and make it private? -pub struct VoteAccumulator { - /// Map of all signatures accumlated so far - pub total_vote_outcomes: VoteMap, - /// Map of all da signatures accumlated so far - pub da_vote_outcomes: VoteMap, - /// Map of all yes signatures accumlated so far - pub yes_vote_outcomes: VoteMap, - /// Map of all no signatures accumlated so far - pub no_vote_outcomes: VoteMap, - /// Map of all view sync precommit votes accumulated thus far - pub viewsync_precommit_vote_outcomes: VoteMap, - /// Map of all view sync commit votes accumulated thus far - pub viewsync_commit_vote_outcomes: VoteMap, - /// Map of all view sync finalize votes accumulated thus far - pub viewsync_finalize_vote_outcomes: VoteMap, - /// A quorum's worth of stake, generall 2f + 1 - pub success_threshold: NonZeroU64, - /// Enough stake to know that we cannot possibly get a quorum, generally f + 1 - pub failure_threshold: NonZeroU64, - /// A list of valid signatures for certificate aggregation - pub sig_lists: Vec<::PureAssembledSignatureType>, - /// A bitvec to indicate which node is active and send out a valid signature for certificate aggregation, this automatically do uniqueness check - pub signers: BitVec, -} - -impl - Accumulator< - ( - COMMITMENT, - ( - EncodedPublicKey, - ( - EncodedSignature, - Vec<::StakeTableEntry>, - usize, - VoteData, - TOKEN, - ), - ), - ), - AssembledSignature, - > for VoteAccumulator -where - TOKEN: Clone + VoteToken, -{ - #![allow(clippy::too_many_lines)] - fn append( - mut self, - val: ( - COMMITMENT, - ( - EncodedPublicKey, - ( - EncodedSignature, - Vec<::StakeTableEntry>, - usize, - VoteData, - TOKEN, - ), - ), - ), - ) -> Either> { - let (commitment, (key, (sig, entries, node_id, vote_data, token))) = val; - - // Desereialize the sig so that it can be assembeld into a QC - let original_signature: ::PureAssembledSignatureType = - bincode_opts() - .deserialize(&sig.0) - .expect("Deserialization on the signature shouldn't be able to fail."); - - let (total_stake_casted, total_vote_map) = self - .total_vote_outcomes - .entry(commitment) - .or_insert_with(|| (0, BTreeMap::new())); - - // Check for duplicate vote - if total_vote_map.contains_key(&key) { - return Either::Left(self); - } - let (da_stake_casted, da_vote_map) = self - .da_vote_outcomes - .entry(commitment) - .or_insert_with(|| (0, BTreeMap::new())); - - let (yes_stake_casted, yes_vote_map) = self - .yes_vote_outcomes - .entry(commitment) - .or_insert_with(|| (0, BTreeMap::new())); - - let (no_stake_casted, no_vote_map) = self - .no_vote_outcomes - .entry(commitment) - .or_insert_with(|| (0, BTreeMap::new())); - - let (viewsync_precommit_stake_casted, viewsync_precommit_vote_map) = self - .viewsync_precommit_vote_outcomes - .entry(commitment) - .or_insert_with(|| (0, BTreeMap::new())); - - let (viewsync_commit_stake_casted, viewsync_commit_vote_map) = self - .viewsync_commit_vote_outcomes - .entry(commitment) - .or_insert_with(|| (0, BTreeMap::new())); - - let (viewsync_finalize_stake_casted, viewsync_finalize_vote_map) = self - .viewsync_finalize_vote_outcomes - .entry(commitment) - .or_insert_with(|| (0, BTreeMap::new())); - - // Accumulate the stake for each leaf commitment rather than the total - // stake of all votes, in case they correspond to inconsistent - // commitments. - - // update the active_keys and sig_lists - if self.signers.get(node_id).as_deref() == Some(&true) { - error!("node id already in signers"); - return Either::Left(self); - } - self.signers.set(node_id, true); - self.sig_lists.push(original_signature); - - *total_stake_casted += u64::from(token.vote_count()); - total_vote_map.insert(key.clone(), (sig.clone(), vote_data.clone(), token.clone())); - - match vote_data { - VoteData::DA(_) => { - *da_stake_casted += u64::from(token.vote_count()); - da_vote_map.insert(key, (sig, vote_data, token)); - } - VoteData::Yes(_) => { - *yes_stake_casted += u64::from(token.vote_count()); - yes_vote_map.insert(key, (sig, vote_data, token)); - } - VoteData::No(_) => { - *no_stake_casted += u64::from(token.vote_count()); - no_vote_map.insert(key, (sig, vote_data, token)); - } - VoteData::ViewSyncPreCommit(_) => { - *viewsync_precommit_stake_casted += u64::from(token.vote_count()); - viewsync_precommit_vote_map.insert(key, (sig, vote_data, token)); - } - VoteData::ViewSyncCommit(_) => { - *viewsync_commit_stake_casted += u64::from(token.vote_count()); - viewsync_commit_vote_map.insert(key, (sig, vote_data, token)); - } - VoteData::ViewSyncFinalize(_) => { - *viewsync_finalize_stake_casted += u64::from(token.vote_count()); - viewsync_finalize_vote_map.insert(key, (sig, vote_data, token)); - } - VoteData::Timeout(_) => { - unimplemented!() - } - } - - // This is a messy way of accounting for the different vote types, but we will be replacing this code very soon - if *total_stake_casted >= u64::from(self.success_threshold) { - // Do assemble for QC here - let real_qc_pp = ::get_public_parameter( - entries.clone(), - U256::from(self.success_threshold.get()), - ); - - let real_qc_sig = ::assemble( - &real_qc_pp, - self.signers.as_bitslice(), - &self.sig_lists[..], - ); - - if *yes_stake_casted >= u64::from(self.success_threshold) { - self.yes_vote_outcomes.remove(&commitment); - return Either::Right(AssembledSignature::Yes(real_qc_sig)); - } else if *no_stake_casted >= u64::from(self.failure_threshold) { - self.total_vote_outcomes.remove(&commitment); - return Either::Right(AssembledSignature::No(real_qc_sig)); - } else if *da_stake_casted >= u64::from(self.success_threshold) { - self.da_vote_outcomes.remove(&commitment); - return Either::Right(AssembledSignature::DA(real_qc_sig)); - } else if *viewsync_commit_stake_casted >= u64::from(self.success_threshold) { - self.viewsync_commit_vote_outcomes - .remove(&commitment) - .unwrap(); - return Either::Right(AssembledSignature::ViewSyncCommit(real_qc_sig)); - } else if *viewsync_finalize_stake_casted >= u64::from(self.success_threshold) { - self.viewsync_finalize_vote_outcomes - .remove(&commitment) - .unwrap(); - return Either::Right(AssembledSignature::ViewSyncFinalize(real_qc_sig)); - } - } - if *viewsync_precommit_stake_casted >= u64::from(self.failure_threshold) { - let real_qc_pp = ::get_public_parameter( - entries, - U256::from(self.failure_threshold.get()), - ); - - let real_qc_sig = ::assemble( - &real_qc_pp, - self.signers.as_bitslice(), - &self.sig_lists[..], - ); - - self.viewsync_precommit_vote_outcomes - .remove(&commitment) - .unwrap(); - return Either::Right(AssembledSignature::ViewSyncPreCommit(real_qc_sig)); - } - Either::Left(self) - } -} diff --git a/crates/web_server/src/lib.rs b/crates/web_server/src/lib.rs index e29ce036e8..c295993c68 100644 --- a/crates/web_server/src/lib.rs +++ b/crates/web_server/src/lib.rs @@ -5,6 +5,7 @@ use async_compatibility_layer::channel::OneShotReceiver; use async_lock::RwLock; use clap::Args; use futures::FutureExt; +use tracing::error; use hotshot_types::traits::signature_key::{EncodedPublicKey, SignatureKey}; use rand::{distributions::Alphanumeric, rngs::StdRng, thread_rng, Rng, SeedableRng}; @@ -322,7 +323,7 @@ impl WebServerDataSource for WebServerState { } /// Stores a received proposal in the `WebServerState` fn post_proposal(&mut self, view_number: u64, mut proposal: Vec) -> Result<(), Error> { - debug!("Received proposal for view {}", view_number); + error!("Received proposal for view {}", view_number); if view_number > self.recent_proposal { self.recent_proposal = view_number; diff --git a/justfile b/justfile index 691406381f..bcc9c424dd 100644 --- a/justfile +++ b/justfile @@ -29,7 +29,7 @@ test_basic: test_success test_with_failures test_network_task test_consensus_tas test_catchup: echo Testing with async std executor - cargo test --lib --bins --tests --benches --workspace --no-fail-fast test_catchup -- --test-threads=1 --nocapture + ASYNC_STD_THREAD_COUNT=1 cargo test --lib --bins --tests --benches --workspace --no-fail-fast test_catchup -- --test-threads=1 --nocapture test_success: echo Testing success test @@ -37,7 +37,7 @@ test_success: test_timeout: echo Testing timeout test - cargo test --lib --bins --tests --benches --workspace --no-fail-fast test_timeout -- --test-threads=1 --nocapture --ignored + ASYNC_STD_THREAD_COUNT=1 cargo test --lib --bins --tests --benches --workspace --no-fail-fast test_timeout -- --test-threads=1 --nocapture test_web_server: echo Testing web server diff --git a/scripts/test.sh b/scripts/test.sh index 9f3da6d864..019eb0ca1d 100755 --- a/scripts/test.sh +++ b/scripts/test.sh @@ -7,8 +7,8 @@ counter=0 while true; do ((counter++)) echo "Iteration: $counter" - rm "test_log.txt" || true - just test_async_std_pkg_test hotshot-testing ten_tx_seven_nodes >> "test_log.txt" 2>&1 + rm "output.json" || true + just async_std test_basic >> "output.json" 2>&1 error_code=$? if [ "$error_code" -ne 0 ]; then break