Skip to content

Commit

Permalink
[core] Add Configuration.exit() for pipelines being run as process exits
Browse files Browse the repository at this point in the history
  • Loading branch information
pajama-coder committed Jul 19, 2023
1 parent b6a80ef commit 4b9148c
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 16 deletions.
50 changes: 50 additions & 0 deletions src/api/configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,14 @@ void Configuration::watch(const std::string &filename) {
FilterConfigurator::set_pipeline_config(&config);
}

void Configuration::exit() {
check_integrity();
m_exits.emplace_back();
auto &config = m_exits.back();
config.index = next_pipeline_index();
FilterConfigurator::set_pipeline_config(&config);
}

void Configuration::pipeline(const std::string &name) {
check_integrity();
if (name.empty()) throw std::runtime_error("pipeline name cannot be empty");
Expand Down Expand Up @@ -739,6 +747,19 @@ void Configuration::apply(JSModule *mod) {
auto w = Watch::make(i.filename, p);
worker->add_watch(w);
}

if (m_exits.size() == 1) {
auto &exit = m_exits.front();
auto p = make_pipeline(exit.index, "", "Exit", exit);
worker->add_exit(p);
} else {
int n = 1;
for (auto &i : m_exits) {
std::string name("Exit ");
auto p = make_pipeline(i.index, "", name + std::to_string(n++), i);
worker->add_exit(p);
}
}
}

void Configuration::draw(Graph &g) {
Expand Down Expand Up @@ -804,6 +825,25 @@ void Configuration::draw(Graph &g) {
add_filters(p, i.filters);
g.add_pipeline(std::move(p));
}

if (m_exits.size() == 1) {
auto &exit = m_exits.front();
Graph::Pipeline p;
p.index = exit.index;
p.label = "Exit";
add_filters(p, exit.filters);
g.add_pipeline(std::move(p));
} else {
int n = 1;
for (const auto &i : m_exits) {
Graph::Pipeline p;
p.index = i.index;
p.label = "Exit ";
p.label += std::to_string(n++);
add_filters(p, i.filters);
g.add_pipeline(std::move(p));
}
}
}

auto Configuration::new_indexed_pipeline(const std::string &name, int &index) -> FilterConfigurator* {
Expand Down Expand Up @@ -2256,6 +2296,16 @@ template<> void ClassDef<Configuration>::init() {
ctx.error(err);
}
});

// Configuration.exit
method("exit", [](Context &ctx, Object *thiz, Value &result) {
try {
thiz->as<Configuration>()->exit();
result.set(thiz);
} catch (std::runtime_error &err) {
ctx.error(err);
}
});
}

} // namespace pjs
4 changes: 4 additions & 0 deletions src/api/configuration.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ class Configuration : public pjs::ObjectTemplate<Configuration, FilterConfigurat
void listen(ListenerArray *listeners, pjs::Object *options);
void task(const std::string &when);
void watch(const std::string &filename);
void exit();
void pipeline(const std::string &name);
void pipeline();

Expand Down Expand Up @@ -223,6 +224,8 @@ class Configuration : public pjs::ObjectTemplate<Configuration, FilterConfigurat
std::string filename;
};

struct ExitConfig : public PipelineConfig {};

struct NamedPipelineConfig : public PipelineConfig {
std::string name;
};
Expand All @@ -234,6 +237,7 @@ class Configuration : public pjs::ObjectTemplate<Configuration, FilterConfigurat
std::list<ListenConfig> m_listens;
std::list<TaskConfig> m_tasks;
std::list<WatchConfig> m_watches;
std::list<ExitConfig> m_exits;
std::list<NamedPipelineConfig> m_named_pipelines;
std::map<int, NamedPipelineConfig> m_indexed_pipelines;
std::unique_ptr<PipelineConfig> m_entrance_pipeline;
Expand Down
15 changes: 6 additions & 9 deletions src/worker-thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ void WorkerThread::reload_done(bool ok) {
if (m_new_worker) {
pjs::Ref<Worker> current_worker = Worker::current();
m_new_worker->start(true);
current_worker->stop();
current_worker->stop(true);
m_new_worker = nullptr;
m_version = m_new_version;
m_working = true;
Expand All @@ -169,7 +169,7 @@ void WorkerThread::reload_done(bool ok) {
m_net->post(
[this]() {
if (m_new_worker) {
m_new_worker->stop();
m_new_worker->stop(true);
m_new_worker = nullptr;
m_new_version.clear();
Log::error("[restart] Failed reloading codebase %d", m_index);
Expand All @@ -185,7 +185,7 @@ bool WorkerThread::stop(bool force) {
[this]() {
m_shutdown = true;
m_new_worker = nullptr;
shutdown_all();
shutdown_all(true);
Net::current().stop();
}
);
Expand All @@ -198,7 +198,7 @@ bool WorkerThread::stop(bool force) {
m_net->post(
[this]() {
m_new_worker = nullptr;
shutdown_all();
shutdown_all(false);
}
);
}
Expand Down Expand Up @@ -347,8 +347,8 @@ void WorkerThread::init_metrics() {
);
}

void WorkerThread::shutdown_all() {
if (auto worker = Worker::current()) worker->stop();
void WorkerThread::shutdown_all(bool force) {
if (auto worker = Worker::current()) worker->stop(force);
Listener::for_each([&](Listener *l) { l->pipeline_layout(nullptr); });
}

Expand Down Expand Up @@ -404,9 +404,6 @@ void WorkerThread::main() {
}

Log::info("[start] Thread %d ended", m_index);

} else {
Log::error("[start] Thread %d failed to start", m_index);
}

Log::shutdown();
Expand Down
3 changes: 2 additions & 1 deletion src/worker-thread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class WorkerThread {
void recycle();
void reload(const std::function<void(bool)> &cb);
void reload_done(bool ok);
void exit(const std::function<void()> &cb);
bool stop(bool force = false);

private:
Expand All @@ -92,7 +93,7 @@ class WorkerThread {
bool m_failed = false;

static void init_metrics();
static void shutdown_all();
static void shutdown_all(bool force);

void main();

Expand Down
75 changes: 70 additions & 5 deletions src/worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,11 @@ void Worker::add_watch(Watch *watch) {
m_watches.insert(watch);
}

void Worker::add_exit(PipelineLayout *layout) {
m_exits.emplace_back();
m_exits.back() = new Exit(this, layout);
}

void Worker::add_export(pjs::Str *ns, pjs::Str *name, Module *module) {
auto &names = m_namespaces[ns];
auto i = names.find(name);
Expand Down Expand Up @@ -434,11 +439,15 @@ bool Worker::start(bool force) {
return true;
}

void Worker::stop() {
for (auto *task : m_tasks) task->end();
for (auto *watch : m_watches) watch->end();
for (auto *mod : m_modules) if (mod) mod->unload();
if (s_current == this) s_current = nullptr;
void Worker::stop(bool force) {
if (force || m_exits.empty()) {
end_all();
} else {
keep_alive();
for (auto *exit : m_exits) {
exit->start();
}
}
}

auto Worker::new_module_index() -> int {
Expand All @@ -461,4 +470,60 @@ void Worker::remove_module(int i) {
m_module_map.erase(mod->filename()->str());
}

void Worker::keep_alive() {
m_keep_alive.schedule(
1, [this]() {
keep_alive();
}
);
}

void Worker::on_exit(Exit *exit) {
bool done = true;
for (auto *exit : m_exits) {
if (!exit->done()) {
done = false;
break;
}
}
if (done) {
m_keep_alive.cancel();
end_all();
}
}

void Worker::end_all() {
for (auto *task : m_tasks) task->end();
for (auto *watch : m_watches) watch->end();
for (auto *exit : m_exits) exit->end();
for (auto *mod : m_modules) if (mod) mod->unload();
if (s_current == this) s_current = nullptr;
}

//
// Worker::Exit
//

void Worker::Exit::start() {
InputContext ic;
m_stream_end = false;
m_pipeline = Pipeline::make(
m_pipeline_layout,
m_pipeline_layout->new_context()
);
m_pipeline->chain(EventTarget::input());
m_pipeline->start();
}

void Worker::Exit::end() {
delete this;
}

void Worker::Exit::on_event(Event *evt) {
if (evt->is<StreamEnd>()) {
m_stream_end = true;
m_worker->on_exit(this);
}
}

} // namespace pipy
25 changes: 24 additions & 1 deletion src/worker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#include "context.hpp"
#include "listener.hpp"
#include "timer.hpp"
#include "nmi.hpp"

#include <list>
Expand Down Expand Up @@ -67,14 +68,15 @@ class Worker : public pjs::RefCount<Worker> {
bool update_listeners(bool force);
void add_task(Task *task);
void add_watch(Watch *watch);
void add_exit(PipelineLayout *layout);
void add_export(pjs::Str *ns, pjs::Str *name, Module *module);
auto get_export(pjs::Str *ns, pjs::Str *name) -> int;
auto new_loading_context() -> Context*;
auto new_runtime_context(Context *base = nullptr) -> Context*;
bool solve(pjs::Context &ctx, pjs::Str *filename, pjs::Value &result);
bool bind();
bool start(bool force);
void stop();
void stop(bool force);

private:
Worker(bool is_graph_enabled);
Expand All @@ -88,6 +90,22 @@ class Worker : public pjs::RefCount<Worker> {
Listener::Options options;
};

class Exit : public EventTarget {
public:
Exit(Worker *worker, PipelineLayout *pipeline_layout)
: m_worker(worker)
, m_pipeline_layout(pipeline_layout) {}
bool done() const { return m_stream_end; }
void start();
void end();
virtual void on_event(Event *evt) override;
private:
Worker* m_worker;
pjs::Ref<PipelineLayout> m_pipeline_layout;
pjs::Ref<Pipeline> m_pipeline;
bool m_stream_end = false;
};

struct SolvedFile {
int index;
pjs::Ref<pjs::Str> filename;
Expand All @@ -98,6 +116,7 @@ class Worker : public pjs::RefCount<Worker> {
};

Module* m_root = nullptr;
Timer m_keep_alive;
pjs::Ref<Thread> m_thread;
pjs::Ref<pjs::Object> m_global_object;
std::vector<Module*> m_modules;
Expand All @@ -106,13 +125,17 @@ class Worker : public pjs::RefCount<Worker> {
std::map<Listener*, ListeningPipeline> m_listeners;
std::set<Task*> m_tasks;
std::set<Watch*> m_watches;
std::list<Exit*> m_exits;
std::map<pjs::Ref<pjs::Str>, Namespace> m_namespaces;
std::map<pjs::Ref<pjs::Str>, SolvedFile> m_solved_files;
bool m_graph_enabled = false;

auto new_module_index() -> int;
void add_module(Module *m);
void remove_module(int i);
void keep_alive();
void on_exit(Exit *exit);
void end_all();

thread_local static pjs::Ref<Worker> s_current;

Expand Down

0 comments on commit 4b9148c

Please sign in to comment.