diff --git a/consumer/src/backend.rs b/consumer/src/backend.rs index 9df5e3b..d24ff60 100644 --- a/consumer/src/backend.rs +++ b/consumer/src/backend.rs @@ -103,31 +103,32 @@ impl From> for SolanaPendingTransaction { } } +#[async_trait] pub trait CollectionBackend { - fn create( + async fn create( &self, txn: MetaplexMasterEditionTransaction, ) -> Result>; - fn update( + async fn update( &self, collection: &collections::Model, txn: MetaplexMasterEditionTransaction, ) -> Result>; - fn update_mint( + async fn update_mint( &self, collection: &collections::Model, mint: &collection_mints::Model, txn: UpdateSolanaMintPayload, ) -> Result>; - fn retry_update_mint( + async fn retry_update_mint( &self, revision: &update_revisions::Model, ) -> Result>; - fn switch( + async fn switch( &self, mint: &collection_mints::Model, collection: &collections::Model, @@ -135,8 +136,10 @@ pub trait CollectionBackend { ) -> Result>; } +#[async_trait] pub trait MintBackend { - fn mint(&self, collection: &collections::Model, txn: T) -> Result>; + async fn mint(&self, collection: &collections::Model, txn: T) + -> Result>; } #[async_trait] diff --git a/consumer/src/events.rs b/consumer/src/events.rs index 642ccda..714d73a 100644 --- a/consumer/src/events.rs +++ b/consumer/src/events.rs @@ -251,7 +251,7 @@ impl EventKind { let address = if let Some(compression_leaf) = compression_leafs { let signature = Signature::from_str(&signature)?; - let nonce = solana.extract_compression_nonce(&signature)?; + let nonce = solana.extract_compression_nonce(&signature).await?; let asset_id = mpl_bubblegum::utils::get_asset_id( &Pubkey::from_str(&compression_leaf.merkle_tree)?, @@ -765,7 +765,7 @@ impl Processor { .map_err(|k| ProcessorError::new(k, kind, ErrorSource::TreasuryStatus)); } - let res = match self.solana().submit_transaction(&res) { + let res = match self.solana().submit_transaction(&res).await { Ok(sig) => self .event_submitted(kind, &key, sig) .await @@ -834,6 +834,7 @@ impl Processor { let conn = self.db.get(); let tx = backend .create(payload.clone()) + .await .map_err(ProcessorErrorKind::Solana)?; let MasterEditionAddresses { @@ -880,6 +881,7 @@ impl Processor { let tx = backend .mint(&collection, payload) + .await .map_err(ProcessorErrorKind::Solana)?; let compression_leaf = compression_leafs::Model { @@ -910,6 +912,7 @@ impl Processor { let tx = backend .mint(&collection, payload) + .await .map_err(ProcessorErrorKind::Solana)?; let collection_mint = collection_mints::Model { @@ -949,6 +952,7 @@ impl Processor { let tx = backend .mint(&collection, payload) + .await .map_err(ProcessorErrorKind::Solana)?; let collection_mint = collection_mints::Model { @@ -979,6 +983,7 @@ impl Processor { let tx = backend .update(&collection, payload) + .await .map_err(ProcessorErrorKind::Solana)?; Ok(tx.into()) @@ -1000,6 +1005,7 @@ impl Processor { let tx = backend .update_mint(&collection, &mint, payload) + .await .map_err(ProcessorErrorKind::Solana)?; let UpdateCollectionMintAddresses { @@ -1035,6 +1041,7 @@ impl Processor { let tx = backend .retry_update_mint(&revision) + .await .map_err(ProcessorErrorKind::Solana)?; Ok(tx.into()) @@ -1083,6 +1090,7 @@ impl Processor { let conn = self.db.get(); let tx = backend .create(payload.clone()) + .await .map_err(ProcessorErrorKind::Solana)?; let MasterEditionAddresses { @@ -1133,6 +1141,7 @@ impl Processor { let tx = backend .switch(&mint, &collection, &new_collection) + .await .map_err(ProcessorErrorKind::Solana)?; Ok(tx.into()) @@ -1157,6 +1166,7 @@ impl Processor { let tx = backend .mint(&collection, payload) + .await .map_err(ProcessorErrorKind::Solana)?; let MintEditionAddresses { @@ -1196,6 +1206,7 @@ impl Processor { let tx = backend .mint(&collection, payload) + .await .map_err(ProcessorErrorKind::Solana)?; let leaf_model = CompressionLeaf::find_by_id(conn, id) @@ -1218,6 +1229,7 @@ impl Processor { let tx = backend .mint(&collection, payload) + .await .map_err(ProcessorErrorKind::Solana)?; let MintMetaplexAddresses { diff --git a/consumer/src/solana.rs b/consumer/src/solana.rs index 2095e94..0ce1681 100644 --- a/consumer/src/solana.rs +++ b/consumer/src/solana.rs @@ -9,7 +9,7 @@ use holaplex_hub_nfts_solana_entity::{ }; use hub_core::{ anyhow::Result, - backon::{BlockingRetryable, ExponentialBuilder}, + backon::{ExponentialBuilder, Retryable}, bs58, clap, prelude::*, thiserror, @@ -27,7 +27,7 @@ use mpl_token_metadata::{ }; use solana_client::{ client_error::{ClientError, ClientErrorKind}, - rpc_client::RpcClient as SolanaRpcClient, + nonblocking::rpc_client::RpcClient as SolanaRpcClient, rpc_request::RpcError, }; use solana_program::{ @@ -65,7 +65,7 @@ use crate::{ macro_rules! with_retry { ($expr:expr) => {{ - (|| $expr) + (|| async { $expr.await }) .retry( &ExponentialBuilder::default() .with_jitter() @@ -203,7 +203,7 @@ impl Solana { /// /// # Errors /// This function fails if unable to query or pull the asset id from the instruction data of the transaction - pub fn extract_compression_nonce( + pub async fn extract_compression_nonce( &self, signature: &Signature, ) -> Result { @@ -211,7 +211,7 @@ impl Solana { self.rpc() .get_transaction(signature, UiTransactionEncoding::Json) ) - .call()?; + .await?; let meta = response .transaction @@ -251,7 +251,10 @@ impl Solana { /// /// # Errors /// This function fails if unable to submit transaction to Solana - pub fn submit_transaction(&self, transaction: &SolanaTransactionResult) -> Result { + pub async fn submit_transaction( + &self, + transaction: &SolanaTransactionResult, + ) -> Result { let signatures = transaction .signed_message_signatures .iter() @@ -277,7 +280,7 @@ impl Solana { .. })) }) - .call() + .await .map_err(|e| { let msg = format!("failed to send transaction: {e}"); error!(msg); @@ -295,8 +298,9 @@ pub struct CompressedRef<'a>(pub &'a Solana); #[repr(transparent)] pub struct EditionRef<'a>(pub &'a Solana); +#[async_trait] impl<'a> CollectionBackend for UncompressedRef<'a> { - fn create( + async fn create( &self, txn: MetaplexMasterEditionTransaction, ) -> hub_core::prelude::Result> { @@ -336,8 +340,8 @@ impl<'a> CollectionBackend for UncompressedRef<'a> { ); let len = spl_token::state::Mint::LEN; - let rent = with_retry!(rpc.get_minimum_balance_for_rent_exemption(len)).call()?; - let blockhash = with_retry!(rpc.get_latest_blockhash()).call()?; + let rent = with_retry!(rpc.get_minimum_balance_for_rent_exemption(len)).await?; + let blockhash = with_retry!(rpc.get_latest_blockhash()).await?; let create_account_ins = solana_program::system_instruction::create_account( &payer, @@ -437,7 +441,7 @@ impl<'a> CollectionBackend for UncompressedRef<'a> { }) } - fn update( + async fn update( &self, collection: &collections::Model, txn: MetaplexMasterEditionTransaction, @@ -486,7 +490,7 @@ impl<'a> CollectionBackend for UncompressedRef<'a> { None, ); - let blockhash = with_retry!(rpc.get_latest_blockhash()).call()?; + let blockhash = with_retry!(rpc.get_latest_blockhash()).await?; let message = solana_program::message::Message::new_with_blockhash(&[ins], Some(&payer), &blockhash); @@ -506,7 +510,7 @@ impl<'a> CollectionBackend for UncompressedRef<'a> { }) } - fn update_mint( + async fn update_mint( &self, collection: &collections::Model, collection_mint: &collection_mints::Model, @@ -538,7 +542,7 @@ impl<'a> CollectionBackend for UncompressedRef<'a> { &mpl_token_metadata::ID, ); - let blockhash = with_retry!(rpc.get_latest_blockhash()).call()?; + let blockhash = with_retry!(rpc.get_latest_blockhash()).await?; let update_ins: Instruction = mpl_token_metadata::instruction::update_metadata_accounts_v2( mpl_token_metadata::ID, @@ -588,7 +592,7 @@ impl<'a> CollectionBackend for UncompressedRef<'a> { }) } - fn retry_update_mint( + async fn retry_update_mint( &self, revision: &update_revisions::Model, ) -> Result> { @@ -601,7 +605,7 @@ impl<'a> CollectionBackend for UncompressedRef<'a> { let mut message: solana_program::message::Message = bincode::deserialize(&revision.serialized_message)?; - let blockhash = with_retry!(rpc.get_latest_blockhash()).call()?; + let blockhash = with_retry!(rpc.get_latest_blockhash()).await?; message.recent_blockhash = blockhash; Ok(TransactionResponse { @@ -618,7 +622,7 @@ impl<'a> CollectionBackend for UncompressedRef<'a> { }) } - fn switch( + async fn switch( &self, mint: &collection_mints::Model, collection: &collections::Model, @@ -686,7 +690,7 @@ impl<'a> CollectionBackend for UncompressedRef<'a> { let instructions = vec![unverify_ins, verify_ins]; - let blockhash = with_retry!(rpc.get_latest_blockhash()).call()?; + let blockhash = with_retry!(rpc.get_latest_blockhash()).await?; let message = solana_program::message::Message::new_with_blockhash( &instructions, @@ -710,8 +714,10 @@ impl<'a> CollectionBackend for UncompressedRef<'a> { } } +#[async_trait] + impl<'a> MintBackend for EditionRef<'a> { - fn mint( + async fn mint( &self, collection: &collections::Model, txn: MintMetaplexEditionTransaction, @@ -757,7 +763,7 @@ impl<'a> MintBackend for E let (metadata_key, _) = Pubkey::find_program_address(metadata_seeds, &program_pubkey); let rent = - with_retry!(rpc.get_minimum_balance_for_rent_exemption(state::Mint::LEN)).call()?; + with_retry!(rpc.get_minimum_balance_for_rent_exemption(state::Mint::LEN)).await?; let mut instructions = vec![ create_account( @@ -795,7 +801,7 @@ impl<'a> MintBackend for E edition, )); - let blockhash = with_retry!(rpc.get_latest_blockhash()).call()?; + let blockhash = with_retry!(rpc.get_latest_blockhash()).await?; let message = solana_program::message::Message::new_with_blockhash( &instructions, @@ -843,7 +849,7 @@ impl<'a> TransferBackend for Un let recipient: Pubkey = recipient_address.parse()?; let mint_address: Pubkey = collection_mint.mint.parse()?; let payer: Pubkey = self.0.treasury_wallet_address; - let blockhash = with_retry!(rpc.get_latest_blockhash()).call()?; + let blockhash = with_retry!(rpc.get_latest_blockhash()).await?; let source_ata = get_associated_token_address(&sender, &mint_address); let destination_ata = get_associated_token_address(&recipient, &mint_address); @@ -972,7 +978,7 @@ impl<'a> TransferBackend TransferBackend MintBackend for CompressedRef<'a> { - fn mint( + async fn mint( &self, collection: &collections::Model, txn: MintMetaplexMetadataTransaction, @@ -1082,7 +1090,7 @@ impl<'a> MintBackend MintBackend MintBackend for UncompressedRef<'a> { - fn mint( + async fn mint( &self, collection: &collections::Model, txn: MintMetaplexMetadataTransaction, @@ -1140,8 +1149,8 @@ impl<'a> MintBackend ); let associated_token_account = get_associated_token_address(&recipient, &mint.pubkey()); let len = spl_token::state::Mint::LEN; - let rent = with_retry!(rpc.get_minimum_balance_for_rent_exemption(len)).call()?; - let blockhash = with_retry!(rpc.get_latest_blockhash()).call()?; + let rent = with_retry!(rpc.get_minimum_balance_for_rent_exemption(len)).await?; + let blockhash = with_retry!(rpc.get_latest_blockhash()).await?; let create_account_ins = solana_program::system_instruction::create_account( &payer,