Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API - PUBSUB CHANNELS #425

Open
rnz opened this issue May 27, 2024 · 9 comments
Open

API - PUBSUB CHANNELS #425

rnz opened this issue May 27, 2024 · 9 comments
Labels
API enhancement New feature or request help wanted Extra attention is needed

Comments

@rnz
Copy link

rnz commented May 27, 2024

Feature request type

sample request

Is your feature request related to a problem? Please describe

Often used

Describe the solution you'd like

Often used

Describe alternatives you've considered

No response

Additional context

No response

@badrishc
Copy link
Contributor

Garnet supports pub/sub already. If you search online for examples of pub/sub in Redis, there should be a lot of hits. Let us know if you run into any bugs or issues.

@rnz
Copy link
Author

rnz commented May 28, 2024

@badrishc
This request about command "PUBSUB CHANNELS"

$ keydb-cli PUBSUB CHANNELS | wc -l
1453015

and

$ keydb-cli INFO | grep -i channel
pubsub_channels:1454151

@TalZaccai TalZaccai added enhancement New feature or request API labels May 29, 2024
@badrishc badrishc added the help wanted Extra attention is needed label Jun 5, 2024
@mardukbp
Copy link

Running GarnetServer from the main branch (with the new --lua option set), the first example in the Celery documentation fails.

To reproduce the error you will need Python and Celery. Install it with:

pip install celery[redis]

Create the file tasks.py:

from celery import Celery

app = Celery('tasks', backend='redis://localhost', broker='redis://localhost:6379/0')

@app.task
def add(x, y):
    return x + y

In the directory where the file is saved open a terminal and execute:

celery -A tasks worker --pool=solo

In the same directory start a new terminal, execute python.exe and enter the following:

> from tasks import add
> add.delay(4,4)

In the first terminal you will get an exception from the Python Redis client:

redis.exceptions.ResponseError: Command # 2 (PUBLISH celery-task-meta-11b19d75-2bbc-496b-8a2f-fe520bcbf8ad {"status": "SUCCESS", "result": 8, "traceback": null, "children": [], "date_done": "2024-08-18T10:39:27.316382+00:00", "task_id": "11b19d75-2bbc-496b-8a2f-fe520bcbf8ad"}) of pipeline caused error: unknown command

@mardukbp
Copy link

Using Carmine, a Redis client for Clojure that auto-generates Clojure functions from the official Redis command spec, the following example causes GarnetServer to terminate:

(ns my-app (:require [taoensso.carmine :as car]))

(defonce my-conn-pool (car/connection-pool {}))
(def     my-conn-spec {:uri "redis://localhost/"})
(def     my-conn-opts {:pool my-conn-pool, :spec my-conn-spec})

(defmacro wcar* [& body] `(car/wcar my-conn-opts ~@body))

(def my-listener
  (car/with-new-pubsub-listener (:spec my-conn-spec)
    {"channel*" (fn f2 [msg] (println "f2:" msg))}
    (car/psubscribe "channel*")))

(car/with-open-listener my-listener
  (car/unsubscribe))

This is the command line output of GarnetServer:

Process terminated. Assertion Failed
   at Garnet.server.RespServerSession.NetworkUNSUBSCRIBE(Int32 count) in C:\Users\mardu\git\garnet\libs\server\Resp\PubSubCommands.cs:line 262
   at Garnet.server.RespServerSession.ProcessArrayCommands[TGarnetApi](RespCommand cmd, TGarnetApi& storageApi) in C:\Users\mardu\git\garnet\libs\server\Resp\RespServerSession.cs:line 570
   at Garnet.server.RespServerSession.ProcessMessages() in C:\Users\mardu\git\garnet\libs\server\Resp\RespServerSession.cs:line 419
   at Garnet.server.RespServerSession.TryConsumeMessages(Byte* reqBuffer, Int32 bytesReceived) in C:\Users\mardu\git\garnet\libs\server\Resp\RespServerSession.cs:line 298
   at Garnet.networking.NetworkHandler`2.TryProcessRequest() in C:\Users\mardu\git\garnet\libs\common\Networking\NetworkHandler.cs:line 481
   at Garnet.networking.NetworkHandler`2.Process() in C:\Users\mardu\git\garnet\libs\common\Networking\NetworkHandler.cs:line 337
   at Garnet.networking.NetworkHandler`2.OnNetworkReceive(Int32 bytesTransferred) in C:\Users\mardu\git\garnet\libs\common\Networking\NetworkHandler.cs:line 290
   at Garnet.common.TcpNetworkHandlerBase`2.RecvEventArg_Completed(Object sender, SocketAsyncEventArgs e) in C:\Users\mardu\git\garnet\libs\common\Networking\TcpNetworkHandlerBase.cs:line 120
   at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state)
   at System.Net.Sockets.SocketAsyncEventArgs.<>c.<.cctor>b__173_0(UInt32 errorCode, UInt32 numBytes, NativeOverlapped* nativeOverlapped)
   at System.Threading.PortableThreadPool.IOCompletionPoller.Callback.Invoke(Event e)
   at System.Threading.ThreadPoolTypedWorkItemQueue`2.System.Threading.IThreadPoolWorkItem.Execute()
   at System.Threading.ThreadPoolWorkQueue.Dispatch()
   at System.Threading.PortableThreadPool.WorkerThread.WorkerThreadStart()

@badrishc
Copy link
Contributor

celery -A tasks worker --pool=solo

Hi @mardukbp, can you try this PR to see if Celery works after this small fix? #604

@badrishc
Copy link
Contributor

Using Carmine, a Redis client for Clojure that auto-generates Clojure functions from the official Redis command spec, the following example causes GarnetServer to terminate:

(ns my-app (:require [taoensso.carmine :as car]))

(defonce my-conn-pool (car/connection-pool {}))
(def     my-conn-spec {:uri "redis://localhost/"})
(def     my-conn-opts {:pool my-conn-pool, :spec my-conn-spec})

(defmacro wcar* [& body] `(car/wcar my-conn-opts ~@body))

(def my-listener
  (car/with-new-pubsub-listener (:spec my-conn-spec)
    {"channel*" (fn f2 [msg] (println "f2:" msg))}
    (car/psubscribe "channel*")))

(car/with-open-listener my-listener
  (car/unsubscribe))

This is the command line output of GarnetServer:

Process terminated. Assertion Failed
   at Garnet.server.RespServerSession.NetworkUNSUBSCRIBE(Int32 count) in C:\Users\mardu\git\garnet\libs\server\Resp\PubSubCommands.cs:line 262
   at Garnet.server.RespServerSession.ProcessArrayCommands[TGarnetApi](RespCommand cmd, TGarnetApi& storageApi) in C:\Users\mardu\git\garnet\libs\server\Resp\RespServerSession.cs:line 570
   at Garnet.server.RespServerSession.ProcessMessages() in C:\Users\mardu\git\garnet\libs\server\Resp\RespServerSession.cs:line 419
   at Garnet.server.RespServerSession.TryConsumeMessages(Byte* reqBuffer, Int32 bytesReceived) in C:\Users\mardu\git\garnet\libs\server\Resp\RespServerSession.cs:line 298
   at Garnet.networking.NetworkHandler`2.TryProcessRequest() in C:\Users\mardu\git\garnet\libs\common\Networking\NetworkHandler.cs:line 481
   at Garnet.networking.NetworkHandler`2.Process() in C:\Users\mardu\git\garnet\libs\common\Networking\NetworkHandler.cs:line 337
   at Garnet.networking.NetworkHandler`2.OnNetworkReceive(Int32 bytesTransferred) in C:\Users\mardu\git\garnet\libs\common\Networking\NetworkHandler.cs:line 290
   at Garnet.common.TcpNetworkHandlerBase`2.RecvEventArg_Completed(Object sender, SocketAsyncEventArgs e) in C:\Users\mardu\git\garnet\libs\common\Networking\TcpNetworkHandlerBase.cs:line 120
   at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state)
   at System.Net.Sockets.SocketAsyncEventArgs.<>c.<.cctor>b__173_0(UInt32 errorCode, UInt32 numBytes, NativeOverlapped* nativeOverlapped)
   at System.Threading.PortableThreadPool.IOCompletionPoller.Callback.Invoke(Event e)
   at System.Threading.ThreadPoolTypedWorkItemQueue`2.System.Threading.IThreadPoolWorkItem.Execute()
   at System.Threading.ThreadPoolWorkQueue.Dispatch()
   at System.Threading.PortableThreadPool.WorkerThread.WorkerThreadStart()

I cannot get carmine to work on WSL, but try commenting out the debug assert on that line (C:\Users\mardu\git\garnet\libs\server\Resp\PubSubCommands.cs:line 262)

@mardukbp
Copy link

celery -A tasks worker --pool=solo

Hi @mardukbp, can you try this PR to see if Celery works after this small fix? #604

It works!

@mardukbp
Copy link

I cannot get carmine to work on WSL, but try commenting out the debug assert on that line (C:\Users\mardu\git\garnet\libs\server\Resp\PubSubCommands.cs:line 262)

That works!

Actually, I tried out Carmine on Windows. I installed Clojure using the Windows installer. Then I created the file deps.edn with the contents

{:deps
 {com.taoensso/carmine {:mvn/version "3.4.1"}}
}

And then I started the Clojure REPL (clj.exe) on the same directory. I also used the VS Code Extension Calva to execute code directly in the REPL.

@badrishc
Copy link
Contributor

I cannot get carmine to work on WSL, but try commenting out the debug assert on that line (C:\Users\mardu\git\garnet\libs\server\Resp\PubSubCommands.cs:line 262)

That works!

Actually, I tried out Carmine on Windows. I installed Clojure using the Windows installer. Then I created the file deps.edn with the contents

{:deps
 {com.taoensso/carmine {:mvn/version "3.4.1"}}
}

And then I started the Clojure REPL (clj.exe) on the same directory. I also used the VS Code Extension Calva to execute code directly in the REPL

Got it, will try this later. Thanks!

badrishc added a commit that referenced this issue Sep 9, 2024
* Fix AOF fast-forward during main memory replication

* noFlush should not wait on FlushEvent

* fix the fix

* update version for release

* nit

* NetworkClusterAppendLog shouldnt write a response

* Fix the flush behavior for safe tail refresh.

* clean up exception handling

* Unrelated fix to pub-sub - see issue #425.

* fix non-MMR skipping logic for space at the end of a page

* fix comparison

---------

Co-authored-by: Vasileios Zois <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
API enhancement New feature or request help wanted Extra attention is needed
Projects
None yet
Development

No branches or pull requests

4 participants