Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compatibility with Siemens/Cinterion TC35i #7

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions lib/pygsm/gsmmodem.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class GsmModem(object):
retry_delay = 2
max_retries = 10
modem_lock = threading.RLock()

modem_type = ""

def __init__(self, *args, **kwargs):
"""Creates, connects to, and boots a GSM Modem. All of the arguments
Expand Down Expand Up @@ -214,16 +214,23 @@ def set_modem_config(self):
# set some sensible defaults, to make
# the various modems more consistant
self.command("ATE0", raise_errors=False) # echo off
#sniff the modem type
modem_type = self.query("AT+CGMM")
self._log("Modem type: " + modem_type)
self.command("AT+CMEE=1", raise_errors=False) # useful error messages
self.command("AT+WIND=0", raise_errors=False) # disable notifications
self.command("AT+CSMS=1", raise_errors=False) # set SMS mode to phase 2+
self.command(self.smshandler.get_mode_cmd() ) # make sure in PDU mode

# enable new message notification
self.command(
"AT+CNMI=2,2,0,0,0",
raise_errors=False)

if (modem_type == "TC35i"):
cnmi_command = "AT+CNMI=2,2,0,0,1" # according to the TC35i documentation the last parameter of
# AT+CNMI can only be one. It will error on it and you will
# not receive sms messages if it's not set like this.
else:
self.command("AT+WIND=0", raise_errors=False) # disable notifications, on the TC35i this command
# does not exsist
cnmi_command = "AT+CNMI=2,2,0,0,0"

self.command(self.smshandler.get_mode_cmd() ) # make sure in PDU mode
self.command(cnmi_command, raise_errors=False) # enable new message notification

def boot(self, reboot=False):
"""Initializes the modem. Must be called after init and connect,
Expand Down
2 changes: 1 addition & 1 deletion lib/pygsm/gsmpdu.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ def __get_pdu_string(self):
type = '91' # international
else:
num = self.address
type = 'A8' # national number
type = '81' # atleast not a international number so lets say an unkown number

# length
num_len = len(num)
Expand Down
68 changes: 40 additions & 28 deletions lib/pygsm/pdusmshandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ class PduSmsHandler(SmsHandler):
CMGL_MATCHER =re.compile(r'^\+CMGL:.*?$')
CMGL_STATUS="0"

max_retries = 10
retry_delay = 2


def __init__(self, modem):
SmsHandler.__init__(self, modem)

Expand Down Expand Up @@ -44,34 +48,42 @@ def _send_pdu(self, pdu):
# accesing the property causes the pdu_string
# to be generated, so do once and cache
pdu_string = pdu.pdu_string

# try to catch write timeouts
try:
# content length is in bytes, so half PDU minus
# the first blank '00' byte
self.modem.command(
'AT+CMGS=%d' % (len(pdu_string)/2 - 1),
read_timeout=1
)

# if no error is raised within the timeout period,
# and the text-mode prompt WAS received, send the
# sms text, wait until it is accepted or rejected
# (text-mode messages are terminated with ascii char 26
# "SUBSTITUTE" (ctrl+z)), and return True (message sent)
except errors.GsmReadTimeoutError, err:
if err.pending_data[0] == ">":
self.modem.command(pdu_string, write_term=chr(26))
return True

# a timeout was raised, but no prompt nor
# error was received. i have no idea what
# is going on, so allow the error to propagate
else:
raise

finally:
pass
retries = 0
while retries < self.max_retries:
# try to catch write timeouts
try:
# content length is in bytes, so half PDU minus
# the first blank '00' byte
self.modem.command(
'AT+CMGS=%d' % (len(pdu_string)/2 - 1),
read_timeout=2
)

#self.modem.command(pdu_string, write_term=chr(26))

# if no error is raised within the timeout period,
# and the text-mode prompt WAS received, send the
# sms text, wait until it is accepted or rejected
# (text-mode messages are terminated with ascii char 26
# "SUBSTITUTE" (ctrl+z)), and return True (message sent)
except errors.GsmReadTimeoutError, err:
if err.pending_data[0] == ">":
try:
self.modem.command(pdu_string, write_term=chr(26))
return True
except:
if getattr(err, "code", None) == 500 and self.modem.modem_type == 'TC35i':
time.sleep(self.retry_delay)
retries += 1
continue
# a timeout was raised, but no prompt nor
# error was received. i have no idea what
# is going on, so allow the error to propagate
else:
raise

finally:
pass

# for all other errors...
# (likely CMS or CME from device)
Expand Down
27 changes: 26 additions & 1 deletion test/gsmmodem_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ def testEchoOff(self):

class MockEchoDevice(MockDevice):
def process(self, cmd):

# raise and error for any
# cmd other than ECHO OFF
if cmd != "ATE0":
Expand All @@ -154,6 +153,32 @@ def process(self, cmd):
gsm = pygsm.GsmModem(device=device)
self.assertEqual(device.echo, False)

def testModemSniffing(self):
class MockSiemensTC35i(MockDevice):
def __init__(self):
MockDevice.__init__(self)
MockDevice.model = "TC35i"
MockDevice.wind_called = False
MockDevice.cgmm_called = False

def at_cgmm(self):
self._output(self.model)
self.cgmm_called = True
return True

def at_wind(self, switch):
self.wind_called = True
return False

def at_cnmi(self, settings):
self.cnmi_settings = settings
return True

device = MockSiemensTC35i()
gsm = pygsm.GsmModem(device=device)
self.assertEqual(device.cnmi_settings, "2,2,0,0,1")
self.assertEqual(device.cgmm_called, True)
self.assertEqual(device.wind_called, False)

def testUsefulErrors(self):
"""Checks that GsmModem attempts to enable useful errors
Expand Down
17 changes: 16 additions & 1 deletion test/mock/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,24 @@ def _respond(out):
# if the value is wrapped in "quotes", remove
# them. this is sloppy, but we're only mocking
val = val.strip('"')

# call the method, and insert OK or ERROR into the
# read buffer depending on the True/False output
if hasattr(self, method):
out = getattr(self, method)(val)
return _respond(out)

#also match the AT+KEYWORD command. A Query to see what modem is connected
#needs to return something always.
m = re.match(r"^AT\+([A-Z]+)$", cmd)
if m is not None:
key = m.groups()[0]
method = "at_%s" % key.lower()
# call the method, and insert OK or ERROR into the
# read buffer depending on the True/False output
if hasattr(self, method):
out = getattr(self, method)()
return _respond(out)

# attempt to hand off this
# command to the subclass
if hasattr(self, "process"):
Expand Down Expand Up @@ -218,6 +229,10 @@ def at_cmgf(self, mode):
self.mode = None
return False

def at_cgmm(self):
self._output("ACME Modem 4000")
return True


def _output(self, str, delimiters=True):
"""Insert a GSM response into the read buffer, with leading and
Expand Down
14 changes: 9 additions & 5 deletions test/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ class TestBase(unittest.TestCase):
rl_args={'read_timeout': mox.IgnoreArg(),
'read_term': mox.IgnoreArg()}
ok = ["","OK"]

modemtype = ["","AMCE Modem 4000", "OK"]

def get_mode(self):
"""
Subclass overrides this to return 'TEXT' or 'PDU'
Expand All @@ -37,14 +38,17 @@ def setUp(self):
self.mockDevice.write("ATE0\r")
self.mockDevice.read_lines(**self.rl_args).AndReturn(self.ok)

self.mockDevice.write("AT+CGMM\r")
self.mockDevice.read_lines(**self.rl_args).AndReturn(self.modemtype)

self.mockDevice.write("AT+CMEE=1\r")
self.mockDevice.read_lines(**self.rl_args).AndReturn(self.ok)

self.mockDevice.write("AT+WIND=0\r")
self.mockDevice.read_lines(**self.rl_args).AndReturn(self.ok)

self.mockDevice.write("AT+CSMS=1\r")
self.mockDevice.read_lines(**self.rl_args).AndReturn(self.ok)

self.mockDevice.write("AT+WIND=0\r")
self.mockDevice.read_lines(**self.rl_args).AndReturn(self.ok)

# must see command to set PDU mode
mode_int = self.mode_map[self.get_mode().lower()]
Expand All @@ -54,7 +58,7 @@ def setUp(self):

self.mockDevice.write("AT+CNMI=2,2,0,0,0\r")
self.mockDevice.read_lines(**self.rl_args).AndReturn(self.ok)

# verify fetch_stored_messages in boot
cmgl_str = self.cmgl_map[self.get_mode().lower()]
self.mockDevice.write("AT+CMGL=%s\r" % cmgl_str)
Expand Down