Skip to content

Commit

Permalink
Allow for failed subscribe, switch back to synchronizing
Browse files Browse the repository at this point in the history
  • Loading branch information
tylerkaraszewski committed Jun 21, 2024
1 parent 4236694 commit 8663412
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 18 deletions.
45 changes: 28 additions & 17 deletions sqlitecluster/SQLiteNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1660,7 +1660,11 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) {
}
PINFO("Received SUBSCRIBE, accepting new follower");
SData response("SUBSCRIPTION_APPROVED");
_queueSynchronize(this, peer, _db, response, true); // Send everything it's missing
bool syncSucess = _queueSynchronize(this, peer, _db, response, true); // Send everything it's missing
if (!syncSucess) {
SWARN("Failed generating sync response to SUBSCRIBE");
response.methodLine = "SUBSCRIPTION_PENDING";
}
_sendToPeer(peer, response);
SASSERTWARN(!peer->subscribed);
peer->subscribed = true;
Expand All @@ -1683,7 +1687,7 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) {
transaction.content = _db.getUncommittedQuery();
_sendToPeer(peer, transaction);
}
} else if (SIEquals(message.methodLine, "SUBSCRIPTION_APPROVED")) {
} else if (SIEquals(message.methodLine, "SUBSCRIPTION_APPROVED") || SIEquals(message.methodLine, "SUBSCRIPTION_PENDING")) {
// SUBSCRIPTION_APPROVED: Sent by a follower's new leader to complete the subscription process. Includes zero or
// more COMMITS that should be immediately applied to the database.
if (_state != SQLiteNodeState::SUBSCRIBING) {
Expand All @@ -1692,19 +1696,24 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) {
if (_leadPeer != peer) {
STHROW("not subscribing to you");
}
SINFO("Received SUBSCRIPTION_APPROVED, final synchronization.");
try {
// Done synchronizing
_recvSynchronize(peer, message);
SINFO("Subscription complete, at commitCount #" << _db.getCommitCount() << " (" << _db.getCommittedHash()
<< "), FOLLOWING");
_changeState(SQLiteNodeState::FOLLOWING);
} catch (const SException& e) {
// Transaction failed
SWARN("Subscription failed '" << e.what() << "', reconnecting to leader and re-SEARCHING.");
_reconnectPeer(_leadPeer);
_changeState(SQLiteNodeState::SEARCHING);
throw e;
if (SIEquals(message.methodLine, "SUBSCRIPTION_PENDING")) {
// This subscription did not actually succeed, but not because we did anything wrong. The most useful course of action here is to switch to synchronizing and try again.
_changeState(SQLiteNodeState::SYNCHRONIZING);
} else {
SINFO("Received SUBSCRIPTION_APPROVED, final synchronization.");
try {
// Done synchronizing
_recvSynchronize(peer, message);
SINFO("Subscription complete, at commitCount #" << _db.getCommitCount() << " (" << _db.getCommittedHash()
<< "), FOLLOWING");
_changeState(SQLiteNodeState::FOLLOWING);
} catch (const SException& e) {
// Transaction failed
SWARN("Subscription failed '" << e.what() << "', reconnecting to leader and re-SEARCHING.");
_reconnectPeer(_leadPeer);
_changeState(SQLiteNodeState::SEARCHING);
throw e;
}
}
} else if (SIEquals(message.methodLine, "BEGIN_TRANSACTION") || SIEquals(message.methodLine, "COMMIT_TRANSACTION") || SIEquals(message.methodLine, "ROLLBACK_TRANSACTION")) {
if (_replicationThreadsShouldExit) {
Expand Down Expand Up @@ -2085,7 +2094,7 @@ void SQLiteNode::_changeState(SQLiteNodeState newState, uint64_t commitIDToCance

static int __ATTEMPTS = 0;

void SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll) {
bool SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll) {
// We need this to check the state of the node, and we also need `name` to make the logging macros work in a static
// function. However, if you pass a null pointer here, we can't set these, so we'll fail. We also can't log that,
// so we are just going to rely on the signal handling for sigsegv to log that for you. Don't do that.
Expand All @@ -2111,7 +2120,7 @@ void SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* pee
response["hashMismatchValue"] = myHash;
response["hashMismatchNumber"] = to_string(peerCommitCount);

return;
return false;
}
PINFO("Latest commit hash matches our records, beginning synchronization.");
} else {
Expand Down Expand Up @@ -2175,6 +2184,8 @@ void SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* pee
response.content += commit.serialize();
}
}

return true;
}

void SQLiteNode::_recvSynchronize(SQLitePeer* peer, const SData& message) {
Expand Down
3 changes: 2 additions & 1 deletion sqlitecluster/SQLiteNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,8 @@ class SQLiteNode : public STCPManager {
// Queue a SYNCHRONIZE message based on the current state of the node, thread-safe, but you need to pass the
// *correct* DB for the thread that's making the call (i.e., you can't use the node's internal DB from a worker
// thread with a different DB object) - which is why this is static.
static void _queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll);
// Returns true on success, false on failure.
static bool _queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll);

bool _isNothingBlockingShutdown() const;
bool _majoritySubscribed() const;
Expand Down

0 comments on commit 8663412

Please sign in to comment.