Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQTTClient doesn't close MQTTS (TLS) connections cleanly #7

Open
mikenerone opened this issue Aug 7, 2020 · 1 comment
Open

MQTTClient doesn't close MQTTS (TLS) connections cleanly #7

mikenerone opened this issue Aug 7, 2020 · 1 comment

Comments

@mikenerone
Copy link
Contributor

mikenerone commented Aug 7, 2020

Reproduction script (obviously, I had a broker set up listening on both of the URLs in this):

#! /usr/bin/env python3

import logging
import sys

import anyio
from distmqtt.client import MQTTClient

secure = 'secure' in sys.argv
backend = 'trio' if 'trio' in sys.argv else 'asyncio'

if secure:
    uri = 'mqtts://127.0.0.1:2883'
    config = dict(
        check_hostname=False,
        broker=dict(
            # Side note: Had to make a CA because there's no way to turn off verification
            cafile='ca-bundle.crt',
        )
    )
else:
    uri = 'mqtt://127.0.0.1:1883'
    config = {}

logging.basicConfig(level='DEBUG')
logging.getLogger().setLevel(logging.DEBUG)
logger = logging.getLogger(__name__)


async def main():
    async with anyio.create_task_group() as tg:
        client = MQTTClient(tg, config=config)
        await client.connect(uri)
        logger.warning('Connected')
        await client.disconnect()
        logger.warning('Disconnected')

anyio.run(main, backend=backend)

Running this without "secure" on the command line works fine, regardless of backend

DEBUG:asyncio:Using selector: KqueueSelector
DEBUG:distmqtt.client:Using generated client ID : distmqtt-kvgxupwiq{fvwict
DEBUG:distmqtt.client.plugins:Loading plugins for namespace distmqtt.client.plugins
DEBUG:distmqtt.client:Connect to: mqtt://127.0.0.1:1883
DEBUG:distmqtt.mqtt.protocol.handler:< C ConnackPacket(ts=2020-08-07 12:15:24.627961, fixed=MQTTFixedHeader(length=2, flags=0x0), variable=ConnackVariableHeader(session_parent=0x0, return_code=0x0), payload=None)
DEBUG:distmqtt.mqtt.protocol.handler:distmqtt-kvgxupwiq{fvwict Starting reader coro
DEBUG:distmqtt.mqtt.protocol.handler:Handler tasks started
DEBUG:distmqtt.mqtt.protocol.handler:Begin messages delivery retries
DEBUG:distmqtt.mqtt.protocol.handler:0 messages redelivered
DEBUG:distmqtt.mqtt.protocol.handler:0 messages not redelivered due to timeout
DEBUG:distmqtt.mqtt.protocol.handler:End messages delivery retries
DEBUG:distmqtt.mqtt.protocol.handler:Client distmqtt-kvgxupwiq{fvwict ready
DEBUG:transitions.core:Executed machine preparation callbacks before conditions.
DEBUG:transitions.core:Initiating transition from state new to state connected...
DEBUG:transitions.core:Executed callbacks before conditions.
DEBUG:transitions.core:Executed callback before transition.
DEBUG:transitions.core:Exiting state new. Processing callbacks...
INFO:transitions.core:Exited state new
DEBUG:transitions.core:Entering state connected. Processing callbacks...
INFO:transitions.core:Entered state connected
DEBUG:transitions.core:Executed callback after transition.
DEBUG:transitions.core:Executed machine finalize callbacks
DEBUG:distmqtt.client:connected to 127.0.0.1:1883
DEBUG:distmqtt.client:Wait for broker disconnection
WARNING:__main__:Connected
DEBUG:distmqtt.mqtt.protocol.handler:Client distmqtt-kvgxupwiq{fvwict stopping
DEBUG:distmqtt.mqtt.protocol.handler:waiting for reader <anyio._backends._asyncio.TaskGroup object at 0x102e958c0> to be stopped
WARNING:distmqtt.client:Disconnected from broker
DEBUG:distmqtt.mqtt.protocol.handler:Client distmqtt-kvgxupwiq{fvwict stopping
DEBUG:distmqtt.mqtt.protocol.handler:waiting for sender <anyio._backends._asyncio.CancelScope object at 0x102e5f7b0> to be stopped
WARNING:distmqtt.mqtt.protocol.handler:ClientProtocolHandler CANCEL
DEBUG:distmqtt.mqtt.protocol.handler:Client distmqtt-kvgxupwiq{fvwict coro stopped
DEBUG:distmqtt.mqtt.protocol.handler:closing writer
DEBUG:distmqtt.mqtt.protocol.handler:closed writer
DEBUG:transitions.core:Executed machine preparation callbacks before conditions.
DEBUG:transitions.core:Initiating transition from state connected to state disconnected...
DEBUG:transitions.core:Executed callbacks before conditions.
DEBUG:transitions.core:Executed callback before transition.
DEBUG:transitions.core:Exiting state connected. Processing callbacks...
INFO:transitions.core:Exited state connected
DEBUG:transitions.core:Entering state disconnected. Processing callbacks...
INFO:transitions.core:Entered state disconnected
DEBUG:transitions.core:Executed callback after transition.
DEBUG:transitions.core:Executed machine finalize callbacks
WARNING:__main__:Disconnected
DEBUG:distmqtt.mqtt.protocol.handler:closing writer
DEBUG:distmqtt.mqtt.protocol.handler:closed writer
DEBUG:transitions.core:Executed machine preparation callbacks before conditions.
DEBUG:transitions.core:Initiating transition from state disconnected to state disconnected...
DEBUG:transitions.core:Executed callbacks before conditions.
DEBUG:transitions.core:Executed callback before transition.
DEBUG:transitions.core:Exiting state disconnected. Processing callbacks...
INFO:transitions.core:Exited state disconnected
DEBUG:transitions.core:Entering state disconnected. Processing callbacks...
INFO:transitions.core:Entered state disconnected
DEBUG:transitions.core:Executed callback after transition.
DEBUG:transitions.core:Executed machine finalize callbacks

But running it with secure on the command line (enabling MQTTS and using asyncio) crashes the app with a stack trace:

DEBUG:asyncio:Using selector: KqueueSelector
DEBUG:distmqtt.client:Using generated client ID : distmqtt-elgwutjmmdeiv{fq
DEBUG:distmqtt.client.plugins:Loading plugins for namespace distmqtt.client.plugins
DEBUG:distmqtt.client:Connect to: mqtts://127.0.0.1:2883
DEBUG:distmqtt.mqtt.protocol.handler:< C ConnackPacket(ts=2020-08-07 12:17:18.235579, fixed=MQTTFixedHeader(length=2, flags=0x0), variable=ConnackVariableHeader(session_parent=0x0, return_code=0x0), payload=None)
DEBUG:distmqtt.mqtt.protocol.handler:distmqtt-elgwutjmmdeiv{fq Starting reader coro
DEBUG:distmqtt.mqtt.protocol.handler:Handler tasks started
DEBUG:distmqtt.mqtt.protocol.handler:Begin messages delivery retries
DEBUG:distmqtt.mqtt.protocol.handler:0 messages redelivered
DEBUG:distmqtt.mqtt.protocol.handler:0 messages not redelivered due to timeout
DEBUG:distmqtt.mqtt.protocol.handler:End messages delivery retries
DEBUG:distmqtt.mqtt.protocol.handler:Client distmqtt-elgwutjmmdeiv{fq ready
DEBUG:transitions.core:Executed machine preparation callbacks before conditions.
DEBUG:transitions.core:Initiating transition from state new to state connected...
DEBUG:transitions.core:Executed callbacks before conditions.
DEBUG:transitions.core:Executed callback before transition.
DEBUG:transitions.core:Exiting state new. Processing callbacks...
INFO:transitions.core:Exited state new
DEBUG:transitions.core:Entering state connected. Processing callbacks...
INFO:transitions.core:Entered state connected
DEBUG:transitions.core:Executed callback after transition.
DEBUG:transitions.core:Executed machine finalize callbacks
DEBUG:distmqtt.client:connected to 127.0.0.1:2883
DEBUG:distmqtt.client:Wait for broker disconnection
WARNING:__main__:Connected
DEBUG:distmqtt.mqtt.protocol.handler:Client distmqtt-elgwutjmmdeiv{fq stopping
DEBUG:distmqtt.mqtt.protocol.handler:waiting for reader <anyio._backends._asyncio.TaskGroup object at 0x10ffeb800> to be stopped
WARNING:distmqtt.client:Disconnected from broker
DEBUG:distmqtt.mqtt.protocol.handler:Client distmqtt-elgwutjmmdeiv{fq stopping
DEBUG:distmqtt.mqtt.protocol.handler:waiting for sender <anyio._backends._asyncio.CancelScope object at 0x10ffcac10> to be stopped
WARNING:distmqtt.mqtt.protocol.handler:ClientProtocolHandler CANCEL
DEBUG:distmqtt.mqtt.protocol.handler:Client distmqtt-elgwutjmmdeiv{fq coro stopped
DEBUG:distmqtt.mqtt.protocol.handler:closing writer
DEBUG:distmqtt.mqtt.protocol.handler:closing writer
Traceback (most recent call last):
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 185, in unwrap_tls
    self._raw_socket = cast(ssl.SSLSocket, self._raw_socket).unwrap()
  File "/Users/mikenerone/.pyenv/versions/3.8.5/lib/python3.8/ssl.py", line 1285, in unwrap
    s = self._sslobj.shutdown()
ssl.SSLWantReadError: The operation did not complete (read) (_ssl.c:2745)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "./closetest.py", line 34, in main
    await client.disconnect()
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/client.py", line 266, in disconnect
    await self._handler.stop()
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/client_handler.py", line 31, in stop
    await super().stop()
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/handler.py", line 175, in stop
    await self.stream.close()
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/adapters.py", line 123, in close
    await self._stream.close()
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 216, in close
    await self._socket.unwrap_tls()
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 188, in unwrap_tls
    await self._wait_readable()
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 633, in wait_socket_readable
    raise ClosedResourceError
anyio.exceptions.ClosedResourceError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "./closetest.py", line 37, in <module>
    anyio.run(main)
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/__init__.py", line 72, in run
    return asynclib.run(func, *args, **backend_options)  # type: ignore
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 114, in run
    raise exception
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 76, in wrapper
    retval = await func(*args)
  File "./closetest.py", line 35, in main
    logger.warning('Disconnected')
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 390, in __aexit__
    raise ExceptionGroup(exceptions)
anyio._backends._asyncio.ExceptionGroup: 2 exceptions were raised in the task group:
----------------------------
Traceback (most recent call last):

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 415, in _run_wrapped_task
    await func(*args)

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/client.py", line 724, in handle_connection_close
    await self._handler.stop()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/client_handler.py", line 31, in stop
    await super().stop()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/handler.py", line 175, in stop
    await self.stream.close()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/adapters.py", line 123, in close
    await self._stream.close()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 216, in close
    await self._socket.unwrap_tls()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 188, in unwrap_tls
    await self._wait_readable()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 618, in wait_socket_readable
    raise ResourceBusyError('reading from') from None

anyio.exceptions.ResourceBusyError: Another task is already reading from this resource
----------------------------
Traceback (most recent call last):

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 185, in unwrap_tls
    self._raw_socket = cast(ssl.SSLSocket, self._raw_socket).unwrap()

  File "/Users/mikenerone/.pyenv/versions/3.8.5/lib/python3.8/ssl.py", line 1285, in unwrap
    s = self._sslobj.shutdown()

ssl.SSLWantReadError: The operation did not complete (read) (_ssl.c:2745)


During handling of the above exception, another exception occurred:


Traceback (most recent call last):

  File "./closetest.py", line 34, in main
    await client.disconnect()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/client.py", line 266, in disconnect
    await self._handler.stop()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/client_handler.py", line 31, in stop
    await super().stop()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/handler.py", line 175, in stop
    await self.stream.close()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/adapters.py", line 123, in close
    await self._stream.close()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 216, in close
    await self._socket.unwrap_tls()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 188, in unwrap_tls
    await self._wait_readable()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 633, in wait_socket_readable
    raise ClosedResourceError

anyio.exceptions.ClosedResourceError

Running it with "secure" and "trio" on the command line (enabling MQTSS and trio) also crashes with a stack trace, but it's different:

DEBUG:distmqtt.client:Using generated client ID : distmqtt-ai{epvdnqr{uqppo
DEBUG:distmqtt.client.plugins:Loading plugins for namespace distmqtt.client.plugins
DEBUG:distmqtt.client:Connect to: mqtts://127.0.0.1:2883
DEBUG:distmqtt.mqtt.protocol.handler:< C ConnackPacket(ts=2020-08-07 12:25:17.745781, fixed=MQTTFixedHeader(length=2, flags=0x0), variable=ConnackVariableHeader(session_parent=0x0, return_code=0x0), payload=None)
DEBUG:distmqtt.mqtt.protocol.handler:distmqtt-ai{epvdnqr{uqppo Starting reader coro
DEBUG:distmqtt.mqtt.protocol.handler:Handler tasks started
DEBUG:distmqtt.mqtt.protocol.handler:Begin messages delivery retries
DEBUG:distmqtt.mqtt.protocol.handler:0 messages redelivered
DEBUG:distmqtt.mqtt.protocol.handler:0 messages not redelivered due to timeout
DEBUG:distmqtt.mqtt.protocol.handler:End messages delivery retries
DEBUG:distmqtt.mqtt.protocol.handler:Client distmqtt-ai{epvdnqr{uqppo ready
DEBUG:transitions.core:Executed machine preparation callbacks before conditions.
DEBUG:transitions.core:Initiating transition from state new to state connected...
DEBUG:transitions.core:Executed callbacks before conditions.
DEBUG:transitions.core:Executed callback before transition.
DEBUG:transitions.core:Exiting state new. Processing callbacks...
INFO:transitions.core:Exited state new
DEBUG:transitions.core:Entering state connected. Processing callbacks...
INFO:transitions.core:Entered state connected
DEBUG:transitions.core:Executed callback after transition.
DEBUG:transitions.core:Executed machine finalize callbacks
DEBUG:distmqtt.client:connected to 127.0.0.1:2883
DEBUG:distmqtt.client:Wait for broker disconnection
WARNING:__main__:Connected
DEBUG:distmqtt.mqtt.protocol.handler:Client distmqtt-ai{epvdnqr{uqppo stopping
DEBUG:distmqtt.mqtt.protocol.handler:waiting for reader <anyio._backends._trio.TaskGroup object at 0x106fc3240> to be stopped
WARNING:distmqtt.client:Disconnected from broker
DEBUG:distmqtt.mqtt.protocol.handler:Client distmqtt-ai{epvdnqr{uqppo stopping
DEBUG:distmqtt.mqtt.protocol.handler:waiting for sender <anyio._backends._trio.CancelScope object at 0x106f91c40> to be stopped
WARNING:distmqtt.mqtt.protocol.handler:ClientProtocolHandler CANCEL
DEBUG:distmqtt.mqtt.protocol.handler:Client distmqtt-ai{epvdnqr{uqppo coro stopped
DEBUG:distmqtt.mqtt.protocol.handler:closing writer
DEBUG:distmqtt.mqtt.protocol.handler:closing writer
Traceback (most recent call last):
  File "./closetest.py", line 38, in <module>
    anyio.run(main, backend=backend)
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/__init__.py", line 72, in run
    return asynclib.run(func, *args, **backend_options)  # type: ignore
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/trio/_core/_run.py", line 1896, in run
    raise runner.main_task_outcome.error
  File "./closetest.py", line 36, in main
    logger.warning('Disconnected')
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_trio.py", line 139, in __aexit__
    raise ExceptionGroup(exc.exceptions) from None
anyio._backends._trio.ExceptionGroup: 2 exceptions were raised in the task group:
----------------------------
Traceback (most recent call last):

  File "./closetest.py", line 35, in main
    await client.disconnect()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/client.py", line 266, in disconnect
    await self._handler.stop()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/client_handler.py", line 31, in stop
    await super().stop()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/handler.py", line 175, in stop
    await self.stream.close()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/adapters.py", line 123, in close
    await self._stream.close()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 216, in close
    await self._socket.unwrap_tls()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 188, in unwrap_tls
    await self._wait_readable()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_trio.py", line 213, in wait_socket_readable
    raise ResourceBusyError('reading from') from None

anyio.exceptions.ResourceBusyError: Another task is already reading from this resource
----------------------------
Traceback (most recent call last):

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/client.py", line 724, in handle_connection_close
    await self._handler.stop()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/client_handler.py", line 31, in stop
    await super().stop()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/handler.py", line 175, in stop
    await self.stream.close()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/adapters.py", line 123, in close
    await self._stream.close()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 216, in close
    await self._socket.unwrap_tls()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 188, in unwrap_tls
    await self._wait_readable()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_trio.py", line 211, in wait_socket_readable
    raise ClosedResourceError().with_traceback(exc.__traceback__) from None

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_trio.py", line 209, in wait_socket_readable
    await wait_readable(sock)

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/trio/_core/_generated_io_kqueue.py", line 37, in wait_readable
    return await GLOBAL_RUN_CONTEXT.runner.io_manager.wait_readable(fd)

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/trio/_core/_io_kqueue.py", line 162, in wait_readable
    await self._wait_common(fd, select.KQ_FILTER_READ)

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/trio/_core/_io_kqueue.py", line 158, in _wait_common
    await self.wait_kevent(fd, filter, abort)

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/trio/_core/_io_kqueue.py", line 127, in wait_kevent
    return await _core.wait_task_rescheduled(abort)

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/trio/_core/_traps.py", line 166, in wait_task_rescheduled
    return (await _async_yield(WaitTaskRescheduled(abort_func))).unwrap()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/outcome/_sync.py", line 111, in unwrap
    raise captured_error

anyio.exceptions.ClosedResourceError


Details of embedded exception 1:

  Traceback (most recent call last):
    File "./closetest.py", line 35, in main
      await client.disconnect()
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/client.py", line 266, in disconnect
      await self._handler.stop()
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/client_handler.py", line 31, in stop
      await super().stop()
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/handler.py", line 175, in stop
      await self.stream.close()
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/adapters.py", line 123, in close
      await self._stream.close()
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 216, in close
      await self._socket.unwrap_tls()
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 188, in unwrap_tls
      await self._wait_readable()
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_trio.py", line 213, in wait_socket_readable
      raise ResourceBusyError('reading from') from None
  anyio.exceptions.ResourceBusyError: Another task is already reading from this resource

Details of embedded exception 2:

  Traceback (most recent call last):
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/client.py", line 724, in handle_connection_close
      await self._handler.stop()
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/client_handler.py", line 31, in stop
      await super().stop()
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/handler.py", line 175, in stop
      await self.stream.close()
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/adapters.py", line 123, in close
      await self._stream.close()
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 216, in close
      await self._socket.unwrap_tls()
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 188, in unwrap_tls
      await self._wait_readable()
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_trio.py", line 211, in wait_socket_readable
      raise ClosedResourceError().with_traceback(exc.__traceback__) from None
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_trio.py", line 209, in wait_socket_readable
      await wait_readable(sock)
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/trio/_core/_generated_io_kqueue.py", line 37, in wait_readable
      return await GLOBAL_RUN_CONTEXT.runner.io_manager.wait_readable(fd)
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/trio/_core/_io_kqueue.py", line 162, in wait_readable
      await self._wait_common(fd, select.KQ_FILTER_READ)
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/trio/_core/_io_kqueue.py", line 158, in _wait_common
      await self.wait_kevent(fd, filter, abort)
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/trio/_core/_io_kqueue.py", line 127, in wait_kevent
      return await _core.wait_task_rescheduled(abort)
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/trio/_core/_traps.py", line 166, in wait_task_rescheduled
      return (await _async_yield(WaitTaskRescheduled(abort_func))).unwrap()
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/outcome/_sync.py", line 111, in unwrap
      raise captured_error
  anyio.exceptions.ClosedResourceError
@mikenerone
Copy link
Contributor Author

Ok, the cause is that in MQTTClient.disconnect(), these two operation are effectively done in succession:

            await self._handler.mqtt_disconnect()
            ...
            await self._handler.stop()

The first call ends up queueing the disconnect packet for sending, but that is not actually complete when the stop() ends up killing the handlers sender task. Here's a terrible workaround that actually works:

--- a/distmqtt/client.py
+++ b/distmqtt/client.py
@@ -262,6 +262,7 @@ class MQTTClient:
             if self._disconnect_task is not None:
                 await self._disconnect_task.cancel()
             await self._handler.mqtt_disconnect()
+            await anyio.sleep(0.1)
             self._connected_state.clear()
             await self._handler.stop()
             self.session.transitions.disconnect()

But I think this needs a more proper fix than that that depends on a positive indicator of completion of sending the disconnect packet, but I'm not sure how you'd like to approach that (though I'm happy to help if you have a suggestion).

For now, out of necessity, I'm going to workaround the issue by monkey-patching ClientProtocolHandler.mqtt_disconnect() to add a short sleep before returning.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant