-
Notifications
You must be signed in to change notification settings - Fork 114
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #51 from teinturman/master
Modified proc_msg function to resolve malformed json packets.
- Loading branch information
Showing
1 changed file
with
151 additions
and
166 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,166 +1,151 @@ | ||
""" | ||
SDClient | ||
A base class for interacting with the sdsim simulator as server. | ||
The server will create on vehicle per client connection. The client | ||
will then interact by createing json message to send to the server. | ||
The server will reply with telemetry and other status messages in an | ||
asynchronous manner. | ||
Author: Tawn Kramer | ||
""" | ||
import os | ||
import time | ||
import socket | ||
import select | ||
from threading import Thread | ||
import json | ||
import logging | ||
|
||
from .util import replace_float_notation | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
class SDClient: | ||
def __init__(self, host, port, poll_socket_sleep_time=0.05): | ||
self.msg = None | ||
self.host = host | ||
self.port = port | ||
self.poll_socket_sleep_sec = poll_socket_sleep_time | ||
self.th = None | ||
|
||
# the aborted flag will be set when we have detected a problem with the socket | ||
# that we can't recover from. | ||
self.aborted = False | ||
self.connect() | ||
|
||
|
||
def connect(self): | ||
self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | ||
|
||
# connecting to the server | ||
logger.info("connecting to %s:%d " % (self.host, self.port)) | ||
try: | ||
self.s.connect((self.host, self.port)) | ||
except ConnectionRefusedError as e: | ||
raise( Exception("Could not connect to server. Is it running? If you specified 'remote', then you must start it manually.")) | ||
|
||
# time.sleep(pause_on_create) | ||
self.do_process_msgs = True | ||
self.th = Thread(target=self.proc_msg, args=(self.s,)) | ||
self.th.start() | ||
|
||
|
||
def send(self, m): | ||
self.msg = m | ||
|
||
def send_now(self, msg): | ||
logger.debug("send_now:" + msg) | ||
self.s.sendall(msg.encode("utf-8")) | ||
|
||
def on_msg_recv(self, j): | ||
logger.debug("got:" + j['msg_type']) | ||
|
||
|
||
def stop(self): | ||
# signal proc_msg loop to stop, then wait for thread to finish | ||
# close socket | ||
self.do_process_msgs = False | ||
if self.th is not None: | ||
self.th.join() | ||
if self.s is not None: | ||
self.s.close() | ||
|
||
|
||
def proc_msg(self, sock): | ||
''' | ||
This is the thread message loop to process messages. | ||
We will send any message that is queued via the self.msg variable | ||
when our socket is in a writable state. | ||
And we will read any messages when it's in a readable state and then | ||
call self.on_msg_recv with the json object message. | ||
''' | ||
sock.setblocking(0) | ||
inputs = [ sock ] | ||
outputs = [ sock ] | ||
partial = [] | ||
|
||
while self.do_process_msgs: | ||
# without this sleep, I was getting very consistent socket errors | ||
# on Windows. Perhaps we don't need this sleep on other platforms. | ||
time.sleep(self.poll_socket_sleep_sec) | ||
|
||
try: | ||
# test our socket for readable, writable states. | ||
readable, writable, exceptional = select.select(inputs, outputs, inputs) | ||
|
||
for s in readable: | ||
# print("waiting to recv") | ||
try: | ||
data = s.recv(1024 * 256) | ||
except ConnectionAbortedError: | ||
logger.warn("socket connection aborted") | ||
self.do_process_msgs = False | ||
break | ||
|
||
# we don't technically need to convert from bytes to string | ||
# for json.loads, but we do need a string in order to do | ||
# the split by \n newline char. This seperates each json msg. | ||
data = data.decode("utf-8") | ||
msgs = data.split("\n") | ||
|
||
for m in msgs: | ||
if len(m) < 2: | ||
continue | ||
last_char = m[-1] | ||
first_char = m[0] | ||
# check first and last char for a valid json terminator | ||
# if not, then add to our partial packets list and see | ||
# if we get the rest of the packet on our next go around. | ||
if first_char == "{" and last_char == '}': | ||
# Replace comma with dots for floats | ||
# useful when using unity in a language different from English | ||
m = replace_float_notation(m) | ||
try: | ||
j = json.loads(m) | ||
self.on_msg_recv(j) | ||
except Exception as e: | ||
logger.error("Exception:" + str(e)) | ||
logger.error("json: " + m) | ||
else: | ||
partial.append(m) | ||
# logger.info("partial packet:" + m) | ||
if last_char == '}': | ||
if partial[0][0] == "{": | ||
assembled_packet = "".join(partial) | ||
assembled_packet = replace_float_notation(assembled_packet) | ||
second_open = assembled_packet.find('{"msg', 1) | ||
if second_open != -1: | ||
# hmm what to do? We have a partial packet. Trimming just | ||
# the good part and discarding the rest. | ||
logger.warn("got partial packet:" + assembled_packet[:20]) | ||
assembled_packet = assembled_packet[second_open:] | ||
|
||
try: | ||
j = json.loads(assembled_packet) | ||
self.on_msg_recv(j) | ||
except Exception as e: | ||
logger.error("Exception:" + str(e)) | ||
logger.error("partial json: " + assembled_packet) | ||
else: | ||
logger.error("failed packet.") | ||
partial.clear() | ||
|
||
for s in writable: | ||
if self.msg != None: | ||
logger.debug("sending " + self.msg) | ||
s.sendall(self.msg.encode("utf-8")) | ||
self.msg = None | ||
if len(exceptional) > 0: | ||
logger.error("problems w sockets!") | ||
|
||
except Exception as e: | ||
print("Exception:", e) | ||
self.aborted = True | ||
self.on_msg_recv({"msg_type" : "aborted"}) | ||
break | ||
""" | ||
SDClient | ||
A base class for interacting with the sdsim simulator as server. | ||
The server will create on vehicle per client connection. The client | ||
will then interact by createing json message to send to the server. | ||
The server will reply with telemetry and other status messages in an | ||
asynchronous manner. | ||
Author: Tawn Kramer | ||
""" | ||
import os | ||
import time | ||
import socket | ||
import select | ||
from threading import Thread | ||
import json | ||
import logging | ||
|
||
from .util import replace_float_notation | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
class SDClient: | ||
def __init__(self, host, port, poll_socket_sleep_time=0.05): | ||
self.msg = None | ||
self.host = host | ||
self.port = port | ||
self.poll_socket_sleep_sec = poll_socket_sleep_time | ||
self.th = None | ||
|
||
# the aborted flag will be set when we have detected a problem with the socket | ||
# that we can't recover from. | ||
self.aborted = False | ||
self.connect() | ||
|
||
|
||
def connect(self): | ||
self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | ||
|
||
# connecting to the server | ||
logger.info("connecting to %s:%d " % (self.host, self.port)) | ||
try: | ||
self.s.connect((self.host, self.port)) | ||
except ConnectionRefusedError as e: | ||
raise( Exception("Could not connect to server. Is it running? If you specified 'remote', then you must start it manually.")) | ||
|
||
# time.sleep(pause_on_create) | ||
self.do_process_msgs = True | ||
self.th = Thread(target=self.proc_msg, args=(self.s,)) | ||
self.th.start() | ||
|
||
|
||
def send(self, m): | ||
self.msg = m | ||
|
||
def send_now(self, msg): | ||
logger.debug("send_now:" + msg) | ||
self.s.sendall(msg.encode("utf-8")) | ||
|
||
def on_msg_recv(self, j): | ||
logger.debug("got:" + j['msg_type']) | ||
|
||
|
||
def stop(self): | ||
# signal proc_msg loop to stop, then wait for thread to finish | ||
# close socket | ||
self.do_process_msgs = False | ||
if self.th is not None: | ||
self.th.join() | ||
if self.s is not None: | ||
self.s.close() | ||
|
||
|
||
def proc_msg(self, sock): | ||
''' | ||
This is the thread message loop to process messages. | ||
We will send any message that is queued via the self.msg variable | ||
when our socket is in a writable state. | ||
And we will read any messages when it's in a readable state and then | ||
call self.on_msg_recv with the json object message. | ||
''' | ||
sock.setblocking(0) | ||
inputs = [ sock ] | ||
outputs = [ sock ] | ||
localbuffer="" | ||
|
||
while self.do_process_msgs: | ||
# without this sleep, I was getting very consistent socket errors | ||
# on Windows. Perhaps we don't need this sleep on other platforms. | ||
time.sleep(self.poll_socket_sleep_sec) | ||
try: | ||
# test our socket for readable, writable states. | ||
readable, writable, exceptional = select.select(inputs, outputs, inputs) | ||
|
||
for s in readable: | ||
try: | ||
data = s.recv(1024 * 256) | ||
except ConnectionAbortedError: | ||
logger.warn("socket connection aborted") | ||
print("socket connection aborted") | ||
self.do_process_msgs = False | ||
break | ||
|
||
# we don't technically need to convert from bytes to string | ||
# for json.loads, but we do need a string in order to do | ||
# the split by \n newline char. This seperates each json msg. | ||
data = data.decode("utf-8") | ||
|
||
localbuffer += data | ||
|
||
n0=localbuffer.find("{") | ||
n1=localbuffer.rfind("}") | ||
if n1>=0 and n0>=0 and n0<n1 : # there is at least one message : | ||
msgs=localbuffer[n0:n1+1].split("\n") | ||
localbuffer=localbuffer[n1:] | ||
|
||
for m in msgs: | ||
if len(m) <= 2: | ||
continue | ||
# Replace comma with dots for floats | ||
# useful when using unity in a language different from English | ||
m = replace_float_notation(m) | ||
try: | ||
j = json.loads(m) | ||
except Exception as e: | ||
logger.error("Exception:" + str(e)) | ||
logger.error("json: " + m) | ||
continue | ||
|
||
if 'msg_type' not in j: | ||
logger.error('Warning expected msg_type field') | ||
logger.error("json: " + m) | ||
continue | ||
else : | ||
self.on_msg_recv(j) | ||
|
||
for s in writable: | ||
if self.msg != None: | ||
logger.debug("sending " + self.msg) | ||
s.sendall(self.msg.encode("utf-8")) | ||
self.msg = None | ||
|
||
if len(exceptional) > 0: | ||
logger.error("problems w sockets!") | ||
|
||
except Exception as e: | ||
print("Exception:", e) | ||
self.aborted = True | ||
self.on_msg_recv({"msg_type" : "aborted"}) | ||
break |