Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add value check #153

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
787 changes: 470 additions & 317 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ sqlx = { version = "0.7.2", features = [
tracing = { version = "0.1.40", default-features = false }
bigdecimal = "0.4.3"
build-info = "0.0.38"
tap_core = { git = "https://github.com/semiotic-ai/timeline-aggregation-protocol", rev = "ff856d9", default-features = false }
tap_core = { git = "https://github.com/semiotic-ai/timeline-aggregation-protocol", rev = "3fe6bc2", default-features = false }
tracing-subscriber = { version = "0.3", features = [
"json",
"env-filter",
Expand Down
2 changes: 2 additions & 0 deletions common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ build-info.workspace = true
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
tokio = { workspace = true, features = ["fs", "tokio-macros"] }
cost-model = { git = "https://github.com/graphprotocol/agora", rev = "3ed34ca" }
regex = "1.7.1"
axum-extra = { version = "0.9.3", features = [
"typed-header",
], default-features = false }
ttl_cache = "0.5.1"
autometrics = { version = "1.0.1", features = ["prometheus-exporter"] }
tower_governor = "0.3.2"
tower-http = { version = "0.5.2", features = [
Expand Down
6 changes: 6 additions & 0 deletions common/src/indexer_service/http/indexer_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use tower_governor::{governor::GovernorConfigBuilder, GovernorLayer};
use tower_http::{cors, cors::CorsLayer, normalize_path::NormalizePath, trace::TraceLayer};
use tracing::{info, info_span};

use crate::tap::{ValueCheckReceiver, ValueCheckSender};
use crate::{
address::public_key,
indexer_service::http::{
Expand Down Expand Up @@ -168,6 +169,8 @@ where
pub url_namespace: &'static str,
pub metrics_prefix: &'static str,
pub extra_routes: Router<Arc<IndexerServiceState<I>>>,
pub value_check_receiver: ValueCheckReceiver,
pub value_check_sender: ValueCheckSender,
}

pub struct IndexerServiceState<I>
Expand All @@ -179,6 +182,7 @@ where
pub tap_manager: Manager<IndexerTapContext>,
pub service_impl: Arc<I>,
pub metrics: IndexerServiceMetrics,
pub value_check_sender: ValueCheckSender,
}

pub struct IndexerService {}
Expand Down Expand Up @@ -303,6 +307,7 @@ impl IndexerService {
domain_separator.clone(),
timestamp_error_tolerance,
receipt_max_value,
options.value_check_receiver,
)
.await;

Expand All @@ -314,6 +319,7 @@ impl IndexerService {
tap_manager,
service_impl: Arc::new(options.service_impl),
metrics,
value_check_sender: options.value_check_sender,
});

// Rate limits by allowing bursts of 10 requests and requiring 100ms of
Expand Down
28 changes: 25 additions & 3 deletions common/src/indexer_service/http/request_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@ use axum::{
};
use axum_extra::TypedHeader;
use reqwest::StatusCode;
use tap_core::receipt::Context;
use thegraph_core::DeploymentId;
use tracing::trace;

use crate::indexer_service::http::IndexerServiceResponse;
use serde_json::value::RawValue;

use crate::{indexer_service::http::IndexerServiceResponse, tap::AgoraQuery};

use super::{
indexer_service::{AttestationOutput, IndexerServiceError, IndexerServiceState},
Expand All @@ -41,17 +44,36 @@ where
.with_label_values(&[&manifest_id.to_string()])
.inc();

#[derive(Debug, serde::Deserialize)]
pub struct QueryBody {
pub query: String,
pub variables: Option<Box<RawValue>>,
}

let request =
serde_json::from_slice(&body).map_err(|e| IndexerServiceError::InvalidRequest(e.into()))?;

let attestation_signer = if let Some(receipt) = receipt.into_signed_receipt() {
let allocation_id = receipt.message.allocation_id;

let request: QueryBody = serde_json::from_slice(&body)
.map_err(|e| IndexerServiceError::InvalidRequest(e.into()))?;
let variables = request
.variables
.as_ref()
.map(ToString::to_string)
.unwrap_or_default();
let mut ctx = Context::new();
ctx.insert(AgoraQuery {
deployment_id: manifest_id,
query: request.query.clone(),
variables,
});

// Verify the receipt and store it in the database
// TODO update checks
state
.tap_manager
.verify_and_store_receipt(receipt)
.verify_and_store_receipt(&ctx, receipt)
.await
.map_err(IndexerServiceError::ReceiptError)?;

Expand Down
8 changes: 8 additions & 0 deletions common/src/tap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::tap::checks::deny_list_check::DenyListCheck;
use crate::tap::checks::receipt_max_val_check::ReceiptMaxValueCheck;
use crate::tap::checks::sender_balance_check::SenderBalanceCheck;
use crate::tap::checks::timestamp_check::TimestampCheck;
use crate::tap::checks::value_check::MinimumValue;
use crate::{escrow_accounts::EscrowAccounts, prelude::Allocation};
use alloy::dyn_abi::Eip712Domain;
use eventuals::Eventual;
Expand All @@ -23,6 +24,11 @@ use tracing::error;
mod checks;
mod receipt_store;

pub use checks::value_check::{
create_value_check, AgoraQuery, CostModelSource, ValueCheckReceiver, ValueCheckSender,
};

#[derive(Clone)]
pub struct IndexerTapContext {
domain_separator: Arc<Eip712Domain>,

Expand All @@ -44,6 +50,7 @@ impl IndexerTapContext {
domain_separator: Eip712Domain,
timestamp_error_tolerance: Duration,
receipt_max_value: u128,
value_check_receiver: ValueCheckReceiver,
) -> Vec<ReceiptCheck> {
vec![
Arc::new(AllocationEligible::new(indexer_allocations)),
Expand All @@ -54,6 +61,7 @@ impl IndexerTapContext {
Arc::new(TimestampCheck::new(timestamp_error_tolerance)),
Arc::new(DenyListCheck::new(pgpool, escrow_accounts, domain_separator).await),
Arc::new(ReceiptMaxValueCheck::new(receipt_max_value)),
Arc::new(MinimumValue::new(value_check_receiver)),
]
}

Expand Down
1 change: 1 addition & 0 deletions common/src/tap/checks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ pub mod deny_list_check;
pub mod receipt_max_val_check;
pub mod sender_balance_check;
pub mod timestamp_check;
pub mod value_check;
6 changes: 5 additions & 1 deletion common/src/tap/checks/allocation_eligible.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ impl AllocationEligible {
}
#[async_trait::async_trait]
impl Check for AllocationEligible {
async fn check(&self, receipt: &ReceiptWithState<Checking>) -> CheckResult {
async fn check(
&self,
_: &tap_core::receipt::Context,
receipt: &ReceiptWithState<Checking>,
) -> CheckResult {
let allocation_id = receipt.signed_receipt().message.allocation_id;
if !self
.indexer_allocations
Expand Down
28 changes: 22 additions & 6 deletions common/src/tap/checks/deny_list_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,11 @@ impl DenyListCheck {

#[async_trait::async_trait]
impl Check for DenyListCheck {
async fn check(&self, receipt: &ReceiptWithState<Checking>) -> CheckResult {
async fn check(
&self,
_: &tap_core::receipt::Context,
receipt: &ReceiptWithState<Checking>,
) -> CheckResult {
let receipt_signer = receipt
.signed_receipt()
.recover_signer(&self.domain_separator)
Expand Down Expand Up @@ -195,7 +199,7 @@ mod tests {
use std::str::FromStr;

use alloy::hex::ToHexExt;
use tap_core::receipt::ReceiptWithState;
use tap_core::receipt::{Context, ReceiptWithState};

use crate::test_vectors::{self, create_signed_receipt, TAP_SENDER};

Expand Down Expand Up @@ -241,7 +245,10 @@ mod tests {
let checking_receipt = ReceiptWithState::new(signed_receipt);

// Check that the receipt is rejected
assert!(deny_list_check.check(&checking_receipt).await.is_err());
assert!(deny_list_check
.check(&Context::new(), &checking_receipt)
.await
.is_err());
}

#[sqlx::test(migrations = "../migrations")]
Expand All @@ -255,7 +262,10 @@ mod tests {
// Check that the receipt is valid
let checking_receipt = ReceiptWithState::new(signed_receipt);

deny_list_check.check(&checking_receipt).await.unwrap();
deny_list_check
.check(&Context::new(), &checking_receipt)
.await
.unwrap();

// Add the sender to the denylist
sqlx::query!(
Expand All @@ -271,7 +281,10 @@ mod tests {

// Check that the receipt is rejected
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
assert!(deny_list_check.check(&checking_receipt).await.is_err());
assert!(deny_list_check
.check(&Context::new(), &checking_receipt)
.await
.is_err());

// Remove the sender from the denylist
sqlx::query!(
Expand All @@ -287,6 +300,9 @@ mod tests {

// Check that the receipt is valid again
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
deny_list_check.check(&checking_receipt).await.unwrap();
deny_list_check
.check(&Context::new(), &checking_receipt)
.await
.unwrap();
}
}
22 changes: 18 additions & 4 deletions common/src/tap/checks/receipt_max_val_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ impl ReceiptMaxValueCheck {

#[async_trait::async_trait]
impl Check for ReceiptMaxValueCheck {
async fn check(&self, receipt: &ReceiptWithState<Checking>) -> CheckResult {
async fn check(
&self,
_: &tap_core::receipt::Context,
receipt: &ReceiptWithState<Checking>,
) -> CheckResult {
let receipt_value = receipt.signed_receipt().message.value;

if receipt_value < self.receipt_max_value {
Expand All @@ -45,6 +49,7 @@ mod tests {
use alloy::signers::local::MnemonicBuilder;
use alloy::signers::local::PrivateKeySigner;
use alloy::sol_types::eip712_domain;
use tap_core::receipt::Context;

use super::*;
use tap_core::{
Expand Down Expand Up @@ -96,20 +101,29 @@ mod tests {
async fn test_receipt_lower_than_limit() {
let signed_receipt = create_signed_receipt_with_custom_value(RECEIPT_LIMIT - 1);
let timestamp_check = ReceiptMaxValueCheck::new(RECEIPT_LIMIT);
assert!(timestamp_check.check(&signed_receipt).await.is_ok());
assert!(timestamp_check
.check(&Context::new(), &signed_receipt)
.await
.is_ok());
}

#[tokio::test]
async fn test_receipt_higher_than_limit() {
let signed_receipt = create_signed_receipt_with_custom_value(RECEIPT_LIMIT + 1);
let timestamp_check = ReceiptMaxValueCheck::new(RECEIPT_LIMIT);
assert!(timestamp_check.check(&signed_receipt).await.is_err());
assert!(timestamp_check
.check(&Context::new(), &signed_receipt)
.await
.is_err());
}

#[tokio::test]
async fn test_receipt_same_as_limit() {
let signed_receipt = create_signed_receipt_with_custom_value(RECEIPT_LIMIT);
let timestamp_check = ReceiptMaxValueCheck::new(RECEIPT_LIMIT);
assert!(timestamp_check.check(&signed_receipt).await.is_err());
assert!(timestamp_check
.check(&Context::new(), &signed_receipt)
.await
.is_err());
}
}
6 changes: 5 additions & 1 deletion common/src/tap/checks/sender_balance_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ impl SenderBalanceCheck {

#[async_trait::async_trait]
impl Check for SenderBalanceCheck {
async fn check(&self, receipt: &ReceiptWithState<Checking>) -> CheckResult {
async fn check(
&self,
_: &tap_core::receipt::Context,
receipt: &ReceiptWithState<Checking>,
) -> CheckResult {
let escrow_accounts_snapshot = self.escrow_accounts.value_immediate().unwrap_or_default();

let receipt_signer = receipt
Expand Down
23 changes: 18 additions & 5 deletions common/src/tap/checks/timestamp_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ impl TimestampCheck {

#[async_trait::async_trait]
impl Check for TimestampCheck {
async fn check(&self, receipt: &ReceiptWithState<Checking>) -> CheckResult {
async fn check(
&self,
_: &tap_core::receipt::Context,
receipt: &ReceiptWithState<Checking>,
) -> CheckResult {
let timestamp_now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map_err(|e| CheckError::Failed(e.into()))?;
Expand Down Expand Up @@ -55,7 +59,7 @@ mod tests {

use super::*;
use tap_core::{
receipt::{checks::Check, state::Checking, Receipt, ReceiptWithState},
receipt::{checks::Check, state::Checking, Context, Receipt, ReceiptWithState},
signed_message::EIP712SignedMessage,
};

Expand Down Expand Up @@ -102,7 +106,10 @@ mod tests {
let timestamp_ns = timestamp as u64;
let signed_receipt = create_signed_receipt_with_custom_timestamp(timestamp_ns);
let timestamp_check = TimestampCheck::new(Duration::from_secs(30));
assert!(timestamp_check.check(&signed_receipt).await.is_ok());
assert!(timestamp_check
.check(&Context::new(), &signed_receipt)
.await
.is_ok());
}

#[tokio::test]
Expand All @@ -115,7 +122,10 @@ mod tests {
let timestamp_ns = timestamp as u64;
let signed_receipt = create_signed_receipt_with_custom_timestamp(timestamp_ns);
let timestamp_check = TimestampCheck::new(Duration::from_secs(30));
assert!(timestamp_check.check(&signed_receipt).await.is_err());
assert!(timestamp_check
.check(&Context::new(), &signed_receipt)
.await
.is_err());
}

#[tokio::test]
Expand All @@ -128,6 +138,9 @@ mod tests {
let timestamp_ns = timestamp as u64;
let signed_receipt = create_signed_receipt_with_custom_timestamp(timestamp_ns);
let timestamp_check = TimestampCheck::new(Duration::from_secs(30));
assert!(timestamp_check.check(&signed_receipt).await.is_err());
assert!(timestamp_check
.check(&Context::new(), &signed_receipt)
.await
.is_err());
}
}
Loading
Loading