Skip to content

Commit

Permalink
feat(pool): handle UREP-020 and track reputation of unstaked entities
Browse files Browse the repository at this point in the history
  • Loading branch information
alex-miao committed Dec 4, 2023
1 parent 76125e0 commit ba528c8
Show file tree
Hide file tree
Showing 9 changed files with 92 additions and 58 deletions.
10 changes: 5 additions & 5 deletions bin/rundler/src/cli/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ pub struct PoolArgs {
pub max_size_in_bytes: usize,

#[arg(
long = "pool.max_userops_per_sender",
name = "pool.max_userops_per_sender",
env = "POOL_MAX_USEROPS_PER_SENDER",
long = "pool.same_sender_mempool_count",
name = "pool.same_sender_mempool_count",
env = "SAME_SENDER_MEMPOOL_COUNT",
default_value = "4"
)]
pub max_userops_per_sender: usize,
pub same_sender_mempool_count: usize,

#[arg(
long = "pool.min_replacement_fee_increase_percentage",
Expand Down Expand Up @@ -156,7 +156,7 @@ impl PoolArgs {
chain_id: common.chain_id,
// Currently use the same shard count as the number of builders
num_shards: common.num_builders,
max_userops_per_sender: self.max_userops_per_sender,
same_sender_mempool_count: self.same_sender_mempool_count,
min_replacement_fee_increase_percentage: self
.min_replacement_fee_increase_percentage,
max_size_of_pool_bytes: self.max_size_in_bytes,
Expand Down
2 changes: 1 addition & 1 deletion crates/pool/proto/op_pool/op_pool.proto
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ message ReplacementUnderpricedError {

message MaxOperationsReachedError {
uint64 num_ops = 1;
bytes sender_address = 2;
bytes entity_address = 2;
}

message EntityThrottledError {
Expand Down
4 changes: 2 additions & 2 deletions crates/pool/src/mempool/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ pub enum MempoolError {
/// and the replacement operation has lower gas price.
#[error("Replacement operation underpriced. Existing priority fee: {0}. Existing fee: {1}")]
ReplacementUnderpriced(U256, U256),
/// Max operations reached for this sender
#[error("Max operations ({0}) reached for sender {1}")]
/// Max operations reached for unstaked sender [UREP-010] or unstaked non-sender entity [UREP-020]
#[error("Max operations ({0}) reached for entity {1}")]
MaxOperationsReached(usize, Address),
/// An entity associated with the operation is throttled/banned.
#[error("Entity {0} is throttled/banned")]
Expand Down
34 changes: 31 additions & 3 deletions crates/pool/src/mempool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ pub struct PoolConfig {
/// Chain ID this pool targets
pub chain_id: u64,
/// The maximum number of operations an unstaked sender can have in the mempool
pub max_userops_per_sender: usize,
pub same_sender_mempool_count: usize,
/// The minimum fee bump required to replace an operation in the mempool
/// Applies to both priority fee and fee. Expressed as an integer percentage value
pub min_replacement_fee_increase_percentage: u64,
Expand Down Expand Up @@ -196,8 +196,8 @@ impl PoolOperation {
})
}

/// Returns an iterator over all staked entities that are included in this operation.
pub fn staked_entities(&'_ self) -> impl Iterator<Item = Entity> + '_ {
/// Returns an iterator over all entities that need stake in this operation.
pub fn entities_requiring_stake(&'_ self) -> impl Iterator<Item = Entity> + '_ {
EntityType::iter()
.filter(|entity| self.requires_stake(*entity))
.filter_map(|entity| {
Expand All @@ -206,6 +206,34 @@ impl PoolOperation {
})
}

/// Return all the unstaked entities that are used in this operation.
pub fn unstaked_entities(&'_ self) -> impl Iterator<Item = Entity> + '_ {
let mut unstaked_entities = vec![];
if !self.entity_infos.sender.is_staked {
unstaked_entities.push(Entity::new(
EntityType::Account,
self.entity_infos.sender.address,
))
}
if let Some(factory) = self.entity_infos.factory {
if !factory.is_staked {
unstaked_entities.push(Entity::new(EntityType::Factory, factory.address))
}
}
if let Some(paymaster) = self.entity_infos.paymaster {
if !paymaster.is_staked {
unstaked_entities.push(Entity::new(EntityType::Paymaster, paymaster.address))
}
}
if let Some(aggregator) = self.entity_infos.aggregator {
if !aggregator.is_staked {
unstaked_entities.push(Entity::new(EntityType::Aggregator, aggregator.address))
}
}

unstaked_entities.into_iter()
}

/// Compute the amount of heap memory the PoolOperation takes up.
pub fn mem_size(&self) -> usize {
std::mem::size_of::<Self>()
Expand Down
32 changes: 2 additions & 30 deletions crates/pool/src/mempool/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ use crate::chain::MinedOp;
pub(crate) struct PoolInnerConfig {
entry_point: Address,
chain_id: u64,
max_userops_per_sender: usize,
max_size_of_pool_bytes: usize,
min_replacement_fee_increase_percentage: u64,
throttled_entity_mempool_count: u64,
Expand All @@ -49,7 +48,6 @@ impl From<PoolConfig> for PoolInnerConfig {
Self {
entry_point: config.entry_point,
chain_id: config.chain_id,
max_userops_per_sender: config.max_userops_per_sender,
max_size_of_pool_bytes: config.max_size_of_pool_bytes,
min_replacement_fee_increase_percentage: config.min_replacement_fee_increase_percentage,
throttled_entity_mempool_count: config.throttled_entity_mempool_count,
Expand Down Expand Up @@ -299,17 +297,6 @@ impl PoolInner {
self.remove_operation_by_hash(hash);
}

// Check sender count in mempool. If sender has too many operations, must be staked
if *self.count_by_address.get(&op.uo.sender).unwrap_or(&0)
>= self.config.max_userops_per_sender
&& !op.account_is_staked
{
return Err(MempoolError::MaxOperationsReached(
self.config.max_userops_per_sender,
op.uo.sender,
));
}

let pool_op = OrderedPoolOperation {
po: op,
submission_id: submission_id.unwrap_or_else(|| self.next_submission_id()),
Expand Down Expand Up @@ -673,20 +660,6 @@ mod tests {
assert!(pool.best.is_empty());
}

#[test]
fn too_many_ops() {
let args = conf();
let mut pool = PoolInner::new(args.clone());
let addr = Address::random();
for i in 0..args.max_userops_per_sender {
let op = create_op(addr, i, 1);
pool.add_operation(op).unwrap();
}

let op = create_op(addr, args.max_userops_per_sender, 1);
assert!(pool.add_operation(op).is_err());
}

#[test]
fn address_count() {
let mut pool = PoolInner::new(conf());
Expand Down Expand Up @@ -747,11 +720,11 @@ mod tests {
pool.add_operation(op).unwrap();
}

let op = create_op(Address::random(), args.max_userops_per_sender, 1);
let op = create_op(Address::random(), 4, 1);
assert!(pool.add_operation(op).is_err());

// on equal gas, worst should remain because it came first
let op = create_op(Address::random(), args.max_userops_per_sender, 2);
let op = create_op(Address::random(), 4, 2);
let result = pool.add_operation(op);
assert!(result.is_ok(), "{:?}", result.err());
}
Expand Down Expand Up @@ -837,7 +810,6 @@ mod tests {
PoolInnerConfig {
entry_point: Address::random(),
chain_id: 1,
max_userops_per_sender: 16,
min_replacement_fee_increase_percentage: 10,
max_size_of_pool_bytes: 20 * mem_size_of_ordered_pool_op(),
throttled_entity_mempool_count: 4,
Expand Down
2 changes: 1 addition & 1 deletion crates/pool/src/mempool/reputation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ impl AddressReputation {
// make sure we aren't dividing by 0
0
} else {
included * self.params.inclusion_rate_factor / seen + std::cmp::min(included, 10_000)
self.params.inclusion_rate_factor * included / seen + std::cmp::min(included, 10_000)
};

// return ops allowed, as defined by UREP-020
Expand Down
58 changes: 46 additions & 12 deletions crates/pool/src/mempool/uo_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ where
state.throttled_ops.remove(&op.hash);

if let Some(op) = state.pool.mine_operation(op, update.latest_block_number) {
// Only account for a staked entity once
for entity_addr in op.staked_entities().map(|e| e.address).unique() {
// Only account for an entity once
for entity_addr in op.entities().map(|e| e.address).unique() {
self.reputation.add_included(entity_addr);
}
mined_op_count += 1;
Expand All @@ -167,9 +167,9 @@ where
}

if let Some(op) = state.pool.unmine_operation(op.hash) {
// Only account for a staked entity once
for entity_addr in op.staked_entities().map(|e| e.address).unique() {
self.reputation.add_included(entity_addr);
// Only account for an entity once
for entity_addr in op.entities().map(|e| e.address).unique() {
self.reputation.remove_included(entity_addr);
}
unmined_op_count += 1;
}
Expand Down Expand Up @@ -321,6 +321,34 @@ where
hash
};

// Check sender count in mempool. If sender has too many operations, must be staked
{
let state = self.state.read();
if !pool_op.account_is_staked
&& state.pool.address_count(pool_op.uo.sender)
>= self.config.same_sender_mempool_count
{
return Err(MempoolError::MaxOperationsReached(
self.config.same_sender_mempool_count,
pool_op.uo.sender,
));
}

// Check unstaked non-sender entity counts in the mempool
for entity in pool_op
.unstaked_entities()
.filter(|e| e.address != pool_op.entity_infos.sender.address)
{
let ops_allowed = self.reputation.get_ops_allowed(entity.address);
if state.pool.address_count(entity.address) >= ops_allowed as usize {
return Err(MempoolError::MaxOperationsReached(
ops_allowed as usize,
entity.address,
));
}
}
}

// Update reputation
pool_op.entities().unique().for_each(|e| {
self.reputation.add_seen(e.address);
Expand Down Expand Up @@ -468,9 +496,9 @@ mod tests {
use std::collections::HashMap;

use rundler_sim::{
MockPrechecker, MockSimulator, PrecheckError, PrecheckSettings, PrecheckViolation,
SimulationError, SimulationSettings, SimulationSuccess, SimulationViolation,
ViolationError,
EntityInfo, EntityInfos, MockPrechecker, MockSimulator, PrecheckError, PrecheckSettings,
PrecheckViolation, SimulationError, SimulationSettings, SimulationSuccess,
SimulationViolation, ViolationError,
};
use rundler_types::{EntityType, GasFees};

Expand Down Expand Up @@ -685,7 +713,6 @@ mod tests {
#[tokio::test]
async fn test_throttled_account() {
let address = Address::random();

let mut ops = Vec::new();
for i in 0..5 {
ops.push(create_op_with_errors(address, i, 2, None, None, true));
Expand Down Expand Up @@ -922,6 +949,13 @@ mod tests {
Ok(SimulationSuccess {
account_is_staked: op.staked,
block_number: Some(0),
entity_infos: EntityInfos {
sender: EntityInfo {
address: op.op.sender,
is_staked: false,
},
..EntityInfos::default()
},
..SimulationSuccess::default()
})
}
Expand All @@ -931,7 +965,6 @@ mod tests {
let args = PoolConfig {
entry_point: Address::random(),
chain_id: 1,
max_userops_per_sender: 16,
min_replacement_fee_increase_percentage: 10,
max_size_of_pool_bytes: 10000,
blocklist: None,
Expand All @@ -940,6 +973,7 @@ mod tests {
sim_settings: SimulationSettings::default(),
mempool_channel_configs: HashMap::new(),
num_shards: 1,
same_sender_mempool_count: 16,
throttled_entity_mempool_count: 4,
throttled_entity_live_blocks: 10,
};
Expand Down Expand Up @@ -1092,8 +1126,8 @@ mod tests {

fn get_ops_allowed(&self, address: Address) -> u64 {
let counts = self.counts.read();
let seen = *counts.seen.get(&address).unwrap();
let included = *counts.included.get(&address).unwrap();
let seen = *counts.seen.get(&address).unwrap_or(&0);
let included = *counts.included.get(&address).unwrap_or(&0);
let inclusion_based_count = if seen == 0 {
// make sure we aren't dividing by 0
0
Expand Down
4 changes: 2 additions & 2 deletions crates/pool/src/server/remote/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ impl TryFrom<ProtoMempoolError> for MempoolError {
Some(mempool_error::Error::MaxOperationsReached(e)) => {
MempoolError::MaxOperationsReached(
e.num_ops as usize,
from_bytes(&e.sender_address)?,
from_bytes(&e.entity_address)?,
)
}
Some(mempool_error::Error::EntityThrottled(e)) => MempoolError::EntityThrottled(
Expand Down Expand Up @@ -136,7 +136,7 @@ impl From<MempoolError> for ProtoMempoolError {
error: Some(mempool_error::Error::MaxOperationsReached(
MaxOperationsReachedError {
num_ops: ops as u64,
sender_address: addr.as_bytes().to_vec(),
entity_address: addr.as_bytes().to_vec(),
},
)),
},
Expand Down
4 changes: 2 additions & 2 deletions docs/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ List of command line options for configuring the Pool.
- *Only required when running in distributed mode*
- `--pool.max_size_in_bytes`: Maximum size in bytes for the pool (default: `500000000`, `0.5 GB`)
- env: *POOL_MAX_SIZE_IN_BYTES*
- `--pool.max_userops_per_sender`: Maximum number of user operations per sender (default: `4`)
- env: *POOL_MAX_USEROPS_PER_SENDER*
- `--pool.same_sender_mempool_count`: Maximum number of user operations for an unstaked sender (default: `4`)
- env: *POOL_SAME_SENDER_MEMPOOL_COUNT*
- `--pool.min_replacement_fee_increase_percentage`: Minimum replacement fee increase percentage (default: `10`)
- env: *POOL_MIN_REPLACEMENT_FEE_INCREASE_PERCENTAGE*
- `--pool.blocklist_path`: Path to a blocklist file (e.g `blocklist.json`, `s3://my-bucket/blocklist.json`)
Expand Down

0 comments on commit ba528c8

Please sign in to comment.