From 4ab8d1fe7d84c62c735521096a337d5ab07bfe0a Mon Sep 17 00:00:00 2001 From: Wilfred Tyler Gee Date: Fri, 24 May 2024 10:31:12 -1000 Subject: [PATCH] Fastapi deprecation 1208 (#1276) * FastAPI startup events This was added a few months ago but seems to have since been clobbered. Testing again. * Missing global * Use objects * Passing the lifespan to the FastAPI constructor * Make lifespan async * Tying it all back together. * Warning for no status instead of error. * More missing values * Converting weather station fastapi service. * Namespace conflict * Remove old `on_startup` * Replace repeat_every * Replace repeat_every with simple thread * Join the threads. * Less verbose * Adding host info for working with remote weather stations on cli --- src/panoptes/pocs/sensor/power.py | 67 +++++++++++-------- src/panoptes/pocs/utils/cli/weather.py | 30 +++++++-- src/panoptes/pocs/utils/service/power.py | 57 ++++++++++------ src/panoptes/pocs/utils/service/weather.py | 78 ++++++++++++++-------- 4 files changed, 157 insertions(+), 75 deletions(-) diff --git a/src/panoptes/pocs/sensor/power.py b/src/panoptes/pocs/sensor/power.py index 2f882fda1..b503b5e50 100644 --- a/src/panoptes/pocs/sensor/power.py +++ b/src/panoptes/pocs/sensor/power.py @@ -2,19 +2,18 @@ from contextlib import suppress from dataclasses import dataclass from enum import IntEnum -from typing import Optional, Dict, List, Callable from functools import partial -import pandas as pd -from panoptes.utils import error - -from streamz.dataframe import PeriodicDataFrame +from typing import Optional, Dict, List, Callable +import pandas as pd from astropy import units as u - +from panoptes.utils import error from panoptes.utils.serial.device import find_serial_port, SerialDevice from panoptes.utils.serializers import to_json, from_json -from panoptes.pocs.base import PanBase from panoptes.utils.time import current_time +from streamz.dataframe import PeriodicDataFrame + +from panoptes.pocs.base import PanBase class PinState(IntEnum): @@ -106,7 +105,8 @@ def __init__(self, dataframe_period: int = 1, mean_interval: Optional[int] = 5, arduino_board_name: str = 'power_board', - *args, **kwargs): + *args, **kwargs + ): """Initialize the power board. The `relays` should be a dictionary with the relay name as key and a @@ -146,11 +146,12 @@ def __init__(self, self.logger.debug(f'Setting up Power board connection for {name=} on {self.port}') self._ignore_readings = 5 - self.arduino_board = SerialDevice(port=self.port, - serial_settings=dict(baudrate=9600), - reader_callback=reader_callback, - name=arduino_board_name - ) + self.arduino_board = SerialDevice( + port=self.port, + serial_settings=dict(baudrate=9600), + reader_callback=reader_callback, + name=arduino_board_name + ) self.relays: List[Relay] = list() self.relay_labels: Dict[str, Relay] = dict() @@ -164,8 +165,10 @@ def __init__(self, self.dataframe = None if dataframe_period is not None: - self.dataframe = PeriodicDataFrame(interval=f'{dataframe_period}s', - datafn=self.to_dataframe) + self.dataframe = PeriodicDataFrame( + interval=f'{dataframe_period}s', + datafn=self.to_dataframe + ) self._mean_interval = mean_interval @@ -174,6 +177,12 @@ def __init__(self, @property def status(self): readings = self.readings + if not readings: + self.logger.warning( + 'No readings available. ' + 'If system just started please wait a moment.' + ) + return {} status = { r.name: dict(label=r.label, state=r.state.name, reading=readings[r.label]) for r in self.relays @@ -188,6 +197,9 @@ def readings(self): """Return the rolling mean of the readings. """ time_start = (current_time() - self._mean_interval * u.second).to_datetime() df = self.to_dataframe()[time_start:] + if len(df) == 0: + return {} + values = df.mean().astype('int').to_dict() # Add the most recent ac_ok and battery_low check. @@ -255,11 +267,12 @@ def setup_relays(self, relays: Dict[str, dict]): # Create relay object. self.logger.debug(f'Creating {relay_label=} for {relay_config!r}') - relay = Relay(name=relay_name, - label=relay_config.get('label', ''), - relay_index=relay_index, - default_state=default_state - ) + relay = Relay( + name=relay_name, + label=relay_config.get('label', ''), + relay_index=relay_index, + default_state=default_state + ) # Add convenience methods on the relay itself. setattr(relay, 'turn_on', partial(self.turn_on, relay.label)) @@ -309,7 +322,7 @@ def default_reader_callback(self, data): # Check we got a valid reading. if len(data[relay_key]) != len(TruckerRelayIndex) \ - and len(data[values_key]) != len(TruckerRelayIndex): + and len(data[values_key]) != len(TruckerRelayIndex): self.logger.debug('Did not get a full valid reading') return @@ -336,11 +349,13 @@ def __str__(self): return f'{self.name} [{relay_states}]' def __repr__(self): - return to_json({ - 'name': self.name, - 'port': self.port, - 'relays': [dict(name=r.name, label=r.label, state=r.state.name) for r in self.relays], - }) + return to_json( + { + 'name': self.name, + 'port': self.port, + 'relays': [dict(name=r.name, label=r.label, state=r.state.name) for r in self.relays], + } + ) @classmethod def lookup_port(cls, vendor_id=0x2341, product_id=0x0043, **kwargs): diff --git a/src/panoptes/pocs/utils/cli/weather.py b/src/panoptes/pocs/utils/cli/weather.py index 92e21f80c..04c38ba3a 100644 --- a/src/panoptes/pocs/utils/cli/weather.py +++ b/src/panoptes/pocs/utils/cli/weather.py @@ -1,22 +1,44 @@ import subprocess +from dataclasses import dataclass import requests import typer from rich import print + +@dataclass +class HostInfo: + host: str = 'localhost' + port: str = '6566' + + @property + def url(self): + return f'http://{self.host}:{self.port}' + + app = typer.Typer() +@app.callback() +def common(context: typer.Context, + host: str = typer.Option('localhost', help='Weather station host address.'), + port: str = typer.Option('6566', help='Weather station port.'), + ): + context.obj = HostInfo(host=host, port=port) + + @app.command(name='status', help='Get the status of the weather station.') -def status(page='status', base_url='http://localhost:6566'): +def status(context: typer.Context, page='status'): """Get the status of the weather station.""" - print(get_page(page, base_url)) + url = context.obj.url + print(get_page(page, url)) @app.command(name='config', help='Get the configuration of the weather station.') -def config(page='config', base_url='http://localhost:6566'): +def config(context: typer.Context, page='config'): """Get the configuration of the weather station.""" - print(get_page(page, base_url)) + url = context.obj.url + print(get_page(page, url)) def get_page(page, base_url): diff --git a/src/panoptes/pocs/utils/service/power.py b/src/panoptes/pocs/utils/service/power.py index 81c386665..26b29a192 100644 --- a/src/panoptes/pocs/utils/service/power.py +++ b/src/panoptes/pocs/utils/service/power.py @@ -1,9 +1,11 @@ +import time +from contextlib import asynccontextmanager from enum import auto +from threading import Thread from typing import Union from fastapi import FastAPI from fastapi_utils.enums import StrEnum -from fastapi_utils.tasks import repeat_every from panoptes.utils.config.client import get_config from pydantic import BaseModel @@ -20,37 +22,54 @@ class RelayCommand(BaseModel): command: RelayAction -app = FastAPI() -power_board: PowerBoard -conf = get_config('environment.power', {}) +app_objects = {} -@app.on_event('startup') -async def startup(): - global power_board - power_board = PowerBoard(**get_config('environment.power', {})) - print(f'Power board setup: {power_board}') +@asynccontextmanager +async def lifespan(app: FastAPI): + """Context manager for the lifespan of the app. + + This will connect to the power board and record readings at a regular interval. + """ + conf: dict = get_config('environment.power', {}) + power_board = PowerBoard(**conf) + power_board.logger.info(f'Power board setup: {power_board}') + app_objects['power_board'] = power_board + app_objects['conf'] = conf + + # Set up a thread to record the readings at an interval. + def record_readings(): + """Record the current readings in the db.""" + record_interval = conf.get('record_interval', 60) + power_board.logger.info(f'Setting up power recording {record_interval=}') + while True: + time.sleep(record_interval) + power_board.record(collection_name='power') + + # Create a thread to record the readings at an interval. + power_thread = Thread(target=record_readings) + power_thread.daemon = True + power_thread.start() + + yield + power_board.logger.info('Shutting down power board, please wait.') + power_thread.join() -@app.on_event('startup') -@repeat_every(seconds=conf.get('record_interval', 60), wait_first=True) -def record_readings(): - """Record the current readings in the db.""" - global power_board - return power_board.record(collection_name='power') +app = FastAPI(lifespan=lifespan) @app.get('/') async def root(): """Returns the power board status.""" - global power_board + power_board = app_objects['power_board'] return power_board.status @app.get('/readings') async def readings(): """Return the current readings as a dict.""" - global power_board + power_board = app_objects['power_board'] return power_board.to_dataframe().to_dict() @@ -63,7 +82,7 @@ def control_relay(relay_command: RelayCommand): @app.get('/relay/{relay}/control/{command}') def control_relay_url(relay: Union[int, str], command: str = 'turn_on'): """Control a relay via a GET request""" - return do_command(RelayCommand(relay=relay, command=command)) + return do_command(RelayCommand(relay=relay, command=RelayAction(command))) def do_command(relay_command: RelayCommand): @@ -72,7 +91,7 @@ def do_command(relay_command: RelayCommand): This function performs the actual relay control and is used by both request types. """ - global power_board + power_board = app_objects['power_board'] relay_id = relay_command.relay try: relay = power_board.relay_labels[relay_id] diff --git a/src/panoptes/pocs/utils/service/weather.py b/src/panoptes/pocs/utils/service/weather.py index 04e73a9e1..c25c8a8e2 100644 --- a/src/panoptes/pocs/utils/service/weather.py +++ b/src/panoptes/pocs/utils/service/weather.py @@ -1,62 +1,88 @@ +import os +import time +from contextlib import asynccontextmanager, suppress +from threading import Thread + from fastapi import FastAPI -from fastapi_utils.tasks import repeat_every from panoptes.utils.config.client import get_config from serial.tools.list_ports import comports as get_comports from panoptes.pocs.sensor.weather import WeatherStation -app = FastAPI() -weather_station: WeatherStation -conf = get_config('environment.weather', {}) +app_objects = {} + +@asynccontextmanager +async def lifespan(app: FastAPI): + """Context manager for the lifespan of the app. + + This will connect to the weather station and record + readings at a regular interval. + """ + conf = get_config('environment.weather', {}) + app_objects['conf'] = conf -@app.on_event('startup') -async def startup(): - global weather_station - global conf - - print(f'Weather config: {conf}') - # Get list of possible ports for auto-detect or use the configured port. if conf.get('auto_detect', False) is True: ports = [p.device for p in get_comports()] else: ports = [conf['serial_port']] - + + # Check the ioptron symlink and skip that port if it exists. + ioptron_port = None + with suppress(FileNotFoundError): + ioptron_port = os.readlink('/dev/ioptron') + + weather_thread: Thread = None + # Try to connect to the weather station. for port in ports: if 'ttyUSB' not in port: continue - + + if port == ioptron_port: + continue + conf['serial_port'] = port try: weather_station = WeatherStation(**conf) + weather_station.logger.info(f'Weather station setup: {weather_station}') + + def record_readings(): + """Record the current readings in the db.""" + record_interval = conf.get('record_interval', 60) + weather_station.logger.info(f'Setting up weather recording {record_interval=}') + while True: + time.sleep(record_interval) + weather_station.record() + + # Create a thread to record the readings at an interval + weather_thread = Thread(target=record_readings) + weather_thread.daemon = True + weather_thread.start() + + app_objects['weather_station'] = weather_station break except Exception as e: print(f'Could not connect to weather station on {port}: {e}') else: raise RuntimeError('Could not connect to weather station.') + yield + weather_station.logger.info('Shutting down weather station, please wait') + weather_thread.join() + -@app.on_event('startup') -@repeat_every(seconds=conf.get('capture_delay', 60), wait_first=True) -def record_readings(): - """Record the current readings in the db.""" - global weather_station - reading = weather_station.record() - print(f'Recorded weather reading: {reading}') - return reading +app = FastAPI(lifespan=lifespan) @app.get('/status') async def status(): """Returns the power board status.""" - global weather_station - return weather_station.status + return app_objects['weather_station'].status @app.get('/config') -async def get_config(): +async def get_ws_config(): """Returns the power board status.""" - global weather_station - return weather_station.weather_station.config + return app_objects['weather_station']