Skip to content

Commit

Permalink
Merge pull request #1672 from Expensify/main
Browse files Browse the repository at this point in the history
Update expensify_prod branch
  • Loading branch information
iwiznia authored Mar 11, 2024
2 parents efae75c + 8251246 commit 996d920
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 10 deletions.
16 changes: 15 additions & 1 deletion BedrockServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ void BedrockServer::syncWrapper()
break;
}
}

// Break out of `poll` in main.cpp.
_notifyDone.push(true);
SINFO("Exiting syncWrapper");
}

void BedrockServer::sync()
Expand Down Expand Up @@ -230,13 +234,17 @@ void BedrockServer::sync()
// commands, and we'll shortly run through the existing queue.
if (_shutdownState.load() == CLIENTS_RESPONDED) {
_syncNode->beginShutdown();

// This will cause us to skip the next `poll` iteration which avoids a 1 second wait.
_notifyDone.push(true);
}

// The fd_map contains a list of all file descriptors (eg, sockets, Unix pipes) that poll will wait on for
// activity. Once any of them has activity (or the timeout ends), poll will return.
fd_map fdm;

// Pre-process any sockets the sync node is managing (i.e., communication with peer nodes).
_notifyDone.prePoll(fdm);
_syncNode->prePoll(fdm);

// Add our command queues to our fd_map.
Expand All @@ -262,6 +270,7 @@ void BedrockServer::sync()
AutoTimerTime postPollTime(postPollTimer);
_syncNode->postPoll(fdm, nextActivity);
_syncNodeQueuedCommands.postPoll(fdm);
_notifyDone.postPoll(fdm);
}

// Ok, let the sync node to it's updating for as many iterations as it requires. We'll update the replication
Expand Down Expand Up @@ -671,7 +680,7 @@ void BedrockServer::worker(int threadId)
});

// Get the next one.
command = commandQueue.get(1000000);
command = commandQueue.get(100000);

SAUTOPREFIX(command->request);
SINFO("Dequeued command " << command->request.methodLine << " (" << command->id << ") in worker, "
Expand Down Expand Up @@ -1362,6 +1371,9 @@ bool BedrockServer::shutdownComplete() {
void BedrockServer::prePoll(fd_map& fdm) {
lock_guard<mutex> lock(_portMutex);

// This will interrupt poll when we shut down.
_notifyDone.prePoll(fdm);

// Add all our ports. There are no sockets directly managed here.
if (_commandPortPublic) {
SFDset(fdm, _commandPortPublic->s, SREADEVTS);
Expand All @@ -1378,6 +1390,8 @@ void BedrockServer::prePoll(fd_map& fdm) {
}

void BedrockServer::postPoll(fd_map& fdm, uint64_t& nextActivity) {
_notifyDone.postPoll(fdm);

// NOTE: There are no sockets managed here, just ports.
// Open the port the first time we enter a command-processing state
SQLiteNodeState state = _replicationState.load();
Expand Down
4 changes: 4 additions & 0 deletions BedrockServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -489,4 +489,8 @@ class BedrockServer : public SQLiteServer {

// We call this method whenever a node changes state
void notifyStateChangeToPlugins(SQLite& db, SQLiteNodeState newState) override;

// This is just here to allow `poll` in main.cpp to get interrupted when the server shuts down.
// to wait up to a full second for them.
SSynchronizedQueue<bool> _notifyDone;
};
6 changes: 3 additions & 3 deletions libstuff/SSignal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ void _SSignal_signalHandlerThreadFunc() {
// Wait for a signal to appear.
siginfo_t siginfo = {0};
struct timespec timeout;
timeout.tv_sec = 1;
timeout.tv_nsec = 0;
timeout.tv_sec = 0;
timeout.tv_nsec = 100'000'000; // 100ms in ns.
int result = -1;
while (result == -1) {
result = sigtimedwait(&signals, &siginfo, &timeout);
Expand Down Expand Up @@ -159,7 +159,7 @@ void _SSignal_signalHandlerThreadFunc() {
void SStopSignalThread() {
_SSignal_threadStopFlag = true;
if (_SSignal_threadInitialized.test_and_set()) {
// Send ourselves a singnal to interrupt our thread.
// Send ourselves a signal to interrupt our thread.
SINFO("Joining signal thread.");
_SSignal_signalThread.join();
_SSignal_threadInitialized.clear();
Expand Down
2 changes: 1 addition & 1 deletion sqlitecluster/SQLitePeer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ SQLitePeer::PeerPostPollStatus SQLitePeer::postPoll(fd_map& fdm, uint64_t& nextA
}
case STCPManager::Socket::CLOSED: {
// Done; clean up and try to reconnect
uint64_t delay = SRandom::rand64() % (STIME_US_PER_S * 5);
uint64_t delay = SRandom::rand64() % (STIME_US_PER_S * 1);
if (socket->connectFailure) {
SINFO("SQLitePeer connection failed after " << (STimeNow() - socket->openTime) / 1000 << "ms, reconnecting in " << delay / 1000 << "ms");
} else {
Expand Down
30 changes: 27 additions & 3 deletions test/clustertest/BedrockClusterTester.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,15 @@ ClusterTester<T>::ClusterTester(ClusterSize size,
_cluster.emplace_back(args, queries, serverPort, nodePort, controlPort, false, processPath, &groupCommitCount);
}

auto start = STimeNow();

// Now start them all.
list<thread> threads;
for (auto it = _cluster.begin(); it != _cluster.end(); it++) {
threads.emplace_back([it](){
it->startServer();
});
usleep(100'000);
}
for (auto& i : threads) {
i.join();
Expand All @@ -176,16 +179,37 @@ ClusterTester<T>::ClusterTester(ClusterSize size,
usleep(100000); // 0.1 seconds.
}
}
auto end = STimeNow();

if ((end - start) > 5000000) {
cout << "Took " << ((end - start) / 1000) << "ms to start cluster." << endl;
}
}

template <typename T>
ClusterTester<T>::~ClusterTester()
{
// Shut them down in reverse order so they don't try and stand up as leader in the middle of everything.
for (int i = _size - 1; i >= 0; i--) {
stopNode(i);
auto start = STimeNow();

// Shut down everything but the leader first.
list<thread> threads;
for (int i = _size - 1; i > 0; i--) {
threads.emplace_back([&, i](){
stopNode(i);
});
}
for (auto& t: threads) {
t.join();
}

// Then do leader last. This is to avoid getting in a state where nodes try to stand up as leader shuts down.
stopNode(0);

auto end = STimeNow();

if ((end - start) > 5000000) {
cout << "Took " << ((end - start) / 1000) << "ms to stop cluster." << endl;
}
_cluster.clear();
}

Expand Down
35 changes: 33 additions & 2 deletions test/lib/tpunit++.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include <string.h>
#include <iostream>
#include <regex>
#include <chrono>
using namespace tpunit;

bool tpunit::TestFixture::exitFlag = false;
Expand Down Expand Up @@ -166,11 +167,15 @@ int tpunit::TestFixture::tpunit_detail_do_run(const set<string>& include, const
}

list<TestFixture*> afterTests;
mutex testTimeLock;
multimap<chrono::milliseconds, string> testTimes;

for (int threadID = 0; threadID < threads; threadID++) {
// Capture everything by reference except threadID, because we don't want it to be incremented for the
// next thread in the loop.
thread t = thread([&, threadID]{
auto start = chrono::steady_clock::now();

threadInitFunction();
try {
// Do test.
Expand Down Expand Up @@ -258,6 +263,11 @@ int tpunit::TestFixture::tpunit_detail_do_run(const set<string>& include, const
exitFlag = true;
printf("Thread %d caught shutdown exception, exiting.\n", threadID);
}
auto end = chrono::steady_clock::now();
if (currentTestName.size()) {
lock_guard<mutex> lock(testTimeLock);
testTimes.emplace(make_pair(chrono::duration_cast<std::chrono::milliseconds>(end - start), currentTestName));
}
});
threadList.push_back(move(t));
}
Expand Down Expand Up @@ -294,6 +304,17 @@ int tpunit::TestFixture::tpunit_detail_do_run(const set<string>& include, const
}
}

cout << endl;
cout << "Slowest Test Classes: " << endl;
auto it = testTimes.rbegin();
for (size_t i = 0; i < 10; i++) {
if (it == testTimes.rend()) {
break;
}
cout << it->first << ": " << it->second << endl;
it++;
}

return tpunit_detail_stats()._failures;
}
return 1;
Expand Down Expand Up @@ -419,23 +440,33 @@ void tpunit::TestFixture::tpunit_detail_do_tests(TestFixture* f) {
f->_stats._assertions = 0;
f->_stats._exceptions = 0;
f->testOutputBuffer = "";
auto start = chrono::steady_clock::now();
tpunit_detail_do_methods(f->_befores);
tpunit_detail_do_method(t);
tpunit_detail_do_methods(f->_afters);
auto end = chrono::steady_clock::now();
stringstream timeStream;
timeStream << "(" << chrono::duration_cast<std::chrono::milliseconds>(end - start);
if (chrono::duration_cast<std::chrono::milliseconds>(end - start) > 5000ms) {
timeStream << " \xF0\x9F\x90\x8C";
}
timeStream << ")";
string timeStr = timeStream.str();
const char* time = timeStr.c_str();

// No new assertions or exceptions. This not currently synchronized correctly. They can cause tests that
// passed to appear failed when another test failed while this test was running. They cannot cause failed
// tests to appear to have passed.
if(!f->_stats._assertions && !f->_stats._exceptions) {
lock_guard<recursive_mutex> lock(m);
printf("\xE2\x9C\x85 %s\n", t->_name);
printf("\xE2\x9C\x85 %s %s\n", t->_name, time);
tpunit_detail_stats()._passes++;
} else {
lock_guard<recursive_mutex> lock(m);

// Dump the test buffer if the test included any log lines.
f->printTestBuffer();
printf("\xE2\x9D\x8C !FAILED! \xE2\x9D\x8C %s\n", t->_name);
printf("\xE2\x9D\x8C !FAILED! \xE2\x9D\x8C %s %s\n", t->_name, time);
tpunit_detail_stats()._failures++;
tpunit_detail_stats()._failureNames.emplace(t->_name);
}
Expand Down

0 comments on commit 996d920

Please sign in to comment.