From 4b9148c2b6338623bc198d947e951f9a86f7089e Mon Sep 17 00:00:00 2001 From: pajama-coder Date: Wed, 19 Jul 2023 22:54:01 +0800 Subject: [PATCH] [core] Add Configuration.exit() for pipelines being run as process exits --- src/api/configuration.cpp | 50 ++++++++++++++++++++++++++ src/api/configuration.hpp | 4 +++ src/worker-thread.cpp | 15 ++++---- src/worker-thread.hpp | 3 +- src/worker.cpp | 75 ++++++++++++++++++++++++++++++++++++--- src/worker.hpp | 25 ++++++++++++- 6 files changed, 156 insertions(+), 16 deletions(-) diff --git a/src/api/configuration.cpp b/src/api/configuration.cpp index f3f471672..0cbecbff9 100644 --- a/src/api/configuration.cpp +++ b/src/api/configuration.cpp @@ -620,6 +620,14 @@ void Configuration::watch(const std::string &filename) { FilterConfigurator::set_pipeline_config(&config); } +void Configuration::exit() { + check_integrity(); + m_exits.emplace_back(); + auto &config = m_exits.back(); + config.index = next_pipeline_index(); + FilterConfigurator::set_pipeline_config(&config); +} + void Configuration::pipeline(const std::string &name) { check_integrity(); if (name.empty()) throw std::runtime_error("pipeline name cannot be empty"); @@ -739,6 +747,19 @@ void Configuration::apply(JSModule *mod) { auto w = Watch::make(i.filename, p); worker->add_watch(w); } + + if (m_exits.size() == 1) { + auto &exit = m_exits.front(); + auto p = make_pipeline(exit.index, "", "Exit", exit); + worker->add_exit(p); + } else { + int n = 1; + for (auto &i : m_exits) { + std::string name("Exit "); + auto p = make_pipeline(i.index, "", name + std::to_string(n++), i); + worker->add_exit(p); + } + } } void Configuration::draw(Graph &g) { @@ -804,6 +825,25 @@ void Configuration::draw(Graph &g) { add_filters(p, i.filters); g.add_pipeline(std::move(p)); } + + if (m_exits.size() == 1) { + auto &exit = m_exits.front(); + Graph::Pipeline p; + p.index = exit.index; + p.label = "Exit"; + add_filters(p, exit.filters); + g.add_pipeline(std::move(p)); + } else { + int n = 1; + for (const auto &i : m_exits) { + Graph::Pipeline p; + p.index = i.index; + p.label = "Exit "; + p.label += std::to_string(n++); + add_filters(p, i.filters); + g.add_pipeline(std::move(p)); + } + } } auto Configuration::new_indexed_pipeline(const std::string &name, int &index) -> FilterConfigurator* { @@ -2256,6 +2296,16 @@ template<> void ClassDef::init() { ctx.error(err); } }); + + // Configuration.exit + method("exit", [](Context &ctx, Object *thiz, Value &result) { + try { + thiz->as()->exit(); + result.set(thiz); + } catch (std::runtime_error &err) { + ctx.error(err); + } + }); } } // namespace pjs diff --git a/src/api/configuration.hpp b/src/api/configuration.hpp index 1f9abf0d8..50a43e0d2 100644 --- a/src/api/configuration.hpp +++ b/src/api/configuration.hpp @@ -196,6 +196,7 @@ class Configuration : public pjs::ObjectTemplate m_listens; std::list m_tasks; std::list m_watches; + std::list m_exits; std::list m_named_pipelines; std::map m_indexed_pipelines; std::unique_ptr m_entrance_pipeline; diff --git a/src/worker-thread.cpp b/src/worker-thread.cpp index 4c663978e..39fbb88e7 100644 --- a/src/worker-thread.cpp +++ b/src/worker-thread.cpp @@ -157,7 +157,7 @@ void WorkerThread::reload_done(bool ok) { if (m_new_worker) { pjs::Ref current_worker = Worker::current(); m_new_worker->start(true); - current_worker->stop(); + current_worker->stop(true); m_new_worker = nullptr; m_version = m_new_version; m_working = true; @@ -169,7 +169,7 @@ void WorkerThread::reload_done(bool ok) { m_net->post( [this]() { if (m_new_worker) { - m_new_worker->stop(); + m_new_worker->stop(true); m_new_worker = nullptr; m_new_version.clear(); Log::error("[restart] Failed reloading codebase %d", m_index); @@ -185,7 +185,7 @@ bool WorkerThread::stop(bool force) { [this]() { m_shutdown = true; m_new_worker = nullptr; - shutdown_all(); + shutdown_all(true); Net::current().stop(); } ); @@ -198,7 +198,7 @@ bool WorkerThread::stop(bool force) { m_net->post( [this]() { m_new_worker = nullptr; - shutdown_all(); + shutdown_all(false); } ); } @@ -347,8 +347,8 @@ void WorkerThread::init_metrics() { ); } -void WorkerThread::shutdown_all() { - if (auto worker = Worker::current()) worker->stop(); +void WorkerThread::shutdown_all(bool force) { + if (auto worker = Worker::current()) worker->stop(force); Listener::for_each([&](Listener *l) { l->pipeline_layout(nullptr); }); } @@ -404,9 +404,6 @@ void WorkerThread::main() { } Log::info("[start] Thread %d ended", m_index); - - } else { - Log::error("[start] Thread %d failed to start", m_index); } Log::shutdown(); diff --git a/src/worker-thread.hpp b/src/worker-thread.hpp index a544d67ce..c8c545223 100644 --- a/src/worker-thread.hpp +++ b/src/worker-thread.hpp @@ -66,6 +66,7 @@ class WorkerThread { void recycle(); void reload(const std::function &cb); void reload_done(bool ok); + void exit(const std::function &cb); bool stop(bool force = false); private: @@ -92,7 +93,7 @@ class WorkerThread { bool m_failed = false; static void init_metrics(); - static void shutdown_all(); + static void shutdown_all(bool force); void main(); diff --git a/src/worker.cpp b/src/worker.cpp index 06918cc0b..55d7860e1 100644 --- a/src/worker.cpp +++ b/src/worker.cpp @@ -299,6 +299,11 @@ void Worker::add_watch(Watch *watch) { m_watches.insert(watch); } +void Worker::add_exit(PipelineLayout *layout) { + m_exits.emplace_back(); + m_exits.back() = new Exit(this, layout); +} + void Worker::add_export(pjs::Str *ns, pjs::Str *name, Module *module) { auto &names = m_namespaces[ns]; auto i = names.find(name); @@ -434,11 +439,15 @@ bool Worker::start(bool force) { return true; } -void Worker::stop() { - for (auto *task : m_tasks) task->end(); - for (auto *watch : m_watches) watch->end(); - for (auto *mod : m_modules) if (mod) mod->unload(); - if (s_current == this) s_current = nullptr; +void Worker::stop(bool force) { + if (force || m_exits.empty()) { + end_all(); + } else { + keep_alive(); + for (auto *exit : m_exits) { + exit->start(); + } + } } auto Worker::new_module_index() -> int { @@ -461,4 +470,60 @@ void Worker::remove_module(int i) { m_module_map.erase(mod->filename()->str()); } +void Worker::keep_alive() { + m_keep_alive.schedule( + 1, [this]() { + keep_alive(); + } + ); +} + +void Worker::on_exit(Exit *exit) { + bool done = true; + for (auto *exit : m_exits) { + if (!exit->done()) { + done = false; + break; + } + } + if (done) { + m_keep_alive.cancel(); + end_all(); + } +} + +void Worker::end_all() { + for (auto *task : m_tasks) task->end(); + for (auto *watch : m_watches) watch->end(); + for (auto *exit : m_exits) exit->end(); + for (auto *mod : m_modules) if (mod) mod->unload(); + if (s_current == this) s_current = nullptr; +} + +// +// Worker::Exit +// + +void Worker::Exit::start() { + InputContext ic; + m_stream_end = false; + m_pipeline = Pipeline::make( + m_pipeline_layout, + m_pipeline_layout->new_context() + ); + m_pipeline->chain(EventTarget::input()); + m_pipeline->start(); +} + +void Worker::Exit::end() { + delete this; +} + +void Worker::Exit::on_event(Event *evt) { + if (evt->is()) { + m_stream_end = true; + m_worker->on_exit(this); + } +} + } // namespace pipy diff --git a/src/worker.hpp b/src/worker.hpp index e0e514c59..6bf505eb6 100644 --- a/src/worker.hpp +++ b/src/worker.hpp @@ -28,6 +28,7 @@ #include "context.hpp" #include "listener.hpp" +#include "timer.hpp" #include "nmi.hpp" #include @@ -67,6 +68,7 @@ class Worker : public pjs::RefCount { bool update_listeners(bool force); void add_task(Task *task); void add_watch(Watch *watch); + void add_exit(PipelineLayout *layout); void add_export(pjs::Str *ns, pjs::Str *name, Module *module); auto get_export(pjs::Str *ns, pjs::Str *name) -> int; auto new_loading_context() -> Context*; @@ -74,7 +76,7 @@ class Worker : public pjs::RefCount { bool solve(pjs::Context &ctx, pjs::Str *filename, pjs::Value &result); bool bind(); bool start(bool force); - void stop(); + void stop(bool force); private: Worker(bool is_graph_enabled); @@ -88,6 +90,22 @@ class Worker : public pjs::RefCount { Listener::Options options; }; + class Exit : public EventTarget { + public: + Exit(Worker *worker, PipelineLayout *pipeline_layout) + : m_worker(worker) + , m_pipeline_layout(pipeline_layout) {} + bool done() const { return m_stream_end; } + void start(); + void end(); + virtual void on_event(Event *evt) override; + private: + Worker* m_worker; + pjs::Ref m_pipeline_layout; + pjs::Ref m_pipeline; + bool m_stream_end = false; + }; + struct SolvedFile { int index; pjs::Ref filename; @@ -98,6 +116,7 @@ class Worker : public pjs::RefCount { }; Module* m_root = nullptr; + Timer m_keep_alive; pjs::Ref m_thread; pjs::Ref m_global_object; std::vector m_modules; @@ -106,6 +125,7 @@ class Worker : public pjs::RefCount { std::map m_listeners; std::set m_tasks; std::set m_watches; + std::list m_exits; std::map, Namespace> m_namespaces; std::map, SolvedFile> m_solved_files; bool m_graph_enabled = false; @@ -113,6 +133,9 @@ class Worker : public pjs::RefCount { auto new_module_index() -> int; void add_module(Module *m); void remove_module(int i); + void keep_alive(); + void on_exit(Exit *exit); + void end_all(); thread_local static pjs::Ref s_current;