Skip to content

Commit

Permalink
Merge pull request #3003 from autonomys/dsn-filter-map
Browse files Browse the repository at this point in the history
Add a RPC subscription which filters DSN mappings by object hash
  • Loading branch information
teor2345 authored Sep 4, 2024
2 parents 5c484f3 + eb25ff9 commit 129f35f
Showing 1 changed file with 75 additions and 4 deletions.
79 changes: 75 additions & 4 deletions crates/sc-consensus-subspace-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use sp_core::H256;
use sp_objects::ObjectsApi;
use sp_runtime::traits::Block as BlockT;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::marker::PhantomData;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Weak};
Expand All @@ -60,8 +60,8 @@ use subspace_archiving::archiver::NewArchivedSegment;
use subspace_core_primitives::crypto::kzg::Kzg;
use subspace_core_primitives::objects::GlobalObjectMapping;
use subspace_core_primitives::{
BlockHash, HistorySize, Piece, PieceIndex, PublicKey, SegmentHeader, SegmentIndex, SlotNumber,
Solution,
Blake3Hash, Blake3HashHex, BlockHash, HistorySize, Piece, PieceIndex, PublicKey, SegmentHeader,
SegmentIndex, SlotNumber, Solution,
};
use subspace_erasure_coding::ErasureCoding;
use subspace_farmer_components::FarmerProtocolInfo;
Expand All @@ -86,6 +86,12 @@ const REWARD_SIGNING_TIMEOUT: Duration = Duration::from_millis(500);
// TODO: make this into a CLI option, or calculate this from other CLI options
const OBJECT_MAPPING_BATCH_SIZE: usize = 10_000;

/// The maximum number of object hashes allowed in a subscription filter.
///
/// Each hash takes up 64 bytes in JSON, and 32 bytes in memory.
// TODO: make this into a CLI option, or calculate this from other CLI options
const MAX_OBJECT_HASHES_PER_SUBSCRIPTION: usize = 1000;

// TODO: More specific errors instead of `StringError`
/// Top-level error type for the RPC handler.
#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -173,7 +179,13 @@ pub trait SubspaceRpcApi {
)]
fn subscribe_archived_object_mappings(&self);

// TODO: add a method for recent/any object mappings based on a list of IDs, piece indexes, or segment indexes
/// Filtered block/transaction archived object mappings subscription
#[subscription(
name = "subspace_subscribeFilteredObjectMappings" => "subspace_filtered_object_mappings",
unsubscribe = "subspace_unsubscribeFilteredObjectMappings",
item = GlobalObjectMapping,
)]
fn subscribe_filtered_object_mappings(&self, hashes: Vec<Blake3HashHex>);
}

#[derive(Default)]
Expand Down Expand Up @@ -855,4 +867,63 @@ where
pipe_from_stream(pending, mapping_stream).boxed(),
);
}

fn subscribe_filtered_object_mappings(
&self,
pending: PendingSubscriptionSink,
hashes: Vec<Blake3HashHex>,
) {
if hashes.len() > MAX_OBJECT_HASHES_PER_SUBSCRIPTION {
error!(
"Request hash count ({}) exceed the server limit: {} ",
hashes.len(),
MAX_OBJECT_HASHES_PER_SUBSCRIPTION
);

let err_fut = pending.reject(Error::StringError(format!(
"Request hash count ({}) exceed the server limit: {} ",
hashes.len(),
MAX_OBJECT_HASHES_PER_SUBSCRIPTION
)));

self.subscription_executor.spawn(
"subspace-filtered-object-mappings-subscription",
Some("rpc"),
err_fut.boxed(),
);

return;
};

let mut hashes = HashSet::<Blake3Hash>::from_iter(hashes.into_iter().map(|hash| *hash));
let hash_count = hashes.len();

// The genesis segment isn't included in this stream, see
// `subscribe_archived_object_mappings` for details.
let mapping_stream = self
.archived_segment_notification_stream
.subscribe()
.flat_map(move |archived_segment_notification| {
let objects = archived_segment_notification
.archived_segment
.global_object_mappings()
.filter(|object| hashes.remove(&object.hash))
.collect::<Vec<_>>();

stream::iter(objects)
})
// Stop when we've returned mappings for all the hashes. Since we only yield each hash
// once, we don't need to check if hashes is empty here.
.take(hash_count)
// Typically batches will be larger than the hash limit, but we want to allow CLI
// options to change that.
.ready_chunks(OBJECT_MAPPING_BATCH_SIZE)
.map(|objects| GlobalObjectMapping::V0 { objects });

self.subscription_executor.spawn(
"subspace-filtered-object-mappings-subscription",
Some("rpc"),
pipe_from_stream(pending, mapping_stream).boxed(),
);
}
}

0 comments on commit 129f35f

Please sign in to comment.