Skip to content

Commit

Permalink
Merge branch 'dev' into old_wallet_loader_tests
Browse files Browse the repository at this point in the history
  • Loading branch information
zancas authored Aug 2, 2024
2 parents 8f4dc35 + e279a1b commit 44b66eb
Show file tree
Hide file tree
Showing 7 changed files with 256 additions and 15 deletions.
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion zingo-netutils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ use tonic::Status;
use tower::util::BoxCloneService;
use zcash_client_backend::proto::service::compact_tx_streamer_client::CompactTxStreamerClient;

type UnderlyingService = BoxCloneService<
/// TODO: add doc-comment
pub type UnderlyingService = BoxCloneService<
http::Request<UnsyncBoxBody<prost::bytes::Bytes, Status>>,
http::Response<hyper::Body>,
hyper::Error,
Expand Down
8 changes: 8 additions & 0 deletions zingo-sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,12 @@ version = "0.1.0"
edition = "2021"

[dependencies]
zingo-netutils = { path = "../zingo-netutils" }

zcash_client_backend.workspace = true
zcash_primitives.workspace = true

futures.workspace = true
tokio.workspace = true
tonic.workspace = true
tracing = "0.1.40"
49 changes: 49 additions & 0 deletions zingo-sync/src/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
//! Module for handling all connections to the server

use std::ops::Range;

use zcash_client_backend::proto::{compact_formats::CompactBlock, service::BlockId};
use zcash_primitives::consensus::BlockHeight;

use tokio::sync::{mpsc::UnboundedSender, oneshot};

pub mod fetcher;

/// Fetch requests are created and sent to the [`crate::client::fetcher::fetcher`] task when a connection to the server is required.
///
/// Each variant includes a [`tokio::sync::mpsc::oneshot::Sender`] for returning the fetched data to the requester.
pub enum FetchRequest {
ChainTip(oneshot::Sender<BlockId>),
CompactBlockRange(oneshot::Sender<Vec<CompactBlock>>, Range<BlockHeight>),
}

/// Gets the height of the blockchain from the server.
///
/// Requires [`crate::client::fetcher::fetcher`] to be running concurrently, connected via the `fetch_request` channel.
pub async fn get_chain_height(
fetch_request_sender: UnboundedSender<FetchRequest>,
) -> Result<BlockHeight, ()> {
let (sender, receiver) = oneshot::channel::<BlockId>();
fetch_request_sender
.send(FetchRequest::ChainTip(sender))
.unwrap();
let chain_tip = receiver.await.unwrap();

Ok(BlockHeight::from_u32(chain_tip.height as u32))
}
/// Gets the height of the blockchain from the server.
///
/// Requires [`crate::client::fetcher::fetcher`] to be running concurrently, connected via the `fetch_request` channel.
#[allow(dead_code)]
pub async fn get_compact_block_range(
fetch_request_sender: UnboundedSender<FetchRequest>,
block_range: Range<BlockHeight>,
) -> Result<Vec<CompactBlock>, ()> {
let (sender, receiver) = oneshot::channel::<Vec<CompactBlock>>();
fetch_request_sender
.send(FetchRequest::CompactBlockRange(sender, block_range))
.unwrap();
let compact_blocks = receiver.await.unwrap();

Ok(compact_blocks)
}
143 changes: 143 additions & 0 deletions zingo-sync/src/client/fetcher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
//! Queue and prioritise fetch requests to fetch data from the server

use std::ops::Range;

use tokio::sync::mpsc::UnboundedReceiver;

use zcash_client_backend::proto::{
compact_formats::CompactBlock,
service::{
compact_tx_streamer_client::CompactTxStreamerClient, BlockId, BlockRange, ChainSpec,
},
};
use zcash_primitives::consensus::BlockHeight;

use crate::client::FetchRequest;

/// Receives [`self::FetchRequest`]'s via an [`tokio::sync::mpsc::UnboundedReceiver`] for queueing,
/// prioritisation and fetching from the server.
/// Returns the data specified in the [`self::FetchRequest`] variant via the provided [`tokio::sync::mpsc::oneshot::Sender`].
///
/// Allows all requests to the server to be handled from a single task for efficiency and also enables
/// request prioritisation for further performance enhancement
pub async fn fetcher(
mut fetch_request_receiver: UnboundedReceiver<FetchRequest>,
mut client: CompactTxStreamerClient<zingo_netutils::UnderlyingService>,
) -> Result<(), ()> {
let mut fetch_request_queue: Vec<FetchRequest> = Vec::new();

loop {
// `fetcher` returns `Ok` here when all requests have successfully been fetched and the
// fetch_request channel is closed on sync completion.
if receive_fetch_requests(&mut fetch_request_receiver, &mut fetch_request_queue).await {
return Ok(());
}

let fetch_request = select_fetch_request(&mut fetch_request_queue);

if let Some(request) = fetch_request {
fetch_from_server(&mut client, request).await.unwrap();
}
}
}

// receives fetch requests and populates the fetch request queue
//
// returns `true` if the fetch request channel is closed and all fetch requests have been completed,
// signalling sync is complete and no longer needs to fetch data from the server.
async fn receive_fetch_requests(
receiver: &mut UnboundedReceiver<FetchRequest>,
fetch_request_queue: &mut Vec<FetchRequest>,
) -> bool {
// if there are no fetch requests to process, sleep until the next fetch request is received
// or channel is closed
if fetch_request_queue.is_empty() {
while let Some(fetch_request) = receiver.recv().await {
fetch_request_queue.push(fetch_request);
}
}
// receive all remaining fetch requests from channel
// when channel is empty return `false` to continue fetching data from the server
// when channel is closed and all fetch requests are processed, return `true`
loop {
match receiver.try_recv() {
Ok(fetch_request) => fetch_request_queue.push(fetch_request),
Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
if fetch_request_queue.is_empty() {
return true;
} else {
break;
}
}
}
}

false
}

// TODO: placeholder for algorythm that selects the next fetch request to be processed
// return `None` if a fetch request could not be selected
fn select_fetch_request(fetch_request_queue: &mut Vec<FetchRequest>) -> Option<FetchRequest> {
// TODO: add other fetch requests with priorities
let fetch_request_index = fetch_request_queue
.iter()
.enumerate()
.find(|(_, request)| matches!(request, FetchRequest::ChainTip(_)))
.map(|(index, _)| index);

fetch_request_index.map(|index| fetch_request_queue.remove(index))
}

//
async fn fetch_from_server(
client: &mut CompactTxStreamerClient<zingo_netutils::UnderlyingService>,
fetch_request: FetchRequest,
) -> Result<(), ()> {
match fetch_request {
FetchRequest::ChainTip(sender) => {
let block_id = get_latest_block(client).await;
sender.send(block_id).unwrap();
}
FetchRequest::CompactBlockRange(sender, block_range) => {
let compact_blocks = get_block_range(client, block_range).await;
sender.send(compact_blocks).unwrap();
}
}

Ok(())
}

async fn get_latest_block(
client: &mut CompactTxStreamerClient<zingo_netutils::UnderlyingService>,
) -> BlockId {
let request = tonic::Request::new(ChainSpec {});

client.get_latest_block(request).await.unwrap().into_inner()
}

async fn get_block_range(
client: &mut CompactTxStreamerClient<zingo_netutils::UnderlyingService>,
block_range: Range<BlockHeight>,
) -> Vec<CompactBlock> {
let mut compact_blocks: Vec<CompactBlock> =
Vec::with_capacity(u64::from(block_range.end - block_range.start) as usize);

let request = tonic::Request::new(BlockRange {
start: Some(BlockId {
height: u64::from(block_range.start),
hash: Vec::new(),
}),
end: Some(BlockId {
height: u64::from(block_range.end) - 1,
hash: Vec::new(),
}),
});
let mut block_stream = client.get_block_range(request).await.unwrap().into_inner();

while let Some(compact_block) = block_stream.message().await.unwrap() {
compact_blocks.push(compact_block);
}

compact_blocks
}
16 changes: 2 additions & 14 deletions zingo-sync/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,2 @@
pub fn add(left: usize, right: usize) -> usize {
left + right
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn it_works() {
let result = add(2, 2);
assert_eq!(result, 4);
}
}
mod client;
mod sync;
46 changes: 46 additions & 0 deletions zingo-sync/src/sync.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use std::ops::Range;

use crate::client::{fetcher::fetcher, get_chain_height};

use zcash_client_backend::{
data_api::scanning::{ScanPriority, ScanRange},
proto::service::compact_tx_streamer_client::CompactTxStreamerClient,
};

use futures::future::try_join_all;
use tokio::sync::mpsc::unbounded_channel;
use zcash_primitives::consensus::BlockHeight;

/// Syncs a wallet to the latest state of the blockchain
#[allow(dead_code)]
pub async fn sync(
client: CompactTxStreamerClient<zingo_netutils::UnderlyingService>,
// TODO: add wallet data field
) -> Result<(), ()> {
// create channel for sending fetch requests and launch fetcher task
let (fetch_request_sender, fetch_request_receiver) = unbounded_channel();
let fetcher_handle = tokio::spawn(fetcher(fetch_request_receiver, client));

let chain_height = get_chain_height(fetch_request_sender).await.unwrap();
update_scan_ranges(chain_height);

try_join_all(vec![fetcher_handle]).await.unwrap();

Ok(())
}

fn update_scan_ranges(chain_height: BlockHeight) {
// TODO: load scan ranges from wallet data
let mut scan_ranges: Vec<ScanRange> = Vec::new();

let latest_scan_range = ScanRange::from_parts(
Range {
start: BlockHeight::from_u32(0), // TODO: add logic to replace with wallet height
end: chain_height,
},
ScanPriority::Historic,
);
scan_ranges.push(latest_scan_range);

// TODO: add logic to combine latest scan range with the tip of wallet scan ranges
}

0 comments on commit 44b66eb

Please sign in to comment.