Skip to content

Commit

Permalink
fix: Do not reset peer state on new established connection (#16)
Browse files Browse the repository at this point in the history
* fix: Do not reset peer state on new established connection

libp2p may establish multiple connections at the same time
to a peer and we shouldn't reset the state.

* Update src/client.rs

Co-authored-by: Maciej Zwoliński <[email protected]>

* fmt

---------

Co-authored-by: Maciej Zwoliński <[email protected]>
  • Loading branch information
oblique and zvolin committed Jan 26, 2024
1 parent 4b57f45 commit e737dbe
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 10 deletions.
29 changes: 20 additions & 9 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::VecDeque;
use std::collections::{hash_map, VecDeque};
use std::fmt;
use std::sync::Arc;
use std::task::{ready, Context, Poll};
Expand Down Expand Up @@ -81,6 +81,7 @@ where

#[derive(Debug)]
struct PeerState<const S: usize> {
established_connections_num: usize,
sending: Arc<Mutex<SendingState>>,
wantlist: WantlistState<S>,
send_full: bool,
Expand Down Expand Up @@ -124,14 +125,14 @@ where
}

pub(crate) fn new_connection_handler(&mut self, peer: PeerId) -> ClientConnectionHandler<S> {
self.peers.insert(
peer,
PeerState {
sending: Arc::new(Mutex::new(SendingState::Ready)),
wantlist: WantlistState::new(),
send_full: true,
},
);
let peer = self.peers.entry(peer).or_insert_with(|| PeerState {
established_connections_num: 0,
sending: Arc::new(Mutex::new(SendingState::Ready)),
wantlist: WantlistState::new(),
send_full: true,
});

peer.established_connections_num += 1;

ClientConnectionHandler {
protocol: self.protocol.clone(),
Expand All @@ -143,6 +144,16 @@ where
}
}

pub(crate) fn on_connection_closed(&mut self, peer: PeerId) {
if let hash_map::Entry::Occupied(mut entry) = self.peers.entry(peer) {
entry.get_mut().established_connections_num -= 1;

if entry.get_mut().established_connections_num == 0 {
entry.remove();
}
}
}

fn next_query_id(&mut self) -> BitswapQueryId {
let id = BitswapQueryId(self.next_query_id);
self.next_query_id += 1;
Expand Down
11 changes: 10 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use blockstore::{Blockstore, BlockstoreError};
use cid::CidGeneric;
use client::SendingState;
use futures::{stream::SelectAll, StreamExt};
use libp2p::swarm::ConnectionClosed;
use libp2p::{
core::{upgrade::ReadyUpgrade, Endpoint},
swarm::{
Expand Down Expand Up @@ -127,7 +128,15 @@ where
})
}

fn on_swarm_event(&mut self, _event: FromSwarm) {}
fn on_swarm_event(&mut self, event: FromSwarm) {
#[allow(clippy::single_match)]
match event {
FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, .. }) => {
self.client.on_connection_closed(peer_id);
}
_ => {}
}
}

fn on_connection_handler_event(
&mut self,
Expand Down

0 comments on commit e737dbe

Please sign in to comment.