Skip to content

Commit

Permalink
186725623 - Integrate transport with rest of codebase (#1079)
Browse files Browse the repository at this point in the history
Co-authored-by: Hector Santos <[email protected]>
  • Loading branch information
iduartgomez and netsirius authored May 13, 2024
1 parent d0c30da commit 18b5e07
Show file tree
Hide file tree
Showing 49 changed files with 2,277 additions and 4,157 deletions.
1,479 changes: 67 additions & 1,412 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,9 @@ directories = "5"
either = { features = ["serde"], workspace = true }
flatbuffers = "24.3"
futures = "0.3"
semver = { version = "1.0.14", features = ["serde"] }
headers = "0.4"
itertools = "0.12"
libp2p = { default-features = false, features = ["autonat", "dns", "ed25519", "identify", "macros", "noise", "ping", "tcp", "tokio", "yamux"], version = "0.52.3" }
libp2p-identity = { features = ["ed25519", "rand"], version = "0.2.7" }
notify = "6"
once_cell = "1"
ordered-float = "4.2"
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/client_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ pub(crate) mod test {
async {
loop {
if self.signal.changed().await.is_ok() {
let (ev_id, pk) = *self.signal.borrow();
let (ev_id, pk) = self.signal.borrow().clone();
if self.rng.is_some() && pk == self.id {
let res = OpenRequest {
client_id: ClientId::FIRST,
Expand Down
8 changes: 1 addition & 7 deletions crates/core/src/client_events/combinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,6 @@ impl<const N: usize> super::ClientEventsProxy for ClientEventsCombinator<N> {
Box::pin(async {
let mut futs_opt = [(); N].map(|_| None);
let pend_futs = &mut self.pend_futs;
eprintln!(
"pending futs: {}",
pend_futs.iter().filter(|a| a.is_some()).count()
);
for (i, pend) in pend_futs.iter_mut().enumerate() {
let fut = &mut futs_opt[i];
if let Some(pend_fut) = pend.take() {
Expand Down Expand Up @@ -216,9 +212,7 @@ impl<Fut: Future + Unpin, const N: usize> Future for SelectAll<Fut, N> {
let rest = std::mem::replace(&mut self.inner, [(); N].map(|_| None));
return Poll::Ready((res, idx, rest));
}
None => {
eprintln!("not found");
}
None => {}
}
};
}
Expand Down
65 changes: 8 additions & 57 deletions crates/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,16 @@ use std::{
io::Read,
net::{IpAddr, Ipv4Addr, SocketAddr},
path::PathBuf,
pin::Pin,
str::FromStr,
sync::atomic::AtomicBool,
time::Duration,
};

use directories::ProjectDirs;
use libp2p::{identity, PeerId};
use once_cell::sync::Lazy;
use tokio::runtime::Runtime;

use crate::local_node::OperationMode;
use crate::{local_node::OperationMode, transport::TransportKeypair};

/// Default maximum number of connections for the peer.
pub const DEFAULT_MAX_CONNECTIONS: usize = 20;
Expand All @@ -31,12 +29,10 @@ pub const DEFAULT_RANDOM_PEER_CONN_THRESHOLD: usize = 7;
/// Default maximum number of hops to live for any operation
/// (if it applies, e.g. connect requests).
pub const DEFAULT_MAX_HOPS_TO_LIVE: usize = 10;
const DEFAULT_BOOTSTRAP_PORT: u16 = 7800;
const DEFAULT_WEBSOCKET_API_PORT: u16 = 55008;

static CONFIG: std::sync::OnceLock<Config> = std::sync::OnceLock::new();

pub(crate) const PEER_TIMEOUT: Duration = Duration::from_secs(60);
pub(crate) const OPERATION_TTL: Duration = Duration::from_secs(60);

// Initialize the executor once.
Expand All @@ -47,10 +43,7 @@ const ORGANIZATION: &str = "The Freenet Project Inc";
const APPLICATION: &str = "Freenet";

pub struct Config {
pub bootstrap_ip: IpAddr,
pub bootstrap_port: u16,
pub bootstrap_id: Option<PeerId>,
pub local_peer_keypair: identity::Keypair,
pub transport_keypair: TransportKeypair,
pub log_level: tracing::log::LevelFilter,
config_paths: ConfigPaths,
local_mode: AtomicBool,
Expand Down Expand Up @@ -221,12 +214,13 @@ impl Config {
})
}

fn load_conf() -> std::io::Result<Config> {
fn load_conf() -> anyhow::Result<Config> {
let settings: config::Config = config::Config::builder()
.add_source(config::Environment::with_prefix("FREENET"))
.build()
.unwrap();
let local_peer_keypair = if let Ok(path_to_key) = settings

let transport_keypair: Option<TransportKeypair> = if let Ok(path_to_key) = settings
.get_string("local_peer_key_file")
.map(PathBuf::from)
{
Expand All @@ -238,69 +232,32 @@ impl Config {
});
let mut buf = Vec::new();
key_file.read_to_end(&mut buf).unwrap();
Some(
identity::Keypair::from_protobuf_encoding(&buf)
.map_err(|_| std::io::ErrorKind::InvalidData)?,
)
todo!("get an rsa private key from the file and create a TransportKeypair")
} else {
None
};

let log_level = settings
.get_string("log")
.map(|lvl| lvl.parse().ok())
.ok()
.flatten()
.unwrap_or(tracing::log::LevelFilter::Info);
let (bootstrap_ip, bootstrap_port, bootstrap_id) = Config::get_bootstrap_host(&settings)?;

let data_dir = settings.get_string("data_dir").ok().map(PathBuf::from);
let config_paths = ConfigPaths::new(data_dir)?;

let local_mode = settings.get_string("network_mode").is_err();

Ok(Config {
bootstrap_ip,
bootstrap_port,
bootstrap_id,
local_peer_keypair: local_peer_keypair
.unwrap_or_else(identity::Keypair::generate_ed25519),
transport_keypair: transport_keypair.unwrap_or_else(|| TransportKeypair::new()),
log_level,
config_paths,
local_mode: AtomicBool::new(local_mode),
#[cfg(feature = "websocket")]
ws: WebSocketApiConfig::from_config(&settings),
})
}

fn get_bootstrap_host(
settings: &config::Config,
) -> std::io::Result<(IpAddr, u16, Option<PeerId>)> {
let bootstrap_ip = IpAddr::from_str(
&settings
.get_string("bootstrap_host")
.unwrap_or_else(|_| format!("{}", Ipv4Addr::LOCALHOST)),
)
.map_err(|_err| std::io::ErrorKind::InvalidInput)?;

let bootstrap_port = settings
.get_int("bootstrap_port")
.ok()
.map(u16::try_from)
.unwrap_or(Ok(DEFAULT_BOOTSTRAP_PORT))
.map_err(|_err| std::io::ErrorKind::InvalidInput)?;

let id_str = if let Some(id_str) = settings
.get_string("bootstrap_id")
.ok()
.map(|id| id.parse().map_err(|_err| std::io::ErrorKind::InvalidInput))
{
Some(id_str?)
} else {
None
};

Ok((bootstrap_ip, bootstrap_port, id_str))
}
}

pub(crate) struct GlobalExecutor;
Expand Down Expand Up @@ -335,12 +292,6 @@ impl GlobalExecutor {
}
}

impl libp2p::swarm::Executor for GlobalExecutor {
fn exec(&self, future: Pin<Box<dyn Future<Output = ()> + 'static + Send>>) {
GlobalExecutor::spawn(future);
}
}

pub fn set_logger(level: Option<tracing::level_filters::LevelFilter>) {
#[cfg(feature = "trace")]
{
Expand Down
10 changes: 7 additions & 3 deletions crates/core/src/contract/executor/mock_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@ impl Executor<MockRuntime> {
std::fs::create_dir_all(&contracts_data_dir).expect("directory created");
let contract_store = ContractStore::new(contracts_data_dir, u16::MAX as i64)?;

let db_path = data_dir.join("db");
std::fs::create_dir_all(&db_path).expect("directory created");
let state_store = StateStore::new(Storage::new(&db_path).await?, u16::MAX as u32).unwrap();
// FIXME: if is sqlite it should be a dir, named <data_dir>/db
// let db_path = data_dir.join("db");
// let state_store = StateStore::new(Storage::new(&db_path).await?, u16::MAX as u32).unwrap();
tracing::debug!("creating state store at path: {data_dir:?}");
std::fs::create_dir_all(&data_dir).expect("directory created");
let state_store = StateStore::new(Storage::new(&data_dir).await?, u16::MAX as u32).unwrap();
tracing::debug!("state store created");

let executor = Executor::new(
state_store,
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/contract/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ pub(super) mod in_memory {
channel,
runtime: Executor::new_mock(identifier, executor_request_sender)
.await
.unwrap(),
.expect("should start mock executor"),
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/contract/storages/redb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ const STATE_TABLE: TableDefinition<&[u8], &[u8]> = TableDefinition::new("state")
pub struct ReDb(Database);

impl ReDb {
pub async fn new(db_path: &Path) -> Result<Self, redb::Error> {
pub async fn new(data_dir: &Path) -> Result<Self, redb::Error> {
let db_path = data_dir.join("db");
tracing::info!("loading contract store from {db_path:?}");

Database::create(db_path).map(Self).map_err(Into::into)
}
}
Expand Down
21 changes: 12 additions & 9 deletions crates/core/src/generated/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,10 @@ impl PeerChange<'_> {
connections: impl Iterator<Item = &'a (PeerId, f64)>,
) -> Vec<u8> {
let mut buf = flatbuffers::FlatBufferBuilder::new();
let to = buf.create_string(to.to_string().as_str());
let to = buf.create_vector(&bincode::serialize(&to).unwrap());
let connections = connections
.map(|(from, from_location)| {
let from = Some(buf.create_string(from.to_string().as_str()));
let from = Some(buf.create_vector(&bincode::serialize(from).unwrap()));
topology::AddedConnection::create(
&mut buf,
&topology::AddedConnectionArgs {
Expand Down Expand Up @@ -169,16 +169,16 @@ impl PeerChange<'_> {
(to, to_location): (PeerId, f64),
) -> Vec<u8> {
let mut buf = flatbuffers::FlatBufferBuilder::new();
let from = Some(buf.create_string(from.to_string().as_str()));
let to = Some(buf.create_string(to.to_string().as_str()));
let from = buf.create_vector(&bincode::serialize(&from).unwrap());
let to = buf.create_vector(&bincode::serialize(&to).unwrap());
let transaction = transaction.map(|t| buf.create_string(t.as_ref()));
let add_conn = topology::AddedConnection::create(
&mut buf,
&topology::AddedConnectionArgs {
transaction,
from,
from: Some(from),
from_location,
to,
to: Some(to),
to_location,
},
);
Expand All @@ -196,11 +196,14 @@ impl PeerChange<'_> {

pub fn removed_connection_msg(at: PeerId, from: PeerId) -> Vec<u8> {
let mut buf = flatbuffers::FlatBufferBuilder::new();
let at = Some(buf.create_string(at.to_string().as_str()));
let from = Some(buf.create_string(from.to_string().as_str()));
let at = buf.create_vector(&bincode::serialize(&at).unwrap());
let from = buf.create_vector(&bincode::serialize(&from).unwrap());
let remove_conn = topology::RemovedConnection::create(
&mut buf,
&topology::RemovedConnectionArgs { at, from },
&topology::RemovedConnectionArgs {
at: Some(at),
from: Some(from),
},
);
let msg = topology::PeerChange::create(
&mut buf,
Expand Down
Loading

0 comments on commit 18b5e07

Please sign in to comment.