Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat/intercom #86

Merged
merged 3 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/publish_alpha.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ on:
- 'LICENSE'
- 'CHANGELOG.md'
- 'MANIFEST.in'
- 'readme.md'
- 'README.md'
- 'scripts/**'
workflow_dispatch:

Expand Down
87 changes: 85 additions & 2 deletions hivemind_core/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from dataclasses import dataclass, field
from enum import Enum, IntEnum
from typing import List, Dict, Optional
import pgpy

from ovos_bus_client import MessageBusClient
from ovos_bus_client.message import Message
Expand All @@ -13,6 +14,7 @@

from hivemind_bus_client.message import HiveMessage, HiveMessageType
from hivemind_bus_client.serialization import decode_bitstring, get_bitstring
from hivemind_bus_client.identity import NodeIdentity
from hivemind_bus_client.util import (
decrypt_bin,
encrypt_bin,
Expand Down Expand Up @@ -236,7 +238,7 @@ class HiveMindListenerProtocol:

require_crypto: bool = True # throw error if crypto key not available
handshake_enabled: bool = True # generate a key per session if not pre-shared

identity: Optional[NodeIdentity] = None
# below are optional callbacks to handle payloads
# receives the payload + HiveMindClient that sent it
escalate_callback = None # slave asked to escalate payload
Expand All @@ -246,7 +248,8 @@ class HiveMindListenerProtocol:
mycroft_bus_callback = None # slave asked to inject payload into mycroft bus
shared_bus_callback = None # passive sharing of slave device bus (info)

def bind(self, websocket, bus):
def bind(self, websocket, bus, identity):
self.identity = identity
websocket.protocol = self
self.internal_protocol = HiveMindListenerInternalProtocol(bus)
self.internal_protocol.register_bus_handlers()
Expand Down Expand Up @@ -367,6 +370,8 @@ def handle_message(self, message: HiveMessage, client: HiveMindClientConnection)
self.handle_broadcast_message(message, client)
elif message.msg_type == HiveMessageType.ESCALATE:
self.handle_escalate_message(message, client)
elif message.msg_type == HiveMessageType.INTERCOM:
self.handle_intercom_message(message, client)
elif message.msg_type == HiveMessageType.BINARY:
self.handle_binary_message(message, client)
else:
Expand Down Expand Up @@ -476,6 +481,16 @@ def handle_broadcast_message(
if self.broadcast_callback:
self.broadcast_callback(payload)

if message.payload.msg_type == HiveMessageType.INTERCOM:
if self.handle_intercom_message(message.payload, client):
return

if message.payload.msg_type == HiveMessageType.BUS:
# if the message targets our site_id, send it to internal bus
site = message.target_site_id
if site and site == self.identity.site_id:
self.handle_bus_message(message.payload, client)

# broadcast message to other peers
payload = self._unpack_message(message, client)
for peer in self.clients:
Expand Down Expand Up @@ -514,6 +529,16 @@ def handle_propagate_message(
if self.propagate_callback:
self.propagate_callback(payload)

if message.payload.msg_type == HiveMessageType.INTERCOM:
if self.handle_intercom_message(message.payload, client):
return

if message.payload.msg_type == HiveMessageType.BUS:
# if the message targets our site_id, send it to internal bus
site = message.target_site_id
if site and site == self.identity.site_id:
self.handle_bus_message(message.payload, client)

# propagate message to other peers
for peer in self.clients:
if peer == client.peer:
Expand Down Expand Up @@ -557,6 +582,16 @@ def handle_escalate_message(
if self.escalate_callback:
self.escalate_callback(payload)

if message.payload.msg_type == HiveMessageType.INTERCOM:
if self.handle_intercom_message(message.payload, client):
return

if message.payload.msg_type == HiveMessageType.BUS:
# if the message targets our site_id, send it to internal bus
site = message.target_site_id
if site and site == self.identity.site_id:
self.handle_bus_message(message.payload, client)

# send to other masters
message = Message(
"hive.send.upstream",
Expand All @@ -570,6 +605,54 @@ def handle_escalate_message(
bus = self.get_bus(client)
bus.emit(message)

def handle_intercom_message(
self, message: HiveMessage, client: HiveMindClientConnection
) -> bool:

# if the message targets us, send it to internal bus
k = message.target_public_key
if k and k != self.identity.public_key:
# not for us
return False

pload = message.payload
if isinstance(pload, dict) and "ciphertext" in pload:
try:
message_from_blob = pgpy.PGPMessage.from_blob(pload["ciphertext"])

with open(self.identity.private_key, "r") as f:
private_key = pgpy.PGPKey.from_blob(f.read())

decrypted: str = private_key.decrypt(message_from_blob)
message._payload = HiveMessage.deserialize(decrypted)
except:
if k:
LOG.error("failed to decrypt message!")
else:
LOG.debug("failed to decrypt message, not for us")
return False

if message.msg_type == HiveMessageType.BUS:
self.handle_bus_message(message, client)
return True
elif message.msg_type == HiveMessageType.PROPAGATE:
self.handle_propagate_message(message, client)
return True
elif message.msg_type == HiveMessageType.BROADCAST:
self.handle_broadcast_message(message, client)
return True
elif message.msg_type == HiveMessageType.ESCALATE:
self.handle_escalate_message(message, client)
return True
elif message.msg_type == HiveMessageType.BINARY:
self.handle_binary_message(message, client)
return True
elif message.msg_type == HiveMessageType.SHARED_BUS:
self.handle_client_bus(message.payload, client)
return True

return False

# HiveMind mycroft bus messages - from slave -> master
def handle_inject_mycroft_msg(
self, message: Message, client: HiveMindClientConnection
Expand Down
2 changes: 1 addition & 1 deletion hivemind_core/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ def run(self):
loop = ioloop.IOLoop.current()

self.protocol = self._proto(loop=loop)
self.protocol.bind(self._ws_handler, self.bus)
self.protocol.bind(self._ws_handler, self.bus, self.identity)
self.status.bind(self.bus)
self.status.set_started()

Expand Down
13 changes: 7 additions & 6 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
tornado
ovos_utils>=0.0.33
pycryptodomex
HiveMind_presence>=0.0.2a3
ovos-bus-client>=0.0.6a5
poorman_handshake>=0.1.0
click
click_default_group
rich
pyOpenSSL
hivemind-ggwave
pycryptodomex
poorman_handshake>=0.1.0
hivemind-ggwave
hivemind_bus_client>=0.0.4a25
HiveMind_presence>=0.0.2a3
ovos_utils>=0.0.33
ovos-bus-client>=0.0.6a5
Loading