diff --git a/Cargo.lock b/Cargo.lock index b830817..211149d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5043,6 +5043,7 @@ dependencies = [ name = "solana-indexer" version = "0.1.0" dependencies = [ + "anchor-lang", "backoff", "bs58 0.5.0", "dashmap", @@ -5051,6 +5052,7 @@ dependencies = [ "holaplex-hub-core", "holaplex-hub-nfts-solana-core", "holaplex-hub-nfts-solana-entity", + "mpl-bubblegum", "sea-orm 0.10.7", "solana-client", "solana-program", diff --git a/consumer/src/asset_api.rs b/consumer/src/asset_api.rs index 6e611aa..30f0f27 100644 --- a/consumer/src/asset_api.rs +++ b/consumer/src/asset_api.rs @@ -76,7 +76,7 @@ pub struct Asset { pub struct AssetSupply { pub print_max_supply: u32, pub print_current_supply: u32, - pub edition_nonce: u64, + pub edition_nonce: Option, } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] diff --git a/consumer/src/backend.rs b/consumer/src/backend.rs index 08d32a4..cbd656d 100644 --- a/consumer/src/backend.rs +++ b/consumer/src/backend.rs @@ -1,7 +1,7 @@ use holaplex_hub_nfts_solana_core::proto::{ MetaplexMasterEditionTransaction, SolanaPendingTransaction, TransferMetaplexAssetTransaction, }; -use holaplex_hub_nfts_solana_entity::{collection_mints, collections}; +use holaplex_hub_nfts_solana_entity::collections; use hub_core::prelude::*; use solana_program::pubkey::Pubkey; @@ -43,6 +43,11 @@ pub struct MintCompressedMintV1Addresses { pub leaf_owner: Pubkey, } +pub struct TransferCompressedMintV1Addresses { + pub owner: Pubkey, + pub recipient: Pubkey, +} + #[derive(Clone)] pub struct UpdateMasterEditionAddresses { pub metadata: Pubkey, @@ -103,10 +108,10 @@ pub trait MintBackend { } #[async_trait] -pub trait TransferBackend { +pub trait TransferBackend { async fn transfer( &self, - collection_mint: &collection_mints::Model, + collection_mint: &M, txn: TransferMetaplexAssetTransaction, - ) -> Result>; + ) -> Result>; } diff --git a/consumer/src/events.rs b/consumer/src/events.rs index 8d7fe61..8034c39 100644 --- a/consumer/src/events.rs +++ b/consumer/src/events.rs @@ -10,7 +10,7 @@ use holaplex_hub_nfts_solana_core::{ SolanaFailedTransaction, SolanaNftEventKey, SolanaNftEvents, SolanaPendingTransaction, SolanaTransactionFailureReason, TransferMetaplexAssetTransaction, }, - sea_orm::{DbErr, Set}, + sea_orm::{DatabaseConnection, DbErr, Set}, Collection, CollectionMint, CompressionLeaf, Services, }; use holaplex_hub_nfts_solana_entity::{collection_mints, collections, compression_leafs}; @@ -117,7 +117,7 @@ impl EventKind { Self::CreateDrop => "drop creation", Self::MintDrop => "drop mint", Self::UpdateDrop => "drop update", - Self::TransferAsset => "drop asset transfer", + Self::TransferAsset => "asset transfer", Self::RetryCreateDrop => "drop creation retry", Self::RetryMintDrop => "drop mint retry", Self::CreateCollection => "collection creation", @@ -150,7 +150,7 @@ impl EventKind { async fn into_success( self, - db: &db::Connection, + conn: &DatabaseConnection, solana: &Solana, key: &SolanaNftEventKey, signature: String, @@ -160,7 +160,7 @@ impl EventKind { Ok(match self { Self::CreateDrop => { let id = id()?; - let collection = Collection::find_by_id(db, id) + let collection = Collection::find_by_id(conn, id) .await? .ok_or(ProcessorErrorKind::RecordNotFound)?; @@ -172,7 +172,7 @@ impl EventKind { Self::CreateCollection => { let id = id()?; - let collection = Collection::find_by_id(db, id) + let collection = Collection::find_by_id(conn, id) .await? .ok_or(ProcessorErrorKind::RecordNotFound)?; @@ -184,7 +184,7 @@ impl EventKind { Self::RetryCreateCollection => { let id = id()?; - let collection = Collection::find_by_id(db, id) + let collection = Collection::find_by_id(conn, id) .await? .ok_or(ProcessorErrorKind::RecordNotFound)?; @@ -200,9 +200,9 @@ impl EventKind { }, Self::MintToCollection => { let id = id()?; - let collection_mint = CollectionMint::find_by_id(db, id).await?; + let collection_mint = CollectionMint::find_by_id(conn, id).await?; - let compression_leafs = CompressionLeaf::find_by_id(db, id).await?; + let compression_leafs = CompressionLeaf::find_by_id(conn, id).await?; let address = if let Some(compression_leaf) = compression_leafs { let signature = Signature::from_str(&signature)?; @@ -220,7 +220,7 @@ impl EventKind { compression_leaf.asset_id = Set(Some(asset_id.clone())); - CompressionLeaf::update(db, compression_leaf).await?; + CompressionLeaf::update(conn, compression_leaf).await?; asset_id } else { @@ -236,7 +236,7 @@ impl EventKind { }, Self::MintDrop => { let id = id()?; - let collection_mint = CollectionMint::find_by_id(db, id) + let collection_mint = CollectionMint::find_by_id(conn, id) .await? .ok_or(ProcessorErrorKind::RecordNotFound)?; @@ -255,7 +255,7 @@ impl EventKind { }, Self::RetryCreateDrop => { let id = id()?; - let collection = Collection::find_by_id(db, id) + let collection = Collection::find_by_id(conn, id) .await? .ok_or(ProcessorErrorKind::RecordNotFound)?; @@ -266,7 +266,7 @@ impl EventKind { }, Self::RetryMintDrop => { let id = id()?; - let collection_mint = CollectionMint::find_by_id(db, id) + let collection_mint = CollectionMint::find_by_id(conn, id) .await? .ok_or(ProcessorErrorKind::RecordNotFound)?; @@ -277,7 +277,7 @@ impl EventKind { }, Self::RetryMintToCollection => { let id = id()?; - let collection_mint = CollectionMint::find_by_id(db, id) + let collection_mint = CollectionMint::find_by_id(conn, id) .await? .ok_or(ProcessorErrorKind::RecordNotFound)?; @@ -387,7 +387,7 @@ impl Processor { self.process_nft( EventKind::TransferAsset, &key, - self.transfer_asset(&UncompressedRef(self.solana()), &key, payload), + self.transfer_asset(&key, payload), ) .await }, @@ -562,10 +562,11 @@ impl Processor { key: &SolanaNftEventKey, sig: String, ) -> ProcessResult<()> { + let conn = self.db.get(); self.producer .send( Some(&SolanaNftEvents { - event: Some(kind.into_success(&self.db, self.solana(), key, sig).await?), + event: Some(kind.into_success(conn, self.solana(), key, sig).await?), }), Some(key), ) @@ -598,6 +599,7 @@ impl Processor { key: &SolanaNftEventKey, payload: MetaplexMasterEditionTransaction, ) -> ProcessResult { + let conn = self.db.get(); let tx = backend .create(payload.clone()) .map_err(ProcessorErrorKind::Solana)?; @@ -623,7 +625,7 @@ impl Processor { created_at: Utc::now().naive_utc(), }; - Collection::create(&self.db, collection.into()).await?; + Collection::create(conn, collection.into()).await?; Ok(tx.into()) } @@ -633,9 +635,10 @@ impl Processor { key: &SolanaNftEventKey, payload: MintMetaplexMetadataTransaction, ) -> ProcessResult { + let conn = self.db.get(); let id = Uuid::parse_str(&key.id.clone())?; let collection_id = Uuid::parse_str(&payload.collection_id)?; - let collection = Collection::find_by_id(&self.db, collection_id) + let collection = Collection::find_by_id(conn, collection_id) .await? .ok_or(ProcessorErrorKind::RecordNotFound)?; @@ -657,7 +660,7 @@ impl Processor { ..Default::default() }; - CompressionLeaf::create(&self.db, compression_leaf).await?; + CompressionLeaf::create(conn, compression_leaf).await?; return Ok(tx.into()); } @@ -677,7 +680,7 @@ impl Processor { associated_token_account: tx.addresses.associated_token_account.to_string(), }; - CollectionMint::create(&self.db, collection_mint).await?; + CollectionMint::create(conn, collection_mint).await?; Ok(tx.into()) } @@ -688,9 +691,10 @@ impl Processor { key: &SolanaNftEventKey, payload: MintMetaplexEditionTransaction, ) -> ProcessResult { + let conn = self.db.get(); let id = Uuid::parse_str(&key.id.clone())?; let collection_id = Uuid::parse_str(&payload.collection_id)?; - let collection = Collection::find_by_id(&self.db, collection_id) + let collection = Collection::find_by_id(conn, collection_id) .await? .ok_or(ProcessorErrorKind::RecordNotFound)?; @@ -707,7 +711,7 @@ impl Processor { created_at: Utc::now().naive_utc(), }; - CollectionMint::create(&self.db, collection_mint).await?; + CollectionMint::create(conn, collection_mint).await?; Ok(tx.into()) } @@ -718,8 +722,9 @@ impl Processor { key: &SolanaNftEventKey, payload: MetaplexMasterEditionTransaction, ) -> ProcessResult { + let conn = self.db.get(); let collection_id = Uuid::parse_str(&key.id.clone())?; - let collection = Collection::find_by_id(&self.db, collection_id) + let collection = Collection::find_by_id(conn, collection_id) .await? .ok_or(ProcessorErrorKind::RecordNotFound)?; @@ -730,19 +735,34 @@ impl Processor { Ok(tx.into()) } - async fn transfer_asset( + async fn transfer_asset( &self, - backend: &B, _key: &SolanaNftEventKey, payload: TransferMetaplexAssetTransaction, ) -> ProcessResult { + let conn = self.db.get(); let collection_mint_id = Uuid::parse_str(&payload.collection_mint_id.clone())?; - let collection_mint = CollectionMint::find_by_id(&self.db, collection_mint_id) + let collection_mint = CollectionMint::find_by_id(conn, collection_mint_id).await?; + + if let Some(collection_mint) = collection_mint { + let backend = &UncompressedRef(self.solana()); + + let tx = backend + .transfer(&collection_mint, payload) + .await + .map_err(ProcessorErrorKind::Solana)?; + + return Ok(tx.into()); + } + + let compression_leaf = CompressionLeaf::find_by_id(conn, collection_mint_id) .await? .ok_or(ProcessorErrorKind::RecordNotFound)?; + let backend = &CompressedRef(self.solana()); + let tx = backend - .transfer(&collection_mint, payload) + .transfer(&compression_leaf, payload) .await .map_err(ProcessorErrorKind::Solana)?; @@ -755,6 +775,7 @@ impl Processor { key: &SolanaNftEventKey, payload: MetaplexMasterEditionTransaction, ) -> ProcessResult { + let conn = self.db.get(); let tx = backend .create(payload.clone()) .map_err(ProcessorErrorKind::Solana)?; @@ -769,7 +790,7 @@ impl Processor { } = tx.addresses; let collection_id = Uuid::parse_str(&key.id.clone())?; - let collection = Collection::find_by_id(&self.db, collection_id) + let collection = Collection::find_by_id(conn, collection_id) .await? .ok_or(ProcessorErrorKind::RecordNotFound)?; @@ -782,7 +803,7 @@ impl Processor { collection.update_authority = Set(update_authority.to_string()); collection.owner = Set(owner.to_string()); - Collection::update(&self.db, collection).await?; + Collection::update(conn, collection).await?; Ok(tx.into()) } @@ -795,12 +816,12 @@ impl Processor { key: &SolanaNftEventKey, payload: MintMetaplexEditionTransaction, ) -> ProcessResult { + let conn = self.db.get(); let id = Uuid::parse_str(&key.id.clone())?; - let (collection_mint, collection) = - CollectionMint::find_by_id_with_collection(&self.db, id) - .await? - .ok_or(ProcessorErrorKind::RecordNotFound)?; + let (collection_mint, collection) = CollectionMint::find_by_id_with_collection(conn, id) + .await? + .ok_or(ProcessorErrorKind::RecordNotFound)?; let collection = collection.ok_or(ProcessorErrorKind::RecordNotFound)?; @@ -821,7 +842,7 @@ impl Processor { collection_mint.owner = Set(recipient.to_string()); collection_mint.associated_token_account = Set(associated_token_account.to_string()); - CollectionMint::update(&self.db, collection_mint).await?; + CollectionMint::update(conn, collection_mint).await?; Ok(tx.into()) } @@ -834,12 +855,12 @@ impl Processor { key: &SolanaNftEventKey, payload: MintMetaplexMetadataTransaction, ) -> ProcessResult { + let conn = self.db.get(); let id = Uuid::parse_str(&key.id.clone())?; - let (collection_mint, collection) = - CollectionMint::find_by_id_with_collection(&self.db, id) - .await? - .ok_or(ProcessorErrorKind::RecordNotFound)?; + let (collection_mint, collection) = CollectionMint::find_by_id_with_collection(conn, id) + .await? + .ok_or(ProcessorErrorKind::RecordNotFound)?; let collection = collection.ok_or(ProcessorErrorKind::RecordNotFound)?; @@ -860,7 +881,7 @@ impl Processor { collection_mint.owner = Set(recipient.to_string()); collection_mint.associated_token_account = Set(associated_token_account.to_string()); - CollectionMint::update(&self.db, collection_mint).await?; + CollectionMint::update(conn, collection_mint).await?; Ok(tx.into()) } diff --git a/consumer/src/import.rs b/consumer/src/import.rs index c71fbd6..469e13c 100644 --- a/consumer/src/import.rs +++ b/consumer/src/import.rs @@ -53,7 +53,7 @@ impl Processor { _ => Ok(None), } }, - _ => Ok(None), + Services::Treasury(..) => Ok(None), } } @@ -69,20 +69,20 @@ impl Processor { const MAX_LIMIT: u64 = 1000; let rpc = &self.solana.0.asset_rpc(); - let db = &self.db; + let conn = self.db.get(); let mut page = 1; let collection = rpc.get_asset(&mint_address).await?; - let collection_model = Collection::find_by_id(db, id.parse()?).await?; + let collection_model = Collection::find_by_id(conn, id.parse()?).await?; if let Some(collection_model) = collection_model { info!( "Deleting already indexed collection: {:?}", collection_model.id ); - collection_model.delete(db.get()).await?; + collection_model.delete(conn).await?; } info!("Importing collection: {:?}", collection.id.to_string()); @@ -126,9 +126,7 @@ impl Processor { } } - CollectionMints::insert_many(mints) - .exec(self.db.get()) - .await?; + CollectionMints::insert_many(mints).exec(conn).await?; if result.total < MAX_LIMIT { break; @@ -145,7 +143,7 @@ impl Processor { user_id: String, collection: Asset, ) -> Result { - let db = &self.db; + let conn = self.db.get(); let producer = &self.producer; let owner = collection.ownership.owner.try_into()?; let mint = collection.id.try_into()?; @@ -188,7 +186,7 @@ impl Processor { let (metadata_pubkey, _) = find_metadata_account(&mint); let (master_edition, _) = find_master_edition_account(&mint); - let collection_model = Collection::create(db, collections::ActiveModel { + let collection_model = Collection::create(conn, collections::ActiveModel { master_edition: Set(master_edition.to_string()), update_authority: Set(update_authority.to_string()), associated_token_account: Set(ata.to_string()), diff --git a/consumer/src/solana.rs b/consumer/src/solana.rs index 2b8654a..d15231e 100644 --- a/consumer/src/solana.rs +++ b/consumer/src/solana.rs @@ -4,7 +4,7 @@ use holaplex_hub_nfts_solana_core::proto::{ MetaplexMetadata, MintMetaplexEditionTransaction, MintMetaplexMetadataTransaction, TransferMetaplexAssetTransaction, }; -use holaplex_hub_nfts_solana_entity::{collection_mints, collections}; +use holaplex_hub_nfts_solana_entity::{collection_mints, collections, compression_leafs}; use hub_core::{anyhow::Result, clap, prelude::*, thiserror, uuid::Uuid}; use mpl_bubblegum::state::metaplex_adapter::{ Collection, Creator as BubblegumCreator, TokenProgramVersion, @@ -13,7 +13,7 @@ use mpl_token_metadata::{ instruction::{mint_new_edition_from_master_edition_via_token, update_metadata_accounts_v2}, state::{Creator, DataV2, EDITION, PREFIX}, }; -use solana_client::rpc_client::RpcClient; +use solana_client::rpc_client::RpcClient as SolanaRpcClient; use solana_program::{ instruction::Instruction, program_pack::Pack, pubkey::Pubkey, system_instruction::create_account, system_program, @@ -37,11 +37,11 @@ use spl_token::{ }; use crate::{ - asset_api::RpcClient as _, + asset_api::RpcClient, backend::{ CollectionBackend, MasterEditionAddresses, MintBackend, MintCompressedMintV1Addresses, MintEditionAddresses, MintMetaplexAddresses, TransactionResponse, TransferAssetAddresses, - TransferBackend, UpdateMasterEditionAddresses, + TransferBackend, TransferCompressedMintV1Addresses, UpdateMasterEditionAddresses, }, }; @@ -114,11 +114,13 @@ pub enum SolanaAssetIdError { Base58(#[from] bs58::decode::Error), #[error("Borsh deserialization error")] BorshDeserialize(#[from] std::io::Error), + #[error("Asset id not found")] + NotFound, } #[derive(Clone)] pub struct Solana { - rpc_client: Arc, + rpc_client: Arc, treasury_wallet_address: Pubkey, bubblegum_tree_authority: Pubkey, bubblegum_merkle_tree: Pubkey, @@ -135,7 +137,7 @@ impl Solana { tree_authority, merkle_tree, } = args; - let rpc_client = Arc::new(RpcClient::new(solana_endpoint)); + let rpc_client = Arc::new(SolanaRpcClient::new(solana_endpoint)); let (bubblegum_cpi_address, _) = Pubkey::find_program_address( &[mpl_bubblegum::state::COLLECTION_CPI_PREFIX.as_bytes()], @@ -160,7 +162,7 @@ impl Solana { } #[must_use] - pub fn rpc(&self) -> Arc { + pub fn rpc(&self) -> Arc { self.rpc_client.clone() } @@ -581,7 +583,7 @@ impl<'a> MintBackend for E } #[async_trait] -impl<'a> TransferBackend for UncompressedRef<'a> { +impl<'a> TransferBackend for UncompressedRef<'a> { async fn transfer( &self, collection_mint: &collection_mints::Model, @@ -645,50 +647,81 @@ impl<'a> TransferBackend for UncompressedRef<'a> { } #[async_trait] -impl<'a> TransferBackend for CompressedRef<'a> { +impl<'a> TransferBackend + for CompressedRef<'a> +{ async fn transfer( &self, - collection_mint: &collection_mints::Model, + compression_leaf: &compression_leafs::Model, txn: TransferMetaplexAssetTransaction, - ) -> hub_core::prelude::Result> { + ) -> hub_core::prelude::Result> { let TransferMetaplexAssetTransaction { recipient_address, owner_address, - collection_mint_id, .. } = txn; let payer = self.0.treasury_wallet_address; let recipient = recipient_address.parse()?; let owner = owner_address.parse()?; - let asset_id = todo!("wait where's the asset address"); - let asset = self - .0 - .asset_rpc_client - .get_asset(asset_id) + let asset_api = &self.0.asset_rpc(); + + let tree_authority_address = Pubkey::from_str(&compression_leaf.tree_authority)?; + let merkle_tree_address = Pubkey::from_str(&compression_leaf.merkle_tree)?; + + let asset_id = compression_leaf + .asset_id + .clone() + .ok_or(SolanaAssetIdError::NotFound)?; + let asset = asset_api + .get_asset(&asset_id) .await - .context("Error getting asset data")?; + .context("fetching asset from DAA")?; + let asset_proof = asset_api + .get_asset_proof(&asset_id) + .await + .context("fetching asset proof from DAA")?; + + let root: Vec = asset_proof.root.into(); + let data_hash: Vec = asset.compression.data_hash.context("no data hash")?.into(); + let creator_hash: Vec = asset + .compression + .creator_hash + .context("no creator hash")? + .into(); + let leaf_id = asset.compression.leaf_id; + let proofs = asset_proof + .proof + .into_iter() + .map(|proof| Ok(AccountMeta::new_readonly(proof.try_into()?, false))) + .collect::>>()?; + + let mut accounts = vec![ + AccountMeta::new(tree_authority_address, false), + AccountMeta::new_readonly(owner, true), + AccountMeta::new_readonly(owner, false), + AccountMeta::new_readonly(recipient, false), + AccountMeta::new(merkle_tree_address, false), + AccountMeta::new_readonly(spl_noop::ID, false), + AccountMeta::new_readonly(spl_account_compression::ID, false), + AccountMeta::new_readonly(system_program::ID, false), + ]; + + accounts.extend(proofs); let instructions = [Instruction { program_id: mpl_bubblegum::ID, - accounts: [ - AccountMeta::new(self.0.bubblegum_tree_authority, false), - AccountMeta::new_readonly(owner, true), - AccountMeta::new_readonly(owner, false), - AccountMeta::new_readonly(recipient, false), - AccountMeta::new(self.0.bubblegum_merkle_tree, false), - AccountMeta::new_readonly(spl_noop::ID, false), - AccountMeta::new_readonly(spl_account_compression::ID, false), - AccountMeta::new_readonly(system_program::ID, false), - ] - .into_iter() - .collect(), + accounts, data: mpl_bubblegum::instruction::Transfer { - root: todo!("how does DAA work"), - data_hash: todo!("how does DAA work"), - creator_hash: todo!("how does DAA work"), - nonce: todo!("how does DAA work"), - index: todo!("how does DAA work"), + root: root.try_into().map_err(|_| anyhow!("Invalid root hash"))?, + data_hash: data_hash + .try_into() + .map_err(|_| anyhow!("Invalid data hash"))?, + creator_hash: creator_hash + .try_into() + .map_err(|_| anyhow!("Invalid creator hash"))?, + nonce: leaf_id.into(), + index: leaf_id, } .data(), }]; @@ -703,12 +736,7 @@ impl<'a> TransferBackend for CompressedRef<'a> { Ok(TransactionResponse { serialized_message, signatures_or_signers_public_keys: vec![payer.to_string(), owner.to_string()], - addresses: TransferAssetAddresses { - owner, - recipient, - recipient_associated_token_account: todo!("what"), - owner_associated_token_account: todo!("what"), - }, + addresses: TransferCompressedMintV1Addresses { owner, recipient }, }) } } diff --git a/core/src/collection_mints.rs b/core/src/collection_mints.rs index a7deac3..d10fa4a 100644 --- a/core/src/collection_mints.rs +++ b/core/src/collection_mints.rs @@ -1,45 +1,38 @@ use holaplex_hub_nfts_solana_entity::{ - collection_mints::{ActiveModel, Column, Entity, Model, Relation}, + collection_mints::{ActiveModel, Column, Entity, Model}, collections, }; -use sea_orm::{prelude::*, JoinType, QuerySelect, Set}; - -use crate::db::Connection; +use sea_orm::{prelude::*, Set}; pub struct CollectionMint; impl CollectionMint { - pub async fn create(db: &Connection, model: Model) -> Result { - let conn = db.get(); - + pub async fn create(conn: &DatabaseConnection, model: Model) -> Result { let active_model: ActiveModel = model.into(); active_model.insert(conn).await } - pub async fn find_by_id(db: &Connection, id: Uuid) -> Result, DbErr> { - let conn = db.get(); - + pub async fn find_by_id(conn: &DatabaseConnection, id: Uuid) -> Result, DbErr> { Entity::find().filter(Column::Id.eq(id)).one(conn).await } pub async fn update_owner_and_ata( - db: &Connection, + conn: &DatabaseConnection, model: &Model, owner: String, ata: String, ) -> Result { - let conn = db.get(); - let mut active_model: ActiveModel = model.clone().into(); active_model.owner = Set(owner); active_model.associated_token_account = Set(ata); active_model.update(conn).await } - pub async fn find_by_ata(db: &Connection, ata: String) -> Result, DbErr> { - let conn = db.get(); - + pub async fn find_by_ata( + conn: &DatabaseConnection, + ata: String, + ) -> Result, DbErr> { Entity::find() .filter(Column::AssociatedTokenAccount.eq(ata)) .one(conn) @@ -47,11 +40,9 @@ impl CollectionMint { } pub async fn find_by_id_with_collection( - db: &Connection, + conn: &DatabaseConnection, id: Uuid, ) -> Result)>, DbErr> { - let conn = db.get(); - Entity::find() .find_also_related(collections::Entity) .filter(Column::Id.eq(id)) @@ -59,9 +50,7 @@ impl CollectionMint { .await } - pub async fn update(db: &Connection, model: ActiveModel) -> Result { - let conn = db.get(); - + pub async fn update(conn: &DatabaseConnection, model: ActiveModel) -> Result { model.update(conn).await } } diff --git a/core/src/collections.rs b/core/src/collections.rs index 3c629b6..e510421 100644 --- a/core/src/collections.rs +++ b/core/src/collections.rs @@ -1,32 +1,25 @@ use holaplex_hub_nfts_solana_entity::collections::{ActiveModel, Column, Entity, Model}; use sea_orm::prelude::*; -use crate::db::Connection; - pub struct Collection; impl Collection { - pub async fn create(db: &Connection, am: ActiveModel) -> Result { - let conn = db.get(); - + pub async fn create(conn: &DatabaseConnection, am: ActiveModel) -> Result { am.insert(conn).await } - pub async fn find_by_id(db: &Connection, id: Uuid) -> Result, DbErr> { - let conn = db.get(); - + pub async fn find_by_id(conn: &DatabaseConnection, id: Uuid) -> Result, DbErr> { Entity::find().filter(Column::Id.eq(id)).one(conn).await } - pub async fn find_by_mint(db: &Connection, mint: String) -> Result, DbErr> { - let conn = db.get(); - + pub async fn find_by_mint( + conn: &DatabaseConnection, + mint: String, + ) -> Result, DbErr> { Entity::find().filter(Column::Mint.eq(mint)).one(conn).await } - pub async fn update(db: &Connection, model: ActiveModel) -> Result { - let conn = db.get(); - + pub async fn update(conn: &DatabaseConnection, model: ActiveModel) -> Result { model.update(conn).await } } diff --git a/core/src/compression_leafs.rs b/core/src/compression_leafs.rs index 5637aff..22b7399 100644 --- a/core/src/compression_leafs.rs +++ b/core/src/compression_leafs.rs @@ -1,28 +1,30 @@ use holaplex_hub_nfts_solana_entity::compression_leafs::{ActiveModel, Column, Entity, Model}; use sea_orm::prelude::*; -use crate::db::Connection; - pub struct CompressionLeaf; impl CompressionLeaf { - pub async fn create(db: &Connection, model: Model) -> Result { - let conn = db.get(); - + pub async fn create(conn: &DatabaseConnection, model: Model) -> Result { let active_model: ActiveModel = model.into(); active_model.insert(conn).await } - pub async fn find_by_id(db: &Connection, id: Uuid) -> Result, DbErr> { - let conn = db.get(); - + pub async fn find_by_id(conn: &DatabaseConnection, id: Uuid) -> Result, DbErr> { Entity::find().filter(Column::Id.eq(id)).one(conn).await } - pub async fn update(db: &Connection, model: ActiveModel) -> Result { - let conn = db.get(); + pub async fn find_by_asset_id( + conn: &DatabaseConnection, + address: String, + ) -> Result, DbErr> { + Entity::find() + .filter(Column::AssetId.eq(address)) + .one(conn) + .await + } + pub async fn update(conn: &DatabaseConnection, model: ActiveModel) -> Result { model.update(conn).await } } diff --git a/indexer/Cargo.toml b/indexer/Cargo.toml index b53e132..f0727fe 100644 --- a/indexer/Cargo.toml +++ b/indexer/Cargo.toml @@ -21,7 +21,9 @@ bs58 = "0.5.0" futures = "0.3.24" hex = "0.4.3" solana-sdk = "1.14" +mpl-bubblegum = "0.7.0" solana-program = "1.14" +anchor-lang = "0.26.0" yellowstone-grpc-client = { git = "https://github.com/rpcpool/yellowstone-grpc", tag = "v1.0.0+solana.1.16.1" } yellowstone-grpc-proto = { git = "https://github.com/rpcpool/yellowstone-grpc", tag = "v1.0.0+solana.1.16.1" } dashmap = "5.4.0" diff --git a/indexer/src/connector.rs b/indexer/src/connector.rs index e6d2fb3..423cd95 100644 --- a/indexer/src/connector.rs +++ b/indexer/src/connector.rs @@ -38,7 +38,7 @@ impl GeyserGrpcConnector { vote: Some(false), failed: Some(false), signature: None, - account_include: vec![spl_token::ID.to_string()], + account_include: vec![spl_token::ID.to_string(), mpl_bubblegum::ID.to_string()], account_exclude: Vec::new(), account_required: Vec::new(), }); diff --git a/indexer/src/handler.rs b/indexer/src/handler.rs index fe0291a..cdde0fc 100644 --- a/indexer/src/handler.rs +++ b/indexer/src/handler.rs @@ -1,5 +1,6 @@ use std::{convert::TryInto, sync::Arc}; +use anchor_lang::AnchorDeserialize; use backoff::ExponentialBackoff; use dashmap::DashMap; use futures::{sink::SinkExt, stream::StreamExt}; @@ -9,16 +10,21 @@ use holaplex_hub_nfts_solana_core::{ solana_nft_events::Event::UpdateMintOwner, MintOwnershipUpdate, SolanaNftEventKey, SolanaNftEvents, }, - CollectionMint, + sea_orm::Set, + CollectionMint, CompressionLeaf, }; +use holaplex_hub_nfts_solana_entity::compression_leafs; use hub_core::{prelude::*, producer::Producer, tokio::task}; +use mpl_bubblegum::utils::get_asset_id; use solana_client::rpc_client::RpcClient; use solana_program::program_pack::Pack; use solana_sdk::{pubkey::Pubkey, signature::Signature}; use spl_token::{instruction::TokenInstruction, state::Account}; use yellowstone_grpc_client::GeyserGrpcClientError; use yellowstone_grpc_proto::{ - prelude::{subscribe_update::UpdateOneof, SubscribeUpdate, SubscribeUpdateTransaction}, + prelude::{ + subscribe_update::UpdateOneof, Message, SubscribeUpdate, SubscribeUpdateTransaction, + }, tonic::Status, }; @@ -106,23 +112,105 @@ impl MessageHandler { .context("Transaction not found")? .message .context("Message not found")?; + let sig = tx + .transaction + .as_ref() + .ok_or_else(|| anyhow!("failed to get transaction"))? + .signature + .clone(); - let mut i = 0; let keys = message.clone().account_keys; for (idx, key) in message.clone().account_keys.iter().enumerate() { - let k = Pubkey::new(key); + let key: &[u8] = key; + let k = Pubkey::try_from(key)?; if k == spl_token::ID { - i = idx; - break; + self.process_spl_token_transaction(idx, &keys, &sig, &message) + .await?; + } else if k == mpl_bubblegum::ID { + self.process_mpl_bubblegum_transaction(idx, &keys, &sig, &message) + .await?; } } + Ok(()) + } + + async fn process_mpl_bubblegum_transaction( + &self, + program_account_index: usize, + keys: &[Vec], + sig: &Vec, + message: &Message, + ) -> Result<()> { for ins in message.instructions.iter() { let account_indices = ins.accounts.clone(); let program_idx: usize = ins.program_id_index.try_into()?; - if program_idx == i { + if program_idx == program_account_index { + let conn = self.db.get(); + let data = ins.data.clone(); + let data = data.as_slice(); + + let tkn_instruction = + mpl_bubblegum::instruction::Transfer::try_from_slice(&data[8..])?; + let new_leaf_owner_account_index = account_indices[3]; + let merkle_tree_account_index = account_indices[4]; + let new_leaf_owner_bytes: &[u8] = &keys[new_leaf_owner_account_index as usize]; + let merkle_tree_bytes: &[u8] = &keys[merkle_tree_account_index as usize]; + let new_leaf_owner = Pubkey::try_from(new_leaf_owner_bytes)?; + let merkle_tree = Pubkey::try_from(merkle_tree_bytes)?; + + let asset_id = get_asset_id(&merkle_tree, tkn_instruction.nonce); + + let compression_leaf = + CompressionLeaf::find_by_asset_id(conn, asset_id.to_string()) + .await? + .context("compression leaf not found")?; + + let collection_mint_id = compression_leaf.id; + let leaf_owner = compression_leaf.leaf_owner.clone(); + let mut compression_leaf: compression_leafs::ActiveModel = compression_leaf.into(); + + compression_leaf.leaf_owner = Set(new_leaf_owner.to_string()); + + CompressionLeaf::update(conn, compression_leaf).await?; + + self.producer + .send( + Some(&SolanaNftEvents { + event: Some(UpdateMintOwner(MintOwnershipUpdate { + mint_address: asset_id.to_string(), + sender: leaf_owner, + recipient: new_leaf_owner.to_string(), + tx_signature: Signature::new(sig.as_slice()).to_string(), + })), + }), + Some(&SolanaNftEventKey { + id: collection_mint_id.to_string(), + ..Default::default() + }), + ) + .await?; + } + } + + Ok(()) + } + + async fn process_spl_token_transaction( + &self, + program_account_index: usize, + keys: &[Vec], + sig: &Vec, + message: &Message, + ) -> Result<()> { + let conn = self.db.get(); + for ins in message.instructions.iter() { + let account_indices = ins.accounts.clone(); + let program_idx: usize = ins.program_id_index.try_into()?; + + if program_idx == program_account_index { let data = ins.data.clone(); let data = data.as_slice(); let tkn_instruction = spl_token::instruction::TokenInstruction::unpack(data)?; @@ -138,27 +226,20 @@ impl MessageHandler { } if let Some((1, destination_ata_index)) = transfer_info { - let sig = tx - .transaction - .as_ref() - .ok_or_else(|| anyhow!("failed to get transaction"))? - .signature - .clone(); - let source_account_index = account_indices[0]; - let source_bytes = &keys[source_account_index as usize]; - let source = Pubkey::new(source_bytes); + let source_bytes: &[u8] = &keys[source_account_index as usize]; + let source = Pubkey::try_from(source_bytes)?; let collection_mint = - CollectionMint::find_by_ata(&self.db, source.to_string()).await?; + CollectionMint::find_by_ata(conn, source.to_string()).await?; if collection_mint.is_none() { return Ok(()); } let destination_account_index = account_indices[destination_ata_index]; - let destination_bytes = &keys[destination_account_index as usize]; - let destination = Pubkey::new(destination_bytes); + let destination_bytes: &[u8] = &keys[destination_account_index as usize]; + let destination = Pubkey::try_from(destination_bytes)?; let acct = fetch_account(&self.rpc, &destination).await?; let destination_tkn_act = Account::unpack(&acct.data)?; @@ -166,7 +247,7 @@ impl MessageHandler { let mint = collection_mint.context("No mint found")?; CollectionMint::update_owner_and_ata( - &self.db, + conn, &mint, new_owner.clone(), destination.to_string(),