diff --git a/src/client.rs b/src/client.rs index 1bb4770..9695ab1 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,4 +1,4 @@ -use std::collections::VecDeque; +use std::collections::{hash_map, VecDeque}; use std::fmt; use std::sync::Arc; use std::task::{ready, Context, Poll}; @@ -81,6 +81,7 @@ where #[derive(Debug)] struct PeerState { + established_connections_num: usize, sending: Arc>, wantlist: WantlistState, send_full: bool, @@ -124,14 +125,14 @@ where } pub(crate) fn new_connection_handler(&mut self, peer: PeerId) -> ClientConnectionHandler { - self.peers.insert( - peer, - PeerState { - sending: Arc::new(Mutex::new(SendingState::Ready)), - wantlist: WantlistState::new(), - send_full: true, - }, - ); + let peer = self.peers.entry(peer).or_insert_with(|| PeerState { + established_connections_num: 0, + sending: Arc::new(Mutex::new(SendingState::Ready)), + wantlist: WantlistState::new(), + send_full: true, + }); + + peer.established_connections_num += 1; ClientConnectionHandler { protocol: self.protocol.clone(), @@ -143,6 +144,16 @@ where } } + pub(crate) fn on_connection_closed(&mut self, peer: PeerId) { + if let hash_map::Entry::Occupied(mut entry) = self.peers.entry(peer) { + entry.get_mut().established_connections_num -= 1; + + if entry.get_mut().established_connections_num == 0 { + entry.remove(); + } + } + } + fn next_query_id(&mut self) -> BitswapQueryId { let id = BitswapQueryId(self.next_query_id); self.next_query_id += 1; diff --git a/src/lib.rs b/src/lib.rs index f5aeb9b..5fc3aa2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,6 +8,7 @@ use blockstore::{Blockstore, BlockstoreError}; use cid::CidGeneric; use client::SendingState; use futures::{stream::SelectAll, StreamExt}; +use libp2p::swarm::ConnectionClosed; use libp2p::{ core::{upgrade::ReadyUpgrade, Endpoint}, swarm::{ @@ -127,7 +128,15 @@ where }) } - fn on_swarm_event(&mut self, _event: FromSwarm) {} + fn on_swarm_event(&mut self, event: FromSwarm) { + #[allow(clippy::single_match)] + match event { + FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, .. }) => { + self.client.on_connection_closed(peer_id); + } + _ => {} + } + } fn on_connection_handler_event( &mut self,