Skip to content

Commit

Permalink
testing...
Browse files Browse the repository at this point in the history
  • Loading branch information
JarbasAl committed Sep 7, 2023
1 parent f909c57 commit 0c2d126
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 36 deletions.
2 changes: 1 addition & 1 deletion hivemind_core/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

def main():
service = HiveMindService()
service.start()
service.run()


if __name__ == "__main__":
Expand Down
29 changes: 12 additions & 17 deletions hivemind_core/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def peer(self) -> str:
# this is how ovos refers to connected nodes in message.context
return f"{self.name}:{self.ip}::{self.sess.session_id}"

def send(self, message: HiveMessage):
async def send(self, message: HiveMessage):
# TODO some cleaning around HiveMessage
if isinstance(message.payload, dict):
_msg_type = message.payload.get("type")
Expand All @@ -90,8 +90,7 @@ def send(self, message: HiveMessage):
else:
LOG.debug(f"sent unencrypted!")

self.loop.install()
self.socket.write_message(payload, is_bin)
await self.socket.write_message(payload, is_bin)

def decode(self, payload: str) -> HiveMessage:
if self.crypto_key:
Expand Down Expand Up @@ -178,7 +177,7 @@ def handle_send(self, message: Message):
"hive.client.send.error",
{"error": "That client is not connected", "peer": peer}))

def handle_internal_mycroft(self, message: str):
async def handle_internal_mycroft(self, message: str):
""" forward internal messages to clients if they are the target
here is where the client isolation happens,
clients only get responses to their own messages"""
Expand All @@ -205,7 +204,7 @@ def handle_internal_mycroft(self, message: str):
source_peer=peer,
target_peers=target_peers,
payload=message)
client.send(msg)
await client.send(msg)


@dataclass()
Expand All @@ -226,16 +225,12 @@ class HiveMindListenerProtocol:
mycroft_bus_callback = None # slave asked to inject payload into mycroft bus
shared_bus_callback = None # passive sharing of slave device bus (info)

def bind(self, websocket, bus=None):
def bind(self, websocket, bus):
websocket.protocol = self
if bus is None:
bus = MessageBusClient()
bus.run_in_thread()
bus.connected_event.wait()
self.internal_protocol = HiveMindListenerInternalProtocol(bus)
self.internal_protocol.register_bus_handlers()

def handle_new_client(self, client: HiveMindClientConnection):
async def handle_new_client(self, client: HiveMindClientConnection):
LOG.debug(f"new client: {client.peer}")
self.clients[client.peer] = client
message = Message("hive.client.connect",
Expand All @@ -253,7 +248,7 @@ def handle_new_client(self, client: HiveMindClientConnection):
"peer": client.peer, # this identifies the connected client in ovos message.context
"node_id": self.peer})
LOG.debug(f"saying HELLO to: {client.peer}")
client.send(msg)
await client.send(msg)

needs_handshake = not client.crypto_key and self.handshake_enabled

Expand All @@ -269,7 +264,7 @@ def handle_new_client(self, client: HiveMindClientConnection):
}
msg = HiveMessage(HiveMessageType.HANDSHAKE, payload)
LOG.debug(f"starting {client.peer} HANDSHAKE: {payload}")
client.send(msg)
await client.send(msg)
# if client is in protocol V1 -> self.handle_handshake_message
# clients can rotate their pubkey or session_key by sending a new handshake

Expand All @@ -296,7 +291,7 @@ def handle_invalid_protocol_version(self, client: HiveMindClientConnection):
{"source": client.peer})
self.internal_protocol.bus.emit(message)

def handle_message(self, message: HiveMessage, client: HiveMindClientConnection):
async def handle_message(self, message: HiveMessage, client: HiveMindClientConnection):
"""
message (HiveMessage): HiveMind message object
Expand All @@ -309,7 +304,7 @@ def handle_message(self, message: HiveMessage, client: HiveMindClientConnection)
message.update_hop_data()

if message.msg_type == HiveMessageType.HANDSHAKE:
self.handle_handshake_message(message, client)
await self.handle_handshake_message(message, client)

# mycroft Message handlers
elif message.msg_type == HiveMessageType.BUS:
Expand Down Expand Up @@ -340,7 +335,7 @@ def handle_unknown_message(self, message: HiveMessage, client: HiveMindClientCon
def handle_binary_message(self, message: HiveMessage, client: HiveMindClientConnection):
assert message.msg_type == HiveMessageType.BINARY

def handle_handshake_message(self, message: HiveMessage,
async def handle_handshake_message(self, message: HiveMessage,
client: HiveMindClientConnection):
LOG.debug("handshake received, generating session key")
payload = message.payload
Expand Down Expand Up @@ -386,7 +381,7 @@ def handle_handshake_message(self, message: HiveMessage,
return

msg = HiveMessage(HiveMessageType.HANDSHAKE, payload)
client.send(msg) # client can recreate crypto_key on his side now
await client.send(msg) # client can recreate crypto_key on his side now

def handle_bus_message(self, message: HiveMessage,
client: HiveMindClientConnection):
Expand Down
32 changes: 14 additions & 18 deletions hivemind_core/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from ovos_bus_client.session import Session
from ovos_utils.xdg_utils import xdg_data_home
from poorman_handshake import HandShake, PasswordHandShake
from pyee import EventEmitter
from pyee import EventEmitter, ExecutorEventEmitter, AsyncIOEventEmitter
from tornado import web, ioloop
from tornado.platform.asyncio import AnyThreadEventLoopPolicy
from tornado.websocket import WebSocketHandler
Expand Down Expand Up @@ -95,26 +95,19 @@ def on_stopping():
class MessageBusEventHandler(WebSocketHandler):
protocol: Optional[HiveMindListenerProtocol] = None

def __init__(self, application, request, **kwargs):
super().__init__(application, request, **kwargs)
self.emitter = EventEmitter()

@staticmethod
def decode_auth(auth) -> Tuple[str, str]:
userpass_encoded = bytes(auth, encoding="utf-8")
userpass_decoded = base64.b64decode(userpass_encoded).decode("utf-8")
name, key = userpass_decoded.split(":")
return name, key

def on(self, event_name, handler):
self.emitter.on(event_name, handler)

def on_message(self, message):
async def on_message(self, message):
message = self.client.decode(message)
LOG.info(f"received {self.client.peer} message: {message}")
self.protocol.handle_message(message, self.client)
await self.protocol.handle_message(message, self.client)

def open(self):
async def open(self):
auth = self.request.uri.split("/?authorization=")[-1]
name, key = self.decode_auth(auth)
LOG.info(f"authorizing client: {name}")
Expand Down Expand Up @@ -152,7 +145,7 @@ def open(self):
self.close()
return

self.protocol.handle_new_client(self.client)
await self.protocol.handle_new_client(self.client)
# self.write_message(Message("connected").serialize())

def on_close(self):
Expand All @@ -163,7 +156,7 @@ def check_origin(self, origin) -> bool:
return True


class HiveMindService(Thread):
class HiveMindService:
identity = NodeIdentity()

def __init__(self,
Expand All @@ -174,7 +167,6 @@ def __init__(self,
stopping_hook: Callable = on_stopping,
websocket_config: Optional[Dict[str, Any]] = None):

super().__init__()
websocket_config = websocket_config or \
Configuration().get('hivemind_websocket', {})
callbacks = StatusCallbackMap(on_started=started_hook,
Expand All @@ -183,10 +175,6 @@ def __init__(self,
on_error=error_hook,
on_stopping=stopping_hook)

self.bus = MessageBusClient(emitter=EventEmitter())
self.bus.run_in_thread()
self.bus.connected_event.wait()

self.status = ProcessStatus('HiveMind', callback_map=callbacks)
self.host = websocket_config.get('host') or "0.0.0.0"
self.port = websocket_config.get('port') or 5678
Expand All @@ -200,11 +188,19 @@ def __init__(self,
port=self.port,
zeroconf=websocket_config.get('zeroconf', False))

def connect_to_mycroft(self):
self.bus = MessageBusClient(emitter=AsyncIOEventEmitter())
self.bus.run_in_thread()
self.bus.connected_event.wait()

def run(self):
self.status.set_alive()
asyncio.set_event_loop_policy(AnyThreadEventLoopPolicy())

loop = ioloop.IOLoop.current()

self.connect_to_mycroft()

self.protocol = HiveMindListenerProtocol(loop=loop)
self.protocol.bind(MessageBusEventHandler, self.bus)
self.status.bind(self.bus)
Expand Down

0 comments on commit 0c2d126

Please sign in to comment.