Skip to content

Commit

Permalink
Merge pull request #1568 from Expensify/main
Browse files Browse the repository at this point in the history
Update expensify_prod branch
  • Loading branch information
flodnv authored Sep 20, 2023
2 parents 673fef3 + f8916a0 commit 9e9ddce
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 70 deletions.
80 changes: 36 additions & 44 deletions BedrockCore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,8 @@ void BedrockCore::prePeekCommand(unique_ptr<BedrockCommand>& command) {
try {
try {
SDEBUG("prePeeking at '" << request.methodLine << "' with priority: " << command->priority);

uint64_t timeout = _getRemainingTime(command, false);
command->prePeekCount++;

_db.startTiming(timeout);
_db.setTimeout(_getRemainingTime(command, false));

if (!_db.beginTransaction(SQLite::TRANSACTION_TYPE::SHARED)) {
STHROW("501 Failed to begin shared prePeek transaction");
Expand Down Expand Up @@ -121,7 +118,7 @@ void BedrockCore::prePeekCommand(unique_ptr<BedrockCommand>& command) {

// Back out of the current transaction, it doesn't need to do anything.
_db.rollback();
_db.resetTiming();
_db.clearTimeout();

// Reset, we can write now.
_db.setQueryOnly(false);
Expand All @@ -141,10 +138,8 @@ BedrockCore::RESULT BedrockCore::peekCommand(unique_ptr<BedrockCommand>& command
RESULT returnValue = RESULT::COMPLETE;
try {
SDEBUG("Peeking at '" << request.methodLine << "' with priority: " << command->priority);
uint64_t timeout = _getRemainingTime(command, false);
command->peekCount++;

_db.startTiming(timeout);
_db.setTimeout(_getRemainingTime(command, false));

try {
if (!_db.beginTransaction(exclusive ? SQLite::TRANSACTION_TYPE::EXCLUSIVE : SQLite::TRANSACTION_TYPE::SHARED)) {
Expand All @@ -161,7 +156,7 @@ BedrockCore::RESULT BedrockCore::peekCommand(unique_ptr<BedrockCommand>& command

if (!completed) {
SDEBUG("Command '" << request.methodLine << "' not finished in peek, re-queuing.");
_db.resetTiming();
_db.clearTimeout();
_db.setQueryOnly(false);
return RESULT::SHOULD_PROCESS;
}
Expand Down Expand Up @@ -212,7 +207,7 @@ BedrockCore::RESULT BedrockCore::peekCommand(unique_ptr<BedrockCommand>& command

// Back out of the current transaction, it doesn't need to do anything.
_db.rollback();
_db.resetTiming();
_db.clearTimeout();

// Reset, we can write now.
_db.setQueryOnly(false);
Expand Down Expand Up @@ -243,11 +238,8 @@ BedrockCore::RESULT BedrockCore::processCommand(unique_ptr<BedrockCommand>& comm
bool needsCommit = false;
try {
SDEBUG("Processing '" << request.methodLine << "'");
uint64_t timeout = _getRemainingTime(command, true);
command->processCount++;

// Time in US.
_db.startTiming(timeout);
_db.setTimeout(_getRemainingTime(command, true));
if (!_db.insideTransaction()) {
// If a transaction was already begun in `peek`, then this won't run. We call it here to support the case where
// peek created a httpsRequest and closed it's first transaction until the httpsRequest was complete, in which
Expand Down Expand Up @@ -323,7 +315,7 @@ BedrockCore::RESULT BedrockCore::processCommand(unique_ptr<BedrockCommand>& comm
_db.setUpdateNoopMode(false);

// We can reset the timing info for the next command.
_db.resetTiming();
_db.clearTimeout();

// Done, return whether or not we need the parent to commit our transaction.
command->complete = !needsCommit;
Expand All @@ -341,41 +333,41 @@ void BedrockCore::postProcessCommand(unique_ptr<BedrockCommand>& command) {

// We catch any exception and handle in `_handleCommandException`.
try {
SDEBUG("postProcessing at '" << request.methodLine << "' with priority: " << command->priority);
uint64_t timeout = _getRemainingTime(command, false);
command->postProcessCount++;

_db.startTiming(timeout);
try {
SDEBUG("postProcessing at '" << request.methodLine << "' with priority: " << command->priority);
command->postProcessCount++;
_db.setTimeout(_getRemainingTime(command, false));

if (!_db.beginTransaction(SQLite::TRANSACTION_TYPE::SHARED)) {
STHROW("501 Failed to begin shared postProcess transaction");
}
if (!_db.beginTransaction(SQLite::TRANSACTION_TYPE::SHARED)) {
STHROW("501 Failed to begin shared postProcess transaction");
}

// Make sure no writes happen while in postProcess command
_db.setQueryOnly(true);
// Make sure no writes happen while in postProcess command
_db.setQueryOnly(true);

// postProcess.
command->postProcess(_db);
SDEBUG("Plugin '" << command->getName() << "' postProcess command '" << request.methodLine << "'");
// postProcess.
command->postProcess(_db);
SDEBUG("Plugin '" << command->getName() << "' postProcess command '" << request.methodLine << "'");

// Success. If a command has set "content", encode it in the response.
SINFO("Responding '" << response.methodLine << "' to read-only '" << request.methodLine << "'.");
if (!content.empty()) {
// Make sure we're not overwriting anything different.
string newContent = SComposeJSONObject(content);
if (response.content != newContent) {
if (!response.content.empty()) {
SWARN("Replacing existing response content in " << request.methodLine);
// Success. If a command has set "content", encode it in the response.
SINFO("Responding '" << response.methodLine << "' to read-only '" << request.methodLine << "'.");
if (!content.empty()) {
// Make sure we're not overwriting anything different.
string newContent = SComposeJSONObject(content);
if (response.content != newContent) {
if (!response.content.empty()) {
SWARN("Replacing existing response content in " << request.methodLine);
}
response.content = newContent;
}
response.content = newContent;
}
} catch (const SQLite::timeout_error& e) {
// Some plugins want to alert timeout errors themselves, and make them silent on bedrock.
if (!command->shouldSuppressTimeoutWarnings()) {
SALERT("Command " << command->request.methodLine << " timed out after " << e.time()/1000 << "ms.");
}
STHROW("555 Timeout postProcessing command");
}
} catch (const SQLite::timeout_error& e) {
// Some plugins want to alert timeout errors themselves, and make them silent on bedrock.
if (!command->shouldSuppressTimeoutWarnings()) {
SALERT("Command " << command->request.methodLine << " timed out after " << e.time()/1000 << "ms.");
}
STHROW("555 Timeout postProcessing command");
} catch (const SException& e) {
_handleCommandException(command, e);
} catch (...) {
Expand All @@ -388,7 +380,7 @@ void BedrockCore::postProcessCommand(unique_ptr<BedrockCommand>& command) {

// Back out of the current transaction, it doesn't need to do anything.
_db.rollback();
_db.resetTiming();
_db.clearTimeout();

// Reset, we can write now.
_db.setQueryOnly(false);
Expand Down
11 changes: 5 additions & 6 deletions sqlitecluster/SQLite.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ bool SQLite::addColumn(const string& tableName, const string& column, const stri
return false;
}

string SQLite::read(const string& query) {
string SQLite::read(const string& query) const {
// Execute the read-only query
SQResult result;
if (!read(query, result)) {
Expand All @@ -457,7 +457,7 @@ string SQLite::read(const string& query) {
return result[0][0];
}

bool SQLite::read(const string& query, SQResult& result) {
bool SQLite::read(const string& query, SQResult& result) const {
uint64_t before = STimeNow();
bool queryResult = false;
_queryCount++;
Expand All @@ -478,7 +478,7 @@ bool SQLite::read(const string& query, SQResult& result) {
return queryResult;
}

void SQLite::_checkInterruptErrors(const string& error) {
void SQLite::_checkInterruptErrors(const string& error) const {

// Local error code.
int errorCode = 0;
Expand All @@ -492,7 +492,6 @@ void SQLite::_checkInterruptErrors(const string& error) {
}
if (_timeoutError) {
time = _timeoutError;
resetTiming();
errorCode = 1;
}
}
Expand Down Expand Up @@ -1005,13 +1004,13 @@ int SQLite::_authorize(int actionCode, const char* detail1, const char* detail2,
return SQLITE_DENY;
}

void SQLite::startTiming(uint64_t timeLimitUS) {
void SQLite::setTimeout(uint64_t timeLimitUS) {
_timeoutStart = STimeNow();
_timeoutLimit = _timeoutStart + timeLimitUS;
_timeoutError = 0;
}

void SQLite::resetTiming() {
void SQLite::clearTimeout() {
_timeoutLimit = 0;
_timeoutStart = 0;
_timeoutError = 0;
Expand Down
34 changes: 17 additions & 17 deletions sqlitecluster/SQLite.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,10 @@ class SQLite {

// Performs a read-only query (eg, SELECT). This can be done inside or outside a transaction. Returns true on
// success, and fills the 'result' with the result of the query.
bool read(const string& query, SQResult& result);
bool read(const string& query, SQResult& result) const;

// Performs a read-only query (eg, SELECT) that returns a single value.
string read(const string& query);
string read(const string& query) const;

// Types of transactions that we can begin.
enum class TRANSACTION_TYPE {
Expand Down Expand Up @@ -223,11 +223,11 @@ class SQLite {
// Looks up a range of commits.
bool getCommits(uint64_t fromIndex, uint64_t toIndex, SQResult& result);

// Start a timing operation, that will time out after the given number of microseconds.
void startTiming(uint64_t timeLimitUS);
// Set a time limit for this transaction, in US from the current time.
void setTimeout(uint64_t timeLimitUS);

// Reset timing after finishing a timed operation.
void resetTiming();
// Reset all timeout information to 0, to be ready for the next operation.
void clearTimeout();

// This atomically removes and returns committed transactions from our internal list. SQLiteNode can call this, and
// it will return a map of transaction IDs to tuples of (query, hash, dbCountAtTransactionStart), so that those
Expand Down Expand Up @@ -385,8 +385,8 @@ class SQLite {
uint64_t _dbCountAtStart = 0;

// Timing information.
uint64_t _beginElapsed = 0;
uint64_t _readElapsed = 0;
mutable uint64_t _beginElapsed = 0;
mutable uint64_t _readElapsed = 0;
uint64_t _writeElapsed = 0;
uint64_t _prepareElapsed = 0;
uint64_t _commitElapsed = 0;
Expand Down Expand Up @@ -459,13 +459,13 @@ class SQLite {
// Registering this has the important side effect of preventing the DB from auto-checkpointing.
static int _walHookCallback(void* sqliteObject, sqlite3* db, const char* name, int walFileSize);

uint64_t _timeoutLimit = 0;
uint64_t _timeoutStart;
uint64_t _timeoutError;
mutable uint64_t _timeoutLimit = 0;
mutable uint64_t _timeoutStart;
mutable uint64_t _timeoutError;

// Check out various error cases that can interrupt a query.
// We check them all together because we need to make sure we atomically pick a single one to handle.
void _checkInterruptErrors(const string& error);
void _checkInterruptErrors(const string& error) const;

// Called internally by _sqliteAuthorizerCallback to authorize columns for a query.
int _authorize(int actionCode, const char* detail1, const char* detail2, const char* detail3, const char* detail4);
Expand All @@ -476,28 +476,28 @@ class SQLite {
// `rollback` is called, we don't double-rollback, generating an error. This allows the externally visible SQLite
// API to be consistent and not have to handle this special case. Consumers can just always call `rollback` after a
// failed query, regardless of whether or not it was already rolled back internally.
bool _autoRolledBack = false;
mutable bool _autoRolledBack = false;

bool _noopUpdateMode = false;

// A map of queries to their cached results. This is populated only with deterministic queries, and is reset on any
// write, rollback, or commit.
map<string, SQResult> _queryCache;
mutable map<string, SQResult> _queryCache;

// List of table names used during this transaction.
set<string> _tablesUsed;

// Number of queries that have been attempted in this transaction (for metrics only).
int64_t _queryCount = 0;
mutable int64_t _queryCount = 0;

// Number of queries found in cache in this transaction (for metrics only).
int64_t _cacheHits = 0;
mutable int64_t _cacheHits = 0;

// A string indicating the name of the transaction (typically a command name) for metric purposes.
string _transactionName;

// Will be set to false while running a non-deterministic query to prevent it's result being cached.
bool _isDeterministicQuery = false;
mutable bool _isDeterministicQuery = false;

// Copies of parameters used to initialize the DB that we store if we make child objects based on this one.
int _cacheSize;
Expand Down
17 changes: 14 additions & 3 deletions test/clustertest/testplugin/TestPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ unique_ptr<BedrockCommand> BedrockPlugin_TestPlugin::getCommand(SQLiteCommand&&
"postprocesscommand",
"prepeekpostprocesscommand",
"preparehandler",
"testquery"
"testquery",
"testPostProcessTimeout"
};
for (auto& cmdName : supportedCommands) {
if (SStartsWith(baseCommand.request.methodLine, cmdName)) {
Expand Down Expand Up @@ -158,8 +159,12 @@ bool TestPluginCommand::shouldPrePeek() {
}

bool TestPluginCommand::shouldPostProcess() {
return request.methodLine == "postprocesscommand" || request.methodLine == "prepeekpostprocesscommand" ||
request.methodLine == "preparehandler";
return set<string>{
"postprocesscommand",
"prepeekpostprocesscommand",
"testPostProcessTimeout",
"preparehandler",
}.count(request.methodLine);
}

bool BedrockPlugin_TestPlugin::preventAttach() {
Expand Down Expand Up @@ -520,6 +525,12 @@ void TestPluginCommand::postProcess(SQLite& db) {
SQResult result;
db.read("SELECT id FROM test WHERE value = 'this is written in onPrepareHandler'", result);
jsonContent["tableID"] = result[0][0];
} else if (request.methodLine == "testPostProcessTimeout") {
// It'll timeout eventually.
while (true) {
SQResult result;
db.read("SELECT COUNT(*) FROM test WHERE id = 999999999", result);
}
} else {
STHROW("500 no prePeek defined, shouldPrePeek should be false");
}
Expand Down
8 changes: 8 additions & 0 deletions test/clustertest/tests/TimeoutTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ struct TimeoutTest : tpunit::TestFixture {
TEST(TimeoutTest::test),
TEST(TimeoutTest::longerThanDefaultProcess),
TEST(TimeoutTest::testprocess),
TEST(TimeoutTest::testPostProcess),
TEST(TimeoutTest::totalTimeout),
TEST(TimeoutTest::quorumHTTPS),
TEST(TimeoutTest::futureCommitTimeout)) { }
Expand Down Expand Up @@ -80,6 +81,13 @@ struct TimeoutTest : tpunit::TestFixture {
brtester.executeWaitVerifyContent(slow, "555 Timeout processing command");
}

void testPostProcess() {
BedrockTester& brtester = tester->getTester(0);
SData slow("testPostProcessTimeout");
slow["timeout"] = "500"; // 0.5s
brtester.executeWaitVerifyContent(slow, "555 Timeout postProcessing command");
}

void totalTimeout() {
// Test total timeout, not process timeout.
BedrockTester& brtester = tester->getTester(0);
Expand Down

0 comments on commit 9e9ddce

Please sign in to comment.