diff --git a/control/deciphonctl/presser.py b/control/deciphonctl/presser.py index 3738213..631e107 100644 --- a/control/deciphonctl/presser.py +++ b/control/deciphonctl/presser.py @@ -43,11 +43,11 @@ def _press(self, hmmfile: Path, req: PressRequest): db = req.db with PressContext(hmm, gencode=db.gencode, epsilon=db.epsilon) as press: self._qout.put(JobUpdate.run(req.job_id, 0).model_dump_json()) - with ProgressLogger(press.nproteins, str(hmmfile)) as progress: - for x in [press] * press.nproteins: + with ProgressLogger(str(hmmfile)) as progress: + for i, x in enumerate([press] * press.nproteins): x.next() - progress.consume() - perc = int(round(progress.percent)) + progress.percent = (100 * (i + 1)) // press.nproteins + perc = progress.percent self._qout.put(JobUpdate.run(req.job_id, perc).model_dump_json()) normalise_file_permissions(dcpfile) return dcpfile diff --git a/control/deciphonctl/progress.py b/control/deciphonctl/progress.py new file mode 100644 index 0000000..b8f0b77 --- /dev/null +++ b/control/deciphonctl/progress.py @@ -0,0 +1,38 @@ +from threading import Event, Thread +from time import sleep + +from deciphon_core.scan import Scan + +from deciphonctl.models import JobUpdate +from deciphonctl.progress_logger import ProgressLogger +from deciphonctl.sched import Sched + + +class Progress: + def __init__(self, desc: str, scan: Scan, sched: Sched, job_id: int): + self._logger = ProgressLogger(desc) + self._continue = Event() + self._scan = scan + self._sched = sched + self._job_id = job_id + self._thread = Thread(target=self.progress_entry) + + def start(self): + self._logger.start() + self._thread.start() + + def progress_entry(self): + self._logger.percent = last_percent = 0 + JobUpdate.run(self._job_id, last_percent).model_dump_json() + while not self._continue.is_set(): + percent = self._scan.progress() + if last_percent != percent: + self._logger.percent = last_percent = percent + msg = JobUpdate.run(self._job_id, last_percent).model_dump_json() + self._sched.job_patch(JobUpdate.model_validate_json(msg)) + sleep(1.05) + + def stop(self): + self._continue.set() + self._thread.join() + self._logger.stop() diff --git a/control/deciphonctl/progress_logger.py b/control/deciphonctl/progress_logger.py index 7aebf70..60753c5 100644 --- a/control/deciphonctl/progress_logger.py +++ b/control/deciphonctl/progress_logger.py @@ -4,40 +4,36 @@ class ProgressLogger: - def __init__(self, total: int, desc: str): - self._total = total - self._consumed = 0 + def __init__(self, desc: str): + self._percent: int = 0 self._desc = desc self._last_time = 0.0 - def __enter__(self): + def start(self): self._last_time = time.monotonic() - 10.0 - return self - def __exit__(self, *_): + def stop(self): self._print() - if self._consumed == self._total: + if self._percent == 100: logger.info(f"{self._desc}: done!") - def _print(self): - logger.info(f"{self._desc}: {self.percent:06.2f}% completed") + def __enter__(self): + self.start() + return self - @property - def total(self): - return self._total + def __exit__(self, *_): + self.stop() - @property - def consumed(self): - return self._consumed + def _print(self): + logger.info(f"{self._desc}: {self.percent}% completed") @property def percent(self): - if self.total == 0: - return 100.0 - return 100.0 * (self._consumed / self._total) + return self._percent - def consume(self): - self._consumed += 1 + @percent.setter + def percent(self, x: int): + self._percent = x elapsed = time.monotonic() - self._last_time if elapsed > 5: self._print() diff --git a/control/deciphonctl/scanner.py b/control/deciphonctl/scanner.py index 778e65b..bc694ef 100644 --- a/control/deciphonctl/scanner.py +++ b/control/deciphonctl/scanner.py @@ -17,20 +17,12 @@ unique_temporary_file, ) from deciphonctl.models import ScanRequest +from deciphonctl.progress import Progress from deciphonctl.progress_informer import ProgressInformer from deciphonctl.sched import Sched from deciphonctl.settings import Settings from deciphonctl.worker import worker_loop -# def sequence_iterator(seqs: list[Sequence], job_id: int, desc: str, qout: JoinableQueue): -# qout.put(JobUpdate.run(job_id, 0).model_dump_json()) -# with ProgressLogger(len(seqs), desc) as progress: -# for seq in seqs: -# yield seq -# progress.consume() -# perc = int(round(progress.percent)) -# qout.put(JobUpdate.run(job_id, perc).model_dump_json()) - class Scanner(Consumer): def __init__( @@ -71,10 +63,13 @@ def callback(self, message: str): logger.info(f"scan parameters: {params}") scan = Scan(params, db) with scan: + bar = Progress("scan", scan, self._sched, x.job_id) + bar.start() scan.dial(daemon.port) for seq in x.seqs: scan.add(Sequence(seq.id, seq.name, seq.data)) scan.run(snap) + bar.stop() logger.info( "Scan has finished successfully and " f"results stored in '{snap.path}'."