Skip to content

Commit

Permalink
fix(cosmos): validator checkpoint & agent indexing improvements (#3958)
Browse files Browse the repository at this point in the history
- propagates RPC errors to the indexing logic, so errors cause sleeps
instead of overwhelming the RPCs. This also greatly reduces cosmos log
noisiness: from 9.5k logs per min, to 2.5k logs per min.
- checks for published checkpoints in reverse, to prioritize recent
messages among those from before the validator was spun up
- fixes a validator bug where errors in checkpoint publishing would
cause the entire checkpoint queue to be reiterated through, from scratch
- Builds on top of
#3887 so we can
test this out on osmosis


### Description

<!--
What's included in this PR?
-->

### Drive-by changes

<!--
Are there any minor or drive-by changes also included?
-->

### Related issues

<!--
- Fixes #[issue number here]
-->

### Backward compatibility

<!--
Are these changes backward compatible? Are there any infrastructure
implications, e.g. changes that would prohibit deploying older commits
using this infra tooling?

Yes/No
-->

### Testing

<!--
What kind of testing have these changes undergone?

None/Manual/Unit Tests
-->
  • Loading branch information
daniel-savu authored Jun 19, 2024
1 parent bbbe932 commit 3bb9d0a
Show file tree
Hide file tree
Showing 13 changed files with 129 additions and 164 deletions.
131 changes: 64 additions & 67 deletions rust/agents/validator/src/submit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::time::{Duration, Instant};
use std::vec;

use hyperlane_core::rpc_clients::call_and_retry_indefinitely;
use hyperlane_core::{ChainCommunicationError, ChainResult, MerkleTreeHook};
use hyperlane_core::{ChainResult, MerkleTreeHook};
use prometheus::IntGauge;
use tokio::time::sleep;
use tracing::{debug, error, info};
Expand Down Expand Up @@ -61,17 +61,8 @@ impl ValidatorSubmitter {
/// Runs idly forever once the target checkpoint is reached to avoid exiting the task.
pub(crate) async fn backfill_checkpoint_submitter(self, target_checkpoint: Checkpoint) {
let mut tree = IncrementalMerkle::default();
call_and_retry_indefinitely(|| {
let target_checkpoint = target_checkpoint;
let self_clone = self.clone();
Box::pin(async move {
self_clone
.submit_checkpoints_until_correctness_checkpoint(&mut tree, &target_checkpoint)
.await?;
Ok(())
})
})
.await;
self.submit_checkpoints_until_correctness_checkpoint(&mut tree, &target_checkpoint)
.await;

info!(
?target_checkpoint,
Expand Down Expand Up @@ -132,21 +123,8 @@ impl ValidatorSubmitter {
sleep(self.interval).await;
continue;
}

tree = call_and_retry_indefinitely(|| {
let mut tree = tree;
let self_clone = self.clone();
Box::pin(async move {
self_clone
.submit_checkpoints_until_correctness_checkpoint(
&mut tree,
&latest_checkpoint,
)
.await?;
Ok(tree)
})
})
.await;
self.submit_checkpoints_until_correctness_checkpoint(&mut tree, &latest_checkpoint)
.await;

self.metrics
.latest_checkpoint_processed
Expand All @@ -162,7 +140,7 @@ impl ValidatorSubmitter {
&self,
tree: &mut IncrementalMerkle,
correctness_checkpoint: &Checkpoint,
) -> ChainResult<()> {
) {
// This should never be called with a tree that is ahead of the correctness checkpoint.
assert!(
!tree_exceeds_checkpoint(correctness_checkpoint, tree),
Expand All @@ -182,7 +160,14 @@ impl ValidatorSubmitter {
while tree.count() as u32 <= correctness_checkpoint.index {
if let Some(insertion) = self
.message_db
.retrieve_merkle_tree_insertion_by_leaf_index(&(tree.count() as u32))?
.retrieve_merkle_tree_insertion_by_leaf_index(&(tree.count() as u32))
.unwrap_or_else(|err| {
panic!(
"Error fetching merkle tree insertion for leaf index {}: {}",
tree.count(),
err
)
})
{
debug!(
index = insertion.index(),
Expand Down Expand Up @@ -225,9 +210,7 @@ impl ValidatorSubmitter {
?correctness_checkpoint,
"Incorrect tree root, something went wrong"
);
return Err(ChainCommunicationError::CustomError(
"Incorrect tree root, something went wrong".to_string(),
));
panic!("Incorrect tree root, something went wrong");
}

if !checkpoint_queue.is_empty() {
Expand All @@ -236,57 +219,71 @@ impl ValidatorSubmitter {
queue_len = checkpoint_queue.len(),
"Reached tree consistency"
);

self.sign_and_submit_checkpoints(checkpoint_queue).await?;
self.sign_and_submit_checkpoints(checkpoint_queue).await;

info!(
index = checkpoint.index,
"Signed all queued checkpoints until index"
);
}

Ok(())
}

/// Signs and submits any previously unsubmitted checkpoints.
async fn sign_and_submit_checkpoints(
async fn sign_and_submit_checkpoint(
&self,
checkpoints: Vec<CheckpointWithMessageId>,
checkpoint: CheckpointWithMessageId,
) -> ChainResult<()> {
let last_checkpoint = checkpoints.as_slice()[checkpoints.len() - 1];

for queued_checkpoint in checkpoints {
let existing = self
.checkpoint_syncer
.fetch_checkpoint(queued_checkpoint.index)
.await?;
if existing.is_some() {
debug!(
index = queued_checkpoint.index,
"Checkpoint already submitted"
);
continue;
}
let signed_checkpoint = self.signer.sign(queued_checkpoint).await?;
self.checkpoint_syncer
.write_checkpoint(&signed_checkpoint)
.await?;
debug!(
index = queued_checkpoint.index,
"Signed and submitted checkpoint"
);

// TODO: move these into S3 implementations
// small sleep before signing next checkpoint to avoid rate limiting
sleep(Duration::from_millis(100)).await;
let existing = self
.checkpoint_syncer
.fetch_checkpoint(checkpoint.index)
.await?;
if existing.is_some() {
debug!(index = checkpoint.index, "Checkpoint already submitted");
return Ok(());
}

let signed_checkpoint = self.signer.sign(checkpoint).await?;
self.checkpoint_syncer
.update_latest_index(last_checkpoint.index)
.write_checkpoint(&signed_checkpoint)
.await?;
debug!(index = checkpoint.index, "Signed and submitted checkpoint");

// TODO: move these into S3 implementations
// small sleep before signing next checkpoint to avoid rate limiting
sleep(Duration::from_millis(100)).await;
Ok(())
}

/// Signs and submits any previously unsubmitted checkpoints.
async fn sign_and_submit_checkpoints(&self, checkpoints: Vec<CheckpointWithMessageId>) {
let last_checkpoint = checkpoints.as_slice()[checkpoints.len() - 1];
// Submits checkpoints to the store in reverse order. This speeds up processing historic checkpoints (those before the validator is spun up),
// since those are the most likely to make messages become processable.
// A side effect is that new checkpoints will also be submitted in reverse order.
for queued_checkpoint in checkpoints.into_iter().rev() {
// certain checkpoint stores rate limit very aggressively, so we retry indefinitely
call_and_retry_indefinitely(|| {
let self_clone = self.clone();
Box::pin(async move {
self_clone
.sign_and_submit_checkpoint(queued_checkpoint)
.await?;
Ok(())
})
})
.await;
}

call_and_retry_indefinitely(|| {
let self_clone = self.clone();
Box::pin(async move {
self_clone
.checkpoint_syncer
.update_latest_index(last_checkpoint.index)
.await?;
Ok(())
})
})
.await;
}
}

/// Returns whether the tree exceeds the checkpoint.
Expand Down
29 changes: 6 additions & 23 deletions rust/chains/hyperlane-cosmos/src/interchain_gas.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use async_trait::async_trait;
use base64::{engine::general_purpose::STANDARD as BASE64, Engine};
use futures::future;
use hyperlane_core::{
ChainCommunicationError, ChainResult, ContractLocator, HyperlaneChain, HyperlaneContract,
HyperlaneDomain, HyperlaneProvider, Indexed, Indexer, InterchainGasPaymaster,
Expand All @@ -9,12 +8,15 @@ use hyperlane_core::{
use once_cell::sync::Lazy;
use std::ops::RangeInclusive;
use tendermint::abci::EventAttribute;
use tracing::{instrument, warn};
use tracing::instrument;

use crate::{
rpc::{CosmosWasmIndexer, ParsedEvent, WasmIndexer},
signers::Signer,
utils::{CONTRACT_ADDRESS_ATTRIBUTE_KEY, CONTRACT_ADDRESS_ATTRIBUTE_KEY_BASE64},
utils::{
execute_and_parse_log_futures, CONTRACT_ADDRESS_ATTRIBUTE_KEY,
CONTRACT_ADDRESS_ATTRIBUTE_KEY_BASE64,
},
ConnectionConf, CosmosProvider, HyperlaneCosmosError,
};

Expand Down Expand Up @@ -223,26 +225,7 @@ impl Indexer<InterchainGasPayment> for CosmosInterchainGasPaymasterIndexer {
})
.collect();

// TODO: this can be refactored when we rework indexing, to be part of the block-by-block indexing
let result = future::join_all(logs_futures)
.await
.into_iter()
.flatten()
.map(|(logs, block_number)| {
if let Err(err) = &logs {
warn!(?err, ?block_number, "Failed to fetch logs for block");
}
logs
})
// Propagate errors from any of the queries. This will cause the entire range to be retried,
// including successful ones, but we don't have a way to handle partial failures in a range for now.
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.flatten()
.map(|(log, meta)| (Indexed::new(log), meta))
.collect();

Ok(result)
execute_and_parse_log_futures(logs_futures).await
}

async fn get_finalized_block_number(&self) -> ChainResult<u32> {
Expand Down
26 changes: 5 additions & 21 deletions rust/chains/hyperlane-cosmos/src/mailbox.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use base64::{engine::general_purpose::STANDARD as BASE64, Engine};
use futures::future;
use std::{
fmt::{Debug, Formatter},
io::Cursor,
Expand All @@ -8,14 +7,15 @@ use std::{
str::FromStr,
};

use crate::payloads::mailbox::{
GeneralMailboxQuery, ProcessMessageRequest, ProcessMessageRequestInner,
};
use crate::payloads::{general, mailbox};
use crate::rpc::{CosmosWasmIndexer, ParsedEvent, WasmIndexer};
use crate::CosmosProvider;
use crate::{address::CosmosAddress, types::tx_response_to_outcome};
use crate::{grpc::WasmProvider, HyperlaneCosmosError};
use crate::{
payloads::mailbox::{GeneralMailboxQuery, ProcessMessageRequest, ProcessMessageRequestInner},
utils::execute_and_parse_log_futures,
};
use crate::{signers::Signer, utils::get_block_height_for_lag, ConnectionConf};
use async_trait::async_trait;
use cosmrs::proto::cosmos::base::abci::v1beta1::TxResponse;
Expand Down Expand Up @@ -371,23 +371,7 @@ impl Indexer<HyperlaneMessage> for CosmosMailboxIndexer {
})
.collect();

// TODO: this can be refactored when we rework indexing, to be part of the block-by-block indexing
let result = future::join_all(logs_futures)
.await
.into_iter()
.flatten()
.filter_map(|(logs_res, block_number)| match logs_res {
Ok(logs) => Some(logs),
Err(err) => {
warn!(?err, ?block_number, "Failed to fetch logs for block");
None
}
})
.flatten()
.map(|(log, meta)| (log.into(), meta))
.collect();

Ok(result)
execute_and_parse_log_futures(logs_futures).await
}

async fn get_finalized_block_number(&self) -> ChainResult<u32> {
Expand Down
28 changes: 4 additions & 24 deletions rust/chains/hyperlane-cosmos/src/merkle_tree_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,21 @@ use std::{fmt::Debug, num::NonZeroU64, ops::RangeInclusive, str::FromStr};

use async_trait::async_trait;
use base64::{engine::general_purpose::STANDARD as BASE64, Engine};
use futures::future;
use hyperlane_core::{
accumulator::incremental::IncrementalMerkle, ChainCommunicationError, ChainResult, Checkpoint,
ContractLocator, HyperlaneChain, HyperlaneContract, HyperlaneDomain, HyperlaneProvider,
Indexed, Indexer, LogMeta, MerkleTreeHook, MerkleTreeInsertion, SequenceAwareIndexer, H256,
};
use once_cell::sync::Lazy;
use tendermint::abci::EventAttribute;
use tracing::{instrument, warn};
use tracing::instrument;

use crate::{
grpc::WasmProvider,
payloads::{
general::{self},
merkle_tree_hook,
},
payloads::{general, merkle_tree_hook},
rpc::{CosmosWasmIndexer, ParsedEvent, WasmIndexer},
utils::{
get_block_height_for_lag, CONTRACT_ADDRESS_ATTRIBUTE_KEY,
execute_and_parse_log_futures, get_block_height_for_lag, CONTRACT_ADDRESS_ATTRIBUTE_KEY,
CONTRACT_ADDRESS_ATTRIBUTE_KEY_BASE64,
},
ConnectionConf, CosmosProvider, HyperlaneCosmosError, Signer,
Expand Down Expand Up @@ -304,23 +300,7 @@ impl Indexer<MerkleTreeInsertion> for CosmosMerkleTreeHookIndexer {
})
.collect();

// TODO: this can be refactored when we rework indexing, to be part of the block-by-block indexing
let result = future::join_all(logs_futures)
.await
.into_iter()
.flatten()
.filter_map(|(logs_res, block_number)| match logs_res {
Ok(logs) => Some(logs),
Err(err) => {
warn!(?err, ?block_number, "Failed to fetch logs for block");
None
}
})
.flatten()
.map(|(log, meta)| (log.into(), meta))
.collect();

Ok(result)
execute_and_parse_log_futures(logs_futures).await
}

/// Get the chain's latest block number that has reached finality
Expand Down
15 changes: 6 additions & 9 deletions rust/chains/hyperlane-cosmos/src/providers/rpc.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use async_trait::async_trait;
use cosmrs::rpc::client::Client;
use hyperlane_core::rpc_clients::call_with_retry;
use hyperlane_core::{ChainCommunicationError, ChainResult, ContractLocator, LogMeta, H256, U256};
use sha256::digest;
use std::fmt::Debug;
Expand Down Expand Up @@ -216,9 +215,7 @@ impl CosmosWasmIndexer {
impl WasmIndexer for CosmosWasmIndexer {
#[instrument(err, skip(self))]
async fn get_finalized_block_number(&self) -> ChainResult<u32> {
let latest_block =
call_with_retry(move || Box::pin(Self::get_latest_block(self.provider.rpc().clone())))
.await?;
let latest_block = Self::get_latest_block(self.provider.rpc().clone()).await?;
let latest_height: u32 = latest_block
.block
.header
Expand All @@ -242,11 +239,11 @@ impl WasmIndexer for CosmosWasmIndexer {
let client = self.provider.rpc().clone();
debug!(?block_number, cursor_label, domain=?self.provider.domain, "Getting logs in block");

let (block, block_results) = tokio::join!(
call_with_retry(|| { Box::pin(Self::get_block(client.clone(), block_number)) }),
call_with_retry(|| { Box::pin(Self::get_block_results(client.clone(), block_number)) }),
);
// The two calls below could be made in parallel, but on cosmos rate limiting is a bigger problem
// than indexing latency, so we do them sequentially.
let block = Self::get_block(client.clone(), block_number).await?;
let block_results = Self::get_block_results(client.clone(), block_number).await?;

Ok(self.handle_txs(block?, block_results?, parser, cursor_label))
Ok(self.handle_txs(block, block_results, parser, cursor_label))
}
}
Loading

0 comments on commit 3bb9d0a

Please sign in to comment.