From d1e41039fb85ead3b06b8bc4b60ed6c85c0e5fb9 Mon Sep 17 00:00:00 2001 From: dskvr Date: Sat, 14 Sep 2024 16:19:38 +0200 Subject: [PATCH] fix cancellation, concurrency, race conditions and remve limits --- demo/main.js | 75 ++++++++++++++++++++++++++++++++------- demo/worker.js | 6 ++-- src/lib.rs | 95 +++++++++++++++++++++++--------------------------- 3 files changed, 109 insertions(+), 67 deletions(-) diff --git a/demo/main.js b/demo/main.js index 0ceb4a7..abfd068 100644 --- a/demo/main.js +++ b/demo/main.js @@ -34,7 +34,7 @@ const neventOutput = document.getElementById('neventOutput'); numberOfWorkers.value = totalWorkers; -numberOfWorkers.max = navigator.hardwareConcurrency || 3; +// numberOfWorkers.max = navigator.hardwareConcurrency || 3; minersBestPowOutput.style.display = 'none'; overallBestPowOutput.style.display = 'none'; @@ -45,7 +45,7 @@ let workerHashRates = {}; let minersBestPow let overallBestPow -let found = false +let halt = false function resetBestPow() { minersBestPow = {}; @@ -57,18 +57,41 @@ function resetBestPow() { }; } -for (let i = 0; i < totalWorkers; i++) { - const worker = new Worker('./worker.js', { type: 'module' }); - worker.onmessage = handleWorkerMessage; - worker.postMessage({ type: 'init', id: i }); - workers.push(worker); +function spawnWorkers(amt=null){ + amt = amt? amt : totalWorkers + console.log('Spawning workers...', totalWorkers); + for (let i = 0; i < amt; i++) { + const worker = new Worker('./worker.js', { type: 'module' }); + worker.onmessage = handleWorkerMessage; + worker.postMessage({ type: 'init', id: i }); + workers.push(worker); + } +} + +spawnWorkers() + +function disableInputs(){ + mineButton.disabled = true; + cancelButton.disabled = true; + numberOfWorkers.disabled = true; + difficultyInput.disabled = true; + eventInput.disabled = true; +} + +function enableInputs(){ + mineButton.disabled = false; + cancelButton.disabled = true; + numberOfWorkers.disabled = false; + difficultyInput.disabled = false; + eventInput.disabled = false; } + async function handleWorkerMessage(e) { const { type, data, error, hashRate, workerId, bestPowData:bestPowDataMap } = e.data; if (type === 'progress') { - if(found && hashRate > 0) { + if(halt && hashRate > 0) { return workers[workerId].postMessage({ type: 'cancel' }); } @@ -106,8 +129,8 @@ async function handleWorkerMessage(e) { if (data.error) { resultOutput.textContent = `Error: ${data.error}\n${JSON.stringify(data, null, 2)}`; } else { - if(found === false) { - found = true + if(halt === false) { + halt = true try { cancelOtherWorkers(workerId); await publishEvent(data.event); @@ -120,10 +143,18 @@ async function handleWorkerMessage(e) { } } hashrateOutput.textContent = '0 H/s'; - mineButton.disabled = false; - cancelButton.disabled = true; // Disable the cancel button + isMining = false; - workerHashRates = {}; // Reset hash rates after mining + workerHashRates = {}; + mineButton.disabled = false; + cancelButton.disabled = true; + eventInput.value = ''; + workers[workerId]?.terminate(); + workers = [] + spawnWorkers() + + } else if (type === 'stopped') { + } else if (type === 'error') { resultOutput.textContent = `Error: ${error}`; hashrateOutput.textContent = '0 H/s'; @@ -138,6 +169,7 @@ function cancelOtherWorkers(excludeWorkerId) { workers.forEach((worker, index) => { if (index !== excludeWorkerId) { worker.postMessage({ type: 'cancel' }); + setTimeout( () => worker.terminate(), 1000); } }); } @@ -156,6 +188,20 @@ function updateBestPowDisplay() { } } +numberOfWorkers.addEventListener('change', () => { + disableInputs() + const c = parseInt(totalWorkers, 10) + const n = parseInt(numberOfWorkers.value, 10); + const delta = n - c; + if (delta > 0) { + spawnWorkers(delta) + } else { + const workersToTerminate = workers.splice(n, Math.abs(delta)) + workersToTerminate.forEach(worker => worker.terminate()) + } + enableInputs() +}) + mineButton.addEventListener('click', () => { if (isMining) return; @@ -213,13 +259,16 @@ cancelButton.addEventListener('click', () => { if (isMining) { workers.forEach(worker => { worker.postMessage({ type: 'cancel' }); + worker.terminate(); }); + workers = [] resultOutput.textContent = 'Mining cancellation requested.'; hashrateOutput.textContent = '0 H/s'; mineButton.disabled = false; cancelButton.disabled = true; // Disable the cancel button isMining = false; workerHashRates = {}; // Reset hash rates after cancellation + spawnWorkers() } }); diff --git a/demo/worker.js b/demo/worker.js index 1729125..5638ba8 100644 --- a/demo/worker.js +++ b/demo/worker.js @@ -34,7 +34,7 @@ self.onmessage = async function (e) { postMessage(message); } - + if (type === 'cancel' && mining) { console.log('Mining cancellation requested.'); miningCancelled = true; @@ -42,9 +42,9 @@ self.onmessage = async function (e) { else if (type === 'init') { initWasm(); } else if (type === 'mine' && !mining) { - miningCancelled = false; // Reset cancellation flag - mining = true; try { + miningCancelled = false; // Reset cancellation flag + mining = true; const startNonce = BigInt(workerId); const nonceStep = BigInt(totalWorkers); diff --git a/src/lib.rs b/src/lib.rs index 57e9c7e..d3c378e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,6 @@ +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; + use serde::{Deserialize, Serialize}; use serde_json::to_string; @@ -83,14 +86,14 @@ pub fn main_js() { } #[wasm_bindgen] -pub fn mine_event( +pub fn mine_event_wrapper( event_json: &str, difficulty: u32, start_nonce_str: &str, nonce_step_str: &str, - report_progress: JsValue, - should_cancel: JsValue, -) -> JsValue { + report_progress: &Function, + should_cancel: &Function, +) -> Result { console::log_1(&format!("event_json: {}", event_json).into()); let mut event: NostrEvent = match serde_json::from_str(event_json) { Ok(e) => e, @@ -108,32 +111,27 @@ pub fn mine_event( event.created_at = Some(current_timestamp); } - let mut nonce_index = None; - for (i, tag) in event.tags.iter().enumerate() { - if tag.len() > 0 && tag[0] == "nonce" { - nonce_index = Some(i); - break; - } - } - if nonce_index.is_none() { - event.tags.push(vec![ - "nonce".to_string(), - "0".to_string(), - difficulty.to_string(), - ]); - nonce_index = Some(event.tags.len() - 1); - } + // Initialize atomic cancellation flag + let cancellation_flag = Arc::new(AtomicBool::new(false)); - let report_progress = match report_progress.dyn_into::() { - Ok(func) => func, - Err(_) => { - console::log_1(&"Failed to convert report_progress to Function".into()); - return serde_wasm_bindgen::to_value(&serde_json::json!({ - "error": "Invalid progress callback." - })) - .unwrap_or(JsValue::NULL); - } - }; + // Clone the flag for the cancellation callback + let cancellation_flag_clone = Arc::clone(&cancellation_flag); + let should_cancel_callback = Closure::wrap(Box::new(move || -> bool { + cancellation_flag_clone.load(Ordering::Relaxed) + }) as Box bool>); + + // Update the cancellation callback + should_cancel.dyn_into::() + .unwrap() + .call0(&JsValue::NULL) + .unwrap_or(JsValue::FALSE); + + // Wrap the cancellation callback + let should_cancel_func = should_cancel_callback.as_ref().unchecked_ref(); + should_cancel_callback.forget(); + + // Initialize report_progress callback + let report_progress_func = report_progress.dyn_into::().unwrap(); let start_time = js_sys::Date::now(); let start_nonce: u64 = start_nonce_str.parse().unwrap_or(0); @@ -144,13 +142,22 @@ pub fn mine_event( let report_interval = 200_000; let mut last_report_time = start_time; - let should_cancel = should_cancel.dyn_into::().ok(); let mut best_pow: u32 = 0; let mut best_nonce: u64 = 0; let mut best_hash_bytes: Vec = Vec::new(); loop { + // Check cancellation flag + if cancellation_flag.load(Ordering::Relaxed) { + console::log_1(&"Mining cancelled.".into()); + return serde_wasm_bindgen::to_value(&serde_json::json!({ + "error": "Mining cancelled." + })) + .unwrap_or(JsValue::NULL); + } + + // Update nonce in the event if let Some(index) = nonce_index { if let Some(tag) = event.tags.get_mut(index) { if tag.len() >= 3 { @@ -160,6 +167,7 @@ pub fn mine_event( } } + // Hash the event let hash_bytes = get_event_hash(&event); if hash_bytes.is_empty() { console::log_1(&"Failed to compute event hash.".into()); @@ -169,8 +177,10 @@ pub fn mine_event( .unwrap_or(JsValue::NULL); } + // Calculate the PoW let pow = get_pow(&hash_bytes); + // Update the best PoW found so far if pow > best_pow { best_pow = pow; best_nonce = nonce; @@ -182,7 +192,7 @@ pub fn mine_event( "hash": hex::encode(&best_hash_bytes), }); - report_progress + report_progress_func .call2( &JsValue::NULL, &JsValue::from_f64(0.0), @@ -196,6 +206,7 @@ pub fn mine_event( }); } + // If the difficulty target is met, return the result if pow >= difficulty { let event_hash = hex::encode(&hash_bytes); event.id = Some(event_hash.clone()); @@ -216,25 +227,13 @@ pub fn mine_event( nonce = nonce.wrapping_add(nonce_step); total_hashes += 1; - if let Some(ref should_cancel) = should_cancel { - if total_hashes % 10_000 == 0 { - let cancel = should_cancel.call0(&JsValue::NULL).unwrap_or(JsValue::FALSE); - if cancel.is_truthy() { - console::log_1(&"Mining cancelled.".into()); - return serde_wasm_bindgen::to_value(&serde_json::json!({ - "error": "Mining cancelled." - })) - .unwrap_or(JsValue::NULL); - } - } - } - + // Report progress at intervals if total_hashes % report_interval == 0 { let current_time = js_sys::Date::now(); let elapsed_time = (current_time - last_report_time) / 1000.0; if elapsed_time > 0.0 { let hash_rate = (report_interval as f64) / elapsed_time; - report_progress + report_progress_func .call2(&JsValue::NULL, &hash_rate.into(), &JsValue::NULL) .unwrap_or_else(|err| { console::log_1( @@ -245,15 +244,9 @@ pub fn mine_event( last_report_time = current_time; } } - - // Uncomment if you wish to log nonce progress - // if nonce % report_interval == 0 { - // console::log_1(&format!("Checked nonce up to: {}", nonce).into()); - // } } } - #[cfg(test)] mod tests { use super::*;