-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Request] FormatMessage(ez.Unit) #57
Comments
I like what I see here. This would be a nice drop-in unit to serialize messages which could then be used by a number of potential transports, such as ZMQ. I'd propose the following changes: from typing import Optional, Callable, AsyncGenerator, Any
from dataclasses import asdict
import json
import numpy as np
import ezmsg.core as ez
class NumpyArrayEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, np.ndarray):
return obj.tolist() # Convert numpy array to list
return json.JSONEncoder.default(self, obj)
class FormatMessageSettings(ez.Settings):
fun: Callable = lambda msg: json.dumps(asdict(msg), cls=NumpyArrayEncoder).encode('utf-8')
class FormatMessage(ez.Unit):
SETTINGS: FormatMessageSettings
INPUT = ez.InputStream(Any)
OUTPUT = ez.OutputStream(bytes)
@ez.subscriber(INPUT)
@ez.publisher(OUTPUT)
async def on_message(self, message: Any) -> AsyncGenerator:
yield self.OUTPUT, self.SETTINGS.fun(message) Namely:
I'll add that it would be nice to have the inverse of this unit for deserialization. @griffinmilsap any thoughts? |
I originally tried putting a lambda in the Settings but then I get
It does work, however, if it's a regular function. Any concern with making it a function instead of a lambda?
If the intention is to use this as a serializer, and a different unit would be a deserializer, then the name should be changed. How about from typing import Callable, AsyncGenerator, Any
from dataclasses import asdict
import json
import ezmsg.core as ez
class NumpyArrayEncoder(json.JSONEncoder):
def default(self, obj):
try:
return obj.tolist() # Likely numpy array to list
except AttributeError:
return json.JSONEncoder.default(self, obj)
def serialize_msg(msg):
return json.dumps(asdict(msg), cls=NumpyArrayEncoder).encode('utf-8')
class SerializeMessageSettings(ez.Settings):
fun: Callable = serialize_msg
class SerializeMessage(ez.Unit):
SETTINGS: SerializeMessageSettings
INPUT = ez.InputStream(Any)
OUTPUT = ez.OutputStream(Any)
@ez.subscriber(INPUT)
@ez.publisher(OUTPUT)
async def on_message(self, message: Any) -> AsyncGenerator:
yield self.OUTPUT, self.SETTINGS.fun(message) If you're happy with this in principle then I'll create a PR. Where would it go? |
My bad on lambda causing a pickling error: I didn't run that code beforehand to check is was valid. I'm fine with it the default serialization being a function. I am generally not in favor of using try/except blocks as a method of generic control flow. Do you have reason to believe I'm happy with the name change and agree this belongs in the utils folder as well. Feel free to create a PR. |
In this case, as we don't really care about the class, we only care that it has the method we're calling, an alternative to the try/except that's faster than |
I have some code in ezmsg.util.messagecodec that could be useful here. It serializes a message into and out of a json dict so we don't need to replicate that. We could even just have that be the default FormatMessage behavior and turn MessageLogger into a collection that has FormatMessage with a file logger that just writes incoming strings to a file. Thoughts? Edit: anticipating performance concerns with MessageEncoder; optimizations made there would benefit more than one code path ;) |
Inspired by @pperanich 's comment, I took a go at creating a reusable FormatMessage Unit. The code snippet is below.
I'd appreciate any feedback and a nudge if you would like this in a PR. I'm sure you have better ideas of how to implement something like this so I don't mind at all if you make this obsolete with your own implementation.
The text was updated successfully, but these errors were encountered: