Skip to content

Commit

Permalink
Add disconnect on Nth retry to make it reresolving IP
Browse files Browse the repository at this point in the history
  • Loading branch information
vzhestkov committed Jul 19, 2024
1 parent 7986277 commit c9e9540
Showing 1 changed file with 29 additions and 2 deletions.
31 changes: 29 additions & 2 deletions salt/transport/zeromq.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Zeromq transport classes
"""

import errno
import hashlib
import logging
Expand Down Expand Up @@ -178,7 +179,7 @@ def __init__(self, opts, io_loop, **kwargs):
self._socket.setsockopt(zmq.IPV4ONLY, 0)

if HAS_ZMQ_MONITOR and self.opts["zmq_monitor"]:
self._monitor = ZeroMQSocketMonitor(self._socket)
self._monitor = ZeroMQSocketMonitor(self._socket, opts)
self._monitor.start_io_loop(self.io_loop)

def close(self):
Expand Down Expand Up @@ -211,6 +212,12 @@ def connect(self, publish_port, connect_callback=None, disconnect_callback=None)
self.master_pub,
)
log.debug("%r connecting to %s", self, self.master_pub)
if (
hasattr(self, "_monitor")
and self._monitor is not None
and disconnect_callback is not None
):
self._monitor.disconnect_callback = disconnect_callback
self._socket.connect(self.master_pub)
connect_callback(True)

Expand Down Expand Up @@ -634,7 +641,7 @@ def mark_future(msg):
class ZeroMQSocketMonitor:
__EVENT_MAP = None

def __init__(self, socket):
def __init__(self, socket, opts=None):
"""
Create ZMQ monitor sockets
Expand All @@ -644,6 +651,11 @@ def __init__(self, socket):
self._socket = socket
self._monitor_socket = self._socket.get_monitor_socket()
self._monitor_stream = None
self.disconnect_callback = None
self.disconnect_on_retry = None
self._connect_retry = None
if opts is not None:
self.disconnect_on_retry = opts.get("zmq_disconnect_on_retry", 10)

def start_io_loop(self, io_loop):
log.trace("Event monitor start!")
Expand Down Expand Up @@ -680,6 +692,21 @@ def monitor_callback(self, msg):
log.debug("ZeroMQ event: %s", evt)
if evt["event"] == zmq.EVENT_MONITOR_STOPPED:
self.stop()
elif evt["event"] == zmq.EVENT_DISCONNECTED:
if self.disconnect_callback is not None:
self.disconnect_callback()
elif evt["event"] == zmq.EVENT_CONNECT_RETRIED:
if (
self.disconnect_on_retry is not None
and self.disconnect_callback is not None
):
if self._connect_retry is None:
self._connect_retry = self.disconnect_on_retry
self._connect_retry -= 1
if self._connect_retry <= 0:
log.debug("Calling disconnect callback as number of retries reached.")
self.disconnect_callback()
self._connect_retry = self.disconnect_on_retry

def stop(self):
if self._socket is None:
Expand Down

0 comments on commit c9e9540

Please sign in to comment.