Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk insert of failed receipts using query macro #329

Merged
merged 6 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

77 changes: 54 additions & 23 deletions tap-agent/src/agent/sender_allocation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use tap_core::{
signed_message::EIP712SignedMessage,
};
use thegraph_core::Address;
use tracing::{error, warn};
use tracing::{debug, error, warn};

use crate::{agent::sender_account::ReceiptFees, lazy_static};

Expand Down Expand Up @@ -685,6 +685,15 @@ impl SenderAllocationState {
&mut self,
receipts: &[ReceiptWithState<Failed>],
) -> Result<()> {
let reciepts_len = receipts.len();
let mut reciepts_signers = Vec::with_capacity(reciepts_len);
let mut encoded_signatures = Vec::with_capacity(reciepts_len);
let mut allocation_ids = Vec::with_capacity(reciepts_len);
let mut timestamps = Vec::with_capacity(reciepts_len);
let mut nounces = Vec::with_capacity(reciepts_len);
let mut values = Vec::with_capacity(reciepts_len);
let mut error_logs = Vec::with_capacity(reciepts_len);

for received_receipt in receipts.iter() {
let receipt = received_receipt.signed_receipt();
let allocation_id = receipt.message.allocation_id;
Expand All @@ -696,31 +705,53 @@ impl SenderAllocationState {
error!("Failed to recover receipt signer: {}", e);
anyhow!(e)
})?;
sqlx::query!(
r#"
INSERT INTO scalar_tap_receipts_invalid (
signer_address,
signature,
allocation_id,
timestamp_ns,
nonce,
value,
error_log
)
VALUES ($1, $2, $3, $4, $5, $6, $7)
"#,
receipt_signer.encode_hex(),
encoded_signature,
debug!(
"Receipt for allocation {} and signer {} failed reason: {}",
allocation_id.encode_hex(),
BigDecimal::from(receipt.message.timestamp_ns),
BigDecimal::from(receipt.message.nonce),
BigDecimal::from(BigInt::from(receipt.message.value)),
receipt_signer.encode_hex(),
receipt_error
)
.execute(&self.pgpool)
.await
.map_err(|e| anyhow!("Failed to store invalid receipt: {:?}", e))?;
);
reciepts_signers.push(receipt_signer.encode_hex());
encoded_signatures.push(encoded_signature);
allocation_ids.push(allocation_id.encode_hex());
timestamps.push(BigDecimal::from(receipt.message.timestamp_ns));
nounces.push(BigDecimal::from(receipt.message.nonce));
values.push(BigDecimal::from(BigInt::from(receipt.message.value)));
error_logs.push(receipt_error);
}
sqlx::query!(
r#"INSERT INTO scalar_tap_receipts_invalid (
signer_address,
signature,
allocation_id,
timestamp_ns,
nonce,
value,
error_log
) SELECT * FROM UNNEST(
$1::CHAR(40)[],
$2::BYTEA[],
$3::CHAR(40)[],
$4::NUMERIC(20)[],
$5::NUMERIC(20)[],
$6::NUMERIC(40)[],
$7::TEXT[]
)"#,
&reciepts_signers,
&encoded_signatures,
&allocation_ids,
&timestamps,
&nounces,
&values,
&error_logs
)
.execute(&self.pgpool)
.await
.map_err(|e| {
error!("Failed to store invalid receipt: {}", e);
anyhow!(e)
})?;

let fees = receipts
.iter()
.map(|receipt| receipt.signed_receipt().message.value)
Expand Down
Loading