You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Reproduction script (obviously, I had a broker set up listening on both of the URLs in this):
#! /usr/bin/env python3
import logging
import sys
import anyio
from distmqtt.client import MQTTClient
secure = 'secure' in sys.argv
backend = 'trio' if 'trio' in sys.argv else 'asyncio'
if secure:
uri = 'mqtts://127.0.0.1:2883'
config = dict(
check_hostname=False,
broker=dict(
# Side note: Had to make a CA because there's no way to turn off verification
cafile='ca-bundle.crt',
)
)
else:
uri = 'mqtt://127.0.0.1:1883'
config = {}
logging.basicConfig(level='DEBUG')
logging.getLogger().setLevel(logging.DEBUG)
logger = logging.getLogger(__name__)
async def main():
async with anyio.create_task_group() as tg:
client = MQTTClient(tg, config=config)
await client.connect(uri)
logger.warning('Connected')
await client.disconnect()
logger.warning('Disconnected')
anyio.run(main, backend=backend)
Running this without "secure" on the command line works fine, regardless of backend
DEBUG:asyncio:Using selector: KqueueSelector
DEBUG:distmqtt.client:Using generated client ID : distmqtt-kvgxupwiq{fvwict
DEBUG:distmqtt.client.plugins:Loading plugins for namespace distmqtt.client.plugins
DEBUG:distmqtt.client:Connect to: mqtt://127.0.0.1:1883
DEBUG:distmqtt.mqtt.protocol.handler:< C ConnackPacket(ts=2020-08-07 12:15:24.627961, fixed=MQTTFixedHeader(length=2, flags=0x0), variable=ConnackVariableHeader(session_parent=0x0, return_code=0x0), payload=None)
DEBUG:distmqtt.mqtt.protocol.handler:distmqtt-kvgxupwiq{fvwict Starting reader coro
DEBUG:distmqtt.mqtt.protocol.handler:Handler tasks started
DEBUG:distmqtt.mqtt.protocol.handler:Begin messages delivery retries
DEBUG:distmqtt.mqtt.protocol.handler:0 messages redelivered
DEBUG:distmqtt.mqtt.protocol.handler:0 messages not redelivered due to timeout
DEBUG:distmqtt.mqtt.protocol.handler:End messages delivery retries
DEBUG:distmqtt.mqtt.protocol.handler:Client distmqtt-kvgxupwiq{fvwict ready
DEBUG:transitions.core:Executed machine preparation callbacks before conditions.
DEBUG:transitions.core:Initiating transition from state new to state connected...
DEBUG:transitions.core:Executed callbacks before conditions.
DEBUG:transitions.core:Executed callback before transition.
DEBUG:transitions.core:Exiting state new. Processing callbacks...
INFO:transitions.core:Exited state new
DEBUG:transitions.core:Entering state connected. Processing callbacks...
INFO:transitions.core:Entered state connected
DEBUG:transitions.core:Executed callback after transition.
DEBUG:transitions.core:Executed machine finalize callbacks
DEBUG:distmqtt.client:connected to 127.0.0.1:1883
DEBUG:distmqtt.client:Wait for broker disconnection
WARNING:__main__:Connected
DEBUG:distmqtt.mqtt.protocol.handler:Client distmqtt-kvgxupwiq{fvwict stopping
DEBUG:distmqtt.mqtt.protocol.handler:waiting for reader <anyio._backends._asyncio.TaskGroup object at 0x102e958c0> to be stopped
WARNING:distmqtt.client:Disconnected from broker
DEBUG:distmqtt.mqtt.protocol.handler:Client distmqtt-kvgxupwiq{fvwict stopping
DEBUG:distmqtt.mqtt.protocol.handler:waiting for sender <anyio._backends._asyncio.CancelScope object at 0x102e5f7b0> to be stopped
WARNING:distmqtt.mqtt.protocol.handler:ClientProtocolHandler CANCEL
DEBUG:distmqtt.mqtt.protocol.handler:Client distmqtt-kvgxupwiq{fvwict coro stopped
DEBUG:distmqtt.mqtt.protocol.handler:closing writer
DEBUG:distmqtt.mqtt.protocol.handler:closed writer
DEBUG:transitions.core:Executed machine preparation callbacks before conditions.
DEBUG:transitions.core:Initiating transition from state connected to state disconnected...
DEBUG:transitions.core:Executed callbacks before conditions.
DEBUG:transitions.core:Executed callback before transition.
DEBUG:transitions.core:Exiting state connected. Processing callbacks...
INFO:transitions.core:Exited state connected
DEBUG:transitions.core:Entering state disconnected. Processing callbacks...
INFO:transitions.core:Entered state disconnected
DEBUG:transitions.core:Executed callback after transition.
DEBUG:transitions.core:Executed machine finalize callbacks
WARNING:__main__:Disconnected
DEBUG:distmqtt.mqtt.protocol.handler:closing writer
DEBUG:distmqtt.mqtt.protocol.handler:closed writer
DEBUG:transitions.core:Executed machine preparation callbacks before conditions.
DEBUG:transitions.core:Initiating transition from state disconnected to state disconnected...
DEBUG:transitions.core:Executed callbacks before conditions.
DEBUG:transitions.core:Executed callback before transition.
DEBUG:transitions.core:Exiting state disconnected. Processing callbacks...
INFO:transitions.core:Exited state disconnected
DEBUG:transitions.core:Entering state disconnected. Processing callbacks...
INFO:transitions.core:Entered state disconnected
DEBUG:transitions.core:Executed callback after transition.
DEBUG:transitions.core:Executed machine finalize callbacks
But running it with secure on the command line (enabling MQTTS and using asyncio) crashes the app with a stack trace:
DEBUG:asyncio:Using selector: KqueueSelector
DEBUG:distmqtt.client:Using generated client ID : distmqtt-elgwutjmmdeiv{fq
DEBUG:distmqtt.client.plugins:Loading plugins for namespace distmqtt.client.plugins
DEBUG:distmqtt.client:Connect to: mqtts://127.0.0.1:2883
DEBUG:distmqtt.mqtt.protocol.handler:< C ConnackPacket(ts=2020-08-07 12:17:18.235579, fixed=MQTTFixedHeader(length=2, flags=0x0), variable=ConnackVariableHeader(session_parent=0x0, return_code=0x0), payload=None)
DEBUG:distmqtt.mqtt.protocol.handler:distmqtt-elgwutjmmdeiv{fq Starting reader coro
DEBUG:distmqtt.mqtt.protocol.handler:Handler tasks started
DEBUG:distmqtt.mqtt.protocol.handler:Begin messages delivery retries
DEBUG:distmqtt.mqtt.protocol.handler:0 messages redelivered
DEBUG:distmqtt.mqtt.protocol.handler:0 messages not redelivered due to timeout
DEBUG:distmqtt.mqtt.protocol.handler:End messages delivery retries
DEBUG:distmqtt.mqtt.protocol.handler:Client distmqtt-elgwutjmmdeiv{fq ready
DEBUG:transitions.core:Executed machine preparation callbacks before conditions.
DEBUG:transitions.core:Initiating transition from state new to state connected...
DEBUG:transitions.core:Executed callbacks before conditions.
DEBUG:transitions.core:Executed callback before transition.
DEBUG:transitions.core:Exiting state new. Processing callbacks...
INFO:transitions.core:Exited state new
DEBUG:transitions.core:Entering state connected. Processing callbacks...
INFO:transitions.core:Entered state connected
DEBUG:transitions.core:Executed callback after transition.
DEBUG:transitions.core:Executed machine finalize callbacks
DEBUG:distmqtt.client:connected to 127.0.0.1:2883
DEBUG:distmqtt.client:Wait for broker disconnection
WARNING:__main__:Connected
DEBUG:distmqtt.mqtt.protocol.handler:Client distmqtt-elgwutjmmdeiv{fq stopping
DEBUG:distmqtt.mqtt.protocol.handler:waiting for reader <anyio._backends._asyncio.TaskGroup object at 0x10ffeb800> to be stopped
WARNING:distmqtt.client:Disconnected from broker
DEBUG:distmqtt.mqtt.protocol.handler:Client distmqtt-elgwutjmmdeiv{fq stopping
DEBUG:distmqtt.mqtt.protocol.handler:waiting for sender <anyio._backends._asyncio.CancelScope object at 0x10ffcac10> to be stopped
WARNING:distmqtt.mqtt.protocol.handler:ClientProtocolHandler CANCEL
DEBUG:distmqtt.mqtt.protocol.handler:Client distmqtt-elgwutjmmdeiv{fq coro stopped
DEBUG:distmqtt.mqtt.protocol.handler:closing writer
DEBUG:distmqtt.mqtt.protocol.handler:closing writer
Traceback (most recent call last):
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 185, in unwrap_tls
self._raw_socket = cast(ssl.SSLSocket, self._raw_socket).unwrap()
File "/Users/mikenerone/.pyenv/versions/3.8.5/lib/python3.8/ssl.py", line 1285, in unwrap
s = self._sslobj.shutdown()
ssl.SSLWantReadError: The operation did not complete (read) (_ssl.c:2745)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "./closetest.py", line 34, in main
await client.disconnect()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/client.py", line 266, in disconnect
await self._handler.stop()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/client_handler.py", line 31, in stop
await super().stop()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/handler.py", line 175, in stop
await self.stream.close()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/adapters.py", line 123, in close
await self._stream.close()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 216, in close
await self._socket.unwrap_tls()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 188, in unwrap_tls
await self._wait_readable()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 633, in wait_socket_readable
raise ClosedResourceError
anyio.exceptions.ClosedResourceError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "./closetest.py", line 37, in <module>
anyio.run(main)
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/__init__.py", line 72, in run
return asynclib.run(func, *args, **backend_options) # type: ignore
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 114, in run
raise exception
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 76, in wrapper
retval = await func(*args)
File "./closetest.py", line 35, in main
logger.warning('Disconnected')
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 390, in __aexit__
raise ExceptionGroup(exceptions)
anyio._backends._asyncio.ExceptionGroup: 2 exceptions were raised in the task group:
----------------------------
Traceback (most recent call last):
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 415, in _run_wrapped_task
await func(*args)
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/client.py", line 724, in handle_connection_close
await self._handler.stop()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/client_handler.py", line 31, in stop
await super().stop()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/handler.py", line 175, in stop
await self.stream.close()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/adapters.py", line 123, in close
await self._stream.close()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 216, in close
await self._socket.unwrap_tls()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 188, in unwrap_tls
await self._wait_readable()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 618, in wait_socket_readable
raise ResourceBusyError('reading from') from None
anyio.exceptions.ResourceBusyError: Another task is already reading from this resource
----------------------------
Traceback (most recent call last):
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 185, in unwrap_tls
self._raw_socket = cast(ssl.SSLSocket, self._raw_socket).unwrap()
File "/Users/mikenerone/.pyenv/versions/3.8.5/lib/python3.8/ssl.py", line 1285, in unwrap
s = self._sslobj.shutdown()
ssl.SSLWantReadError: The operation did not complete (read) (_ssl.c:2745)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "./closetest.py", line 34, in main
await client.disconnect()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/client.py", line 266, in disconnect
await self._handler.stop()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/client_handler.py", line 31, in stop
await super().stop()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/handler.py", line 175, in stop
await self.stream.close()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/adapters.py", line 123, in close
await self._stream.close()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 216, in close
await self._socket.unwrap_tls()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 188, in unwrap_tls
await self._wait_readable()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 633, in wait_socket_readable
raise ClosedResourceError
anyio.exceptions.ClosedResourceError
Running it with "secure" and "trio" on the command line (enabling MQTSS and trio) also crashes with a stack trace, but it's different:
DEBUG:distmqtt.client:Using generated client ID : distmqtt-ai{epvdnqr{uqppo
DEBUG:distmqtt.client.plugins:Loading plugins for namespace distmqtt.client.plugins
DEBUG:distmqtt.client:Connect to: mqtts://127.0.0.1:2883
DEBUG:distmqtt.mqtt.protocol.handler:< C ConnackPacket(ts=2020-08-07 12:25:17.745781, fixed=MQTTFixedHeader(length=2, flags=0x0), variable=ConnackVariableHeader(session_parent=0x0, return_code=0x0), payload=None)
DEBUG:distmqtt.mqtt.protocol.handler:distmqtt-ai{epvdnqr{uqppo Starting reader coro
DEBUG:distmqtt.mqtt.protocol.handler:Handler tasks started
DEBUG:distmqtt.mqtt.protocol.handler:Begin messages delivery retries
DEBUG:distmqtt.mqtt.protocol.handler:0 messages redelivered
DEBUG:distmqtt.mqtt.protocol.handler:0 messages not redelivered due to timeout
DEBUG:distmqtt.mqtt.protocol.handler:End messages delivery retries
DEBUG:distmqtt.mqtt.protocol.handler:Client distmqtt-ai{epvdnqr{uqppo ready
DEBUG:transitions.core:Executed machine preparation callbacks before conditions.
DEBUG:transitions.core:Initiating transition from state new to state connected...
DEBUG:transitions.core:Executed callbacks before conditions.
DEBUG:transitions.core:Executed callback before transition.
DEBUG:transitions.core:Exiting state new. Processing callbacks...
INFO:transitions.core:Exited state new
DEBUG:transitions.core:Entering state connected. Processing callbacks...
INFO:transitions.core:Entered state connected
DEBUG:transitions.core:Executed callback after transition.
DEBUG:transitions.core:Executed machine finalize callbacks
DEBUG:distmqtt.client:connected to 127.0.0.1:2883
DEBUG:distmqtt.client:Wait for broker disconnection
WARNING:__main__:Connected
DEBUG:distmqtt.mqtt.protocol.handler:Client distmqtt-ai{epvdnqr{uqppo stopping
DEBUG:distmqtt.mqtt.protocol.handler:waiting for reader <anyio._backends._trio.TaskGroup object at 0x106fc3240> to be stopped
WARNING:distmqtt.client:Disconnected from broker
DEBUG:distmqtt.mqtt.protocol.handler:Client distmqtt-ai{epvdnqr{uqppo stopping
DEBUG:distmqtt.mqtt.protocol.handler:waiting for sender <anyio._backends._trio.CancelScope object at 0x106f91c40> to be stopped
WARNING:distmqtt.mqtt.protocol.handler:ClientProtocolHandler CANCEL
DEBUG:distmqtt.mqtt.protocol.handler:Client distmqtt-ai{epvdnqr{uqppo coro stopped
DEBUG:distmqtt.mqtt.protocol.handler:closing writer
DEBUG:distmqtt.mqtt.protocol.handler:closing writer
Traceback (most recent call last):
File "./closetest.py", line 38, in <module>
anyio.run(main, backend=backend)
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/__init__.py", line 72, in run
return asynclib.run(func, *args, **backend_options) # type: ignore
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/trio/_core/_run.py", line 1896, in run
raise runner.main_task_outcome.error
File "./closetest.py", line 36, in main
logger.warning('Disconnected')
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_trio.py", line 139, in __aexit__
raise ExceptionGroup(exc.exceptions) from None
anyio._backends._trio.ExceptionGroup: 2 exceptions were raised in the task group:
----------------------------
Traceback (most recent call last):
File "./closetest.py", line 35, in main
await client.disconnect()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/client.py", line 266, in disconnect
await self._handler.stop()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/client_handler.py", line 31, in stop
await super().stop()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/handler.py", line 175, in stop
await self.stream.close()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/adapters.py", line 123, in close
await self._stream.close()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 216, in close
await self._socket.unwrap_tls()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 188, in unwrap_tls
await self._wait_readable()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_trio.py", line 213, in wait_socket_readable
raise ResourceBusyError('reading from') from None
anyio.exceptions.ResourceBusyError: Another task is already reading from this resource
----------------------------
Traceback (most recent call last):
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/client.py", line 724, in handle_connection_close
await self._handler.stop()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/client_handler.py", line 31, in stop
await super().stop()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/handler.py", line 175, in stop
await self.stream.close()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/adapters.py", line 123, in close
await self._stream.close()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 216, in close
await self._socket.unwrap_tls()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 188, in unwrap_tls
await self._wait_readable()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_trio.py", line 211, in wait_socket_readable
raise ClosedResourceError().with_traceback(exc.__traceback__) from None
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_trio.py", line 209, in wait_socket_readable
await wait_readable(sock)
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/trio/_core/_generated_io_kqueue.py", line 37, in wait_readable
return await GLOBAL_RUN_CONTEXT.runner.io_manager.wait_readable(fd)
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/trio/_core/_io_kqueue.py", line 162, in wait_readable
await self._wait_common(fd, select.KQ_FILTER_READ)
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/trio/_core/_io_kqueue.py", line 158, in _wait_common
await self.wait_kevent(fd, filter, abort)
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/trio/_core/_io_kqueue.py", line 127, in wait_kevent
return await _core.wait_task_rescheduled(abort)
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/trio/_core/_traps.py", line 166, in wait_task_rescheduled
return (await _async_yield(WaitTaskRescheduled(abort_func))).unwrap()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/outcome/_sync.py", line 111, in unwrap
raise captured_error
anyio.exceptions.ClosedResourceError
Details of embedded exception 1:
Traceback (most recent call last):
File "./closetest.py", line 35, in main
await client.disconnect()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/client.py", line 266, in disconnect
await self._handler.stop()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/client_handler.py", line 31, in stop
await super().stop()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/handler.py", line 175, in stop
await self.stream.close()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/adapters.py", line 123, in close
await self._stream.close()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 216, in close
await self._socket.unwrap_tls()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 188, in unwrap_tls
await self._wait_readable()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_trio.py", line 213, in wait_socket_readable
raise ResourceBusyError('reading from') from None
anyio.exceptions.ResourceBusyError: Another task is already reading from this resource
Details of embedded exception 2:
Traceback (most recent call last):
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/client.py", line 724, in handle_connection_close
await self._handler.stop()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/client_handler.py", line 31, in stop
await super().stop()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/handler.py", line 175, in stop
await self.stream.close()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/adapters.py", line 123, in close
await self._stream.close()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 216, in close
await self._socket.unwrap_tls()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 188, in unwrap_tls
await self._wait_readable()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_trio.py", line 211, in wait_socket_readable
raise ClosedResourceError().with_traceback(exc.__traceback__) from None
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_trio.py", line 209, in wait_socket_readable
await wait_readable(sock)
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/trio/_core/_generated_io_kqueue.py", line 37, in wait_readable
return await GLOBAL_RUN_CONTEXT.runner.io_manager.wait_readable(fd)
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/trio/_core/_io_kqueue.py", line 162, in wait_readable
await self._wait_common(fd, select.KQ_FILTER_READ)
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/trio/_core/_io_kqueue.py", line 158, in _wait_common
await self.wait_kevent(fd, filter, abort)
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/trio/_core/_io_kqueue.py", line 127, in wait_kevent
return await _core.wait_task_rescheduled(abort)
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/trio/_core/_traps.py", line 166, in wait_task_rescheduled
return (await _async_yield(WaitTaskRescheduled(abort_func))).unwrap()
File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/outcome/_sync.py", line 111, in unwrap
raise captured_error
anyio.exceptions.ClosedResourceError
The text was updated successfully, but these errors were encountered:
The first call ends up queueing the disconnect packet for sending, but that is not actually complete when the stop() ends up killing the handlers sender task. Here's a terrible workaround that actually works:
--- a/distmqtt/client.py
+++ b/distmqtt/client.py
@@ -262,6 +262,7 @@ class MQTTClient:
if self._disconnect_task is not None:
await self._disconnect_task.cancel()
await self._handler.mqtt_disconnect()
+ await anyio.sleep(0.1)
self._connected_state.clear()
await self._handler.stop()
self.session.transitions.disconnect()
But I think this needs a more proper fix than that that depends on a positive indicator of completion of sending the disconnect packet, but I'm not sure how you'd like to approach that (though I'm happy to help if you have a suggestion).
For now, out of necessity, I'm going to workaround the issue by monkey-patching ClientProtocolHandler.mqtt_disconnect() to add a short sleep before returning.
Reproduction script (obviously, I had a broker set up listening on both of the URLs in this):
Running this without "secure" on the command line works fine, regardless of backend
But running it with
secure
on the command line (enabling MQTTS and using asyncio) crashes the app with a stack trace:Running it with "secure" and "trio" on the command line (enabling MQTSS and trio) also crashes with a stack trace, but it's different:
The text was updated successfully, but these errors were encountered: