Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Compatibility] Added LPOS command #673

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions libs/server/API/GarnetApiObjectCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ public GarnetStatus ListLeftPush(ArgSlice key, ArgSlice element, out int count,
public GarnetStatus ListLeftPush(byte[] key, ref ObjectInput input, out ObjectOutputHeader output)
=> storageSession.ListPush(key, ref input, out output, ref objectContext);

/// <inheritdoc />
public GarnetStatus ListPosition(byte[] key, ref ObjectInput input, ref GarnetObjectStoreOutput outputFooter)
=> storageSession.ListPosition(key, ref input, ref outputFooter, ref objectContext);

/// <inheritdoc />
public GarnetStatus ListLeftPop(byte[] key, ref ObjectInput input, ref GarnetObjectStoreOutput outputFooter)
=> storageSession.ListPop(key, ref input, ref outputFooter, ref objectContext);
Expand Down
10 changes: 10 additions & 0 deletions libs/server/API/IGarnetApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,16 @@ public interface IGarnetApi : IGarnetReadApi, IGarnetAdvancedApi

#region ListPush Methods

/// <summary>
/// The command returns the index of matching elements inside a Redis list.
/// By default, when no options are given, it will scan the list from head to tail, looking for the first match of "element".
/// </summary>
/// <param name="key"></param>
/// <param name="input"></param>
/// <param name="outputFooter"></param>
/// <returns></returns>
GarnetStatus ListPosition(byte[] key, ref ObjectInput input, ref GarnetObjectStoreOutput outputFooter);

/// <summary>
/// ListLeftPush ArgSlice version with ObjectOutputHeader output
/// </summary>
Expand Down
4 changes: 4 additions & 0 deletions libs/server/Objects/List/ListObject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public enum ListOperation : byte
LSET,
BRPOP,
BLPOP,
LPOS,
}

/// <summary>
Expand Down Expand Up @@ -179,6 +180,9 @@ public override unsafe bool Operate(ref ObjectInput input, ref SpanByteAndMemory
case ListOperation.LSET:
ListSet(ref input, ref output);
break;
case ListOperation.LPOS:
ListPosition(ref input, ref output);
break;

default:
throw new GarnetException($"Unsupported operation {input.header.ListOp} in ListObject.Operate");
Expand Down
201 changes: 201 additions & 0 deletions libs/server/Objects/List/ListObjectImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -418,5 +418,206 @@ private void ListSet(ref ObjectInput input, ref SpanByteAndMemory output)
output.Length = (int)(output_currptr - output_startptr);
}
}

private void ListPosition(ref ObjectInput input, ref SpanByteAndMemory output)
{
var element = input.parseState.GetArgSliceByRef(input.parseStateStartIdx).ReadOnlySpan;
input.parseStateStartIdx++;

var isMemory = false;
MemoryHandle ptrHandle = default;
var output_startptr = output.SpanByte.ToPointer();
var output_currptr = output_startptr;
var output_end = output_currptr + output.Length;
var count = 0;
ObjectOutputHeader outputHeader = default;

try
{
if (!ReadListPositionInput(ref input, out var rank, out count, out var maxlen, out var error))
{
while (!RespWriteUtils.WriteError(error, ref output_currptr, output_end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref output_startptr, ref ptrHandle, ref output_currptr, ref output_end);
return;
}

if (count < 0)
{
while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_VALUE_IS_NOT_INTEGER, ref output_currptr, output_end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref output_startptr, ref ptrHandle, ref output_currptr, ref output_end);
return;
}

if (maxlen < 0)
{
while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_VALUE_IS_NOT_INTEGER, ref output_currptr, output_end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref output_startptr, ref ptrHandle, ref output_currptr, ref output_end);
return;
}

int[] foundItemsArray = null;
count = count == 0 ? list.Count : count;
// TODO: (Nirmal) When count is greater then 32, directly write it in output and use buffer copy to move the data forward and add the lenght of the array at the start
Span<int> foundItems = count <= 32 ? stackalloc int[32] : foundItemsArray = ArrayPool<int>.Shared.Rent(count);
try
{
var noOfFoundItem = 0;
if (rank > 0)
{
var currentNode = list.First;
var currentIndex = 0;
var maxlenIndex = maxlen == 0 ? list.Count : maxlen;
do
{
var nextNode = currentNode.Next;
if (currentNode.Value.AsSpan().SequenceEqual(element))
{
if (rank == 1)
{
foundItems[noOfFoundItem] = currentIndex;
noOfFoundItem++;

if (noOfFoundItem == count)
{
break;
}
}
else
{
rank--;
}
}
currentNode = nextNode;
currentIndex++;
}
while (currentNode != null && currentIndex < maxlenIndex);
}
else if (rank < 0)
{
var currentNode = list.Last;
var currentIndex = list.Count -1;
var maxlenIndex = maxlen == 0 ? 0 : list.Count - maxlen;
do
{
var nextNode = currentNode.Previous;
if (currentNode.Value.AsSpan().SequenceEqual(element))
{
if (rank == -1)
{
foundItems[noOfFoundItem] = currentIndex;
noOfFoundItem++;

if (noOfFoundItem == count)
{
break;
}
}
else
{
rank++;
}
}
currentNode = nextNode;
currentIndex--;
}
while (currentNode != null && currentIndex >= maxlenIndex);
}
else
{
while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_VALUE_IS_NOT_INTEGER, ref output_currptr, output_end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref output_startptr, ref ptrHandle, ref output_currptr, ref output_end);
return;
}

if (noOfFoundItem == 0)
{
while (!RespWriteUtils.WriteNull(ref output_currptr, output_end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref output_startptr, ref ptrHandle, ref output_currptr, ref output_end);
}
else if (noOfFoundItem == 1)
{
while (!RespWriteUtils.WriteInteger(foundItems[0], ref output_currptr, output_end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref output_startptr, ref ptrHandle, ref output_currptr, ref output_end);
}
else
{
while (!RespWriteUtils.WriteArrayLength(noOfFoundItem, ref output_currptr, output_end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref output_startptr, ref ptrHandle, ref output_currptr, ref output_end);

foreach (var item in foundItems.Slice(0, noOfFoundItem))
{
while (!RespWriteUtils.WriteInteger(item, ref output_currptr, output_end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref output_startptr, ref ptrHandle, ref output_currptr, ref output_end);
}
}

outputHeader.result1 = noOfFoundItem;
}
finally
{
if (foundItemsArray is not null)
{
ArrayPool<int>.Shared.Return(foundItemsArray);
}
}
}
finally
{
while (!RespWriteUtils.WriteDirect(ref outputHeader, ref output_currptr, output_end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref output_startptr, ref ptrHandle, ref output_currptr, ref output_end);

if (isMemory)
ptrHandle.Dispose();
output.Length = (int)(output_currptr - output_startptr);
}
}

private static unsafe bool ReadListPositionInput(ref ObjectInput input, out int rank, out int count, out int maxlen, out ReadOnlySpan<byte> error)
{
var currTokenIdx = input.parseStateStartIdx;

rank = 1; // By default, LPOS takes first match element
count = 1; // By default, LPOS return 1 element
maxlen = 0; // By default, iterate to all the item

error = default;

while (currTokenIdx < input.parseState.Count)
{
var sbParam = input.parseState.GetArgSliceByRef(currTokenIdx++).ReadOnlySpan;

if (sbParam.SequenceEqual(CmdStrings.RANK) || sbParam.SequenceEqual(CmdStrings.rank))
{
if (!input.parseState.TryGetInt(currTokenIdx++, out rank))
{
error = CmdStrings.RESP_ERR_GENERIC_VALUE_IS_NOT_INTEGER;
return false;
}
}
else if (sbParam.SequenceEqual(CmdStrings.COUNT) || sbParam.SequenceEqual(CmdStrings.count))
{
if (!input.parseState.TryGetInt(currTokenIdx++, out count))
{
error = CmdStrings.RESP_ERR_GENERIC_VALUE_IS_NOT_INTEGER;
return false;
}
}
else if (sbParam.SequenceEqual(CmdStrings.MAXLEN) || sbParam.SequenceEqual(CmdStrings.maxlen))
{
if (!input.parseState.TryGetInt(currTokenIdx++, out maxlen))
{
error = CmdStrings.RESP_ERR_GENERIC_VALUE_IS_NOT_INTEGER;
return false;
}
}
else
{
error = CmdStrings.RESP_SYNTAX_ERROR;
return false;
}
}

return true;
}
}
}
4 changes: 4 additions & 0 deletions libs/server/Resp/CmdStrings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ static partial class CmdStrings
public static ReadOnlySpan<byte> XX => "XX"u8;
public static ReadOnlySpan<byte> UNSAFETRUNCATELOG => "UNSAFETRUNCATELOG"u8;
public static ReadOnlySpan<byte> SAMPLES => "SAMPLES"u8;
public static ReadOnlySpan<byte> RANK => "RANK"u8;
public static ReadOnlySpan<byte> rank => "rank"u8;
public static ReadOnlySpan<byte> MAXLEN => "MAXLEN"u8;
public static ReadOnlySpan<byte> maxlen => "maxlen"u8;

/// <summary>
/// Response strings
Expand Down
59 changes: 59 additions & 0 deletions libs/server/Resp/Objects/ListCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,65 @@ private unsafe bool ListPop<TGarnetApi>(RespCommand command, ref TGarnetApi stor
return true;
}

/// <summary>
/// The command returns the index of matching elements inside a Redis list.
/// By default, when no options are given, it will scan the list from head to tail, looking for the first match of "element".
/// </summary>
/// <typeparam name="TGarnetApi"></typeparam>
/// <param name="storageApi"></param>
/// <returns></returns>
private unsafe bool ListPosition<TGarnetApi>(ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
if (parseState.Count < 2)
{
return AbortWithWrongNumberOfArguments(nameof(RespCommand.LPOS));
}

// Get the key for List
var sbKey = parseState.GetArgSliceByRef(0).SpanByte;
var element = parseState.GetArgSliceByRef(1).SpanByte;
var keyBytes = sbKey.ToByteArray();

if (NetworkSingleKeySlotVerify(keyBytes, false))
{
return true;
}

// Prepare input
var input = new ObjectInput
{
header = new RespInputHeader
{
type = GarnetObjectType.List,
ListOp = ListOperation.LPOS,
},
parseState = parseState,
parseStateStartIdx = 1,
};

// Prepare GarnetObjectStore output
var outputFooter = new GarnetObjectStoreOutput { spanByteAndMemory = new SpanByteAndMemory(dcurr, (int)(dend - dcurr)) };

var statusOp = storageApi.ListPosition(keyBytes, ref input, ref outputFooter);

switch (statusOp)
{
case GarnetStatus.OK:
ProcessOutputWithHeader(outputFooter.spanByteAndMemory);
break;
case GarnetStatus.NOTFOUND:
while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_ERRNOTFOUND, ref dcurr, dend))
SendAndReset();
break;
case GarnetStatus.WRONGTYPE:
while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_WRONG_TYPE, ref dcurr, dend))
SendAndReset();
break;
}

return true;
}

/// <summary>
/// LMPOP numkeys key [key ...] LEFT | RIGHT [COUNT count]
Expand Down
5 changes: 5 additions & 0 deletions libs/server/Resp/Parser/RespCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public enum RespCommand : byte
KEYS,
LINDEX,
LLEN,
LPOS,
LRANGE,
MEMORY_USAGE,
MGET,
Expand Down Expand Up @@ -767,6 +768,10 @@ private RespCommand FastParseArrayCommand(ref int count, ref ReadOnlySpan<byte>
{
return RespCommand.LSET;
}
else if (*(ulong*)(ptr + 2) == MemoryMarshal.Read<ulong>("\r\nLPOS\r\n"u8))
{
return RespCommand.LPOS;
}
break;

case 'M':
Expand Down
29 changes: 29 additions & 0 deletions libs/server/Resp/RespCommandsInfo.json
Original file line number Diff line number Diff line change
Expand Up @@ -2780,6 +2780,35 @@
],
"SubCommands": null
},
{
"Command": "LPOS",
"Name": "LPOS",
"IsInternal": false,
"Arity": -3,
"Flags": "ReadOnly",
"FirstKey": 1,
"LastKey": 1,
"Step": 1,
"AclCategories": "List, Read, Slow",
"Tips": null,
"KeySpecifications": [
{
"BeginSearch": {
"TypeDiscriminator": "BeginSearchIndex",
"Index": 1
},
"FindKeys": {
"TypeDiscriminator": "FindKeysRange",
"LastKey": 0,
"KeyStep": 1,
"Limit": 0
},
"Notes": null,
"Flags": "RO, Access"
}
],
"SubCommands": null
},
{
"Command": "LPUSH",
"Name": "LPUSH",
Expand Down
1 change: 1 addition & 0 deletions libs/server/Resp/RespServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,7 @@ private bool ProcessArrayCommands<TGarnetApi>(RespCommand cmd, ref TGarnetApi st
RespCommand.LPUSH => ListPush(cmd, ref storageApi),
RespCommand.LPUSHX => ListPush(cmd, ref storageApi),
RespCommand.LPOP => ListPop(cmd, ref storageApi),
RespCommand.LPOS => ListPosition(ref storageApi),
RespCommand.RPUSH => ListPush(cmd, ref storageApi),
RespCommand.RPUSHX => ListPush(cmd, ref storageApi),
RespCommand.RPOP => ListPop(cmd, ref storageApi),
Expand Down
Loading
Loading