Skip to content

Commit

Permalink
fix: if there is an issue on spawn, emit an error event
Browse files Browse the repository at this point in the history
  • Loading branch information
cablehead committed Sep 17, 2024
1 parent 0da4370 commit cd41672
Showing 1 changed file with 59 additions and 33 deletions.
92 changes: 59 additions & 33 deletions src/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ responds to events in realtime.
When it sees one it spawns a generator:
- store engine, closure, runs in its own thread, so no thread pool
- emits an <topic>.spawn.err event if bad meta data
- emits an <topic>.spawn.error event if bad meta data
- emits a topic.start event {generator_id: id}
- on stop emits a stop event: meta reason
- restarts until terminated or replaced
- generates topic.recv for each Value::String on pipeline: {generator_id: id}
- topic.err
- topic.error
If it sees an a spawn for an existing generator: it stops the current running generator, and starts
a new one: so all events generated are now linked to the new id.
Expand All @@ -44,6 +44,41 @@ struct GeneratorTask {
expression: String,
}

async fn handle_spawn_event(
topic: &str,
frame: Frame,
generators: &mut HashMap<String, GeneratorTask>,
engine: nu::Engine,
store: Store,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let meta = frame
.meta
.clone()
.and_then(|meta| serde_json::from_value::<GeneratorMeta>(meta).ok())
.unwrap_or_default();

if generators.contains_key(topic) {
return Err("Updating existing generator is not implemented".into());
}

let hash = frame.hash.clone().ok_or("Missing hash")?;
let reader = store.cas_reader(hash).await?;
let mut expression = String::new();
reader.compat().read_to_string(&mut expression).await?;

let task = GeneratorTask {
id: frame.id,
topic: topic.to_string(),
meta: meta.clone(),
expression: expression.clone(),
};

generators.insert(topic.to_string(), task.clone());

spawn(engine.clone(), store.clone(), task).await;
Ok(())
}

pub async fn serve(
store: Store,
engine: nu::Engine,
Expand All @@ -53,8 +88,9 @@ pub async fn serve(
.compaction_strategy(|frame| {
frame
.topic
.strip_suffix(".spawn")
.map(|prefix| prefix.to_string())
.strip_suffix(".spawn.error")
.or_else(|| frame.topic.strip_suffix(".spawn"))
.map(String::from)
})
.build();
let mut recver = store.read(options).await;
Expand All @@ -76,39 +112,29 @@ pub async fn serve(
continue;
}

if let Some(topic) = frame.topic.strip_suffix(".spawn") {
let meta = frame
.meta
.clone()
.and_then(|meta| serde_json::from_value::<GeneratorMeta>(meta).ok())
.unwrap_or_else(GeneratorMeta::default);
if let Some(topic) = frame.topic.clone().strip_suffix(".spawn") {
if let Err(e) = handle_spawn_event(
topic,
frame.clone(),
&mut generators,
engine.clone(),
store.clone(),
)
.await
{
let mut store = store.clone();
let meta = serde_json::json!({
"source_id": frame.id.to_string(),
"reason": e.to_string()
});

if generators.contains_key(topic) {
panic!("TODO: handle updating existing generator");
let _ = store
.append(&format!("{}.spawn.error", topic), None, Some(meta))
.await;
}

// TODO: emit a .err event on any of these unwraps
let hash = frame.hash.clone().unwrap();
let reader = store.cas_reader(hash).await.unwrap();
let mut expression = String::new();
reader
.compat()
.read_to_string(&mut expression)
.await
.unwrap();

let task = GeneratorTask {
id: frame.id,
topic: topic.to_string(),
meta: meta.clone(),
expression: expression.clone(),
};

generators.insert(topic.to_string(), task.clone());

spawn(engine.clone(), store.clone(), task).await;
}
}

Ok(())
}

Expand Down

0 comments on commit cd41672

Please sign in to comment.