From 347a7711ca36b30459e2bae523d8b0d7b0c52335 Mon Sep 17 00:00:00 2001 From: imabdulbasit Date: Thu, 12 Oct 2023 13:30:13 +0500 Subject: [PATCH 1/2] use non blocking solana rpc client --- consumer/src/backend.rs | 15 ++++--- consumer/src/events.rs | 16 +++++++- consumer/src/solana.rs | 87 +++++++++++++++++++++-------------------- 3 files changed, 68 insertions(+), 50 deletions(-) diff --git a/consumer/src/backend.rs b/consumer/src/backend.rs index 9df5e3b..d24ff60 100644 --- a/consumer/src/backend.rs +++ b/consumer/src/backend.rs @@ -103,31 +103,32 @@ impl From> for SolanaPendingTransaction { } } +#[async_trait] pub trait CollectionBackend { - fn create( + async fn create( &self, txn: MetaplexMasterEditionTransaction, ) -> Result>; - fn update( + async fn update( &self, collection: &collections::Model, txn: MetaplexMasterEditionTransaction, ) -> Result>; - fn update_mint( + async fn update_mint( &self, collection: &collections::Model, mint: &collection_mints::Model, txn: UpdateSolanaMintPayload, ) -> Result>; - fn retry_update_mint( + async fn retry_update_mint( &self, revision: &update_revisions::Model, ) -> Result>; - fn switch( + async fn switch( &self, mint: &collection_mints::Model, collection: &collections::Model, @@ -135,8 +136,10 @@ pub trait CollectionBackend { ) -> Result>; } +#[async_trait] pub trait MintBackend { - fn mint(&self, collection: &collections::Model, txn: T) -> Result>; + async fn mint(&self, collection: &collections::Model, txn: T) + -> Result>; } #[async_trait] diff --git a/consumer/src/events.rs b/consumer/src/events.rs index 642ccda..714d73a 100644 --- a/consumer/src/events.rs +++ b/consumer/src/events.rs @@ -251,7 +251,7 @@ impl EventKind { let address = if let Some(compression_leaf) = compression_leafs { let signature = Signature::from_str(&signature)?; - let nonce = solana.extract_compression_nonce(&signature)?; + let nonce = solana.extract_compression_nonce(&signature).await?; let asset_id = mpl_bubblegum::utils::get_asset_id( &Pubkey::from_str(&compression_leaf.merkle_tree)?, @@ -765,7 +765,7 @@ impl Processor { .map_err(|k| ProcessorError::new(k, kind, ErrorSource::TreasuryStatus)); } - let res = match self.solana().submit_transaction(&res) { + let res = match self.solana().submit_transaction(&res).await { Ok(sig) => self .event_submitted(kind, &key, sig) .await @@ -834,6 +834,7 @@ impl Processor { let conn = self.db.get(); let tx = backend .create(payload.clone()) + .await .map_err(ProcessorErrorKind::Solana)?; let MasterEditionAddresses { @@ -880,6 +881,7 @@ impl Processor { let tx = backend .mint(&collection, payload) + .await .map_err(ProcessorErrorKind::Solana)?; let compression_leaf = compression_leafs::Model { @@ -910,6 +912,7 @@ impl Processor { let tx = backend .mint(&collection, payload) + .await .map_err(ProcessorErrorKind::Solana)?; let collection_mint = collection_mints::Model { @@ -949,6 +952,7 @@ impl Processor { let tx = backend .mint(&collection, payload) + .await .map_err(ProcessorErrorKind::Solana)?; let collection_mint = collection_mints::Model { @@ -979,6 +983,7 @@ impl Processor { let tx = backend .update(&collection, payload) + .await .map_err(ProcessorErrorKind::Solana)?; Ok(tx.into()) @@ -1000,6 +1005,7 @@ impl Processor { let tx = backend .update_mint(&collection, &mint, payload) + .await .map_err(ProcessorErrorKind::Solana)?; let UpdateCollectionMintAddresses { @@ -1035,6 +1041,7 @@ impl Processor { let tx = backend .retry_update_mint(&revision) + .await .map_err(ProcessorErrorKind::Solana)?; Ok(tx.into()) @@ -1083,6 +1090,7 @@ impl Processor { let conn = self.db.get(); let tx = backend .create(payload.clone()) + .await .map_err(ProcessorErrorKind::Solana)?; let MasterEditionAddresses { @@ -1133,6 +1141,7 @@ impl Processor { let tx = backend .switch(&mint, &collection, &new_collection) + .await .map_err(ProcessorErrorKind::Solana)?; Ok(tx.into()) @@ -1157,6 +1166,7 @@ impl Processor { let tx = backend .mint(&collection, payload) + .await .map_err(ProcessorErrorKind::Solana)?; let MintEditionAddresses { @@ -1196,6 +1206,7 @@ impl Processor { let tx = backend .mint(&collection, payload) + .await .map_err(ProcessorErrorKind::Solana)?; let leaf_model = CompressionLeaf::find_by_id(conn, id) @@ -1218,6 +1229,7 @@ impl Processor { let tx = backend .mint(&collection, payload) + .await .map_err(ProcessorErrorKind::Solana)?; let MintMetaplexAddresses { diff --git a/consumer/src/solana.rs b/consumer/src/solana.rs index 2095e94..27fb5b5 100644 --- a/consumer/src/solana.rs +++ b/consumer/src/solana.rs @@ -9,7 +9,7 @@ use holaplex_hub_nfts_solana_entity::{ }; use hub_core::{ anyhow::Result, - backon::{BlockingRetryable, ExponentialBuilder}, + backon::{ExponentialBuilder, Retryable}, bs58, clap, prelude::*, thiserror, @@ -26,8 +26,7 @@ use mpl_token_metadata::{ state::{Creator, DataV2, EDITION, PREFIX}, }; use solana_client::{ - client_error::{ClientError, ClientErrorKind}, - rpc_client::RpcClient as SolanaRpcClient, + client_error::ClientErrorKind, nonblocking::rpc_client::RpcClient as SolanaRpcClient, rpc_request::RpcError, }; use solana_program::{ @@ -65,16 +64,12 @@ use crate::{ macro_rules! with_retry { ($expr:expr) => {{ - (|| $expr) - .retry( - &ExponentialBuilder::default() - .with_jitter() - .with_min_delay(Duration::from_millis(30)) - .with_max_times(15), - ) - .notify(|err: &ClientError, dur: Duration| { - error!("retrying error {:?} in {:?}", err, dur); - }) + (|| async { $expr.await }).retry( + &ExponentialBuilder::default() + .with_jitter() + .with_min_delay(Duration::from_millis(30)) + .with_max_times(15), + ) }}; } @@ -203,15 +198,14 @@ impl Solana { /// /// # Errors /// This function fails if unable to query or pull the asset id from the instruction data of the transaction - pub fn extract_compression_nonce( + pub async fn extract_compression_nonce( &self, signature: &Signature, ) -> Result { - let response = with_retry!( - self.rpc() - .get_transaction(signature, UiTransactionEncoding::Json) - ) - .call()?; + let response = with_retry!(self + .rpc() + .get_transaction(signature, UiTransactionEncoding::Json)) + .await?; let meta = response .transaction @@ -251,7 +245,10 @@ impl Solana { /// /// # Errors /// This function fails if unable to submit transaction to Solana - pub fn submit_transaction(&self, transaction: &SolanaTransactionResult) -> Result { + pub async fn submit_transaction( + &self, + transaction: &SolanaTransactionResult, + ) -> Result { let signatures = transaction .signed_message_signatures .iter() @@ -277,7 +274,7 @@ impl Solana { .. })) }) - .call() + .await .map_err(|e| { let msg = format!("failed to send transaction: {e}"); error!(msg); @@ -295,8 +292,9 @@ pub struct CompressedRef<'a>(pub &'a Solana); #[repr(transparent)] pub struct EditionRef<'a>(pub &'a Solana); +#[async_trait] impl<'a> CollectionBackend for UncompressedRef<'a> { - fn create( + async fn create( &self, txn: MetaplexMasterEditionTransaction, ) -> hub_core::prelude::Result> { @@ -336,8 +334,8 @@ impl<'a> CollectionBackend for UncompressedRef<'a> { ); let len = spl_token::state::Mint::LEN; - let rent = with_retry!(rpc.get_minimum_balance_for_rent_exemption(len)).call()?; - let blockhash = with_retry!(rpc.get_latest_blockhash()).call()?; + let rent = with_retry!(rpc.get_minimum_balance_for_rent_exemption(len)).await?; + let blockhash = with_retry!(rpc.get_latest_blockhash()).await?; let create_account_ins = solana_program::system_instruction::create_account( &payer, @@ -437,7 +435,7 @@ impl<'a> CollectionBackend for UncompressedRef<'a> { }) } - fn update( + async fn update( &self, collection: &collections::Model, txn: MetaplexMasterEditionTransaction, @@ -486,7 +484,7 @@ impl<'a> CollectionBackend for UncompressedRef<'a> { None, ); - let blockhash = with_retry!(rpc.get_latest_blockhash()).call()?; + let blockhash = with_retry!(rpc.get_latest_blockhash()).await?; let message = solana_program::message::Message::new_with_blockhash(&[ins], Some(&payer), &blockhash); @@ -506,7 +504,7 @@ impl<'a> CollectionBackend for UncompressedRef<'a> { }) } - fn update_mint( + async fn update_mint( &self, collection: &collections::Model, collection_mint: &collection_mints::Model, @@ -538,7 +536,7 @@ impl<'a> CollectionBackend for UncompressedRef<'a> { &mpl_token_metadata::ID, ); - let blockhash = with_retry!(rpc.get_latest_blockhash()).call()?; + let blockhash = with_retry!(rpc.get_latest_blockhash()).await?; let update_ins: Instruction = mpl_token_metadata::instruction::update_metadata_accounts_v2( mpl_token_metadata::ID, @@ -588,7 +586,7 @@ impl<'a> CollectionBackend for UncompressedRef<'a> { }) } - fn retry_update_mint( + async fn retry_update_mint( &self, revision: &update_revisions::Model, ) -> Result> { @@ -601,7 +599,7 @@ impl<'a> CollectionBackend for UncompressedRef<'a> { let mut message: solana_program::message::Message = bincode::deserialize(&revision.serialized_message)?; - let blockhash = with_retry!(rpc.get_latest_blockhash()).call()?; + let blockhash = with_retry!(rpc.get_latest_blockhash()).await?; message.recent_blockhash = blockhash; Ok(TransactionResponse { @@ -618,7 +616,7 @@ impl<'a> CollectionBackend for UncompressedRef<'a> { }) } - fn switch( + async fn switch( &self, mint: &collection_mints::Model, collection: &collections::Model, @@ -686,7 +684,7 @@ impl<'a> CollectionBackend for UncompressedRef<'a> { let instructions = vec![unverify_ins, verify_ins]; - let blockhash = with_retry!(rpc.get_latest_blockhash()).call()?; + let blockhash = with_retry!(rpc.get_latest_blockhash()).await?; let message = solana_program::message::Message::new_with_blockhash( &instructions, @@ -710,8 +708,10 @@ impl<'a> CollectionBackend for UncompressedRef<'a> { } } +#[async_trait] + impl<'a> MintBackend for EditionRef<'a> { - fn mint( + async fn mint( &self, collection: &collections::Model, txn: MintMetaplexEditionTransaction, @@ -757,7 +757,7 @@ impl<'a> MintBackend for E let (metadata_key, _) = Pubkey::find_program_address(metadata_seeds, &program_pubkey); let rent = - with_retry!(rpc.get_minimum_balance_for_rent_exemption(state::Mint::LEN)).call()?; + with_retry!(rpc.get_minimum_balance_for_rent_exemption(state::Mint::LEN)).await?; let mut instructions = vec![ create_account( @@ -795,7 +795,7 @@ impl<'a> MintBackend for E edition, )); - let blockhash = with_retry!(rpc.get_latest_blockhash()).call()?; + let blockhash = with_retry!(rpc.get_latest_blockhash()).await?; let message = solana_program::message::Message::new_with_blockhash( &instructions, @@ -843,7 +843,7 @@ impl<'a> TransferBackend for Un let recipient: Pubkey = recipient_address.parse()?; let mint_address: Pubkey = collection_mint.mint.parse()?; let payer: Pubkey = self.0.treasury_wallet_address; - let blockhash = with_retry!(rpc.get_latest_blockhash()).call()?; + let blockhash = with_retry!(rpc.get_latest_blockhash()).await?; let source_ata = get_associated_token_address(&sender, &mint_address); let destination_ata = get_associated_token_address(&recipient, &mint_address); @@ -972,7 +972,7 @@ impl<'a> TransferBackend TransferBackend MintBackend for CompressedRef<'a> { - fn mint( + async fn mint( &self, collection: &collections::Model, txn: MintMetaplexMetadataTransaction, @@ -1082,7 +1084,7 @@ impl<'a> MintBackend MintBackend MintBackend for UncompressedRef<'a> { - fn mint( + async fn mint( &self, collection: &collections::Model, txn: MintMetaplexMetadataTransaction, @@ -1140,8 +1143,8 @@ impl<'a> MintBackend ); let associated_token_account = get_associated_token_address(&recipient, &mint.pubkey()); let len = spl_token::state::Mint::LEN; - let rent = with_retry!(rpc.get_minimum_balance_for_rent_exemption(len)).call()?; - let blockhash = with_retry!(rpc.get_latest_blockhash()).call()?; + let rent = with_retry!(rpc.get_minimum_balance_for_rent_exemption(len)).await?; + let blockhash = with_retry!(rpc.get_latest_blockhash()).await?; let create_account_ins = solana_program::system_instruction::create_account( &payer, From a69cd387bd25b44878461b27832825e65c2cc3e9 Mon Sep 17 00:00:00 2001 From: imabdulbasit Date: Thu, 12 Oct 2023 13:31:34 +0500 Subject: [PATCH 2/2] add notify --- consumer/src/solana.rs | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/consumer/src/solana.rs b/consumer/src/solana.rs index 27fb5b5..0ce1681 100644 --- a/consumer/src/solana.rs +++ b/consumer/src/solana.rs @@ -26,7 +26,8 @@ use mpl_token_metadata::{ state::{Creator, DataV2, EDITION, PREFIX}, }; use solana_client::{ - client_error::ClientErrorKind, nonblocking::rpc_client::RpcClient as SolanaRpcClient, + client_error::{ClientError, ClientErrorKind}, + nonblocking::rpc_client::RpcClient as SolanaRpcClient, rpc_request::RpcError, }; use solana_program::{ @@ -64,12 +65,16 @@ use crate::{ macro_rules! with_retry { ($expr:expr) => {{ - (|| async { $expr.await }).retry( - &ExponentialBuilder::default() - .with_jitter() - .with_min_delay(Duration::from_millis(30)) - .with_max_times(15), - ) + (|| async { $expr.await }) + .retry( + &ExponentialBuilder::default() + .with_jitter() + .with_min_delay(Duration::from_millis(30)) + .with_max_times(15), + ) + .notify(|err: &ClientError, dur: Duration| { + error!("retrying error {:?} in {:?}", err, dur); + }) }}; } @@ -202,9 +207,10 @@ impl Solana { &self, signature: &Signature, ) -> Result { - let response = with_retry!(self - .rpc() - .get_transaction(signature, UiTransactionEncoding::Json)) + let response = with_retry!( + self.rpc() + .get_transaction(signature, UiTransactionEncoding::Json) + ) .await?; let meta = response