Skip to content

Commit

Permalink
[fix] Memory leak caused by ListenerArrays and PipelineLB while reloa…
Browse files Browse the repository at this point in the history
…ding
  • Loading branch information
pajama-coder committed May 10, 2024
1 parent 44c7696 commit 7c190be
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 7 deletions.
9 changes: 7 additions & 2 deletions src/listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ bool Port::increase_num_connections() {
}

bool Port::decrease_num_connections() {
auto n = m_num_connections.fetch_sub(1);
auto max = m_max_connections.load();
if (max >= 0) {
auto n = m_num_connections.fetch_sub(1);
if (n == max) wake_up_listeners();
return n <= max;
} else {
Expand Down Expand Up @@ -665,13 +665,18 @@ void ListenerArray::apply(Worker *worker, PipelineLayout *layout) {
if (m_worker) {
throw std::runtime_error("ListenerArray is being listened already");
}
m_worker = worker;
m_pipeline_layout = layout;
m_worker = worker;
m_worker->add_listener_array(this);
for (const auto &p : m_listeners) {
worker->add_listener(p.first, layout, p.second);
}
}

void ListenerArray::close() {
m_pipeline_layout = nullptr;
}

void ListenerArray::get_ip_port(const std::string &ip_port, std::string &ip, int &port) {
if (!utils::get_host_port(ip_port, ip, port)) {
std::string msg("invalid 'ip:port' form: ");
Expand Down
1 change: 1 addition & 0 deletions src/listener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ class ListenerArray : public pjs::ObjectTemplate<ListenerArray> {
auto remove_listener(pjs::Str *port, pjs::Object *options = nullptr) -> Listener*;
void set_listeners(pjs::Array *array);
void apply(Worker *worker, PipelineLayout *layout);
void close();

private:
ListenerArray(pjs::Object *options = nullptr)
Expand Down
14 changes: 9 additions & 5 deletions src/worker-thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ bool WorkerManager::start(int concurrency, bool force) {
}

auto WorkerManager::status() -> Status& {
if (!m_querying_status && !m_reloading) {
if (!m_querying_status && !m_reloading && !m_stopping) {
m_querying_status = true;

if (auto n = m_worker_threads.size()) {
Expand Down Expand Up @@ -608,7 +608,7 @@ auto WorkerManager::status() -> Status& {
}

bool WorkerManager::status(const std::function<void(Status&)> &cb) {
if (m_querying_status || m_reloading) return false;
if (m_querying_status || m_reloading || m_stopping) return false;
if (m_worker_threads.empty()) return false;

m_querying_status = true;
Expand Down Expand Up @@ -642,7 +642,7 @@ bool WorkerManager::status(const std::function<void(Status&)> &cb) {
}

auto WorkerManager::stats() -> stats::MetricDataSum& {
if (!m_querying_stats && !m_reloading) {
if (!m_querying_stats && !m_reloading && !m_stopping) {
m_querying_stats = true;

if (auto n = m_worker_threads.size()) {
Expand Down Expand Up @@ -678,7 +678,7 @@ auto WorkerManager::stats() -> stats::MetricDataSum& {
}

bool WorkerManager::stats(const std::function<void(stats::MetricDataSum&)> &cb) {
if (m_querying_stats || m_reloading) return false;
if (m_querying_stats || m_reloading || m_stopping) return false;
if (m_worker_threads.empty()) return false;

m_querying_stats = true;
Expand Down Expand Up @@ -722,6 +722,7 @@ void WorkerManager::recycle() {
}

void WorkerManager::reload() {
if (m_stopping) return;
if (m_reloading || m_querying_status || m_querying_stats || !m_admin_requests.empty()) {
m_reloading_requested = true;
} else {
Expand All @@ -730,7 +731,7 @@ void WorkerManager::reload() {
}

bool WorkerManager::admin(pjs::Str *path, const Data &request, const std::function<void(const Data *)> &respond) {
if (m_reloading) return false;
if (m_reloading || m_stopping) return false;
if (m_worker_threads.empty()) return false;
new AdminRequest(this, path, request, respond);
next_admin_request();
Expand Down Expand Up @@ -782,6 +783,9 @@ void WorkerManager::start_reloading() {

bool WorkerManager::stop(bool force) {
if (m_stopped) return true;
m_stopping = true;
m_loading_pipeline_lb = nullptr;
m_running_pipeline_lb = nullptr;
bool pending = false;
for (auto *wt : m_worker_threads) {
if (wt) {
Expand Down
1 change: 1 addition & 0 deletions src/worker-thread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ class WorkerManager {
bool m_reloading = false;
bool m_querying_status = false;
bool m_querying_stats = false;
bool m_stopping = false;
bool m_stopped = false;
List<AdminRequest> m_admin_requests;
AdminRequest* m_current_admin_request = nullptr;
Expand Down
10 changes: 10 additions & 0 deletions src/worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,10 @@ auto Worker::load_module(pjs::Module *referer, const std::string &path) -> pjs::
return mod;
}

void Worker::add_listener_array(ListenerArray *la) {
m_listener_arrays.push_back(la);
}

void Worker::add_listener(Listener *listener, PipelineLayout *layout, const Listener::Options &options) {
auto &p = m_listeners[listener];
p.pipeline_layout = layout;
Expand Down Expand Up @@ -631,11 +635,17 @@ void Worker::on_exit(Exit *exit) {
void Worker::end_all() {
m_period->end();
if (s_current == this) s_current = nullptr;

for (auto *pt : m_pipeline_templates) pt->shutdown();
for (auto *task : m_tasks) task->end();
for (auto *watch : m_watches) watch->end();
for (auto *exit : m_exits) exit->end();
for (auto *admin : m_admins) admin->end();

for (const auto &la : m_listener_arrays) la->close();
m_listener_arrays.clear();
m_pipeline_lb = nullptr;

if (m_pipeline_templates.empty()) {
for (auto *mod : m_legacy_modules) if (mod) mod->unload();
} else {
Expand Down
2 changes: 2 additions & 0 deletions src/worker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class Worker : public pjs::RefCount<Worker>, public pjs::Instance {
auto load_js_module(const std::string &path, pjs::Value &result) -> JSModule*;
auto load_native_module(const std::string &path) -> nmi::NativeModule*;
auto load_module(pjs::Module *referer, const std::string &path) -> pjs::Module*;
void add_listener_array(ListenerArray *la);
void add_listener(Listener *listener, PipelineLayout *layout, const Listener::Options &options);
void remove_listener(Listener *listener);
bool update_listeners(bool force);
Expand Down Expand Up @@ -170,6 +171,7 @@ class Worker : public pjs::RefCount<Worker>, public pjs::Instance {
std::set<Watch*> m_watches;
std::list<Exit*> m_exits;
std::list<Admin*> m_admins;
std::list<pjs::Ref<ListenerArray>> m_listener_arrays;
std::map<pjs::Ref<pjs::Str>, Namespace> m_namespaces;
std::map<pjs::Ref<pjs::Str>, SolvedFile> m_solved_files;
std::unique_ptr<Signal> m_exit_signal;
Expand Down

0 comments on commit 7c190be

Please sign in to comment.