Skip to content

Commit

Permalink
fix cancellation, concurrency, race conditions and remve limits
Browse files Browse the repository at this point in the history
  • Loading branch information
dskvr committed Sep 14, 2024
1 parent 2147dbe commit d1e4103
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 67 deletions.
75 changes: 62 additions & 13 deletions demo/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ const neventOutput = document.getElementById('neventOutput');


numberOfWorkers.value = totalWorkers;
numberOfWorkers.max = navigator.hardwareConcurrency || 3;
// numberOfWorkers.max = navigator.hardwareConcurrency || 3;

minersBestPowOutput.style.display = 'none';
overallBestPowOutput.style.display = 'none';
Expand All @@ -45,7 +45,7 @@ let workerHashRates = {};
let minersBestPow
let overallBestPow

let found = false
let halt = false

function resetBestPow() {
minersBestPow = {};
Expand All @@ -57,18 +57,41 @@ function resetBestPow() {
};
}

for (let i = 0; i < totalWorkers; i++) {
const worker = new Worker('./worker.js', { type: 'module' });
worker.onmessage = handleWorkerMessage;
worker.postMessage({ type: 'init', id: i });
workers.push(worker);
function spawnWorkers(amt=null){
amt = amt? amt : totalWorkers
console.log('Spawning workers...', totalWorkers);
for (let i = 0; i < amt; i++) {
const worker = new Worker('./worker.js', { type: 'module' });
worker.onmessage = handleWorkerMessage;
worker.postMessage({ type: 'init', id: i });
workers.push(worker);
}
}

spawnWorkers()

function disableInputs(){
mineButton.disabled = true;
cancelButton.disabled = true;
numberOfWorkers.disabled = true;
difficultyInput.disabled = true;
eventInput.disabled = true;
}

function enableInputs(){
mineButton.disabled = false;
cancelButton.disabled = true;
numberOfWorkers.disabled = false;
difficultyInput.disabled = false;
eventInput.disabled = false;
}


async function handleWorkerMessage(e) {
const { type, data, error, hashRate, workerId, bestPowData:bestPowDataMap } = e.data;

if (type === 'progress') {
if(found && hashRate > 0) {
if(halt && hashRate > 0) {
return workers[workerId].postMessage({ type: 'cancel' });
}

Expand Down Expand Up @@ -106,8 +129,8 @@ async function handleWorkerMessage(e) {
if (data.error) {
resultOutput.textContent = `Error: ${data.error}\n${JSON.stringify(data, null, 2)}`;
} else {
if(found === false) {
found = true
if(halt === false) {
halt = true
try {
cancelOtherWorkers(workerId);
await publishEvent(data.event);
Expand All @@ -120,10 +143,18 @@ async function handleWorkerMessage(e) {
}
}
hashrateOutput.textContent = '0 H/s';
mineButton.disabled = false;
cancelButton.disabled = true; // Disable the cancel button

isMining = false;
workerHashRates = {}; // Reset hash rates after mining
workerHashRates = {};
mineButton.disabled = false;
cancelButton.disabled = true;
eventInput.value = '';
workers[workerId]?.terminate();
workers = []
spawnWorkers()

} else if (type === 'stopped') {

} else if (type === 'error') {
resultOutput.textContent = `Error: ${error}`;
hashrateOutput.textContent = '0 H/s';
Expand All @@ -138,6 +169,7 @@ function cancelOtherWorkers(excludeWorkerId) {
workers.forEach((worker, index) => {
if (index !== excludeWorkerId) {
worker.postMessage({ type: 'cancel' });
setTimeout( () => worker.terminate(), 1000);
}
});
}
Expand All @@ -156,6 +188,20 @@ function updateBestPowDisplay() {
}
}

numberOfWorkers.addEventListener('change', () => {
disableInputs()
const c = parseInt(totalWorkers, 10)
const n = parseInt(numberOfWorkers.value, 10);
const delta = n - c;
if (delta > 0) {
spawnWorkers(delta)
} else {
const workersToTerminate = workers.splice(n, Math.abs(delta))
workersToTerminate.forEach(worker => worker.terminate())
}
enableInputs()
})

mineButton.addEventListener('click', () => {
if (isMining) return;

Expand Down Expand Up @@ -213,13 +259,16 @@ cancelButton.addEventListener('click', () => {
if (isMining) {
workers.forEach(worker => {
worker.postMessage({ type: 'cancel' });
worker.terminate();
});
workers = []
resultOutput.textContent = 'Mining cancellation requested.';
hashrateOutput.textContent = '0 H/s';
mineButton.disabled = false;
cancelButton.disabled = true; // Disable the cancel button
isMining = false;
workerHashRates = {}; // Reset hash rates after cancellation
spawnWorkers()
}
});

Expand Down
6 changes: 3 additions & 3 deletions demo/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,17 @@ self.onmessage = async function (e) {

postMessage(message);
}

if (type === 'cancel' && mining) {
console.log('Mining cancellation requested.');
miningCancelled = true;
}
else if (type === 'init') {
initWasm();
} else if (type === 'mine' && !mining) {
miningCancelled = false; // Reset cancellation flag
mining = true;
try {
miningCancelled = false; // Reset cancellation flag
mining = true;
const startNonce = BigInt(workerId);
const nonceStep = BigInt(totalWorkers);

Expand Down
95 changes: 44 additions & 51 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use serde::{Deserialize, Serialize};
use serde_json::to_string;

Expand Down Expand Up @@ -83,14 +86,14 @@ pub fn main_js() {
}

#[wasm_bindgen]
pub fn mine_event(
pub fn mine_event_wrapper(
event_json: &str,
difficulty: u32,
start_nonce_str: &str,
nonce_step_str: &str,
report_progress: JsValue,
should_cancel: JsValue,
) -> JsValue {
report_progress: &Function,
should_cancel: &Function,
) -> Result<JsValue, JsValue> {
console::log_1(&format!("event_json: {}", event_json).into());
let mut event: NostrEvent = match serde_json::from_str(event_json) {
Ok(e) => e,
Expand All @@ -108,32 +111,27 @@ pub fn mine_event(
event.created_at = Some(current_timestamp);
}

let mut nonce_index = None;
for (i, tag) in event.tags.iter().enumerate() {
if tag.len() > 0 && tag[0] == "nonce" {
nonce_index = Some(i);
break;
}
}
if nonce_index.is_none() {
event.tags.push(vec![
"nonce".to_string(),
"0".to_string(),
difficulty.to_string(),
]);
nonce_index = Some(event.tags.len() - 1);
}
// Initialize atomic cancellation flag
let cancellation_flag = Arc::new(AtomicBool::new(false));

let report_progress = match report_progress.dyn_into::<Function>() {
Ok(func) => func,
Err(_) => {
console::log_1(&"Failed to convert report_progress to Function".into());
return serde_wasm_bindgen::to_value(&serde_json::json!({
"error": "Invalid progress callback."
}))
.unwrap_or(JsValue::NULL);
}
};
// Clone the flag for the cancellation callback
let cancellation_flag_clone = Arc::clone(&cancellation_flag);
let should_cancel_callback = Closure::wrap(Box::new(move || -> bool {
cancellation_flag_clone.load(Ordering::Relaxed)
}) as Box<dyn FnMut() -> bool>);

// Update the cancellation callback
should_cancel.dyn_into::<Function>()
.unwrap()
.call0(&JsValue::NULL)
.unwrap_or(JsValue::FALSE);

// Wrap the cancellation callback
let should_cancel_func = should_cancel_callback.as_ref().unchecked_ref();
should_cancel_callback.forget();

// Initialize report_progress callback
let report_progress_func = report_progress.dyn_into::<Function>().unwrap();

let start_time = js_sys::Date::now();
let start_nonce: u64 = start_nonce_str.parse().unwrap_or(0);
Expand All @@ -144,13 +142,22 @@ pub fn mine_event(

let report_interval = 200_000;
let mut last_report_time = start_time;
let should_cancel = should_cancel.dyn_into::<Function>().ok();

let mut best_pow: u32 = 0;
let mut best_nonce: u64 = 0;
let mut best_hash_bytes: Vec<u8> = Vec::new();

loop {
// Check cancellation flag
if cancellation_flag.load(Ordering::Relaxed) {
console::log_1(&"Mining cancelled.".into());
return serde_wasm_bindgen::to_value(&serde_json::json!({
"error": "Mining cancelled."
}))
.unwrap_or(JsValue::NULL);
}

// Update nonce in the event
if let Some(index) = nonce_index {

Check failure on line 161 in src/lib.rs

View workflow job for this annotation

GitHub Actions / deploy

cannot find value `nonce_index` in this scope
if let Some(tag) = event.tags.get_mut(index) {
if tag.len() >= 3 {

Check failure on line 163 in src/lib.rs

View workflow job for this annotation

GitHub Actions / deploy

type annotations needed
Expand All @@ -160,6 +167,7 @@ pub fn mine_event(
}
}

// Hash the event
let hash_bytes = get_event_hash(&event);
if hash_bytes.is_empty() {
console::log_1(&"Failed to compute event hash.".into());
Expand All @@ -169,8 +177,10 @@ pub fn mine_event(
.unwrap_or(JsValue::NULL);
}

// Calculate the PoW
let pow = get_pow(&hash_bytes);

// Update the best PoW found so far
if pow > best_pow {
best_pow = pow;
best_nonce = nonce;
Expand All @@ -182,7 +192,7 @@ pub fn mine_event(
"hash": hex::encode(&best_hash_bytes),
});

report_progress
report_progress_func
.call2(
&JsValue::NULL,
&JsValue::from_f64(0.0),
Expand All @@ -196,6 +206,7 @@ pub fn mine_event(
});
}

// If the difficulty target is met, return the result
if pow >= difficulty {
let event_hash = hex::encode(&hash_bytes);
event.id = Some(event_hash.clone());
Expand All @@ -216,25 +227,13 @@ pub fn mine_event(
nonce = nonce.wrapping_add(nonce_step);
total_hashes += 1;

if let Some(ref should_cancel) = should_cancel {
if total_hashes % 10_000 == 0 {
let cancel = should_cancel.call0(&JsValue::NULL).unwrap_or(JsValue::FALSE);
if cancel.is_truthy() {
console::log_1(&"Mining cancelled.".into());
return serde_wasm_bindgen::to_value(&serde_json::json!({
"error": "Mining cancelled."
}))
.unwrap_or(JsValue::NULL);
}
}
}

// Report progress at intervals
if total_hashes % report_interval == 0 {
let current_time = js_sys::Date::now();
let elapsed_time = (current_time - last_report_time) / 1000.0;
if elapsed_time > 0.0 {
let hash_rate = (report_interval as f64) / elapsed_time;
report_progress
report_progress_func
.call2(&JsValue::NULL, &hash_rate.into(), &JsValue::NULL)
.unwrap_or_else(|err| {
console::log_1(
Expand All @@ -245,15 +244,9 @@ pub fn mine_event(
last_report_time = current_time;
}
}

// Uncomment if you wish to log nonce progress
// if nonce % report_interval == 0 {
// console::log_1(&format!("Checked nonce up to: {}", nonce).into());
// }
}
}


#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit d1e4103

Please sign in to comment.