Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
lukeiannucci committed Sep 25, 2024
1 parent 6198525 commit 77424b0
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 68 deletions.
3 changes: 2 additions & 1 deletion crates/hotshot/src/tasks/task_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

use std::{
collections::{BTreeMap, HashMap},
sync::Arc,
sync::{atomic::AtomicBool, Arc},
};

use async_trait::async_trait;
Expand Down Expand Up @@ -55,6 +55,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> CreateTaskState
public_key: handle.public_key().clone(),
private_key: handle.private_key().clone(),
id: handle.hotshot.id,
shutdown_flag: Arc::new(AtomicBool::new(false)),
spawned_tasks: BTreeMap::new(),
}
}
Expand Down
19 changes: 11 additions & 8 deletions crates/task-impls/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,17 +151,20 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> ConsensusTaskSt
let payload_commitment = disperse.data.payload_commitment;

// Check whether the data satisfies one of the following.
// * From the right leader for this view.
// * Calculated and signed by the current node.
// * Signed by one of the staked DA committee members.
if !sender.validate(&disperse.signature, payload_commitment.as_ref())
// * Signed by sender of VID share and verify sender is in DA
let sender_sig_validated =
sender.validate(&disperse.signature, payload_commitment.as_ref());

if (sender_sig_validated
&& !self
.quorum_membership
.leader(view)
.validate(&disperse.signature, payload_commitment.as_ref())
&& !self
.public_key
.validate(&disperse.signature, payload_commitment.as_ref())
.committee_members(view)
.contains(sender))
|| (!sender_sig_validated
&& !self
.public_key
.validate(&disperse.signature, payload_commitment.as_ref()))
{
return false;
}
Expand Down
23 changes: 14 additions & 9 deletions crates/task-impls/src/quorum_vote/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,22 +590,27 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> QuorumVoteTaskS

// Validate the VID share.
let payload_commitment = disperse.data.payload_commitment;

// Check whether the data satisfies one of the following.
// * From the right leader for this view.
// * Calculated and signed by the current node.
// * Signed by one of the staked DA committee members.
if !sender.validate(&disperse.signature, payload_commitment.as_ref())
// * Signed by sender of VID share and verify sender is in DA
let sender_sig_validated =
sender.validate(&disperse.signature, payload_commitment.as_ref());

if (sender_sig_validated
&& !self
.quorum_membership
.leader(view)
.validate(&disperse.signature, payload_commitment.as_ref())
&& !self
.public_key
.validate(&disperse.signature, payload_commitment.as_ref())
.committee_members(view)
.contains(sender))
|| (!sender_sig_validated
&& !self
.public_key
.validate(&disperse.signature, payload_commitment.as_ref()))
{
tracing::error!("Recieved VID share from sender not in DA");
tracing::warn!("Failed to validated the VID dispersal/share sig.");
return;
}

// NOTE: `verify_share` returns a nested `Result`, so we must check both the inner
// and outer results
#[allow(clippy::no_effect)]
Expand Down
124 changes: 74 additions & 50 deletions crates/task-impls/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,14 @@
// You should have received a copy of the MIT License
// along with the HotShot repository. If not, see <https://mit-license.org/>.

use std::{collections::BTreeMap, sync::Arc, time::Duration};
use std::{
collections::BTreeMap,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};

use anyhow::{ensure, Result};
use async_broadcast::{Receiver, Sender};
Expand Down Expand Up @@ -63,6 +70,8 @@ pub struct NetworkRequestState<TYPES: NodeType, I: NodeImplementation<TYPES>> {
/// The node's id
pub id: u64,
/// A flag indicating that `HotShotEvent::Shutdown` has been received
pub shutdown_flag: Arc<AtomicBool>,
/// A flag indicating that `HotShotEvent::Shutdown` has been received
pub spawned_tasks: BTreeMap<TYPES::Time, Vec<JoinHandle<()>>>,
}

Expand Down Expand Up @@ -143,6 +152,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> TaskState for NetworkRequest
}

async fn cancel_subtasks(&mut self) {
self.shutdown_flag.store(true, Ordering::Relaxed);
while !self.spawned_tasks.is_empty() {
let Some((_, handles)) = self.spawned_tasks.pop_first() else {
break;
Expand Down Expand Up @@ -197,6 +207,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> NetworkRequestState<TYPES, I
let network = Arc::clone(&self.network);
let delay = self.delay;
let pub_key = self.public_key.clone();
let shutdown_flag = Arc::clone(&self.shutdown_flag);
async_spawn(async move {
// Do the delay only if primary is up and then start sending
if !network.is_primary_down() {
Expand All @@ -205,9 +216,8 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> NetworkRequestState<TYPES, I

let mut request_vid = true;

// start waiting
while !Self::cancel_vid(&state, &sender, &pub_key, &view).await {
// we broadcast to all DA nodes, no need to resend
while !Self::cancel_vid(&state, &sender, &pub_key, &view, &shutdown_flag).await {
// We will send request to all DA nodes, only broadcast this event once per view
if request_vid {
broadcast_event(
HotShotEvent::VidRequestSend(request.clone(), signature.clone()).into(),
Expand All @@ -216,57 +226,68 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> NetworkRequestState<TYPES, I
.await;
request_vid = false;
}
let result = async_timeout(REQUEST_TIMEOUT, async {
let mut response = None;
while response.is_none() {
let event = EventDependency::new(
receiver.clone(),
Box::new(move |event: &Arc<HotShotEvent<TYPES>>| {
let event = event.as_ref();
if let HotShotEvent::VidResponseRecv(_sender_key, proposal) = event
{
proposal.data.view_number() == view
} else {
false
}
}),
)
.completed()
.await;

if let Some(hs_event) = event.as_ref() {
if let HotShotEvent::VidResponseRecv(sender_pub_key, proposal) =
hs_event.as_ref()
{
// validate from someone in DA for view and signature is valid
if membership.committee_members(view).contains(sender_pub_key)
&& sender_pub_key.validate(
&proposal.signature,
proposal.data.payload_commitment.as_ref(),
)
{
response = Some((sender_pub_key.clone(), proposal.clone()));
}
}
}
}
if Self::handle_response(&receiver, &sender, &membership, view).await {
return;
}
}
});
}

response
})
async fn handle_response(
receiver: &Receiver<Arc<HotShotEvent<TYPES>>>,
sender: &Sender<Arc<HotShotEvent<TYPES>>>,
membership: &TYPES::Membership,
view: TYPES::Time,
) -> bool {
let result = async_timeout(REQUEST_TIMEOUT, async {
let mut response = None;
while response.is_none() {
let event = EventDependency::new(
receiver.clone(),
Box::new(move |event: &Arc<HotShotEvent<TYPES>>| {
let event = event.as_ref();
if let HotShotEvent::VidResponseRecv(_sender_key, proposal) = event {
proposal.data.view_number() == view
} else {
false
}
}),
)
.completed()
.await;

// check if success otherwise retry until max attempts is reached
if let Ok(Some(response)) = result {
broadcast_event(
Arc::new(HotShotEvent::VidShareRecv(response.0, response.1)),
&sender,
)
.await;
return;
if let Some(hs_event) = event.as_ref() {
if let HotShotEvent::VidResponseRecv(sender_pub_key, proposal) =
hs_event.as_ref()
{
// validate from someone in DA for view and signature is valid
if membership.committee_members(view).contains(sender_pub_key)
&& sender_pub_key.validate(
&proposal.signature,
proposal.data.payload_commitment.as_ref(),
)
{
response = Some((sender_pub_key.clone(), proposal.clone()));
}
}
}
}
});
// self.spawned_tasks.entry(view).or_default().push(handle);

response
})
.await;

// check if success otherwise retry until max attempts is reached
if let Ok(Some(response)) = result {
broadcast_event(
Arc::new(HotShotEvent::VidShareRecv(response.0, response.1)),
sender,
)
.await;
return true;
}
false
}

/// Returns true if we got the data we wanted, or the view has moved on.
Expand All @@ -275,10 +296,13 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> NetworkRequestState<TYPES, I
sender: &Sender<Arc<HotShotEvent<TYPES>>>,
public_key: &<TYPES as NodeType>::SignatureKey,
view: &TYPES::Time,
shutdown_flag: &Arc<AtomicBool>,
) -> bool {
let state = state.read().await;

let cancel = state.vid_shares().contains_key(view) || state.cur_view() > *view;
let cancel = shutdown_flag.load(Ordering::Relaxed)
|| state.vid_shares().contains_key(view)
|| state.cur_view() > *view;
if cancel {
if let Some(Some(vid_share)) = state
.vid_shares()
Expand Down

0 comments on commit 77424b0

Please sign in to comment.