Skip to content

Commit

Permalink
chore: wip
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Paitrault <[email protected]>
  • Loading branch information
Freyskeyd committed Jun 30, 2023
1 parent c1583fa commit 3d1d7ca
Show file tree
Hide file tree
Showing 18 changed files with 169 additions and 232 deletions.
300 changes: 124 additions & 176 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ http = "0.2.9"
tower-http = { version = "0.4", features = ["cors"] }

# P2P related
libp2p = { version = "0.51", default-features = false, features = ["noise"]}
# libp2p = { version = "0.52", default-features = false, features = ["noise"]}
libp2p = { branch = "bump-yamux", git = "https://github.com/libp2p/rust-libp2p.git", default-features = false, features = ["noise"]}

# Serialization & Deserialization
bincode = { version = "1.3", default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion crates/topos-p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ tokio-stream.workspace = true
tokio = { workspace = true, features = ["full"] }
tracing = { workspace = true, features = ["attributes"] }

libp2p = { workspace = true, features = ["macros", "gossipsub", "tcp", "dns", "tokio", "request-response", "identify", "mplex", "kad", "serde", "yamux"] }
libp2p = { workspace = true, features = ["macros", "gossipsub", "tcp", "dns", "tokio", "request-response", "identify", "kad", "serde", "yamux"] }
void = "1"

topos-metrics = { path = "../topos-metrics/" }
Expand Down
2 changes: 1 addition & 1 deletion crates/topos-p2p/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub(crate) mod topos;
pub(crate) mod transmission;

#[derive(NetworkBehaviour)]
#[behaviour(out_event = "ComposedEvent")]
#[behaviour(to_swarm = "ComposedEvent")]
pub(crate) struct Behaviour {
/// All the topos-specific protocols.
// pub(crate) topos: ToposBehaviour,
Expand Down
7 changes: 4 additions & 3 deletions crates/topos-p2p/src/behaviour/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ use std::{borrow::Cow, collections::HashMap, num::NonZeroUsize, time::Duration};

use crate::{
config::DiscoveryConfig,
constant::TRANSMISSION_PROTOCOL,
error::{CommandExecutionError, P2PError},
};
use libp2p::kad::KademliaEvent;
use libp2p::{
identity::Keypair,
kad::{store::MemoryStore, Kademlia, KademliaBucketInserts, KademliaConfig},
swarm::{behaviour, NetworkBehaviour},
Multiaddr, PeerId,
};
use libp2p::{kad::KademliaEvent, StreamProtocol};
use tokio::sync::oneshot;
use tracing::{debug, info, warn};

Expand All @@ -19,7 +20,7 @@ pub type PendingRecordRequest = oneshot::Sender<Result<Vec<Multiaddr>, CommandEx

/// DiscoveryBehaviour is responsible to discover and manage connections with peers
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "KademliaEvent")]
#[behaviour(to_swarm = "KademliaEvent")]
pub(crate) struct DiscoveryBehaviour {
pub(crate) inner: Kademlia<MemoryStore>,
}
Expand All @@ -34,7 +35,7 @@ impl DiscoveryBehaviour {
) -> Self {
let local_peer_id = peer_key.public().to_peer_id();
let kademlia_config = KademliaConfig::default()
.set_protocol_names(vec![discovery_protocol])
.set_protocol_names(vec![StreamProtocol::new(TRANSMISSION_PROTOCOL)])
.set_replication_factor(config.replication_factor)
.set_kbucket_inserts(KademliaBucketInserts::Manual)
.set_replication_interval(config.replication_interval)
Expand Down
2 changes: 1 addition & 1 deletion crates/topos-p2p/src/behaviour/peer_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use libp2p::{
};

#[derive(NetworkBehaviour)]
#[behaviour(out_event = "IdentifyEvent")]
#[behaviour(to_swarm = "IdentifyEvent")]
pub struct PeerInfoBehaviour {
identify: Identify,
}
Expand Down
14 changes: 10 additions & 4 deletions crates/topos-p2p/src/behaviour/transmission.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use crate::error::CommandExecutionError;
use crate::{constant::TRANSMISSION_PROTOCOL, error::CommandExecutionError};

use self::{codec::TransmissionCodec, protocol::TransmissionProtocol};

use libp2p::request_response::{Behaviour, Config, ProtocolSupport, RequestId};
use libp2p::{
request_response::{Behaviour, Config, ProtocolSupport, RequestId},
StreamProtocol,
};
use std::{collections::HashMap, iter, time::Duration};
use tokio::sync::oneshot;

Expand All @@ -21,9 +24,12 @@ impl TransmissionBehaviour {
cfg.set_connection_keep_alive(Duration::from_secs(60));
cfg.set_request_timeout(Duration::from_secs(30));

Behaviour::new(
Behaviour::with_codec(
TransmissionCodec(),
iter::once((TransmissionProtocol(), ProtocolSupport::Full)),
iter::once((
StreamProtocol::new(TRANSMISSION_PROTOCOL),
ProtocolSupport::Full,
)),
cfg,
)
}
Expand Down
5 changes: 4 additions & 1 deletion crates/topos-p2p/src/behaviour/transmission/codec.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use std::io;

use crate::constant::TRANSMISSION_PROTOCOL;

use super::protocol::TransmissionProtocol;
use futures::{AsyncRead, AsyncWrite, AsyncWriteExt};
use libp2p::{
core::upgrade::{read_length_prefixed, write_length_prefixed},
request_response::Codec,
StreamProtocol,
};

#[derive(Clone)]
Expand All @@ -18,7 +21,7 @@ pub struct TransmissionResponse(pub(crate) Vec<u8>);

#[async_trait::async_trait]
impl Codec for TransmissionCodec {
type Protocol = TransmissionProtocol;
type Protocol = StreamProtocol;
type Request = TransmissionRequest;
type Response = TransmissionResponse;

Expand Down
8 changes: 0 additions & 8 deletions crates/topos-p2p/src/behaviour/transmission/protocol.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,4 @@
use libp2p::core::ProtocolName;

use crate::constant::TRANSMISSION_PROTOCOL;

#[derive(Debug, Clone)]
pub(crate) struct TransmissionProtocol();

impl ProtocolName for TransmissionProtocol {
fn protocol_name(&self) -> &[u8] {
TRANSMISSION_PROTOCOL.as_bytes()
}
}
14 changes: 4 additions & 10 deletions crates/topos-p2p/src/runtime/handle_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::io;
use libp2p::{
core::either,
multiaddr::Protocol,
swarm::{derive_prelude::Either, ConnectionHandlerUpgrErr, NetworkBehaviour, SwarmEvent},
swarm::{derive_prelude::Either, NetworkBehaviour, SwarmEvent},
};
use tracing::{debug, error, info, warn};

Expand Down Expand Up @@ -48,10 +48,7 @@ impl
SwarmEvent<
ComposedEvent,
Either<
Either<
Either<Either<io::Error, io::Error>, ConnectionHandlerUpgrErr<io::Error>>,
void::Void,
>,
Either<Either<Either<io::Error, io::Error>, void::Void>, void::Void>,
void::Void,
>,
>,
Expand All @@ -62,10 +59,7 @@ impl
event: SwarmEvent<
ComposedEvent,
Either<
Either<
Either<Either<io::Error, io::Error>, ConnectionHandlerUpgrErr<io::Error>>,
void::Void,
>,
Either<Either<Either<io::Error, io::Error>, void::Void>, void::Void>,
void::Void,
>,
>,
Expand Down Expand Up @@ -148,7 +142,7 @@ impl
}
}

SwarmEvent::Dialing(peer_id) => {}
SwarmEvent::Dialing { peer_id, .. } => {}

SwarmEvent::Behaviour(event) => {
self.handle(event).await;
Expand Down
3 changes: 2 additions & 1 deletion crates/topos-p2p/src/runtime/handle_event/gossipsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ impl EventHandler<Box<GossipsubEvent>> for Runtime {

if let Err(e) = self
.event_sender
.try_send(Event::Gossip { from: source, data })
.send(Event::Gossip { from: source, data })
.await
{
tracing::error!("Failed to send gossip event to runtime: {:?}", e);
}
Expand Down
14 changes: 7 additions & 7 deletions crates/topos-p2p/src/runtime/handle_event/peer_info.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::borrow::Cow;

use libp2p::{
identify::{Event as IdentifyEvent, Info as IdentifyInfo},
request_response::ProtocolName,
};
use libp2p::identify::{Event as IdentifyEvent, Info as IdentifyInfo};
use tracing::info;

use crate::{behaviour::transmission::protocol::TransmissionProtocol, Runtime};
use crate::{
behaviour::transmission::protocol::TransmissionProtocol, constant::TRANSMISSION_PROTOCOL,
Runtime,
};

use super::EventHandler;

Expand All @@ -22,14 +22,14 @@ impl EventHandler<Box<IdentifyEvent>> for Runtime {
} = info;

if !self.peer_set.contains(&peer_id)
&& protocol_version.as_bytes() == TransmissionProtocol().protocol_name()
&& protocol_version.as_bytes() == TRANSMISSION_PROTOCOL.as_bytes()
&& protocols.iter().any(|p| {
self.swarm
.behaviour()
.discovery
.inner
.protocol_names()
.contains(&Cow::Borrowed(p.as_bytes()))
.contains(&Cow::Borrowed(p))
})
{
self.peer_set.insert(peer_id);
Expand Down
3 changes: 2 additions & 1 deletion crates/topos-p2p/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ impl Runtime {
)) => {}

SwarmEvent::ConnectionEstablished { .. } => {}
SwarmEvent::Dialing(_) => {}
SwarmEvent::Dialing { .. } => {}
SwarmEvent::IncomingConnection { .. } => {}
SwarmEvent::NewListenAddr { .. } => {}
SwarmEvent::Behaviour(ComposedEvent::Gossipsub(_)) => {}
Expand All @@ -235,6 +235,7 @@ impl Runtime {
local_addr,
send_back_addr,
error,
..
} => {
warn!("IncomingConnectionError: local_addr: {local_addr:?}, send_back_addr: {send_back_addr:?}, error: {error:?}");
}
Expand Down
9 changes: 0 additions & 9 deletions tools/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,3 @@ services:

volumes:
shared:
prometheus_data: {}
grafana_data: {}


networks:
monitoring:
driver: bridge


2 changes: 1 addition & 1 deletion tools/env/base.env
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
RUST_LOG=info
RUST_LOG=warn
TOOLCHAIN_VERSION=stable
RUST_BACKTRACE=full
2 changes: 1 addition & 1 deletion tools/env/node.env
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
RUST_LOG=warn,topos=debug
RUST_LOG=warn,topos=warn
LOCAL_TEST_NET=true

TCE_DB_PATH=/tmp/default-db
Expand Down
4 changes: 2 additions & 2 deletions tools/env/spammer.env
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
RUST_LOG=info
TOPOS_NETWORK_SPAMMER_CERT_PER_BATCH=50
TOPOS_NETWORK_SPAMMER_CERT_PER_BATCH=20
TOPOS_NETWORK_SPAMMER_BATCH_INTERVAL=1000
TOPOS_NETWORK_SPAMMER_TARGET_NODES_PATH=/tmp/shared/peer_nodes.json
TOPOS_NETWORK_SPAMMER_NUMBER_OF_SUBNETS=1
TOPOS_NETWORK_SPAMMER_NUMBER_OF_BATCHES=10
TOPOS_NETWORK_SPAMMER_NUMBER_OF_BATCHES=60
TOPOS_OTLP_SERVICE_NAME=simon-spammer
7 changes: 3 additions & 4 deletions tools/prometheus.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
scrape_configs:
- job_name: 'otel-collector'
scrape_interval: 10s
- job_name: 'peers'
scrape_interval: 1s
static_configs:
- targets: ['otel-collector:8889']
- targets: ['otel-collector:8888']
- targets: ['boot:3000', 'peer_1:3000']

0 comments on commit 3d1d7ca

Please sign in to comment.