Skip to content

Commit

Permalink
Merge pull request #1885 from Expensify/jpersaud_log_thread
Browse files Browse the repository at this point in the history
Add log about resource usages on slow threads
  • Loading branch information
tylerkaraszewski authored Oct 8, 2024
2 parents 8bf3a4b + 656aa30 commit 52ea542
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 8 deletions.
11 changes: 6 additions & 5 deletions BedrockServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <libstuff/libstuff.h>
#include <libstuff/SRandom.h>
#include <libstuff/AutoTimer.h>
#include <libstuff/ResourceMonitorThread.h>
#include <PageLockGuard.h>
#include <sqlitecluster/SQLitePeer.h>

Expand Down Expand Up @@ -116,9 +117,9 @@ void BedrockServer::sync()
// our worker threads now. We don't wait until the node is `LEADING` or `FOLLOWING`, as it's state can change while
// it's running, and our workers will have to maintain awareness of that state anyway.
SINFO("Starting " << workerThreads << " worker threads.");
list<thread> workerThreadList;
list<ResourceMonitorThread> workerThreadList;
for (int threadId = 0; threadId < workerThreads; threadId++) {
workerThreadList.emplace_back(&BedrockServer::worker, this, threadId);
workerThreadList.emplace_back([this, threadId](){this->worker(threadId);});
}

// Now we jump into our main command processing loop.
Expand Down Expand Up @@ -1319,7 +1320,7 @@ BedrockServer::BedrockServer(const SData& args_)

// Start the sync thread, which will start the worker threads.
SINFO("Launching sync thread '" << _syncThreadName << "'");
_syncThread = thread(&BedrockServer::syncWrapper, this);
_syncThread = ResourceMonitorThread(&BedrockServer::syncWrapper, this);
}

BedrockServer::~BedrockServer() {
Expand Down Expand Up @@ -1868,7 +1869,7 @@ void BedrockServer::_control(unique_ptr<BedrockCommand>& command) {
if (__quiesceThread) {
response.methodLine = "400 Already Blocked";
} else {
__quiesceThread = new thread([&]() {
__quiesceThread = new ResourceMonitorThread([&]() {
shared_ptr<SQLitePool> dbPoolCopy = _dbPool;
if (dbPoolCopy) {
SQLiteScopedHandle dbScope(*_dbPool, _dbPool->getIndex());
Expand Down Expand Up @@ -2098,7 +2099,7 @@ void BedrockServer::_acceptSockets() {
bool threadStarted = false;
while (!threadStarted) {
try {
t = thread(&BedrockServer::handleSocket, this, move(socket), port == _controlPort, port == _commandPortPublic, port == _commandPortPrivate);
t = ResourceMonitorThread(&BedrockServer::handleSocket, this, move(socket), port == _controlPort, port == _commandPortPublic, port == _commandPortPrivate);
threadStarted = true;
} catch (const system_error& e) {
// We don't care about this lock here from a performance perspective, it only happens when we
Expand Down
26 changes: 26 additions & 0 deletions libstuff/ResourceMonitorThread.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#include "ResourceMonitorThread.h"
#include "libstuff/libstuff.h"
#include <format>
#include <cmath>

thread_local uint64_t ResourceMonitorThread::threadStartTime;
thread_local double ResourceMonitorThread::cpuStartTime;

void ResourceMonitorThread::beforeProcessStart() {
threadStartTime = STimeNow();
cpuStartTime = SGetCPUUserTime();
}

void ResourceMonitorThread::afterProcessFinished() {
const uint64_t threadUserTime = STimeNow() - threadStartTime;
const double cpuUserTime = SGetCPUUserTime() - cpuStartTime;

// This shouldn't happen since the time to start/finish a thread should take more than a microsecond, but to be
// sure we're not dividing by 0 and causing crashes, let's add an if here and return if threadEndTime is 0.
if (threadUserTime == 0) {
return;
}
const double cpuUserPercentage = round((cpuUserTime / static_cast<double>(threadUserTime)) * 100 * 1000) / 1000;
const pid_t tid = syscall(SYS_gettid);
SINFO(format("Thread finished. pID: '{}', CPUTime: '{}µs', CPUPercentage: '{}%'", tid, cpuUserTime, cpuUserPercentage));
}
31 changes: 31 additions & 0 deletions libstuff/ResourceMonitorThread.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#include "libstuff/libstuff.h"
#include <thread>

using namespace std;

// This class is a wrapper around the default thread. We use it to collect the thread CPU usage. That allows us
// to investigate if we have any threads using more resources than it should, which can cause CPU usage peaks in
// the cluster.
class ResourceMonitorThread : public thread
{
public:
// When calling this constructor, if you're passing a class member function as the `f` parameter and that
// function receives parameters, you will need to wrap your function call in a lambda, doing something like:
// ResourceMonitorThread([=, this]{ this->memberFunction(param1, param2);});
template<typename F, typename... Args>
ResourceMonitorThread(F&& f, Args&&... args):
thread(ResourceMonitorThread::wrapper<F&&, Args&&...>, forward<F&&>(f), forward<Args&&>(args)...){};
private:
thread_local static uint64_t threadStartTime;
thread_local static double cpuStartTime;

static void beforeProcessStart();
static void afterProcessFinished();

template<typename F, typename... Args>
static void wrapper(F&& f, Args&&... args) {
beforeProcessStart();
invoke(forward<F>(f), forward<Args>(args)...);
afterProcessFinished();
}
};
9 changes: 9 additions & 0 deletions libstuff/libstuff.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
// C library
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/resource.h>
#include <sys/socket.h>
#include <sys/syscall.h>
#include <execinfo.h>
#include <sys/un.h>
#include <cxxabi.h>
Expand Down Expand Up @@ -3201,3 +3203,10 @@ SString& SString::operator=(const bool from) {
return *this;
}

double SGetCPUUserTime() {
struct rusage usage;
getrusage(RUSAGE_THREAD, &usage);

// Returns the current threads CPU user time in microseconds
return static_cast<double>(usage.ru_utime.tv_sec) * 1e6 + static_cast<double>(usage.ru_utime.tv_usec);
}
3 changes: 3 additions & 0 deletions libstuff/libstuff.h
Original file line number Diff line number Diff line change
Expand Up @@ -629,4 +629,7 @@ string SGUnzip(const string& content);
// Command-line helpers
STable SParseCommandLine(int argc, char* argv[]);

// Returns the CPU usage inside the current thread
double SGetCPUUserTime();

#endif // LIBSTUFF_H
3 changes: 2 additions & 1 deletion sqlitecluster/SQLiteClusterMessenger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include <sqlitecluster/SQLiteClusterMessenger.h>
#include <sqlitecluster/SQLiteNode.h>
#include <sqlitecluster/SQLitePeer.h>
#include <libstuff/ResourceMonitorThread.h>

#include <unistd.h>
#include <fcntl.h>
Expand Down Expand Up @@ -69,7 +70,7 @@ SQLiteClusterMessenger::WaitForReadyResult SQLiteClusterMessenger::waitForReady(
}

vector<SData> SQLiteClusterMessenger::runOnAll(const SData& cmd) {
list<thread> threads;
list<ResourceMonitorThread> threads;
const list<STable> peerInfo = _node->getPeerInfo();
vector<SData> results(peerInfo.size());
atomic<size_t> index = 0;
Expand Down
5 changes: 3 additions & 2 deletions sqlitecluster/SQLiteNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include <libstuff/AutoScopeOnPrepare.h>
#include <libstuff/libstuff.h>
#include <libstuff/ResourceMonitorThread.h>
#include <libstuff/SRandom.h>
#include <libstuff/SQResult.h>
#include <sqlitecluster/SQLiteCommand.h>
Expand Down Expand Up @@ -1485,7 +1486,7 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) {
} else {
_pendingSynchronizeResponses++;
static atomic<size_t> synchronizeCount(0);
thread([message, peer, currentSynchronizeCount = synchronizeCount++, this] () {
ResourceMonitorThread([message, peer, currentSynchronizeCount = synchronizeCount++, this] () {
SInitialize("synchronize" + to_string(currentSynchronizeCount));
SData response("SYNCHRONIZE_RESPONSE");
SQLiteScopedHandle dbScope(*_dbPool, _dbPool->getIndex());
Expand Down Expand Up @@ -1648,7 +1649,7 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) {
SDEBUG("Spawning concurrent replicate thread (blocks until DB handle available): " << threadID);
try {
uint64_t threadAttemptStartTimestamp = STimeNow();
thread(&SQLiteNode::_replicate, this, peer, message, _dbPool->getIndex(false), threadAttemptStartTimestamp).detach();
ResourceMonitorThread([=, this](){this->_replicate(peer, message, _dbPool->getIndex(false), threadAttemptStartTimestamp);}).detach();
} catch (const system_error& e) {
// If the server is strugling and falling behind on replication, we might have too many threads
// causing a resource exhaustion. If that happens, all the transactions that are already threaded
Expand Down

0 comments on commit 52ea542

Please sign in to comment.