Skip to content

Commit

Permalink
Initial support of transactions
Browse files Browse the repository at this point in the history
Handles MULTI, EXEC and DISCARD messages in the pipeline API
using `redisClusterAppendCommand()`

Only supports transactions to a single slot
  • Loading branch information
bjosv committed Dec 1, 2020
1 parent f9b977e commit 9fe13f4
Show file tree
Hide file tree
Showing 7 changed files with 243 additions and 13 deletions.
18 changes: 18 additions & 0 deletions command.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ static int redis_argz(struct cmd *r) {
switch (r->type) {
case CMD_REQ_REDIS_PING:
case CMD_REQ_REDIS_QUIT:
case CMD_REQ_REDIS_MULTI:
case CMD_REQ_REDIS_EXEC:
case CMD_REQ_REDIS_DISCARD:
return 1;

default:
Expand Down Expand Up @@ -595,6 +598,11 @@ void redis_parse_cmd(struct cmd *r) {
break;
}

if (str4icmp(m, 'e', 'x', 'e', 'c')) {
r->type = CMD_REQ_REDIS_EXEC;
break;
}

break;

case 5:
Expand Down Expand Up @@ -688,6 +696,11 @@ void redis_parse_cmd(struct cmd *r) {
break;
}

if (str5icmp(m, 'm', 'u', 'l', 't', 'i')) {
r->type = CMD_REQ_REDIS_MULTI;
break;
}

break;

case 6:
Expand Down Expand Up @@ -849,6 +862,11 @@ void redis_parse_cmd(struct cmd *r) {
break;
}

if (str7icmp(m, 'd', 'i', 's', 'c', 'a', 'r', 'd')) {
r->type = CMD_REQ_REDIS_DISCARD;
break;
}

break;

case 8:
Expand Down
3 changes: 3 additions & 0 deletions command.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ typedef enum cmd_parse_result {
ACTION(REQ_REDIS_PING) /* redis requests - ping/quit */ \
ACTION(REQ_REDIS_QUIT) \
ACTION(REQ_REDIS_AUTH) \
ACTION(REQ_REDIS_MULTI) \
ACTION(REQ_REDIS_EXEC) \
ACTION(REQ_REDIS_DISCARD) \
ACTION(RSP_REDIS_STATUS) /* redis response */ \
ACTION(RSP_REDIS_ERROR) \
ACTION(RSP_REDIS_INTEGER) \
Expand Down
79 changes: 68 additions & 11 deletions hircluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@

#define CLUSTER_DEFAULT_MAX_REDIRECT_COUNT 5

#define TRANSACTION_NONE -2
#define TRANSACTION_STARTED -1

typedef struct cluster_async_data {
redisClusterAsyncContext *acc;
struct cmd *command;
Expand Down Expand Up @@ -1625,7 +1628,7 @@ redisClusterContext *redisClusterContextInit(void) {
cc->ssl = NULL;
#endif
cc->password[0] = '\0';

cc->transaction_slot = TRANSACTION_NONE;
return cc;
}

Expand Down Expand Up @@ -3013,9 +3016,6 @@ static int command_format_by_slot(redisClusterContext *cc, struct cmd *command,
key_count = hiarray_n(command->keys);

if (key_count <= 0) {
__redisClusterSetError(
cc, REDIS_ERR_OTHER,
"No keys in command(must have keys for redis cluster mode)");
goto done;
} else if (key_count == 1) {
kp = hiarray_get(command->keys, 0);
Expand Down Expand Up @@ -3241,15 +3241,75 @@ int redisClusterAppendFormattedCommand(redisClusterContext *cc, char *cmd,
slot_num = command_format_by_slot(cc, command, commands);

if (slot_num < 0) {
goto error;
if (command->type == CMD_REQ_REDIS_MULTI) {
// This cmd will not be sent until we get a slot from next cmd
if (cc->transaction_slot != TRANSACTION_NONE) {
__redisClusterSetError(cc, REDIS_ERR_OTHER,
"Transaction already started");
goto error;
}
cc->transaction_slot = TRANSACTION_STARTED;
command->cmd = strdup(cmd); // Keep copy of the command to send
goto done;
} else if (command->type == CMD_REQ_REDIS_EXEC ||
command->type == CMD_REQ_REDIS_DISCARD) {
if (cc->transaction_slot == TRANSACTION_NONE) {
__redisClusterSetError(cc, REDIS_ERR_OTHER,
"Transaction not started");
goto error;
}
command->slot_num = cc->transaction_slot;
cc->transaction_slot = TRANSACTION_NONE;
} else {
__redisClusterSetError(
cc, REDIS_ERR_OTHER,
"No keys in command(must have keys for redis cluster mode)");
goto error;
}
} else if (slot_num >= REDIS_CLUSTER_SLOTS) {
__redisClusterSetError(cc, REDIS_ERR_OTHER, "slot_num is out of range");
goto error;
}

if (cc->transaction_slot >= 0 && cc->transaction_slot != slot_num) {
__redisClusterSetError(cc, REDIS_ERR_OTHER,
"Only same slot transactions supported");
goto error;
}

if (cc->transaction_slot == TRANSACTION_STARTED) {
// Previous command was MULTI, use current slot as transaction slot
listNode *prev_command = listLast(cc->requests);
if (prev_command == NULL) {
__redisClusterSetError(cc, REDIS_ERR_OTHER,
"Transaction failure while handling MULTI");
goto error;
}
// Update slot for command MULTI before sending it
struct cmd *multi_command = NULL;
multi_command = prev_command->value;
if (multi_command->type != CMD_REQ_REDIS_MULTI) {
__redisClusterSetError(cc, REDIS_ERR_OTHER,
"Transaction failure while handling MULTI");
goto error;
}
multi_command->slot_num = slot_num;
cc->transaction_slot = slot_num;

// Send MULTI command to correct slot first
if (__redisClusterAppendCommand(cc, multi_command) != REDIS_OK) {
__redisClusterSetError(cc, REDIS_ERR_OTHER,
"Transaction failure while sending MULTI");
goto error;
}
free(multi_command->cmd);
multi_command->cmd = NULL;
}

// all keys belong to one slot
if (listLength(commands) == 0) {
if (__redisClusterAppendCommand(cc, command) == REDIS_OK) {
command->cmd = NULL;
goto done;
} else {
goto error;
Expand All @@ -3268,15 +3328,10 @@ int redisClusterAppendFormattedCommand(redisClusterContext *cc, char *cmd,
goto error;
}
}
command->cmd = NULL;

done:

if (command->cmd != NULL) {
command->cmd = NULL;
} else {
goto error;
}

if (commands != NULL) {
if (listLength(commands) > 0) {
command->sub_commands = commands;
Expand Down Expand Up @@ -3492,6 +3547,8 @@ int redisClusterGetReply(redisClusterContext *cc, void **reply) {
return __redisClusterGetReply(cc, slot_num, reply);
}

// The command was sent to many slots

commands = command->sub_commands;
if (commands == NULL) {
__redisClusterSetError(cc, REDIS_ERR_OTHER,
Expand Down
1 change: 1 addition & 0 deletions hircluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ typedef struct redisClusterContext {
#ifdef SSL_SUPPORT
redisSSLContext *ssl;
#endif
int transaction_slot;

} redisClusterContext;

Expand Down
5 changes: 5 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ if(ENABLE_IPV6_TESTS)
set_tests_properties(ct_connection_ipv6 PROPERTIES LABELS "CT")
endif()

add_executable(ct_transaction ct_transaction.c)
target_link_libraries(ct_transaction hiredis_cluster hiredis ${SSL_LIBRARY})
add_test(NAME ct_transaction COMMAND "$<TARGET_FILE:ct_transaction>")
set_tests_properties(ct_transaction PROPERTIES LABELS "CT")

if(ENABLE_SSL)
# Executable: tls
add_executable(example_tls main_tls.c)
Expand Down
134 changes: 134 additions & 0 deletions tests/ct_transaction.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
#include "hircluster.h"
#include "test_utils.h"

#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#define CLUSTER_NODE "127.0.0.1:7000"

// Test of transactions in using the pipelined sync api
void test_pipelined_transaction() {
redisClusterContext *cc = redisClusterContextInit();
assert(cc);

int status;
status = redisClusterSetOptionAddNodes(cc, CLUSTER_NODE);
ASSERT_MSG(status == REDIS_OK, cc->errstr);

status = redisClusterConnect2(cc);
ASSERT_MSG(status == REDIS_OK, cc->errstr);

status = redisClusterAppendCommand(cc, "MULTI");
ASSERT_MSG(status == REDIS_OK, cc->errstr);
status = redisClusterAppendCommand(cc, "SET bar five");
ASSERT_MSG(status == REDIS_OK, cc->errstr);
status = redisClusterAppendCommand(cc, "SET foo five"); // New slot..
ASSERT_MSG(status == REDIS_ERR, cc->errstr); // ..gives REDIS_ERR
status = redisClusterAppendCommand(cc, "GET bar");
ASSERT_MSG(status == REDIS_OK, cc->errstr);
status = redisClusterAppendCommand(cc, "EXEC");
ASSERT_MSG(status == REDIS_OK, cc->errstr);

redisReply *reply;
redisClusterGetReply(cc, (void *)&reply); // reply for: MULTI
CHECK_REPLY_OK(cc, reply);
freeReplyObject(reply);

redisClusterGetReply(cc, (void *)&reply); // reply for: SET
CHECK_REPLY_QUEUED(cc, reply);
freeReplyObject(reply);

redisClusterGetReply(cc, (void *)&reply); // reply for: GET
CHECK_REPLY_QUEUED(cc, reply);
freeReplyObject(reply);

redisClusterGetReply(cc, (void *)&reply); // reply for: EXEC
CHECK_REPLY_ARRAY(cc, reply, 2);
CHECK_REPLY_OK(cc, reply->element[0]);
CHECK_REPLY_STR(cc, reply->element[1], "five");
freeReplyObject(reply);

redisClusterFree(cc);
}

// Test of discarding a transaction
void test_discarded_pipelined_transaction() {
redisClusterContext *cc = redisClusterContextInit();
assert(cc);

int status;
status = redisClusterSetOptionAddNodes(cc, CLUSTER_NODE);
ASSERT_MSG(status == REDIS_OK, cc->errstr);

status = redisClusterConnect2(cc);
ASSERT_MSG(status == REDIS_OK, cc->errstr);

status = redisClusterAppendCommand(cc, "SET foo 55");
ASSERT_MSG(status == REDIS_OK, cc->errstr);
status = redisClusterAppendCommand(cc, "MULTI");
ASSERT_MSG(status == REDIS_OK, cc->errstr);
status = redisClusterAppendCommand(cc, "INCR foo");
ASSERT_MSG(status == REDIS_OK, cc->errstr);
status = redisClusterAppendCommand(cc, "DISCARD");
ASSERT_MSG(status == REDIS_OK, cc->errstr);
status = redisClusterAppendCommand(cc, "GET foo");
ASSERT_MSG(status == REDIS_OK, cc->errstr);
status = redisClusterAppendCommand(cc, "MULTI");
ASSERT_MSG(status == REDIS_OK, cc->errstr);
status = redisClusterAppendCommand(cc, "INCR foo");
ASSERT_MSG(status == REDIS_OK, cc->errstr);
status = redisClusterAppendCommand(cc, "EXEC");
ASSERT_MSG(status == REDIS_OK, cc->errstr);
status = redisClusterAppendCommand(cc, "GET foo");
ASSERT_MSG(status == REDIS_OK, cc->errstr);

redisReply *reply;
redisClusterGetReply(cc, (void *)&reply); // reply for: SET
CHECK_REPLY_OK(cc, reply);
freeReplyObject(reply);

redisClusterGetReply(cc, (void *)&reply); // reply for: MULTI
CHECK_REPLY_OK(cc, reply);
freeReplyObject(reply);

redisClusterGetReply(cc, (void *)&reply); // reply for: INCR
CHECK_REPLY_QUEUED(cc, reply);
freeReplyObject(reply);

redisClusterGetReply(cc, (void *)&reply); // reply for: DISCARD
CHECK_REPLY_OK(cc, reply);
freeReplyObject(reply);

redisClusterGetReply(cc, (void *)&reply); // reply for: GET
CHECK_REPLY_STR(cc, reply, "55");
freeReplyObject(reply);

redisClusterGetReply(cc, (void *)&reply); // reply for: MULTI
CHECK_REPLY_OK(cc, reply);
freeReplyObject(reply);

redisClusterGetReply(cc, (void *)&reply); // reply for: INCR
CHECK_REPLY_QUEUED(cc, reply);
freeReplyObject(reply);

redisClusterGetReply(cc, (void *)&reply); // reply for: EXEC
CHECK_REPLY_ARRAY(cc, reply, 1);
CHECK_REPLY_INT(cc, reply->element[0], 56); // INCR
freeReplyObject(reply);

redisClusterGetReply(cc, (void *)&reply); // reply for: GET
CHECK_REPLY_STR(cc, reply, "56");
freeReplyObject(reply);

redisClusterFree(cc);
}

int main() {

test_pipelined_transaction();
test_discarded_pipelined_transaction();

return 0;
}
16 changes: 14 additions & 2 deletions tests/test_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,13 @@
}

#define CHECK_REPLY(_ctx, _reply) \
if (!(_reply)) { \
ASSERT_MSG(_reply, _ctx->errstr); \
if ((_reply) == NULL) { \
fprintf(stderr, "ERROR: reply=NULL => "); \
if ((_ctx) != NULL) { \
ASSERT_MSG(_reply, _ctx->errstr); \
} else { \
ASSERT_MSG(_reply, "context is NULL"); \
} \
}

#define CHECK_REPLY_TYPE(_reply, _type) \
Expand All @@ -22,6 +27,13 @@
ASSERT_MSG((strcmp(_reply->str, "OK") == 0), _ctx->errstr); \
}

#define CHECK_REPLY_QUEUED(_ctx, _reply) \
{ \
CHECK_REPLY(_ctx, _reply); \
CHECK_REPLY_TYPE(_reply, REDIS_REPLY_STATUS); \
ASSERT_MSG((strcmp(_reply->str, "QUEUED") == 0), _ctx->errstr); \
}

#define CHECK_REPLY_INT(_ctx, _reply, _value) \
{ \
CHECK_REPLY(_ctx, _reply); \
Expand Down

0 comments on commit 9fe13f4

Please sign in to comment.