Skip to content

Commit

Permalink
Update hub-core.
Browse files Browse the repository at this point in the history
  • Loading branch information
ray-kast committed Aug 17, 2023
1 parent 3928cb1 commit a39c0d3
Show file tree
Hide file tree
Showing 11 changed files with 340 additions and 386 deletions.
509 changes: 212 additions & 297 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 4 additions & 6 deletions consumer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,12 @@ mpl-bubblegum = "0.7.0"
mpl-token-metadata = "1.8.3"
holaplex-hub-nfts-solana-core = { path = "../core" }
holaplex-hub-nfts-solana-entity = { path = "../entity" }
jsonrpsee = { version = "0.18.2", features = ["macros", "http-client"] }
bs58 = "0.5.0"
rand = "0.8.5"
tokio-retry = "0.3.0"
jsonrpsee = { version = "0.19.0", features = ["macros", "http-client"] }
rand = "0.8.5"

[dependencies.hub-core]
package = "holaplex-hub-core"
version = "0.2.0"
version = "0.5.0"
git = "https://github.com/holaplex/hub-core"
branch = "stable"
features = ["kafka"]
features = ["jsonrpsee-core", "kafka", "sea-orm", "solana"]
6 changes: 4 additions & 2 deletions consumer/src/asset_api.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use core::fmt;
use std::collections::HashMap;
use std::{collections::HashMap, fmt};

use hub_core::bs58;
use solana_program::pubkey::Pubkey;

mod b58 {
use hub_core::bs58;
use serde::{de::Visitor, Deserializer, Serializer};

pub fn serialize<S: Serializer>(bytes: &[u8], ser: S) -> Result<S::Ok, S::Error> {
Expand Down
6 changes: 4 additions & 2 deletions consumer/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,16 @@ use crate::{
solana::{CompressedRef, EditionRef, Solana, SolanaAssetIdError, UncompressedRef},
};

#[derive(Debug, thiserror::Error)]
#[derive(Debug, thiserror::Error, Triage)]
pub enum ProcessorErrorKind {
#[error("Associated record not found in database")]
#[transient]
RecordNotFound,
#[error("Transaction status not found in treasury event payload")]
TransactionStatusNotFound,

#[error("Error processing Solana operation")]
#[transient]
Solana(#[source] Error),
#[error("Error sending message")]
SendError(#[from] SendError),
Expand All @@ -59,7 +61,7 @@ pub enum ProcessorErrorKind {
AssetId(#[from] SolanaAssetIdError),
}

#[derive(Debug, thiserror::Error)]
#[derive(Debug, thiserror::Error, Triage)]
#[error("Error handling {} of {}", src.name(), evt.name())]
pub struct ProcessorError {
#[source]
Expand Down
103 changes: 72 additions & 31 deletions consumer/src/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,22 @@ use holaplex_hub_nfts_solana_core::{
CollectionImport, File, Metadata, SolanaCollectionPayload, SolanaCreator,
SolanaMintPayload, SolanaNftEventKey, SolanaNftEvents,
},
sea_orm::{EntityTrait, ModelTrait, Set},
sea_orm::{DbErr, EntityTrait, ModelTrait, Set},
Collection, Services,
};
use holaplex_hub_nfts_solana_entity::{collection_mints, collections, prelude::CollectionMints};
use hub_core::{
chrono::Utc, futures_util::stream, prelude::*, producer::Producer, reqwest, util::DebugShim,
uuid::Uuid,
backon::{ExponentialBuilder, Retryable},
chrono::Utc,
futures_util::stream,
prelude::*,
producer::{Producer, SendError},
reqwest, thiserror,
util::DebugShim,
uuid::{self, Uuid},
};
use mpl_token_metadata::pda::{find_master_edition_account, find_metadata_account};
use spl_associated_token_account::get_associated_token_address;
use tokio_retry::{strategy::ExponentialBackoff, Retry};

use crate::{
asset_api::{self, Asset, RpcClient},
Expand All @@ -24,6 +29,29 @@ use crate::{

const CONCURRENT_REQUESTS: usize = 64;

#[derive(Debug, thiserror::Error, Triage)]
pub enum ProcessorError {
#[error("Missing update authority (index 0) on asset")]
#[transient]
MissingUpdateAuthority,

#[error("Error fetching metadata JSON")]
JsonFetch(#[source] reqwest::Error),
#[error("JSONRPC error")]
JsonRpc(#[from] jsonrpsee::core::Error),
#[error("Invalid UUID")]
InvalidUuid(#[from] uuid::Error),
#[error("Invalid conversion from byte slice to public key")]
#[permanent]
InvalidPubkey(#[source] std::array::TryFromSliceError),
#[error("Database error")]
DbError(#[from] DbErr),
#[error("Error sending message")]
SendError(#[from] SendError),
}

type Result<T> = std::result::Result<T, ProcessorError>;

// TODO: could this just be a newtype over events::Processor?
#[derive(Debug, Clone)]
pub struct Processor {
Expand Down Expand Up @@ -119,11 +147,8 @@ impl Processor {
}

let mut buffered = stream::iter(futures).buffer_unordered(CONCURRENT_REQUESTS);
while let Some(result) = buffered.next().await {
match result {
Ok(model) => mints.push(model),
Err(e) => bail!("Error: {}", e),
}
while let Some(model) = buffered.next().await {
mints.push(model?);
}

CollectionMints::insert_many(mints).exec(conn).await?;
Expand All @@ -145,8 +170,15 @@ impl Processor {
) -> Result<collections::Model> {
let conn = self.db.get();
let producer = &self.producer;
let owner = collection.ownership.owner.try_into()?;
let mint = collection.id.try_into()?;
let owner = collection
.ownership
.owner
.try_into()
.map_err(ProcessorError::InvalidPubkey)?;
let mint = collection
.id
.try_into()
.map_err(ProcessorError::InvalidPubkey)?;
let seller_fee_basis_points = collection.royalty.basis_points;

let json_uri = collection.content.json_uri.clone();
Expand Down Expand Up @@ -179,22 +211,25 @@ impl Processor {
let update_authority = &collection
.authorities
.get(0)
.context("Invalid index")?
.ok_or(ProcessorError::MissingUpdateAuthority)?
.address;

let ata = get_associated_token_address(&owner, &mint);
let (metadata_pubkey, _) = find_metadata_account(&mint);

let (master_edition, _) = find_master_edition_account(&mint);
let collection_model = Collection::create(conn, collections::ActiveModel {
master_edition: Set(master_edition.to_string()),
update_authority: Set(update_authority.to_string()),
associated_token_account: Set(ata.to_string()),
owner: Set(owner.to_string()),
mint: Set(mint.to_string()),
metadata: Set(metadata_pubkey.to_string()),
..Default::default()
})
let collection_model = Collection::create(
conn,
collections::ActiveModel {
master_edition: Set(master_edition.to_string()),
update_authority: Set(update_authority.to_string()),
associated_token_account: Set(ata.to_string()),
owner: Set(owner.to_string()),
mint: Set(mint.to_string()),
metadata: Set(metadata_pubkey.to_string()),
..Default::default()
},
)
.await?;

producer
Expand Down Expand Up @@ -231,18 +266,20 @@ impl Processor {
}

async fn get_metadata_json(uri: String) -> Result<asset_api::Metadata> {
let json_metadata_fut = || async {
let json_metadata = (|| async {
reqwest::get(uri.clone())
.await?
.json::<asset_api::Metadata>()
.await
};

let json_metadata = Retry::spawn(
ExponentialBackoff::from_millis(20).take(10),
json_metadata_fut,
})
.retry(
&ExponentialBuilder::default()
.with_jitter()
.with_min_delay(Duration::from_millis(200))
.with_max_times(10),
)
.await?;
.await
.map_err(ProcessorError::JsonFetch)?;

Ok(json_metadata)
}
Expand All @@ -255,15 +292,19 @@ impl Processor {
asset: Asset,
) -> Result<collection_mints::ActiveModel> {
let producer = self.producer.clone();
let owner = asset.ownership.owner.try_into()?;
let mint = asset.id.try_into()?;
let owner = asset
.ownership
.owner
.try_into()
.map_err(ProcessorError::InvalidPubkey)?;
let mint = asset.id.try_into().map_err(ProcessorError::InvalidPubkey)?;
let ata = get_associated_token_address(&owner, &mint);
let seller_fee_basis_points = asset.royalty.basis_points;

let update_authority = asset
.authorities
.get(0)
.context("Invalid index")?
.ok_or(ProcessorError::MissingUpdateAuthority)?
.address
.clone();

Expand Down
56 changes: 25 additions & 31 deletions consumer/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use holaplex_hub_nfts_solana::{events, import, solana::Solana, Args};
use holaplex_hub_nfts_solana_core::{db::Connection, proto::SolanaNftEvents, Services};
use hub_core::{prelude::*, tokio};
use hub_core::{prelude::*, triage};

pub fn main() {
let opts = hub_core::StartConfig {
Expand All @@ -25,36 +25,30 @@ pub fn main() {
import::Processor::new(solana.clone(), connection.clone(), producer.clone());
let event_processor = events::Processor::new(solana, connection, producer);

let mut stream = cons.stream();
loop {
let import_processor = import_processor.clone();
let event_processor = event_processor.clone();

match stream.next().await {
Some(Ok(msg)) => {
info!(?msg, "message received");

tokio::spawn(async move {
if let Some(()) = import_processor
.process(&msg)
.await
.map_err(|e| error!("Error processing import: {e:?}"))?
{
return Ok(());
}

event_processor
.process(msg)
.await
.map_err(|e| error!("Error processing event: {:?}", Error::new(e)))
});
},
None => (),
Some(Err(e)) => {
warn!("failed to get message {:?}", e);
},
}
}
cons.consume::<_, _, _, triage::BoxedSync>(
|b| {
b.with_jitter()
.with_min_delay(Duration::from_millis(500))
.with_max_delay(Duration::from_secs(90))
},
move |e| async move {
if let Some(()) = import_processor
.process(&e)
.await
.map_err(|e| Box::new(e) as triage::BoxedSync)?
{
return Ok(());
}

event_processor
.process(e)
.await
.map_err(|e| Box::new(e) as triage::BoxedSync)
},
)
.await;

Ok(())
})
});
}
8 changes: 4 additions & 4 deletions consumer/src/solana.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use holaplex_hub_nfts_solana_core::proto::{
use holaplex_hub_nfts_solana_entity::{
collection_mints, collections, compression_leafs, update_revisions,
};
use hub_core::{anyhow::Result, clap, prelude::*, thiserror, uuid::Uuid};
use hub_core::{anyhow::Result, bs58, clap, prelude::*, thiserror, uuid::Uuid};
use mpl_bubblegum::state::metaplex_adapter::{
Collection, Creator as BubblegumCreator, TokenProgramVersion,
};
Expand Down Expand Up @@ -103,13 +103,13 @@ enum SolanaErrorNotFoundMessage {
Metadata,
}

#[derive(Debug, thiserror::Error)]
#[derive(Debug, thiserror::Error, Triage)]
pub enum SolanaAssetIdError {
#[error("The transaction has no meta field")]
NoTransactionMeta,
#[error("no inner instruction found")]
#[error("No inner instruction found")]
NoInnerInstruction,
#[error("Solana rpc error")]
#[error("Solana RPC error")]
Rpc(#[from] solana_client::client_error::ClientError),
#[error("No nonce found in log messages")]
NoNonce,
Expand Down
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ mpl-bubblegum = "0.7.0"

[dependencies.hub-core]
package = "holaplex-hub-core"
version = "0.2.0"
version = "0.5.0"
git = "https://github.com/holaplex/hub-core"
branch = "stable"
features = ["kafka"]
Expand Down
2 changes: 1 addition & 1 deletion core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub mod proto {
include!(concat!(env!("OUT_DIR"), "/treasury.proto.rs"));
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum Services {
Nfts(proto::NftEventKey, proto::NftEvents),
Treasury(proto::TreasuryEventKey, proto::TreasuryEvents),
Expand Down
5 changes: 2 additions & 3 deletions indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,11 @@ keywords = ["hub", "holaplex", "web3"]
categories = ["cryptography::cryptocurrencies", "web-programming"]

[dependencies]
sea-orm = { version = "^0.10.0", features = [
sea-orm = { version = "^0.11.0", features = [
"debug-print",
"runtime-tokio-rustls",
"sqlx-postgres",
] }
bs58 = "0.5.0"
futures = "0.3.24"
hex = "0.4.3"
solana-sdk = "1.14"
Expand All @@ -34,7 +33,7 @@ backoff = { version = "0.4.0", features = ["tokio"] }

[dependencies.hub-core]
package = "holaplex-hub-core"
version = "0.2.0"
version = "0.5.0"
git = "https://github.com/holaplex/hub-core"
branch = "stable"

Expand Down
19 changes: 11 additions & 8 deletions indexer/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,17 @@ impl GeyserGrpcConnector {
slots.insert("client".to_owned(), SubscribeRequestFilterSlots {});

let mut transactions = HashMap::new();
transactions.insert("client".to_string(), SubscribeRequestFilterTransactions {
vote: Some(false),
failed: Some(false),
signature: None,
account_include: vec![spl_token::ID.to_string(), mpl_bubblegum::ID.to_string()],
account_exclude: Vec::new(),
account_required: Vec::new(),
});
transactions.insert(
"client".to_string(),
SubscribeRequestFilterTransactions {
vote: Some(false),
failed: Some(false),
signature: None,
account_include: vec![spl_token::ID.to_string(), mpl_bubblegum::ID.to_string()],
account_exclude: Vec::new(),
account_required: Vec::new(),
},
);

SubscribeRequest {
accounts: HashMap::new(),
Expand Down

0 comments on commit a39c0d3

Please sign in to comment.