Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Request] FormatMessage(ez.Unit) #57

Open
cboulay opened this issue Nov 21, 2023 · 5 comments
Open

[Request] FormatMessage(ez.Unit) #57

cboulay opened this issue Nov 21, 2023 · 5 comments

Comments

@cboulay
Copy link
Collaborator

cboulay commented Nov 21, 2023

Inspired by @pperanich 's comment, I took a go at creating a reusable FormatMessage Unit. The code snippet is below.

I'd appreciate any feedback and a nudge if you would like this in a PR. I'm sure you have better ideas of how to implement something like this so I don't mind at all if you make this obsolete with your own implementation.

from typing import Optional, Callable, AsyncGenerator, Any
from dataclasses import asdict
import json

import ezmsg.core as ez
from ezmsg.util.messages.axisarray import AxisArray
from ezmsg.zmq.units import ZMQMessage


def aa2dict(aa_msg: AxisArray, data_as_list=True):
    out = asdict(aa_msg)
    if data_as_list:
        out["data"] = out["data"].tolist()
    return out


class FormatMessageSettings(ez.Settings):
    fun: Optional[Callable] = None


class FormatMessage(ez.Unit):
    SETTINGS: FormatMessageSettings

    INPUT = ez.InputStream(Any)
    OUTPUT = ez.OutputStream(Any)

    def initialize(self) -> None:
        if self.SETTINGS.fun is None:
            # Default fun will convert AxisArray to dict without ndarray,
            # then json-encoded-string, then encode to bytes.
            # Finally, it packages it in a ZMQMessage
            self.SETTINGS = FormatMessageSettings(lambda m: ZMQMessage(json.dumps(aa2dict(m)).encode("utf-8")))

    @ez.subscriber(INPUT)
    @ez.publisher(OUTPUT)
    async def on_message(self, message: Any) -> AsyncGenerator:
        yield self.OUTPUT, self.SETTINGS.fun(message)
@pperanich
Copy link
Collaborator

I like what I see here. This would be a nice drop-in unit to serialize messages which could then be used by a number of potential transports, such as ZMQ. I'd propose the following changes:

from typing import Optional, Callable, AsyncGenerator, Any
from dataclasses import asdict
import json
import numpy as np

import ezmsg.core as ez

class NumpyArrayEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, np.ndarray):
            return obj.tolist()  # Convert numpy array to list
        return json.JSONEncoder.default(self, obj)


class FormatMessageSettings(ez.Settings):
    fun: Callable = lambda msg: json.dumps(asdict(msg), cls=NumpyArrayEncoder).encode('utf-8')


class FormatMessage(ez.Unit):
    SETTINGS: FormatMessageSettings

    INPUT = ez.InputStream(Any)
    OUTPUT = ez.OutputStream(bytes)

    @ez.subscriber(INPUT)
    @ez.publisher(OUTPUT)
    async def on_message(self, message: Any) -> AsyncGenerator:
        yield self.OUTPUT, self.SETTINGS.fun(message)

Namely:

  • the Settings field fun should not be optional, and we can set our default serialization in the class definition.
  • The output type should be bytes, regardless of serialization approach.
    • With this, we should change the ZMQ input/output types to be bytes.
    • In the case that additional interfacing extensions are created, e.g. for nanomsg or similar, we could have a common format of sending/receiving, rather than each having their own message type, e.g. ZMQMessage, NanomsgMessage, etc..
    • This allows a user to either use the proposed FormatMessage unit for serialization, or they can handle serialization directly in their custom units and send the bytes to ZMQ units to be sent/received.
  • I propose generalizing the default serialization protocol to serialize all numpy array object in the message.

I'll add that it would be nice to have the inverse of this unit for deserialization. @griffinmilsap any thoughts?

@cboulay
Copy link
Collaborator Author

cboulay commented Nov 22, 2023

I originally tried putting a lambda in the Settings but then I get

_pickle.PicklingError: Can't pickle <function FormatMessageSettings.<lambda> at 0x16bef11c0>: attribute lookup FormatMessageSettings.<lambda> on ezmsg.scratch.formatmsg failed

It does work, however, if it's a regular function. Any concern with making it a function instead of a lambda?

isinstance(obj, np.ndarray) is a bit slow. It's usually faster (and more "pythonic") to try/except.

If the intention is to use this as a serializer, and a different unit would be a deserializer, then the name should be changed. How about serializemsg.SerializeMessage?

from typing import Callable, AsyncGenerator, Any
from dataclasses import asdict
import json

import ezmsg.core as ez


class NumpyArrayEncoder(json.JSONEncoder):
    def default(self, obj):
        try:
            return obj.tolist()  # Likely numpy array to list
        except AttributeError:
            return json.JSONEncoder.default(self, obj)


def serialize_msg(msg):
    return json.dumps(asdict(msg), cls=NumpyArrayEncoder).encode('utf-8')


class SerializeMessageSettings(ez.Settings):
    fun: Callable = serialize_msg


class SerializeMessage(ez.Unit):
    SETTINGS: SerializeMessageSettings

    INPUT = ez.InputStream(Any)
    OUTPUT = ez.OutputStream(Any)

    @ez.subscriber(INPUT)
    @ez.publisher(OUTPUT)
    async def on_message(self, message: Any) -> AsyncGenerator:
        yield self.OUTPUT, self.SETTINGS.fun(message)

If you're happy with this in principle then I'll create a PR. Where would it go? ezmsg.util?

@pperanich
Copy link
Collaborator

My bad on lambda causing a pickling error: I didn't run that code beforehand to check is was valid.

I'm fine with it the default serialization being a function.

I am generally not in favor of using try/except blocks as a method of generic control flow. Do you have reason to believe isinstance will be too slow for your particular use case?

I'm happy with the name change and agree this belongs in the utils folder as well. Feel free to create a PR.

@cboulay
Copy link
Collaborator Author

cboulay commented Nov 29, 2023

isinstance can be fast on some objects, but for most objects it is quite slow because it goes through many lines of pure Python code before reaching its decision. This line could conceivably be hit 100k times / sec so I think it's worth a bit of premature optimization.

In this case, as we don't really care about the class, we only care that it has the method we're calling, an alternative to the try/except that's faster than isinstance is if hasattr(obj, "tolist"). It also has the benefit that it will work with other objects that happen to have tolist. Will that do?

@griffinmilsap
Copy link
Collaborator

griffinmilsap commented Dec 1, 2023

I have some code in ezmsg.util.messagecodec that could be useful here. It serializes a message into and out of a json dict so we don't need to replicate that. We could even just have that be the default FormatMessage behavior and turn MessageLogger into a collection that has FormatMessage with a file logger that just writes incoming strings to a file.

Thoughts?

Edit: anticipating performance concerns with MessageEncoder; optimizations made there would benefit more than one code path ;)

@cboulay cboulay changed the title REQUEST: FormatMessage(ez.Unit) [Request] FormatMessage(ez.Unit) May 11, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants