Skip to content

Commit

Permalink
fix(tap-agent): bulk insert of failed receipts (#329)
Browse files Browse the repository at this point in the history
* Bulk insert of failed receipts

* query macro

* sqlx bulk insert after running compiling codes

* sqlx bulk insert after running compiling codes

* added error logs

* minor changes
  • Loading branch information
taslimmuhammed authored Oct 1, 2024
1 parent 5edf6cf commit f65d95c
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 43 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

77 changes: 54 additions & 23 deletions tap-agent/src/agent/sender_allocation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use tap_core::{
signed_message::EIP712SignedMessage,
};
use thegraph_core::Address;
use tracing::{error, warn};
use tracing::{debug, error, warn};

use crate::{agent::sender_account::ReceiptFees, lazy_static};

Expand Down Expand Up @@ -685,6 +685,15 @@ impl SenderAllocationState {
&mut self,
receipts: &[ReceiptWithState<Failed>],
) -> Result<()> {
let reciepts_len = receipts.len();
let mut reciepts_signers = Vec::with_capacity(reciepts_len);
let mut encoded_signatures = Vec::with_capacity(reciepts_len);
let mut allocation_ids = Vec::with_capacity(reciepts_len);
let mut timestamps = Vec::with_capacity(reciepts_len);
let mut nounces = Vec::with_capacity(reciepts_len);
let mut values = Vec::with_capacity(reciepts_len);
let mut error_logs = Vec::with_capacity(reciepts_len);

for received_receipt in receipts.iter() {
let receipt = received_receipt.signed_receipt();
let allocation_id = receipt.message.allocation_id;
Expand All @@ -696,31 +705,53 @@ impl SenderAllocationState {
error!("Failed to recover receipt signer: {}", e);
anyhow!(e)
})?;
sqlx::query!(
r#"
INSERT INTO scalar_tap_receipts_invalid (
signer_address,
signature,
allocation_id,
timestamp_ns,
nonce,
value,
error_log
)
VALUES ($1, $2, $3, $4, $5, $6, $7)
"#,
receipt_signer.encode_hex(),
encoded_signature,
debug!(
"Receipt for allocation {} and signer {} failed reason: {}",
allocation_id.encode_hex(),
BigDecimal::from(receipt.message.timestamp_ns),
BigDecimal::from(receipt.message.nonce),
BigDecimal::from(BigInt::from(receipt.message.value)),
receipt_signer.encode_hex(),
receipt_error
)
.execute(&self.pgpool)
.await
.map_err(|e| anyhow!("Failed to store invalid receipt: {:?}", e))?;
);
reciepts_signers.push(receipt_signer.encode_hex());
encoded_signatures.push(encoded_signature);
allocation_ids.push(allocation_id.encode_hex());
timestamps.push(BigDecimal::from(receipt.message.timestamp_ns));
nounces.push(BigDecimal::from(receipt.message.nonce));
values.push(BigDecimal::from(BigInt::from(receipt.message.value)));
error_logs.push(receipt_error);
}
sqlx::query!(
r#"INSERT INTO scalar_tap_receipts_invalid (
signer_address,
signature,
allocation_id,
timestamp_ns,
nonce,
value,
error_log
) SELECT * FROM UNNEST(
$1::CHAR(40)[],
$2::BYTEA[],
$3::CHAR(40)[],
$4::NUMERIC(20)[],
$5::NUMERIC(20)[],
$6::NUMERIC(40)[],
$7::TEXT[]
)"#,
&reciepts_signers,
&encoded_signatures,
&allocation_ids,
&timestamps,
&nounces,
&values,
&error_logs
)
.execute(&self.pgpool)
.await
.map_err(|e| {
error!("Failed to store invalid receipt: {}", e);
anyhow!(e)
})?;

let fees = receipts
.iter()
.map(|receipt| receipt.signed_receipt().message.value)
Expand Down

0 comments on commit f65d95c

Please sign in to comment.