From 9315908e68d4baaf05c981f683e33c0b1ec3621b Mon Sep 17 00:00:00 2001 From: taslimmuhammed Date: Tue, 24 Sep 2024 11:04:32 +0530 Subject: [PATCH 1/6] Bulk insert of failed receipts --- tap-agent/src/agent/sender_allocation.rs | 62 +++++++++++++++--------- 1 file changed, 38 insertions(+), 24 deletions(-) diff --git a/tap-agent/src/agent/sender_allocation.rs b/tap-agent/src/agent/sender_allocation.rs index a25b6df7..575af231 100644 --- a/tap-agent/src/agent/sender_allocation.rs +++ b/tap-agent/src/agent/sender_allocation.rs @@ -685,7 +685,32 @@ impl SenderAllocationState { &mut self, receipts: &[ReceiptWithState], ) -> Result<()> { - for received_receipt in receipts.iter() { + let mut query_str = String::from( + "INSERT INTO scalar_tap_receipts_invalid ( + signer_address, + signature, + allocation_id, + timestamp_ns, + nonce, + value + ) + VALUES "); + for i in 0..receipts.len() { + query_str = query_str +"(" + +"$"+&(i*6+1).to_string()+", " + +"$"+&(i*6+2).to_string()+", " + +"$"+&(i*6+3).to_string()+", " + +"$"+&(i*6+4).to_string()+", " + +"$"+&(i*6+5).to_string()+", " + +"$"+&(i*6+6).to_string() + +")"; + if i!=receipts.len()-1 { + query_str = query_str +" , " + } + } + query_str = query_str+";"; + let mut query = sqlx::query(&query_str); + for received_receipt in receipts.iter(){ let receipt = received_receipt.signed_receipt(); let allocation_id = receipt.message.allocation_id; let encoded_signature = receipt.signature.as_bytes().to_vec(); @@ -696,31 +721,20 @@ impl SenderAllocationState { error!("Failed to recover receipt signer: {}", e); anyhow!(e) })?; - sqlx::query!( - r#" - INSERT INTO scalar_tap_receipts_invalid ( - signer_address, - signature, - allocation_id, - timestamp_ns, - nonce, - value, - error_log - ) - VALUES ($1, $2, $3, $4, $5, $6, $7) - "#, - receipt_signer.encode_hex(), - encoded_signature, + debug!( + "Receipt for allocation {} and signer {} failed reason: {}", allocation_id.encode_hex(), - BigDecimal::from(receipt.message.timestamp_ns), - BigDecimal::from(receipt.message.nonce), - BigDecimal::from(BigInt::from(receipt.message.value)), + receipt_signer.encode_hex(), receipt_error - ) - .execute(&self.pgpool) - .await - .map_err(|e| anyhow!("Failed to store invalid receipt: {:?}", e))?; - } + ); + query = query.bind(receipt_signer.encode_hex()); + query = query.bind(encoded_signature); + query = query.bind(allocation_id.encode_hex()); + query = query.bind(BigDecimal::from(receipt.message.timestamp_ns)); + query = query.bind(BigDecimal::from(receipt.message.nonce)); + query = query.bind(BigDecimal::from(BigInt::from(receipt.message.value))); + } + query.execute(&self.pgpool).await?; let fees = receipts .iter() .map(|receipt| receipt.signed_receipt().message.value) From 5bcbcd86783a014ec6e4db1c3159a7162dd72cb1 Mon Sep 17 00:00:00 2001 From: taslimmuhammed Date: Tue, 24 Sep 2024 22:22:42 +0530 Subject: [PATCH 2/6] query macro --- tap-agent/src/agent/sender_allocation.rs | 77 ++++++++++++++---------- 1 file changed, 44 insertions(+), 33 deletions(-) diff --git a/tap-agent/src/agent/sender_allocation.rs b/tap-agent/src/agent/sender_allocation.rs index 575af231..85a66bcf 100644 --- a/tap-agent/src/agent/sender_allocation.rs +++ b/tap-agent/src/agent/sender_allocation.rs @@ -685,31 +685,14 @@ impl SenderAllocationState { &mut self, receipts: &[ReceiptWithState], ) -> Result<()> { - let mut query_str = String::from( - "INSERT INTO scalar_tap_receipts_invalid ( - signer_address, - signature, - allocation_id, - timestamp_ns, - nonce, - value - ) - VALUES "); - for i in 0..receipts.len() { - query_str = query_str +"(" - +"$"+&(i*6+1).to_string()+", " - +"$"+&(i*6+2).to_string()+", " - +"$"+&(i*6+3).to_string()+", " - +"$"+&(i*6+4).to_string()+", " - +"$"+&(i*6+5).to_string()+", " - +"$"+&(i*6+6).to_string() - +")"; - if i!=receipts.len()-1 { - query_str = query_str +" , " - } - } - query_str = query_str+";"; - let mut query = sqlx::query(&query_str); + let reciepts_len = receipts.len(); + let mut reciepts_signers = Vec::with_capacity(reciepts_len); + let mut encoded_signatures = Vec::with_capacity(reciepts_len); + let mut allocation_ids = Vec::with_capacity(reciepts_len); + let mut timestamps = Vec::with_capacity(reciepts_len); + let mut nounces = Vec::with_capacity(reciepts_len); + let mut values = Vec::with_capacity(reciepts_len); + for received_receipt in receipts.iter(){ let receipt = received_receipt.signed_receipt(); let allocation_id = receipt.message.allocation_id; @@ -727,14 +710,42 @@ impl SenderAllocationState { receipt_signer.encode_hex(), receipt_error ); - query = query.bind(receipt_signer.encode_hex()); - query = query.bind(encoded_signature); - query = query.bind(allocation_id.encode_hex()); - query = query.bind(BigDecimal::from(receipt.message.timestamp_ns)); - query = query.bind(BigDecimal::from(receipt.message.nonce)); - query = query.bind(BigDecimal::from(BigInt::from(receipt.message.value))); - } - query.execute(&self.pgpool).await?; + reciepts_signers.push(receipt_signer.encode_hex()); + encoded_signatures.push(encoded_signature); + allocation_ids.push(allocation_id.encode_hex()); + timestamps.push(BigDecimal::from(receipt.message.timestamp_ns)); + nounces.push(BigDecimal::from(receipt.message.nonce)); + values.push(BigDecimal::from(BigInt::from(receipt.message.value))); + } + sqlx::query!( + r#"INSERT INTO scalar_tap_receipts_invalid ( + signer_address, + signature, + allocation_id, + timestamp_ns, + nonce, + value + ) SELECT * FROM UNNEST( + $1::CHAR(40)[], + $2::BYTEA[], + $3::CHAR(40)[], + $4::NUMERIC(20)[], + $5::NUMERIC(20)[], + $6::NUMERIC(40)[] + )"#, + &signers, + &signatures, + &allocation_ids, + ×tamps, + &nonces, + &values, + ).execute(&self.pgpool) + .await + .map_err(|e| { + error!("Failed to store receipt: {}", e); + anyhow!(e) + })?; + let fees = receipts .iter() .map(|receipt| receipt.signed_receipt().message.value) From f98aaf370474db15fda6c838b744c27467c0ca90 Mon Sep 17 00:00:00 2001 From: taslimmuhammed Date: Fri, 27 Sep 2024 23:39:44 +0530 Subject: [PATCH 3/6] sqlx bulk insert after running compiling codes --- .env | 1 + ...18688afa91dc0b8eeb8e551b271fc73c1157e.json | 19 ++++++++++++++++++ ...d07007990cc244f598b763ec5470515efe019.json | 6 ------ ...fe7861ff17c6b8de4365a39f28099f5711613.json | 20 ------------------- migrations/20230915230734_tap_ravs.down.sql | 4 ++-- tap-agent/src/agent/sender_allocation.rs | 17 ++++++---------- 6 files changed, 28 insertions(+), 39 deletions(-) create mode 100644 .env create mode 100644 .sqlx/query-0d4e055d87b4496202041f3c28618688afa91dc0b8eeb8e551b271fc73c1157e.json delete mode 100644 .sqlx/query-6c365bc1b0728ec8d9f1239d7bffe7861ff17c6b8de4365a39f28099f5711613.json diff --git a/.env b/.env new file mode 100644 index 00000000..4b5dbc3c --- /dev/null +++ b/.env @@ -0,0 +1 @@ +DATABASE_URL = postgres://taslim:6318@localhost:5432/test \ No newline at end of file diff --git a/.sqlx/query-0d4e055d87b4496202041f3c28618688afa91dc0b8eeb8e551b271fc73c1157e.json b/.sqlx/query-0d4e055d87b4496202041f3c28618688afa91dc0b8eeb8e551b271fc73c1157e.json new file mode 100644 index 00000000..4ebee386 --- /dev/null +++ b/.sqlx/query-0d4e055d87b4496202041f3c28618688afa91dc0b8eeb8e551b271fc73c1157e.json @@ -0,0 +1,19 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO scalar_tap_receipts_invalid (\n signer_address,\n signature,\n allocation_id,\n timestamp_ns,\n nonce,\n value\n ) SELECT * FROM UNNEST(\n $1::CHAR(40)[],\n $2::BYTEA[],\n $3::CHAR(40)[],\n $4::NUMERIC(20)[],\n $5::NUMERIC(20)[],\n $6::NUMERIC(40)[]\n )", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "BpcharArray", + "ByteaArray", + "BpcharArray", + "NumericArray", + "NumericArray", + "NumericArray" + ] + }, + "nullable": [] + }, + "hash": "0d4e055d87b4496202041f3c28618688afa91dc0b8eeb8e551b271fc73c1157e" +} diff --git a/.sqlx/query-56c3678866ffe0ec2eed7290394d07007990cc244f598b763ec5470515efe019.json b/.sqlx/query-56c3678866ffe0ec2eed7290394d07007990cc244f598b763ec5470515efe019.json index 69853368..ff058439 100644 --- a/.sqlx/query-56c3678866ffe0ec2eed7290394d07007990cc244f598b763ec5470515efe019.json +++ b/.sqlx/query-56c3678866ffe0ec2eed7290394d07007990cc244f598b763ec5470515efe019.json @@ -37,11 +37,6 @@ "ordinal": 6, "name": "value", "type_info": "Numeric" - }, - { - "ordinal": 7, - "name": "error_log", - "type_info": "Text" } ], "parameters": { @@ -54,7 +49,6 @@ false, false, false, - false, false ] }, diff --git a/.sqlx/query-6c365bc1b0728ec8d9f1239d7bffe7861ff17c6b8de4365a39f28099f5711613.json b/.sqlx/query-6c365bc1b0728ec8d9f1239d7bffe7861ff17c6b8de4365a39f28099f5711613.json deleted file mode 100644 index 1e45b7ef..00000000 --- a/.sqlx/query-6c365bc1b0728ec8d9f1239d7bffe7861ff17c6b8de4365a39f28099f5711613.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n INSERT INTO scalar_tap_receipts_invalid (\n signer_address,\n signature,\n allocation_id,\n timestamp_ns,\n nonce,\n value,\n error_log\n )\n VALUES ($1, $2, $3, $4, $5, $6, $7)\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Bpchar", - "Bytea", - "Bpchar", - "Numeric", - "Numeric", - "Numeric", - "Text" - ] - }, - "nullable": [] - }, - "hash": "6c365bc1b0728ec8d9f1239d7bffe7861ff17c6b8de4365a39f28099f5711613" -} diff --git a/migrations/20230915230734_tap_ravs.down.sql b/migrations/20230915230734_tap_ravs.down.sql index 0ad115b3..a411faed 100644 --- a/migrations/20230915230734_tap_ravs.down.sql +++ b/migrations/20230915230734_tap_ravs.down.sql @@ -1,2 +1,2 @@ -DROP TABLE IF EXISTS scalar_tap_ravs CASCADE; -DROP TABLE IF EXISTS scalar_tap_rav_requests_failed CASCADE; +-- DROP TABLE IF EXISTS scalar_tap_ravs CASCADE; +-- DROP TABLE IF EXISTS scalar_tap_rav_requests_failed CASCADE; diff --git a/tap-agent/src/agent/sender_allocation.rs b/tap-agent/src/agent/sender_allocation.rs index 85a66bcf..f7b069fa 100644 --- a/tap-agent/src/agent/sender_allocation.rs +++ b/tap-agent/src/agent/sender_allocation.rs @@ -693,7 +693,7 @@ impl SenderAllocationState { let mut nounces = Vec::with_capacity(reciepts_len); let mut values = Vec::with_capacity(reciepts_len); - for received_receipt in receipts.iter(){ + for received_receipt in receipts.iter() { let receipt = received_receipt.signed_receipt(); let allocation_id = receipt.message.allocation_id; let encoded_signature = receipt.signature.as_bytes().to_vec(); @@ -704,12 +704,6 @@ impl SenderAllocationState { error!("Failed to recover receipt signer: {}", e); anyhow!(e) })?; - debug!( - "Receipt for allocation {} and signer {} failed reason: {}", - allocation_id.encode_hex(), - receipt_signer.encode_hex(), - receipt_error - ); reciepts_signers.push(receipt_signer.encode_hex()); encoded_signatures.push(encoded_signature); allocation_ids.push(allocation_id.encode_hex()); @@ -733,13 +727,14 @@ impl SenderAllocationState { $5::NUMERIC(20)[], $6::NUMERIC(40)[] )"#, - &signers, - &signatures, + &reciepts_signers, + &encoded_signatures, &allocation_ids, ×tamps, - &nonces, + &nounces, &values, - ).execute(&self.pgpool) + ) + .execute(&self.pgpool) .await .map_err(|e| { error!("Failed to store receipt: {}", e); From bcaae0d86faf82f1d3246dddbd37779f435fd4ae Mon Sep 17 00:00:00 2001 From: taslimmuhammed Date: Fri, 27 Sep 2024 23:41:29 +0530 Subject: [PATCH 4/6] sqlx bulk insert after running compiling codes --- .env | 1 - 1 file changed, 1 deletion(-) delete mode 100644 .env diff --git a/.env b/.env deleted file mode 100644 index 4b5dbc3c..00000000 --- a/.env +++ /dev/null @@ -1 +0,0 @@ -DATABASE_URL = postgres://taslim:6318@localhost:5432/test \ No newline at end of file From fc29b3a01efc83c9ea7574a8348948aa2f92cd8c Mon Sep 17 00:00:00 2001 From: taslimmuhammed Date: Sat, 28 Sep 2024 23:01:13 +0530 Subject: [PATCH 5/6] added error logs --- ...18688afa91dc0b8eeb8e551b271fc73c1157e.json | 19 ------------------ ...44861fecdad20b4a52d0cec851712f8cba862.json | 20 +++++++++++++++++++ ...d07007990cc244f598b763ec5470515efe019.json | 6 ++++++ tap-agent/src/agent/sender_allocation.rs | 17 +++++++++++++--- 4 files changed, 40 insertions(+), 22 deletions(-) delete mode 100644 .sqlx/query-0d4e055d87b4496202041f3c28618688afa91dc0b8eeb8e551b271fc73c1157e.json create mode 100644 .sqlx/query-1e672d98779cf3082906a5aaee744861fecdad20b4a52d0cec851712f8cba862.json diff --git a/.sqlx/query-0d4e055d87b4496202041f3c28618688afa91dc0b8eeb8e551b271fc73c1157e.json b/.sqlx/query-0d4e055d87b4496202041f3c28618688afa91dc0b8eeb8e551b271fc73c1157e.json deleted file mode 100644 index 4ebee386..00000000 --- a/.sqlx/query-0d4e055d87b4496202041f3c28618688afa91dc0b8eeb8e551b271fc73c1157e.json +++ /dev/null @@ -1,19 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "INSERT INTO scalar_tap_receipts_invalid (\n signer_address,\n signature,\n allocation_id,\n timestamp_ns,\n nonce,\n value\n ) SELECT * FROM UNNEST(\n $1::CHAR(40)[],\n $2::BYTEA[],\n $3::CHAR(40)[],\n $4::NUMERIC(20)[],\n $5::NUMERIC(20)[],\n $6::NUMERIC(40)[]\n )", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "BpcharArray", - "ByteaArray", - "BpcharArray", - "NumericArray", - "NumericArray", - "NumericArray" - ] - }, - "nullable": [] - }, - "hash": "0d4e055d87b4496202041f3c28618688afa91dc0b8eeb8e551b271fc73c1157e" -} diff --git a/.sqlx/query-1e672d98779cf3082906a5aaee744861fecdad20b4a52d0cec851712f8cba862.json b/.sqlx/query-1e672d98779cf3082906a5aaee744861fecdad20b4a52d0cec851712f8cba862.json new file mode 100644 index 00000000..823df8c4 --- /dev/null +++ b/.sqlx/query-1e672d98779cf3082906a5aaee744861fecdad20b4a52d0cec851712f8cba862.json @@ -0,0 +1,20 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO scalar_tap_receipts_invalid (\n signer_address,\n signature,\n allocation_id,\n timestamp_ns,\n nonce,\n value,\n error_log\n ) SELECT * FROM UNNEST(\n $1::CHAR(40)[],\n $2::BYTEA[],\n $3::CHAR(40)[],\n $4::NUMERIC(20)[],\n $5::NUMERIC(20)[],\n $6::NUMERIC(40)[],\n $7::TEXT[]\n )", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "BpcharArray", + "ByteaArray", + "BpcharArray", + "NumericArray", + "NumericArray", + "NumericArray", + "TextArray" + ] + }, + "nullable": [] + }, + "hash": "1e672d98779cf3082906a5aaee744861fecdad20b4a52d0cec851712f8cba862" +} diff --git a/.sqlx/query-56c3678866ffe0ec2eed7290394d07007990cc244f598b763ec5470515efe019.json b/.sqlx/query-56c3678866ffe0ec2eed7290394d07007990cc244f598b763ec5470515efe019.json index ff058439..69853368 100644 --- a/.sqlx/query-56c3678866ffe0ec2eed7290394d07007990cc244f598b763ec5470515efe019.json +++ b/.sqlx/query-56c3678866ffe0ec2eed7290394d07007990cc244f598b763ec5470515efe019.json @@ -37,6 +37,11 @@ "ordinal": 6, "name": "value", "type_info": "Numeric" + }, + { + "ordinal": 7, + "name": "error_log", + "type_info": "Text" } ], "parameters": { @@ -49,6 +54,7 @@ false, false, false, + false, false ] }, diff --git a/tap-agent/src/agent/sender_allocation.rs b/tap-agent/src/agent/sender_allocation.rs index f7b069fa..60536467 100644 --- a/tap-agent/src/agent/sender_allocation.rs +++ b/tap-agent/src/agent/sender_allocation.rs @@ -27,7 +27,7 @@ use tap_core::{ signed_message::EIP712SignedMessage, }; use thegraph_core::Address; -use tracing::{error, warn}; +use tracing::{debug, error, warn}; use crate::{agent::sender_account::ReceiptFees, lazy_static}; @@ -692,6 +692,7 @@ impl SenderAllocationState { let mut timestamps = Vec::with_capacity(reciepts_len); let mut nounces = Vec::with_capacity(reciepts_len); let mut values = Vec::with_capacity(reciepts_len); + let mut error_logs = Vec::with_capacity(reciepts_len); for received_receipt in receipts.iter() { let receipt = received_receipt.signed_receipt(); @@ -704,12 +705,19 @@ impl SenderAllocationState { error!("Failed to recover receipt signer: {}", e); anyhow!(e) })?; + debug!( + "Receipt for allocation {} and signer {} failed reason: {}", + allocation_id.encode_hex(), + receipt_signer.encode_hex(), + receipt_error + ); reciepts_signers.push(receipt_signer.encode_hex()); encoded_signatures.push(encoded_signature); allocation_ids.push(allocation_id.encode_hex()); timestamps.push(BigDecimal::from(receipt.message.timestamp_ns)); nounces.push(BigDecimal::from(receipt.message.nonce)); values.push(BigDecimal::from(BigInt::from(receipt.message.value))); + error_logs.push(receipt_error); } sqlx::query!( r#"INSERT INTO scalar_tap_receipts_invalid ( @@ -718,14 +726,16 @@ impl SenderAllocationState { allocation_id, timestamp_ns, nonce, - value + value, + error_log ) SELECT * FROM UNNEST( $1::CHAR(40)[], $2::BYTEA[], $3::CHAR(40)[], $4::NUMERIC(20)[], $5::NUMERIC(20)[], - $6::NUMERIC(40)[] + $6::NUMERIC(40)[], + $7::TEXT[] )"#, &reciepts_signers, &encoded_signatures, @@ -733,6 +743,7 @@ impl SenderAllocationState { ×tamps, &nounces, &values, + &error_logs ) .execute(&self.pgpool) .await From b714213c7798b073067facf6828afa41b37b60c2 Mon Sep 17 00:00:00 2001 From: taslimmuhammed Date: Tue, 1 Oct 2024 10:40:11 +0530 Subject: [PATCH 6/6] minor changes --- migrations/20230915230734_tap_ravs.down.sql | 4 ++-- tap-agent/src/agent/sender_allocation.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/migrations/20230915230734_tap_ravs.down.sql b/migrations/20230915230734_tap_ravs.down.sql index a411faed..0ad115b3 100644 --- a/migrations/20230915230734_tap_ravs.down.sql +++ b/migrations/20230915230734_tap_ravs.down.sql @@ -1,2 +1,2 @@ --- DROP TABLE IF EXISTS scalar_tap_ravs CASCADE; --- DROP TABLE IF EXISTS scalar_tap_rav_requests_failed CASCADE; +DROP TABLE IF EXISTS scalar_tap_ravs CASCADE; +DROP TABLE IF EXISTS scalar_tap_rav_requests_failed CASCADE; diff --git a/tap-agent/src/agent/sender_allocation.rs b/tap-agent/src/agent/sender_allocation.rs index 60536467..ea64b692 100644 --- a/tap-agent/src/agent/sender_allocation.rs +++ b/tap-agent/src/agent/sender_allocation.rs @@ -748,7 +748,7 @@ impl SenderAllocationState { .execute(&self.pgpool) .await .map_err(|e| { - error!("Failed to store receipt: {}", e); + error!("Failed to store invalid receipt: {}", e); anyhow!(e) })?;