Skip to content

Commit

Permalink
Lint picking.
Browse files Browse the repository at this point in the history
  • Loading branch information
mlinvill committed Aug 8, 2024
1 parent ab892c3 commit 6f2202d
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 24 deletions.
33 changes: 19 additions & 14 deletions src/distributedlock/distributed/disco.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
This should generate events on peer changes!!!
Created on @date
Created on @date
@author: mlinvill
"""
Expand Down Expand Up @@ -84,7 +84,8 @@ def __init__(self):
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
sock.settimeout(0)
try:
"""This can throw OSError or TimeoutError, but we don't care. Any
"""
This can throw OSError or TimeoutError, but we don't care. Any
exception results in not being able to determine our ip and fall-back
to a default value.
"""
Expand Down Expand Up @@ -118,7 +119,7 @@ def add_peer(self, peer):
old_state = copy.deepcopy(self._state)

log.debug(
f"PeerList: add_peer(): Adding {peer}, state before is {pprint.pformat(self._state)}"
"PeerList: add_peer(): Adding {peer}, state before is {pprint.pformat(self._state)}"
)
self._state.add(peer)
self._length = len(self._state)
Expand Down Expand Up @@ -199,6 +200,7 @@ def __init__(self, *args, **kwargs):
self._in_disco = False
self._peerlist = PeerList()
self._id = None
self._executor = None
self._thrds = dict()
self._in_queue = queue.Queue(maxsize=15)
self._out_queue = queue.Queue(maxsize=15)
Expand All @@ -207,9 +209,9 @@ def __init__(self, *args, **kwargs):

""" setup broker, topic attributes
"""
for k, v in kwargs.items():
for k, val in kwargs.items():
key = f"_{k}"
self.__dict__[key] = v
self.__dict__[key] = val

if self._broker:
if self._read_topic:
Expand Down Expand Up @@ -245,8 +247,8 @@ def __enter__(self):
time.sleep(DISCO_STARTUP_DELAY)

self._executor = ThreadPoolExecutor(max_workers=2)
self._thrds["recv"] = self._executor.submit(self._recv, self, log)
self._thrds["send"] = self._executor.submit(self._send, self, log)
self._thrds["recv"] = self._executor.submit(self._recv, self)
self._thrds["send"] = self._executor.submit(self._send, self)

while not self._event.is_set():
self.discovery()
Expand All @@ -261,15 +263,14 @@ def __exit__(self, exception_type, exception_value, traceback):
self._event.set()

@staticmethod
def _send(self, logh):
def _send(self):
"""Encapsulate the logic/method of actually writing"""
while not self._event.is_set():
for msg in [self._out_queue.get()]:
logh.debug(f"_send(): calling stream.write({msg})")
self._stream_w.write(msg)

@staticmethod
def _recv(self, logh):
def _recv(self):
"""Encapsulate the logic/method of actually reading"""
while not self._event.is_set():
for message in self._stream_r:
Expand All @@ -285,7 +286,9 @@ def discovery(self):
self.poll()

def poll(self):
"""Main logic for the protocol, wait for messages, register peers, end when we have enough"""
"""Main logic for the protocol, wait for messages,
register peers, end when we have enough
"""
while not self._endit:
time.sleep(1 + random.randint(0, 3))

Expand Down Expand Up @@ -327,16 +330,18 @@ def end(self):
def produce(self, msg: str):
"""Put msg in the work queue"""
if not self._event.is_set() and not self._out_queue.full():
log.debug(f"produce(): queueing [{msg}] for kafka")
log.debug("produce(): queueing [{msg}] for kafka")
self._out_queue.put(str(msg))

def consume(self):
"""Get messages from the work queue"""
while not self._in_queue.empty():
message = self._in_queue.get()
log.debug(f"consume(): incoming message {message} from kafka")
log.debug("consume(): incoming message {message} from kafka")
yield message

def shutdown(self):
"""Stop disco"""
self._endit = True
self._event.set()

Expand All @@ -361,6 +366,6 @@ def watchdog_timeout():
watchdog.start()

with Disco(broker=BROKER, read_topic=READ_TOPIC, write_topic=WRITE_TOPIC) as disco:
log.debug(f"Peers: {disco.get_peerlist()}")
log.debug("Peers: {disco.get_peerlist()}")

watchdog.cancel()
13 changes: 7 additions & 6 deletions src/distributedlock/distributed/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ class InvalidHostURIError(Exception):
""" Host URI not provided error
"""

def genlockid() -> str:
""" Generate a unique lockid
"""
return str(uuid.uuid1().hex)

class DistributedLock:
"""
Expand All @@ -66,7 +70,7 @@ def __init__(self, mynode: str, peerlist: List, lockid: str, leader: Value):

if not lockid or lockid is None:
print("Warning! You should provide a lockid! Hint: try genlockid()")
lockid = self.genlockid()
lockid = genlockid()

self.lockid = lockid
# multiprocessing Value type, leader.value: 1 for leader, 0 for follower
Expand All @@ -80,10 +84,7 @@ def __init__(self, mynode: str, peerlist: List, lockid: str, leader: Value):

log.debug("DistributedLock.__init__(): myip {self.myip} peers {self.peers}")

def genlockid(self) -> str:
""" Generate a unique lockid
"""
return str(uuid.uuid1().hex)


def run(self):
"""
Expand Down Expand Up @@ -118,7 +119,7 @@ def run(self):
def stop(self) -> None:
""" Stop the PySyncObj protocol
"""
log.warn("{self.myip} ...Stopping.")
log.info("{self.myip} ...Stopping.")

self.setleaderstate(False)
self.lockmanager.release(self.lockid, sync=True)
Expand Down
8 changes: 4 additions & 4 deletions src/distributedlock/distributed/logger.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on @date
Created on @date
from https://python-tutorial.dev/201/tutorial/logging.html
Expand Down Expand Up @@ -29,11 +29,11 @@

snewslog = os.getenv('SNEWSLOG')
if snewslog:
log_file = Path(snewslog) / f"{HOST}.distributed_lock.log"
LOG_FILE = Path(snewslog) / f"{HOST}.distributed_lock.log"
else:
log_file = "distributed_lock.log"
LOG_FILE = "distributed_lock.log"

fh = FileHandler(log_file)
fh = FileHandler(LOG_FILE)

formatter = Formatter(f'%(asctime)s {HOST} %(levelname)s [%(name)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
Expand Down

0 comments on commit 6f2202d

Please sign in to comment.