From b6bfa0dfac633e07ff53ddf5cf95a90ff9d5ecdf Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski Date: Fri, 8 Mar 2024 15:06:20 -0800 Subject: [PATCH 1/6] WIP --- BedrockServer.cpp | 18 ++++++++++++++-- BedrockServer.h | 4 ++++ libstuff/SSignal.cpp | 6 +++--- main.cpp | 2 +- test/clustertest/BedrockClusterTester.h | 28 ++++++++++++++++++++++--- 5 files changed, 49 insertions(+), 9 deletions(-) diff --git a/BedrockServer.cpp b/BedrockServer.cpp index 8105dd9a7..4bbbd9484 100644 --- a/BedrockServer.cpp +++ b/BedrockServer.cpp @@ -88,6 +88,10 @@ void BedrockServer::syncWrapper() break; } } + + // Break out of `poll` in main.cpp. + _notifyDone.push(true); + SINFO("Exiting syncWrapper"); } void BedrockServer::sync() @@ -250,7 +254,7 @@ void BedrockServer::sync() } // And set our next timeout for 1 second from now. - nextActivity = STimeNow() + STIME_US_PER_S; + nextActivity = STimeNow() + (STIME_US_PER_MS * 100); // Process any network traffic that happened. Scope this so that we can change the log prefix and have it // auto-revert when we're finished. @@ -644,10 +648,14 @@ void BedrockServer::sync() // Note: This is not an atomic operation but should not matter. Nothing should use this that can happen with no // sync thread. // If there are socket threads in existance, they can be looking at this through a syncThread copy. + SINFO("Deleting DB pool"); _dbPool = nullptr; + SINFO("Deleted DB pool"); // We're really done, store our flag so main() can be aware. + SINFO("Marking sync thread complete"); _syncThreadComplete.store(true); + SINFO("Marked sync thread complete"); } void BedrockServer::worker(int threadId) @@ -671,7 +679,7 @@ void BedrockServer::worker(int threadId) }); // Get the next one. - command = commandQueue.get(1000000); + command = commandQueue.get(100000); SAUTOPREFIX(command->request); SINFO("Dequeued command " << command->request.methodLine << " (" << command->id << ") in worker, " @@ -1346,7 +1354,10 @@ BedrockServer::~BedrockServer() { // Delete our plugins. for (auto& p : plugins) { + string name = p.second->getName(); + SINFO("Deleting " << name << "plugin."); delete p.second; + SINFO("Done deleting " << name << "plugin."); } } @@ -1362,6 +1373,9 @@ bool BedrockServer::shutdownComplete() { void BedrockServer::prePoll(fd_map& fdm) { lock_guard lock(_portMutex); + // This will interrupt poll when we shut down. + _notifyDone.prePoll(fdm); + // Add all our ports. There are no sockets directly managed here. if (_commandPortPublic) { SFDset(fdm, _commandPortPublic->s, SREADEVTS); diff --git a/BedrockServer.h b/BedrockServer.h index d08720a41..501327322 100644 --- a/BedrockServer.h +++ b/BedrockServer.h @@ -489,4 +489,8 @@ class BedrockServer : public SQLiteServer { // We call this method whenever a node changes state void notifyStateChangeToPlugins(SQLite& db, SQLiteNodeState newState) override; + + // This is just here to allow `poll` in main.cpp to get interrupted when the server shuts down. + // to wait up to a full second for them. + SSynchronizedQueue _notifyDone; }; diff --git a/libstuff/SSignal.cpp b/libstuff/SSignal.cpp index e4f2b86af..28339a3c3 100644 --- a/libstuff/SSignal.cpp +++ b/libstuff/SSignal.cpp @@ -130,8 +130,8 @@ void _SSignal_signalHandlerThreadFunc() { // Wait for a signal to appear. siginfo_t siginfo = {0}; struct timespec timeout; - timeout.tv_sec = 1; - timeout.tv_nsec = 0; + timeout.tv_sec = 0; + timeout.tv_nsec = 100'000; int result = -1; while (result == -1) { result = sigtimedwait(&signals, &siginfo, &timeout); @@ -159,7 +159,7 @@ void _SSignal_signalHandlerThreadFunc() { void SStopSignalThread() { _SSignal_threadStopFlag = true; if (_SSignal_threadInitialized.test_and_set()) { - // Send ourselves a singnal to interrupt our thread. + // Send ourselves a signal to interrupt our thread. SINFO("Joining signal thread."); _SSignal_signalThread.join(); _SSignal_threadInitialized.clear(); diff --git a/main.cpp b/main.cpp index b8eacd917..1683b4454 100644 --- a/main.cpp +++ b/main.cpp @@ -365,7 +365,7 @@ int main(int argc, char* argv[]) { const uint64_t now = STimeNow(); auto timeBeforePoll = chrono::steady_clock::now(); S_poll(fdm, max(nextActivity, now) - now); - nextActivity = STimeNow() + STIME_US_PER_S; // 1s max period + nextActivity = STimeNow() + STIME_US_PER_S; // 0.1s max period auto timeAfterPoll = chrono::steady_clock::now(); server.postPoll(fdm, nextActivity); auto timeAfterPostPoll = chrono::steady_clock::now(); diff --git a/test/clustertest/BedrockClusterTester.h b/test/clustertest/BedrockClusterTester.h index 78a60684b..84380bd71 100644 --- a/test/clustertest/BedrockClusterTester.h +++ b/test/clustertest/BedrockClusterTester.h @@ -145,6 +145,8 @@ ClusterTester::ClusterTester(ClusterSize size, _cluster.emplace_back(args, queries, serverPort, nodePort, controlPort, false, processPath, &groupCommitCount); } + auto start = STimeNow(); + // Now start them all. list threads; for (auto it = _cluster.begin(); it != _cluster.end(); it++) { @@ -176,16 +178,36 @@ ClusterTester::ClusterTester(ClusterSize size, usleep(100000); // 0.1 seconds. } } + auto end = STimeNow(); + + cout << "Took " << ((end - start) / 1000) << "ms to start cluster." << endl; } template ClusterTester::~ClusterTester() { - // Shut them down in reverse order so they don't try and stand up as leader in the middle of everything. - for (int i = _size - 1; i >= 0; i--) { - stopNode(i); + auto start = STimeNow(); + + // Shut down everything but the leader first. + list threads; + cout << "Starting shutdown at " << SCURRENT_TIMESTAMP() << endl; + for (int i = _size - 1; i > 0; i--) { + threads.emplace_back([&, i](){ + cout << "Stopping node " << i << " at " << SCURRENT_TIMESTAMP() << endl; + stopNode(i); + }); + } + for (auto& t: threads) { + t.join(); } + // Then do leader last. This is to avoid getting in a state where nodes try to stand up as leader shuts down. + cout << "Stopping node " << 0 << " at " << SCURRENT_TIMESTAMP() << endl; + stopNode(0); + + auto end = STimeNow(); + + cout << "Took " << ((end - start) / 1000) << "ms to stop cluster." << endl; _cluster.clear(); } From 2a6798d96489c76d147707adfb869c551d3d09d4 Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski Date: Fri, 8 Mar 2024 15:10:03 -0800 Subject: [PATCH 2/6] Remove some test code --- BedrockServer.cpp | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/BedrockServer.cpp b/BedrockServer.cpp index 4bbbd9484..2159ddf67 100644 --- a/BedrockServer.cpp +++ b/BedrockServer.cpp @@ -254,7 +254,7 @@ void BedrockServer::sync() } // And set our next timeout for 1 second from now. - nextActivity = STimeNow() + (STIME_US_PER_MS * 100); + nextActivity = STimeNow() + (STIME_US_PER_MS * 100); // Process any network traffic that happened. Scope this so that we can change the log prefix and have it // auto-revert when we're finished. @@ -648,14 +648,10 @@ void BedrockServer::sync() // Note: This is not an atomic operation but should not matter. Nothing should use this that can happen with no // sync thread. // If there are socket threads in existance, they can be looking at this through a syncThread copy. - SINFO("Deleting DB pool"); _dbPool = nullptr; - SINFO("Deleted DB pool"); // We're really done, store our flag so main() can be aware. - SINFO("Marking sync thread complete"); _syncThreadComplete.store(true); - SINFO("Marked sync thread complete"); } void BedrockServer::worker(int threadId) @@ -1354,10 +1350,7 @@ BedrockServer::~BedrockServer() { // Delete our plugins. for (auto& p : plugins) { - string name = p.second->getName(); - SINFO("Deleting " << name << "plugin."); delete p.second; - SINFO("Done deleting " << name << "plugin."); } } From f304516f4c10d9e10f61cd9bda41dd582721b3fa Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski Date: Fri, 8 Mar 2024 15:35:46 -0800 Subject: [PATCH 3/6] More speedups --- BedrockServer.cpp | 9 ++++++++- main.cpp | 2 +- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/BedrockServer.cpp b/BedrockServer.cpp index 2159ddf67..07753d6ef 100644 --- a/BedrockServer.cpp +++ b/BedrockServer.cpp @@ -234,6 +234,9 @@ void BedrockServer::sync() // commands, and we'll shortly run through the existing queue. if (_shutdownState.load() == CLIENTS_RESPONDED) { _syncNode->beginShutdown(); + + // This will cause us to skip the next `poll` iteration which avoids a 1 second wait. + _notifyDone.push(true); } // The fd_map contains a list of all file descriptors (eg, sockets, Unix pipes) that poll will wait on for @@ -241,6 +244,7 @@ void BedrockServer::sync() fd_map fdm; // Pre-process any sockets the sync node is managing (i.e., communication with peer nodes). + _notifyDone.prePoll(fdm); _syncNode->prePoll(fdm); // Add our command queues to our fd_map. @@ -254,7 +258,7 @@ void BedrockServer::sync() } // And set our next timeout for 1 second from now. - nextActivity = STimeNow() + (STIME_US_PER_MS * 100); + nextActivity = STimeNow() + STIME_US_PER_S; // Process any network traffic that happened. Scope this so that we can change the log prefix and have it // auto-revert when we're finished. @@ -266,6 +270,7 @@ void BedrockServer::sync() AutoTimerTime postPollTime(postPollTimer); _syncNode->postPoll(fdm, nextActivity); _syncNodeQueuedCommands.postPoll(fdm); + _notifyDone.postPoll(fdm); } // Ok, let the sync node to it's updating for as many iterations as it requires. We'll update the replication @@ -1385,6 +1390,8 @@ void BedrockServer::prePoll(fd_map& fdm) { } void BedrockServer::postPoll(fd_map& fdm, uint64_t& nextActivity) { + _notifyDone.postPoll(fdm); + // NOTE: There are no sockets managed here, just ports. // Open the port the first time we enter a command-processing state SQLiteNodeState state = _replicationState.load(); diff --git a/main.cpp b/main.cpp index 1683b4454..d61b4a1f3 100644 --- a/main.cpp +++ b/main.cpp @@ -365,7 +365,7 @@ int main(int argc, char* argv[]) { const uint64_t now = STimeNow(); auto timeBeforePoll = chrono::steady_clock::now(); S_poll(fdm, max(nextActivity, now) - now); - nextActivity = STimeNow() + STIME_US_PER_S; // 0.1s max period + nextActivity = STimeNow() + STIME_US_PER_MS * 100; // 0.1s max period auto timeAfterPoll = chrono::steady_clock::now(); server.postPoll(fdm, nextActivity); auto timeAfterPostPoll = chrono::steady_clock::now(); From 267895ee89867d35dd1db7a4b08e48bbd337266c Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski Date: Fri, 8 Mar 2024 15:42:53 -0800 Subject: [PATCH 4/6] Cleanup some stuff --- main.cpp | 2 +- test/clustertest/BedrockClusterTester.h | 11 ++++++----- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/main.cpp b/main.cpp index d61b4a1f3..b8eacd917 100644 --- a/main.cpp +++ b/main.cpp @@ -365,7 +365,7 @@ int main(int argc, char* argv[]) { const uint64_t now = STimeNow(); auto timeBeforePoll = chrono::steady_clock::now(); S_poll(fdm, max(nextActivity, now) - now); - nextActivity = STimeNow() + STIME_US_PER_MS * 100; // 0.1s max period + nextActivity = STimeNow() + STIME_US_PER_S; // 1s max period auto timeAfterPoll = chrono::steady_clock::now(); server.postPoll(fdm, nextActivity); auto timeAfterPostPoll = chrono::steady_clock::now(); diff --git a/test/clustertest/BedrockClusterTester.h b/test/clustertest/BedrockClusterTester.h index 84380bd71..dc1a40b38 100644 --- a/test/clustertest/BedrockClusterTester.h +++ b/test/clustertest/BedrockClusterTester.h @@ -180,7 +180,9 @@ ClusterTester::ClusterTester(ClusterSize size, } auto end = STimeNow(); - cout << "Took " << ((end - start) / 1000) << "ms to start cluster." << endl; + if ((end - start) > 5000000) { + cout << "Took " << ((end - start) / 1000) << "ms to start cluster." << endl; + } } template @@ -190,10 +192,8 @@ ClusterTester::~ClusterTester() // Shut down everything but the leader first. list threads; - cout << "Starting shutdown at " << SCURRENT_TIMESTAMP() << endl; for (int i = _size - 1; i > 0; i--) { threads.emplace_back([&, i](){ - cout << "Stopping node " << i << " at " << SCURRENT_TIMESTAMP() << endl; stopNode(i); }); } @@ -202,12 +202,13 @@ ClusterTester::~ClusterTester() } // Then do leader last. This is to avoid getting in a state where nodes try to stand up as leader shuts down. - cout << "Stopping node " << 0 << " at " << SCURRENT_TIMESTAMP() << endl; stopNode(0); auto end = STimeNow(); - cout << "Took " << ((end - start) / 1000) << "ms to stop cluster." << endl; + if ((end - start) > 5000000) { + cout << "Took " << ((end - start) / 1000) << "ms to stop cluster." << endl; + } _cluster.clear(); } From fdfe60b36a2819f9f35139b8a603d86ab164c408 Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski Date: Fri, 8 Mar 2024 16:14:38 -0800 Subject: [PATCH 5/6] Also shorten the retry for startup --- libstuff/SSignal.cpp | 2 +- sqlitecluster/SQLitePeer.cpp | 2 +- test/clustertest/BedrockClusterTester.h | 1 + 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/libstuff/SSignal.cpp b/libstuff/SSignal.cpp index 28339a3c3..57e82759b 100644 --- a/libstuff/SSignal.cpp +++ b/libstuff/SSignal.cpp @@ -131,7 +131,7 @@ void _SSignal_signalHandlerThreadFunc() { siginfo_t siginfo = {0}; struct timespec timeout; timeout.tv_sec = 0; - timeout.tv_nsec = 100'000; + timeout.tv_nsec = 100'000'000; // 100ms in ns. int result = -1; while (result == -1) { result = sigtimedwait(&signals, &siginfo, &timeout); diff --git a/sqlitecluster/SQLitePeer.cpp b/sqlitecluster/SQLitePeer.cpp index 4b8a12bc1..f2e4199e8 100644 --- a/sqlitecluster/SQLitePeer.cpp +++ b/sqlitecluster/SQLitePeer.cpp @@ -83,7 +83,7 @@ SQLitePeer::PeerPostPollStatus SQLitePeer::postPoll(fd_map& fdm, uint64_t& nextA } case STCPManager::Socket::CLOSED: { // Done; clean up and try to reconnect - uint64_t delay = SRandom::rand64() % (STIME_US_PER_S * 5); + uint64_t delay = SRandom::rand64() % (STIME_US_PER_S * 1); if (socket->connectFailure) { SINFO("SQLitePeer connection failed after " << (STimeNow() - socket->openTime) / 1000 << "ms, reconnecting in " << delay / 1000 << "ms"); } else { diff --git a/test/clustertest/BedrockClusterTester.h b/test/clustertest/BedrockClusterTester.h index dc1a40b38..000752287 100644 --- a/test/clustertest/BedrockClusterTester.h +++ b/test/clustertest/BedrockClusterTester.h @@ -153,6 +153,7 @@ ClusterTester::ClusterTester(ClusterSize size, threads.emplace_back([it](){ it->startServer(); }); + usleep(100'000); } for (auto& i : threads) { i.join(); From da0d0439177cf70e4af78ec71fd29ce002cd5430 Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski Date: Mon, 11 Mar 2024 08:50:54 -0700 Subject: [PATCH 6/6] Revert "Revert test timing changes to unblock deploy" --- test/lib/tpunit++.cpp | 35 +++++++++++++++++++++++++++++++++-- 1 file changed, 33 insertions(+), 2 deletions(-) diff --git a/test/lib/tpunit++.cpp b/test/lib/tpunit++.cpp index 8443d91a6..ff2b49e07 100644 --- a/test/lib/tpunit++.cpp +++ b/test/lib/tpunit++.cpp @@ -2,6 +2,7 @@ #include #include #include +#include using namespace tpunit; bool tpunit::TestFixture::exitFlag = false; @@ -166,11 +167,15 @@ int tpunit::TestFixture::tpunit_detail_do_run(const set& include, const } list afterTests; + mutex testTimeLock; + multimap testTimes; for (int threadID = 0; threadID < threads; threadID++) { // Capture everything by reference except threadID, because we don't want it to be incremented for the // next thread in the loop. thread t = thread([&, threadID]{ + auto start = chrono::steady_clock::now(); + threadInitFunction(); try { // Do test. @@ -258,6 +263,11 @@ int tpunit::TestFixture::tpunit_detail_do_run(const set& include, const exitFlag = true; printf("Thread %d caught shutdown exception, exiting.\n", threadID); } + auto end = chrono::steady_clock::now(); + if (currentTestName.size()) { + lock_guard lock(testTimeLock); + testTimes.emplace(make_pair(chrono::duration_cast(end - start), currentTestName)); + } }); threadList.push_back(move(t)); } @@ -294,6 +304,17 @@ int tpunit::TestFixture::tpunit_detail_do_run(const set& include, const } } + cout << endl; + cout << "Slowest Test Classes: " << endl; + auto it = testTimes.rbegin(); + for (size_t i = 0; i < 10; i++) { + if (it == testTimes.rend()) { + break; + } + cout << it->first << ": " << it->second << endl; + it++; + } + return tpunit_detail_stats()._failures; } return 1; @@ -419,23 +440,33 @@ void tpunit::TestFixture::tpunit_detail_do_tests(TestFixture* f) { f->_stats._assertions = 0; f->_stats._exceptions = 0; f->testOutputBuffer = ""; + auto start = chrono::steady_clock::now(); tpunit_detail_do_methods(f->_befores); tpunit_detail_do_method(t); tpunit_detail_do_methods(f->_afters); + auto end = chrono::steady_clock::now(); + stringstream timeStream; + timeStream << "(" << chrono::duration_cast(end - start); + if (chrono::duration_cast(end - start) > 5000ms) { + timeStream << " \xF0\x9F\x90\x8C"; + } + timeStream << ")"; + string timeStr = timeStream.str(); + const char* time = timeStr.c_str(); // No new assertions or exceptions. This not currently synchronized correctly. They can cause tests that // passed to appear failed when another test failed while this test was running. They cannot cause failed // tests to appear to have passed. if(!f->_stats._assertions && !f->_stats._exceptions) { lock_guard lock(m); - printf("\xE2\x9C\x85 %s\n", t->_name); + printf("\xE2\x9C\x85 %s %s\n", t->_name, time); tpunit_detail_stats()._passes++; } else { lock_guard lock(m); // Dump the test buffer if the test included any log lines. f->printTestBuffer(); - printf("\xE2\x9D\x8C !FAILED! \xE2\x9D\x8C %s\n", t->_name); + printf("\xE2\x9D\x8C !FAILED! \xE2\x9D\x8C %s %s\n", t->_name, time); tpunit_detail_stats()._failures++; tpunit_detail_stats()._failureNames.emplace(t->_name); }