Skip to content

Commit

Permalink
Revert the change making reactor less blocking (bsc#1230322)
Browse files Browse the repository at this point in the history
This reverts commit 0d35f09.
  • Loading branch information
vzhestkov authored Sep 25, 2024
1 parent 2fb453d commit c00801d
Showing 1 changed file with 17 additions and 28 deletions.
45 changes: 17 additions & 28 deletions salt/utils/reactor.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
"""
Functions which implement running reactor jobs
"""

import fnmatch
import glob
import logging
import os
from threading import Lock

import salt.client
import salt.defaults.exitcodes
Expand Down Expand Up @@ -196,6 +194,13 @@ def reactions(self, tag, data, reactors):
self.resolve_aliases(chunks)
return chunks

def call_reactions(self, chunks):
"""
Execute the reaction state
"""
for chunk in chunks:
self.wrap.run(chunk)

def run(self):
"""
Enter into the server loop
Expand All @@ -213,7 +218,7 @@ def run(self):
) as event:
self.wrap = ReactWrap(self.opts)

for data in event.iter_events(full=True, auto_reconnect=True):
for data in event.iter_events(full=True):
# skip all events fired by ourselves
if data["data"].get("user") == self.wrap.event_user:
continue
Expand Down Expand Up @@ -263,9 +268,15 @@ def run(self):
if not self.is_leader:
continue
else:
self.wrap.call_reactions(
data, self.list_reactors, self.reactions
)
reactors = self.list_reactors(data["tag"])
if not reactors:
continue
chunks = self.reactions(data["tag"], data["data"], reactors)
if chunks:
try:
self.call_reactions(chunks)
except SystemExit:
log.warning("Exit ignored by reactor")


class ReactWrap:
Expand All @@ -286,7 +297,6 @@ class ReactWrap:

def __init__(self, opts):
self.opts = opts
self._run_lock = Lock()
if ReactWrap.client_cache is None:
ReactWrap.client_cache = salt.utils.cache.CacheDict(
opts["reactor_refresh_interval"]
Expand Down Expand Up @@ -470,24 +480,3 @@ def caller(self, fun, **kwargs):
Wrap LocalCaller to execute remote exec functions locally on the Minion
"""
self.client_cache["caller"].cmd(fun, *kwargs["arg"], **kwargs["kwarg"])

def _call_reactions(self, data, list_reactors, get_reactions):
reactors = list_reactors(data["tag"])
if not reactors:
return
chunks = get_reactions(data["tag"], data["data"], reactors)
if not chunks:
return
with self._run_lock:
try:
for chunk in chunks:
self.run(chunk)
except Exception as exc: # pylint: disable=broad-except
log.error(
"Exception while calling the reactions: %s", exc, exc_info=True
)

def call_reactions(self, data, list_reactors, get_reactions):
return self.pool.fire_async(
self._call_reactions, args=(data, list_reactors, get_reactions)
)

0 comments on commit c00801d

Please sign in to comment.