Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements in command handling #5 #631

Open
wants to merge 144 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
144 commits
Select commit Hold shift + click to select a range
5c570c9
partial checkin
badrishc Jun 13, 2024
b29c03b
nit
badrishc Jun 13, 2024
8d5c67b
nit
badrishc Jun 13, 2024
57d9534
idea for initial spike for making input a struct type
badrishc Jun 14, 2024
90c2489
support ZADD and ZREM using safe struct wrappers for input
badrishc Jun 14, 2024
6ac7518
nit
badrishc Jun 14, 2024
ce4e972
Merge branch 'main' into talzacc/header_impr
TalZaccai Jun 19, 2024
76aa03d
wip
TalZaccai Jun 19, 2024
2698b67
wip
TalZaccai Jun 19, 2024
b05050a
Merge branch 'main' into talzacc/header_impr
TalZaccai Jun 20, 2024
fe19e3e
Fixing non-determinism + refactoring HashGet
TalZaccai Jun 21, 2024
5459083
Merge branch 'talzacc/header_impr' of https://github.com/microsoft/ga…
TalZaccai Jun 21, 2024
34354be
merging from latest main
TalZaccai Jun 21, 2024
cea36f6
dotnet format
TalZaccai Jun 21, 2024
65a62c8
Fixed some API calls
TalZaccai Jun 21, 2024
9132c87
dotnet format
TalZaccai Jun 21, 2024
d4ea372
merging with latest main
TalZaccai Jun 21, 2024
9570d86
wip
TalZaccai Jun 21, 2024
ea10ce6
small fix
TalZaccai Jun 22, 2024
ba1f67c
Merge branch 'main' into talzacc/cmd_handling_impr
TalZaccai Jun 22, 2024
8d5a186
Removing unused method
TalZaccai Jun 22, 2024
ce208e3
Merge branch 'talzacc/cmd_handling_impr' of https://github.com/micros…
TalZaccai Jun 22, 2024
38fef93
wip - refactoring ProcessAdminCommands
TalZaccai Jun 22, 2024
abf8e28
Merging from main
TalZaccai Jul 5, 2024
925291d
Undoing changes to RandomUtils
TalZaccai Jul 5, 2024
7a410c2
Continued refactoring of AdminCommands
TalZaccai Jul 6, 2024
6555787
Added "TryGetInt" and "TryGetLong" to parse state api
TalZaccai Jul 6, 2024
6ee3652
dotnet format
TalZaccai Jul 6, 2024
cadfca2
wip
TalZaccai Jul 8, 2024
9df3e68
format
TalZaccai Jul 8, 2024
36fcc2f
wip
TalZaccai Jul 9, 2024
7515130
wip
TalZaccai Jul 9, 2024
5fb1a75
wip
TalZaccai Jul 9, 2024
c499c3c
cleanup
TalZaccai Jul 9, 2024
96a38d8
wip
TalZaccai Jul 9, 2024
7920d5b
wip
TalZaccai Jul 9, 2024
f944d25
format
TalZaccai Jul 9, 2024
ea53fda
wip
TalZaccai Jul 10, 2024
815a0a5
merge with latest main
TalZaccai Jul 10, 2024
04b27a6
Fixing build
TalZaccai Jul 10, 2024
beea5d8
cont
TalZaccai Jul 15, 2024
6eb1e03
Merge branch 'main' into talzacc/cmd_handling_impr
TalZaccai Jul 15, 2024
0bfb1d9
bugfix
TalZaccai Jul 16, 2024
64ea1c5
merging with cmd_handling_impr
TalZaccai Jul 16, 2024
dbacb58
Fixing build
TalZaccai Jul 16, 2024
22ea02c
wip
TalZaccai Jul 17, 2024
41aaaa4
wip
TalZaccai Jul 17, 2024
4d992f9
Merging from main
TalZaccai Jul 17, 2024
ec63ba4
Merged from main & updated new added command
TalZaccai Jul 19, 2024
6f8ec74
bugfix
TalZaccai Jul 19, 2024
5ae9fdd
Few small fixes
TalZaccai Jul 19, 2024
ecc8716
format
TalZaccai Jul 19, 2024
a0c5155
bugfix
TalZaccai Jul 19, 2024
79f7298
Added range validation to SessionParseState read methods
TalZaccai Jul 19, 2024
cec844a
wip
TalZaccai Jul 19, 2024
49f0ef7
Merge branch 'main' into talzacc/cmd_handling_impr
TalZaccai Jul 19, 2024
c60f844
Merge branch 'talzacc/cmd_handling_impr' into talzacc/cmd_handling_impr2
TalZaccai Jul 19, 2024
2d43807
merging from latest main
TalZaccai Jul 19, 2024
1c0df8e
format
TalZaccai Jul 20, 2024
fc97b7b
wip
TalZaccai Jul 20, 2024
b5680f2
wip
TalZaccai Jul 20, 2024
df5b50d
wip
TalZaccai Jul 22, 2024
3d833d7
wip
TalZaccai Jul 22, 2024
ac4a1dd
wip
TalZaccai Jul 23, 2024
dd3ac07
wip
TalZaccai Jul 23, 2024
3ec10aa
wip
TalZaccai Jul 23, 2024
9774e71
wip
TalZaccai Jul 23, 2024
31d2025
wip
TalZaccai Jul 23, 2024
ab1356f
clean up
TalZaccai Jul 25, 2024
d08e71e
Fixed object store PERSIST + added missing test
TalZaccai Jul 25, 2024
72615cf
wip - adding parseState to ObjectInput & implementing RMW AOF recover…
TalZaccai Jul 26, 2024
509d2a5
Cleaned up ObjectInput and added XML comments
TalZaccai Jul 29, 2024
3334a34
merge latest
TalZaccai Jul 29, 2024
8b1583d
wip
TalZaccai Jul 29, 2024
265a6c6
wip
TalZaccai Jul 30, 2024
6a1ec72
wip
TalZaccai Jul 30, 2024
26e8c37
remove unnecessary serialization during upsert
TalZaccai Jul 30, 2024
745557a
merging from main
TalZaccai Jul 30, 2024
51b10ed
Addressing comments
TalZaccai Jul 31, 2024
e086031
Merge branch 'talzacc/cmd_handling_impr2' into talzacc/cmd_handling_i…
TalZaccai Jul 31, 2024
126fdc2
merging from main
TalZaccai Jul 31, 2024
55fae9b
wip
TalZaccai Aug 1, 2024
b93bbc4
Merge branch 'main' into talzacc/cmd_handling_impr3
TalZaccai Aug 1, 2024
52e045c
wip
TalZaccai Aug 1, 2024
cf3bd30
wip
TalZaccai Aug 2, 2024
86272fe
Merge branch 'main' into talzacc/cmd_handling_impr3
TalZaccai Aug 2, 2024
54d4f43
merging from main
TalZaccai Aug 6, 2024
e228af6
wip
TalZaccai Aug 6, 2024
1cc29c7
wip
TalZaccai Aug 7, 2024
242a057
wip
TalZaccai Aug 7, 2024
3cd584d
bugfix
TalZaccai Aug 7, 2024
3cfe228
wip
TalZaccai Aug 7, 2024
eb66d5c
wip
TalZaccai Aug 8, 2024
4c5082a
Merge branch 'main' into talzacc/cmd_handling_impr3
TalZaccai Aug 8, 2024
dbd0c36
format
TalZaccai Aug 13, 2024
6d63f80
Merge branch 'talzacc/cmd_handling_impr3' of https://github.com/micro…
TalZaccai Aug 13, 2024
cad4a7f
format
TalZaccai Aug 13, 2024
6e79a8b
merging from latest main
TalZaccai Aug 13, 2024
b3daafe
Merge branch 'main' into talzacc/cmd_handling_impr3
TalZaccai Aug 13, 2024
40c16ce
Using ObjectInput in custom object commands
TalZaccai Aug 13, 2024
460d7ca
wip
TalZaccai Aug 14, 2024
53402bb
Merge branch 'main' into talzacc/cmd_handling_impr3
TalZaccai Aug 15, 2024
17c2451
some cleanup
TalZaccai Aug 15, 2024
20270c2
merging from main
TalZaccai Aug 15, 2024
44de156
safe parsing for cluster commands
TalZaccai Aug 16, 2024
ed0f03f
Merge branch 'main' into talzacc/cmd_handling_impr3
TalZaccai Aug 16, 2024
8dca3a2
format
TalZaccai Aug 16, 2024
0beb052
Merge branch 'talzacc/cmd_handling_impr3' of https://github.com/micro…
TalZaccai Aug 16, 2024
e5cbae0
Merging from latest main
TalZaccai Aug 20, 2024
05188fc
Fixing build errors
TalZaccai Aug 20, 2024
4e55852
wip -- broken
TalZaccai Aug 21, 2024
405236b
wip
TalZaccai Aug 21, 2024
ca22307
wip
TalZaccai Aug 22, 2024
089f4dd
wip
TalZaccai Aug 22, 2024
97e1372
wip
TalZaccai Aug 22, 2024
24a28a4
merging from main
TalZaccai Aug 23, 2024
d05d76f
merging from main
TalZaccai Aug 23, 2024
9c0dc38
wip
TalZaccai Aug 26, 2024
df1f773
wip
TalZaccai Aug 26, 2024
5cea7d7
wip
TalZaccai Aug 26, 2024
ecb970b
wip - RespTests passing
TalZaccai Aug 27, 2024
c93d9ab
wip
TalZaccai Aug 27, 2024
ffc438d
wip
TalZaccai Aug 28, 2024
64d9e06
wip - bitfield working
TalZaccai Aug 29, 2024
fd322f6
wip - bitmap commands working
TalZaccai Aug 29, 2024
5487d3f
wip - pfadd
TalZaccai Aug 29, 2024
a8d4c69
wip
TalZaccai Aug 30, 2024
e3ee243
wip
TalZaccai Aug 31, 2024
ad702ba
wip
TalZaccai Aug 31, 2024
0909431
wip
TalZaccai Sep 3, 2024
c47a90b
wip
TalZaccai Sep 3, 2024
d461a15
fixes
TalZaccai Sep 4, 2024
b848dd3
bugfixes
TalZaccai Sep 4, 2024
956c711
format
TalZaccai Sep 4, 2024
979c9f9
more fixes
TalZaccai Sep 4, 2024
e2b93a6
merging with latest main
TalZaccai Sep 4, 2024
a2e5e75
Merge branch 'main' into talzacc/cmd_handling_impr4
TalZaccai Sep 4, 2024
7b5d5e4
merging from main
TalZaccai Sep 4, 2024
eaa00ab
fix
TalZaccai Sep 4, 2024
b1309af
Merge branch 'main' into talzacc/cmd_handling_impr4
TalZaccai Sep 5, 2024
ea1e337
cleanup
TalZaccai Sep 5, 2024
9fb4989
cleanup
TalZaccai Sep 5, 2024
12d4ff6
cleanup
TalZaccai Sep 5, 2024
0ec23cc
cleanup
TalZaccai Sep 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion libs/cluster/Server/ClusterManagerSlotState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

namespace Garnet.cluster
{
using BasicGarnetApi = GarnetApi<BasicContext<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long, MainSessionFunctions,
using BasicGarnetApi = GarnetApi<BasicContext<SpanByte, SpanByte, RawStringInput, SpanByteAndMemory, long, MainSessionFunctions,
/* MainStoreFunctions */ StoreFunctions<SpanByte, SpanByte, SpanByteComparer, SpanByteRecordDisposer>,
SpanByteAllocator<StoreFunctions<SpanByte, SpanByte, SpanByteComparer, SpanByteRecordDisposer>>>,
BasicContext<byte[], IGarnetObject, ObjectInput, GarnetObjectStoreOutput, long, ObjectSessionFunctions,
Expand Down
2 changes: 1 addition & 1 deletion libs/cluster/Server/ClusterProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

namespace Garnet.cluster
{
using BasicGarnetApi = GarnetApi<BasicContext<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long, MainSessionFunctions,
using BasicGarnetApi = GarnetApi<BasicContext<SpanByte, SpanByte, RawStringInput, SpanByteAndMemory, long, MainSessionFunctions,
/* MainStoreFunctions */ StoreFunctions<SpanByte, SpanByte, SpanByteComparer, SpanByteRecordDisposer>,
SpanByteAllocator<StoreFunctions<SpanByte, SpanByte, SpanByteComparer, SpanByteRecordDisposer>>>,
BasicContext<byte[], IGarnetObject, ObjectInput, GarnetObjectStoreOutput, long, ObjectSessionFunctions,
Expand Down
15 changes: 3 additions & 12 deletions libs/cluster/Server/Migration/MigrateSessionKeys.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,11 @@ private bool MigrateKeysFromMainStore()
TryTransitionState(KeyMigrationStatus.MIGRATING);
WaitForConfigPropagation();

// 4 byte length of input
// 1 byte RespCommand
// 1 byte RespInputFlags
var inputSize = sizeof(int) + RespInputHeader.Size;
var pbCmdInput = stackalloc byte[inputSize];

////////////////
// Build Input//
////////////////
var pcurr = pbCmdInput;
*(int*)pcurr = inputSize - sizeof(int);
pcurr += sizeof(int);
// 1. Header
((RespInputHeader*)pcurr)->SetHeader(RespCommandAccessor.MIGRATE, 0);
var input = new RawStringInput();
input.header.SetHeader(RespCommandAccessor.MIGRATE, 0);

foreach (var pair in _keys.GetKeys())
{
Expand All @@ -57,7 +48,7 @@ private bool MigrateKeysFromMainStore()
var key = pair.Key.SpanByte;

// Read value for key
var status = localServerSession.BasicGarnetApi.Read_MainStore(ref key, ref Unsafe.AsRef<SpanByte>(pbCmdInput), ref o);
var status = localServerSession.BasicGarnetApi.Read_MainStore(ref key, ref input, ref o);

// Check if found in main store
if (status == GarnetStatus.NOTFOUND)
Expand Down
2 changes: 1 addition & 1 deletion libs/cluster/Session/ClusterSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

namespace Garnet.cluster
{
using BasicGarnetApi = GarnetApi<BasicContext<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long, MainSessionFunctions,
using BasicGarnetApi = GarnetApi<BasicContext<SpanByte, SpanByte, RawStringInput, SpanByteAndMemory, long, MainSessionFunctions,
/* MainStoreFunctions */ StoreFunctions<SpanByte, SpanByte, SpanByteComparer, SpanByteRecordDisposer>,
SpanByteAllocator<StoreFunctions<SpanByte, SpanByte, SpanByteComparer, SpanByteRecordDisposer>>>,
BasicContext<byte[], IGarnetObject, ObjectInput, GarnetObjectStoreOutput, long, ObjectSessionFunctions,
Expand Down
99 changes: 91 additions & 8 deletions libs/common/RespReadUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,28 +101,71 @@ private static bool TryReadUlong(ref byte* ptr, byte* end, out ulong value, out

/// <summary>
/// Tries to read a signed 64-bit integer from a given ASCII-encoded input stream.
/// This method will throw if an overflow occurred.
/// </summary>
/// <param name="ptr">Pointer to the beginning of the ASCII encoded input string.</param>
/// <param name="end">The end of the string to parse.</param>
/// <param name="value">If parsing was successful, contains the parsed long value.</param>
/// <param name="bytesRead">If parsing was successful, contains the number of bytes that were parsed.</param>
/// <param name="allowLeadingZeros">True if leading zeros allowed</param>
/// <returns>
/// True if a long was successfully parsed, false if the input string did not start with
/// a valid integer or the end of the string was reached before finishing parsing.
/// </returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static bool TryReadLong(ref byte* ptr, byte* end, out long value, out ulong bytesRead)
public static bool TryReadLong(ref byte* ptr, byte* end, out long value, out ulong bytesRead, bool allowLeadingZeros = true)
{
var parseSuccessful = TryReadLongSafe(ref ptr, end, out value, out bytesRead, out var signRead,
out var overflow, allowLeadingZeros);

if (parseSuccessful) return true;

if (overflow)
{
var digitsRead = signRead ? bytesRead - 1 : bytesRead;
RespParsingException.ThrowIntegerOverflow(ptr - digitsRead, (int)digitsRead);
return false;
}

return false;
}

/// <summary>
/// Tries to read a signed 64-bit integer from a given ASCII-encoded input stream.
/// </summary>
/// <param name="ptr">Pointer to the beginning of the ASCII encoded input string.</param>
/// <param name="end">The end of the string to parse.</param>
/// <param name="value">If parsing was successful, contains the parsed long value.</param>
/// <param name="bytesRead">If parsing was successful, contains the number of bytes that were parsed.</param>
/// <param name="signRead">True if +/- sign was read during parsing</param>
/// <param name="overflow">True if overflow occured during parsing</param>
/// <param name="allowLeadingZeros">True if leading zeros allowed</param>
/// <returns>
/// True if a long was successfully parsed, false if the input string did not start with
/// a valid integer or the end of the string was reached before finishing parsing.
/// </returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static bool TryReadLongSafe(ref byte* ptr, byte* end, out long value, out ulong bytesRead, out bool signRead, out bool overflow, bool allowLeadingZeros = true)
{
bytesRead = 0;
value = 0;
overflow = false;

// Parse optional leading sign
if (TryReadSign(ptr, out var negative))
signRead = TryReadSign(ptr, out var negative);
if (signRead)
{
ptr++;
bytesRead = 1;
}

if (!allowLeadingZeros)
{
// Do not allow leading zeros
if (end - ptr > 1 && *ptr == '0')
return false;
}

// Parse digits as ulong
if (!TryReadUlong(ref ptr, end, out var number, out var digitsRead))
{
Expand All @@ -134,7 +177,8 @@ public static bool TryReadLong(ref byte* ptr, byte* end, out long value, out ulo
{
if (number > ((ulong)long.MaxValue) + 1)
{
RespParsingException.ThrowIntegerOverflow(ptr - digitsRead, (int)digitsRead);
overflow = true;
return false;
}

value = -1 - (long)(number - 1);
Expand All @@ -143,7 +187,8 @@ public static bool TryReadLong(ref byte* ptr, byte* end, out long value, out ulo
{
if (number > long.MaxValue)
{
RespParsingException.ThrowIntegerOverflow(ptr - digitsRead, (int)digitsRead);
overflow = true;
return false;
}
value = (long)number;
}
Expand All @@ -153,25 +198,61 @@ public static bool TryReadLong(ref byte* ptr, byte* end, out long value, out ulo
return true;
}

/// <summary>
/// Tries to read a signed 32-bit integer from a given ASCII-encoded input stream.
/// This method will throw if an overflow occurred.
/// </summary>
/// <param name="ptr">Pointer to the beginning of the ASCII encoded input string.</param>
/// <param name="end">The end of the string to parse.</param>
/// <param name="value">If parsing was successful, contains the parsed int value.</param>
/// <param name="bytesRead">If parsing was successful, contains the number of bytes that were parsed.</param>
/// <param name="allowLeadingZeros">True if leading zeros allowed</param>
/// <returns>
/// True if an int was successfully parsed, false if the input string did not start with
/// a valid integer or the end of the string was reached before finishing parsing.
/// </returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static bool TryReadInt(ref byte* ptr, byte* end, out int value, out ulong bytesRead, bool allowLeadingZeros = true)
{
var parseSuccessful = TryReadIntSafe(ref ptr, end, out value, out bytesRead, out var signRead,
out var overflow, allowLeadingZeros);

if (parseSuccessful) return true;

if (overflow)
{
var digitsRead = signRead ? bytesRead - 1 : bytesRead;
RespParsingException.ThrowIntegerOverflow(ptr - digitsRead, (int)digitsRead);
return false;
}

return false;
}

/// <summary>
/// Tries to read a signed 32-bit integer from a given ASCII-encoded input stream.
/// </summary>
/// <param name="ptr">Pointer to the beginning of the ASCII encoded input string.</param>
/// <param name="end">The end of the string to parse.</param>
/// <param name="value">If parsing was successful, contains the parsed int value.</param>
/// <param name="bytesRead">If parsing was successful, contains the number of bytes that were parsed.</param>
/// <param name="signRead">True if +/- sign was read during parsing</param>
/// <param name="overflow">True if overflow occured during parsing</param>
/// <param name="allowLeadingZeros">True if leading zeros allowed</param>
/// <returns>
/// True if an int was successfully parsed, false if the input string did not start with
/// a valid integer or the end of the string was reached before finishing parsing.
/// </returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static bool TryReadInt(ref byte* ptr, byte* end, out int value, out ulong bytesRead)
public static bool TryReadIntSafe(ref byte* ptr, byte* end, out int value, out ulong bytesRead, out bool signRead, out bool overflow, bool allowLeadingZeros = true)
{
bytesRead = 0;
value = 0;
overflow = false;

// Parse optional leading sign
if (TryReadSign(ptr, out var negative))
signRead = TryReadSign(ptr, out var negative);
if (signRead)
{
ptr++;
bytesRead = 1;
Expand All @@ -188,7 +269,8 @@ public static bool TryReadInt(ref byte* ptr, byte* end, out int value, out ulong
{
if (number > ((ulong)int.MaxValue) + 1)
{
RespParsingException.ThrowIntegerOverflow(ptr - digitsRead, (int)digitsRead);
overflow = true;
return false;
}

value = (int)(0 - (long)number);
Expand All @@ -197,7 +279,8 @@ public static bool TryReadInt(ref byte* ptr, byte* end, out int value, out ulong
{
if (number > int.MaxValue)
{
RespParsingException.ThrowIntegerOverflow(ptr - digitsRead, (int)digitsRead);
overflow = true;
return false;
}
value = (int)number;
}
Expand Down
97 changes: 85 additions & 12 deletions libs/server/AOF/AofProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public sealed unsafe partial class AofProcessor
/// <summary>
/// Session for main store
/// </summary>
readonly BasicContext<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long, MainSessionFunctions, MainStoreFunctions, MainStoreAllocator> basicContext;
readonly BasicContext<SpanByte, SpanByte, RawStringInput, SpanByteAndMemory, long, MainSessionFunctions, MainStoreFunctions, MainStoreAllocator> basicContext;

/// <summary>
/// Session for object store
Expand Down Expand Up @@ -259,40 +259,113 @@ private unsafe bool ReplayOp(byte* entryPtr)
ObjectStoreDelete(objectStoreBasicContext, entryPtr);
break;
case AofEntryType.StoredProcedure:
ref var input = ref Unsafe.AsRef<SpanByte>(entryPtr + sizeof(AofHeader));
respServerSession.RunTransactionProc(header.type, new ArgSlice(ref input), ref output);
RunStoredProc(header.type, entryPtr);
break;
default:
throw new GarnetException($"Unknown AOF header operation type {header.opType}");
}
return true;
}

static unsafe void StoreUpsert(BasicContext<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long, MainSessionFunctions, MainStoreFunctions, MainStoreAllocator> basicContext, byte* ptr)
unsafe void RunStoredProc(byte id, byte* ptr)
{
ref var key = ref Unsafe.AsRef<SpanByte>(ptr + sizeof(AofHeader));
ref var input = ref Unsafe.AsRef<SpanByte>(ptr + sizeof(AofHeader) + key.TotalSize);
ref var value = ref Unsafe.AsRef<SpanByte>(ptr + sizeof(AofHeader) + key.TotalSize + input.TotalSize);
var curr = ptr;
var parseStateCount = *(int*)curr;
curr += sizeof(int);

var parseState = new SessionParseState();
if (parseStateCount > 0)
{
ArgSlice[] parseStateBuffer = default;
parseState.Initialize(ref parseStateBuffer, parseStateCount);

for (var i = 0; i < parseStateCount; i++)
{
ref var sbArgument = ref Unsafe.AsRef<SpanByte>(curr);
parseStateBuffer[i] = new ArgSlice(ref sbArgument);
curr += sbArgument.TotalSize;
}
}

respServerSession.RunTransactionProc(id, ref parseState, ref output);
}

static unsafe void StoreUpsert(BasicContext<SpanByte, SpanByte, RawStringInput, SpanByteAndMemory, long, MainSessionFunctions, MainStoreFunctions, MainStoreAllocator> basicContext, byte* ptr)
{
var curr = ptr + sizeof(AofHeader);
ref var key = ref Unsafe.AsRef<SpanByte>(curr);
curr += key.TotalSize;

// Reconstructing RawStringInput

// input
ref var sbInput = ref Unsafe.AsRef<SpanByte>(curr);
ref var input = ref Unsafe.AsRef<RawStringInput>(sbInput.ToPointer());
curr += sbInput.TotalSize;

// Reconstructing parse state
var parseStateCount = input.parseState.Count;

if (parseStateCount > 0)
{
ArgSlice[] parseStateBuffer = default;
input.parseState.Initialize(ref parseStateBuffer, parseStateCount);

for (var i = 0; i < parseStateCount; i++)
{
ref var sbArgument = ref Unsafe.AsRef<SpanByte>(curr);
parseStateBuffer[i] = new ArgSlice(ref sbArgument);
curr += sbArgument.TotalSize;
}
}

ref var value = ref Unsafe.AsRef<SpanByte>(curr);

SpanByteAndMemory output = default;
basicContext.Upsert(ref key, ref input, ref value, ref output);
if (!output.IsSpanByte)
output.Memory.Dispose();
}

static unsafe void StoreRMW(BasicContext<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long, MainSessionFunctions, MainStoreFunctions, MainStoreAllocator> basicContext, byte* ptr)
static unsafe void StoreRMW(BasicContext<SpanByte, SpanByte, RawStringInput, SpanByteAndMemory, long, MainSessionFunctions, MainStoreFunctions, MainStoreAllocator> basicContext, byte* ptr)
{
byte* pbOutput = stackalloc byte[32];
ref var key = ref Unsafe.AsRef<SpanByte>(ptr + sizeof(AofHeader));
ref var input = ref Unsafe.AsRef<SpanByte>(ptr + sizeof(AofHeader) + key.TotalSize);
var curr = ptr + sizeof(AofHeader);
ref var key = ref Unsafe.AsRef<SpanByte>(curr);
curr += key.TotalSize;

// Reconstructing RawStringInput

// input
ref var sbInput = ref Unsafe.AsRef<SpanByte>(curr);
ref var input = ref Unsafe.AsRef<RawStringInput>(sbInput.ToPointer());
curr += sbInput.TotalSize;

// Reconstructing parse state
var parseStateCount = input.parseState.Count;

if (parseStateCount > 0)
{
ArgSlice[] parseStateBuffer = default;
input.parseState.Initialize(ref parseStateBuffer, parseStateCount);

for (var i = 0; i < parseStateCount; i++)
{
ref var sbArgument = ref Unsafe.AsRef<SpanByte>(curr);
parseStateBuffer[i] = new ArgSlice(ref sbArgument);
curr += sbArgument.TotalSize;
}
}

var pbOutput = stackalloc byte[32];
var output = new SpanByteAndMemory(pbOutput, 32);

if (basicContext.RMW(ref key, ref input, ref output).IsPending)
basicContext.CompletePending(true);
if (!output.IsSpanByte)
output.Memory.Dispose();
}

static unsafe void StoreDelete(BasicContext<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long, MainSessionFunctions, MainStoreFunctions, MainStoreAllocator> basicContext, byte* ptr)
static unsafe void StoreDelete(BasicContext<SpanByte, SpanByte, RawStringInput, SpanByteAndMemory, long, MainSessionFunctions, MainStoreFunctions, MainStoreAllocator> basicContext, byte* ptr)
{
ref var key = ref Unsafe.AsRef<SpanByte>(ptr + sizeof(AofHeader));
basicContext.Delete(ref key);
Expand Down
Loading