Skip to content

Commit

Permalink
Fix a potential deadlock on fork
Browse files Browse the repository at this point in the history
If a fork happens while some thread is sending metrics and holds one
of the locks, the lock would remain locked in the child process and
would deadlock, either in the post_fork handler (when the post_fork
hook tries to close the socket), or later (when user code tries to
send a metric).

Work around the issue by resetting the socket and buffer locks in the
child process. If those were locked in the parent at the time of the
fork, the internal client state may be inconsistent, so we reset it as
well.

With config lock, we can not reset the state to some known good state,
and to avoid problems when fork is called while a thread modifies the
client configuration, the config lock will be held across fork. Both
the client and the parent can safely unlock it afterwards.
  • Loading branch information
vickenty committed Jun 24, 2024
1 parent cca8ac7 commit b7cab0a
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 21 deletions.
63 changes: 45 additions & 18 deletions datadog/dogstatsd/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,17 +105,26 @@ def pre_fork():
c.pre_fork()


def post_fork():
def post_fork_parent():
"""Restore all client instances after a fork.
If SUPPORTS_FORKING is true, this will be called automatically after os.fork().
"""
for c in _instances:
c.post_fork()
c.post_fork_parent()


def post_fork_child():
for c in _instances:
c.post_fork_child()


if SUPPORTS_FORKING:
os.register_at_fork(before=pre_fork, after_in_child=post_fork, after_in_parent=post_fork) # type: ignore
os.register_at_fork( # type: ignore
before=pre_fork,
after_in_child=post_fork_child,
after_in_parent=post_fork_parent,
)


# pylint: disable=useless-object-inheritance,too-many-instance-attributes
Expand Down Expand Up @@ -473,7 +482,8 @@ def enable_background_sender(self, sender_queue_size=0, sender_queue_timeout=0):
Applications should call stop() before exiting to make sure all pending payloads are sent.
Compatible with os.fork() starting with Python 3.7. On earlier versions, compatible if applications
arrange to call pre_fork() and post_fork() module functions around calls to os.fork().
arrange to call pre_fork(), post_fork_parent() and post_fork_child() module functions around calls
to os.fork().
:param sender_queue_size: Set the maximum number of packets to queue for the sender.
How many packets to queue before blocking or dropping the packet if the packet queue is already full.
Expand Down Expand Up @@ -1397,29 +1407,46 @@ def wait_for_pending(self):
def pre_fork(self):
"""Prepare client for a process fork.
Flush any pending payloads, stop all background threads and
close the connection. Once the function returns.
Flush any pending payloads and stop all background threads.
The client should not be used from this point until
post_fork() is called.
state is restored by calling post_fork_parent() or
post_fork_child().
"""
log.debug("[%d] pre_fork for %s", os.getpid(), self)

self._forking = True
# Hold the config lock across fork. This will make sure that
# we don't fork in the middle of the concurrent modification
# of the client's settings. Data protected by other locks may
# be left in inconsistent state in the child process, which we
# will clean up in post_fork_child.

with self._config_lock:
self._stop_flush_thread()
self._stop_sender_thread()
self.close_socket()
self._config_lock.acquire()
self._stop_flush_thread()
self._stop_sender_thread()

def post_fork(self):
"""Restore the client state after a fork."""
def post_fork_parent(self):
"""Restore the client state after a fork in the parent process."""
self._start_flush_thread(self._flush_interval)
self._start_sender_thread()
self._config_lock.release()

log.debug("[%d] post_fork for %s", os.getpid(), self)
def post_fork_child(self):
"""Restore the client state after a fork in the child process."""
self._config_lock.release()

self.close_socket()
# Discard the locks that could have been locked at the time
# when we forked. This may cause inconsistent internal state,
# which we will fix in the next steps.
self._socket_lock = Lock()
self._buffer_lock = RLock()

self._forking = False
# Reset the buffer so we don't send metrics from the parent
# process. Also makes sure buffer properties are consistent.
self._reset_buffer()
# Execute the socket_path setter to reconcile transport and
# payload size properties in respect to socket_path value.
self.socket_path = self.socket_path
self.close_socket()

with self._config_lock:
self._start_flush_thread(self._flush_interval)
Expand Down
49 changes: 48 additions & 1 deletion tests/integration/dogstatsd/test_statsd_fork.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import itertools
import socket
import threading

import pytest

Expand Down Expand Up @@ -31,7 +32,7 @@ def inner(*args, **kwargs):
return inner

statsd.pre_fork = track(statsd.pre_fork)
statsd.post_fork = track(statsd.post_fork)
statsd.post_fork_parent = track(statsd.post_fork_parent)

pid = os.fork()
if pid == 0:
Expand All @@ -41,3 +42,49 @@ def inner(*args, **kwargs):
os.waitpid(pid, 0)

assert len(tracker) == 2


def sender_a(statsd, running):
while running[0]:
statsd.gauge("spam", 1)


def sender_b(statsd, signal):
while running[0]:
with statsd:
statsd.gauge("spam", 1)

@pytest.mark.parametrize(
"disable_background_sender, disable_buffering, sender",
list(itertools.product([True, False], [True, False], [sender_a, sender_b])),
)
def test_fork_with_thread(disable_background_sender, disable_buffering, sender):
if not SUPPORTS_FORKING:
pytest.skip("os.register_at_fork is required for this test")

statsd = DogStatsd(
telemetry_min_flush_interval=0,
disable_background_sender=disable_background_sender,
disable_buffering=disable_buffering,
)

sender = None
try:
sender_running = [True]
sender = threading.Thread(target=sender, args=(statsd, sender_running))
sender.daemon = True
sender.start()

pid = os.fork()
if pid == 0:
os._exit(42)

assert pid > 0
(_, status) = os.waitpid(pid, 0)

assert os.WEXITSTATUS(status) == 42
finally:
statsd.stop()
if sender:
sender_running[0] = False
sender.join()
2 changes: 1 addition & 1 deletion tests/integration/dogstatsd/test_statsd_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def test_fork_hooks(disable_background_sender, disable_buffering):
assert statsd._queue is None or statsd._queue.empty()
assert len(statsd._buffer) == 0

statsd.post_fork()
statsd.post_fork_parent()

assert disable_buffering or statsd._flush_thread.is_alive()
assert disable_background_sender or statsd._sender_thread.is_alive()
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/dogstatsd/test_statsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -2021,7 +2021,7 @@ def inner():
# Statsd should survive this sequence of events
statsd.pre_fork()
statsd.get_socket()
statsd.post_fork()
statsd.post_fork_parent()
t = Thread(target=inner)
t.daemon = True
t.start()
Expand Down

0 comments on commit b7cab0a

Please sign in to comment.