Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up focuser code #488

Merged
merged 1 commit into from
Jun 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 33 additions & 123 deletions src/huntsman/pocs/focuser/astromechanics.py
Original file line number Diff line number Diff line change
@@ -1,143 +1,53 @@
import os
import numpy as np

from panoptes.utils import error
from panoptes.utils.time import current_time

from panoptes.pocs.focuser.astromechanics import Focuser as AstromechFocuser
from panoptes.pocs.utils.plotting import make_autofocus_plot

from huntsman.pocs.utils.focus import AutofocusSequence
from huntsman.pocs.focuser.serial import HuntsmanSerialFocuser


class Focuser(AstromechFocuser):
class Focuser(AstromechFocuser, HuntsmanSerialFocuser):
def __init__(self, *args, **kwargs):
"""Initialize an AbstractSerialMount for the port defined in the config.
Opens a connection to the serial device, if it is valid.
"""
self._position = None
super().__init__(*args, **kwargs)

initial_position = kwargs.get("initial_position", None)
self.logger.debug(f"Initial position for {self}: {initial_position}")
if initial_position is not None:
self.position = initial_position
@HuntsmanSerialFocuser.position.getter
def position(self):
return int(self._position)

def _autofocus(self, *args, **kwargs):
focus_event = kwargs.pop("focus_event")
def move_to(self, new_position):
""" Override to use panoptes utils serial code. """
self._is_moving = True
try:
return self._run_autofocus(*args, **kwargs)
self._send_command(f'M{int(new_position):d}#')
self._position = new_position
finally:
if focus_event is not None:
focus_event.set()

def _run_autofocus(self, seconds, focus_range, focus_step, cutout_size, keep_files=False,
take_dark=True, coarse=False, make_plots=False, max_exposure_retries=3,
**kwargs):
"""
Focuses the camera using the specified merit function. Optionally performs
a coarse focus to find the approximate position of infinity focus, which
should be followed by a fine focus before observing.
Args:
seconds (scalar, optional): Exposure time for focus exposures, if not
specified will use value from config.
focus_range (2-tuple, optional): Coarse & fine focus sweep range, in
encoder units. Specify to override values from config.
focus_step (2-tuple, optional): Coarse & fine focus sweep steps, in
encoder units. Specify to override values from config.
cutout_size (int, optional): Size of square central region of image
to use, default 500 x 500 pixels.
keep_files (bool, optional): If True will keep all images taken
during focusing. If False (default) will delete all except the
first and last images from each focus run.
take_dark (bool, optional): If True will attempt to take a dark frame
before the focus run, and use it for dark subtraction and hot
pixel masking, default True.
coarse (bool, optional): Whether to perform a coarse focus, otherwise will perform
a fine focus. Default False.
make_plots (bool, optional): Whether to write focus plots to images folder. If not
given will fall back on value of `autofocus_make_plots` set on initialisation,
and if it wasn't set then will default to False.
blocking (bool, optional): Whether to block until autofocus complete, default False.
"""
start_time = start_time = current_time(flatten=True)
imagedir = os.path.join(self.camera.get_config('directories.images'), 'focus',
self.camera.uid, start_time)
initial_position = self.position

# Get focus range
idx = 1 if coarse else 0
position_step = focus_step[idx]
position_min = initial_position - focus_range[idx] / 2
position_max = initial_position + focus_range[idx] / 2

# Make sequence object
sequence = AutofocusSequence(position_min=position_min, position_max=position_max,
position_step=position_step, bit_depth=self.camera.bit_depth,
**kwargs)
# Add a dark exposure
if take_dark:
self.logger.info(f"Taking dark frame before autofocus on {self}.")
filename = os.path.join(imagedir, f"dark.{self.camera.file_extension}")
cutout = self.camera.get_cutout(seconds, filename, cutout_size, keep_file=keep_files,
dark=True)
sequence.dark_image = cutout

# Take the focusing exposures
exposure_retries = 0
while not sequence.is_finished:
self.logger.info(f"Autofocus status on {self}: {sequence.status}")

new_position = sequence.get_next_position()

basename = f"{new_position}-{sequence.exposure_idx:02d}.{self.camera.file_extension}"
filename = os.path.join(imagedir, basename)

# Move the focuser
self.move_to(new_position)

# Get the exposure cutout
try:
cutout = self.camera.get_cutout(seconds, filename, cutout_size,
keep_file=keep_files)
exposure_retries = 0 # Reset exposure retries
except error.PanError as err:
self.logger.warning(f"Exception encountered in get_cutout on {self}: {err!r}")

# Abort the sequence if max exposure retries is reached
exposure_retries += 1
if exposure_retries >= max_exposure_retries:
raise error.PanError(f"Max exposure retries reached during autofocus on"
f" {self}.")
self.logger.warning("Continuing with autofocus sequence after exposure error on"
f" {self}.")
continue

# Update the sequence
sequence.update(cutout, position=self.position)
# Focuser move commands block until the move is finished, so if the command has
# returned then the focuser is no longer moving.
self._is_moving = False

# Get the best position and move to it
best_position = sequence.best_position
best_position_actual = self.move_to(best_position)
self.logger.info(f"Best focus position for {self}: {best_position}")
self.logger.debug(f"Moved to encoder position {self.position}")
return self.position

if make_plots:
focus_type = "coarse" if coarse else "fine"
plot_filename = os.path.join(imagedir, f'{focus_type}-focus-{self.camera.uid}.png')
plot_title = f'{self} {focus_type} focus at {start_time}'
def move_by(self, *args, **kwargs):
""" Override to set position. """
self._position = super().move_by(*args, **kwargs)

metrics = sequence.metrics
focus_positions = sequence.positions
merit_function = sequence.merit_function_name
def _send_command(self, command):
""" Override method to use panoptes-utils code. """
if not self.is_connected:
self.logger.critical(f"Attempt to send command to {self} when not connected!")
return

initial_idx = np.argmin(abs(focus_positions - initial_position))
initial_cutout = sequence.images[initial_idx]
# Clear the input buffer in case there's anything left over in there.
self._serial.reset_input_buffer()

final_idx = np.argmin(abs(focus_positions - best_position))
final_cutout = sequence.images[final_idx]
# Send command
self._serial.write(command + '\r')

self.logger.info(f"Writing focus plot for {self} to {plot_filename}.")
make_autofocus_plot(plot_filename, initial_cutout, final_cutout, initial_position,
best_position_actual, focus_positions, metrics, merit_function,
plot_title=plot_title)
return self._serial.read()

return initial_position, best_position
def _move_zero(self):
""" Override to set position. """
super()._move_zero()
self._position = 0
179 changes: 50 additions & 129 deletions src/huntsman/pocs/focuser/birger.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,10 @@
""" Modified focuser to reconnect serial port on command error. """
import os
import numpy as np

from panoptes.utils import error
from panoptes.utils.time import current_time

from panoptes.pocs.focuser.birger import Focuser as BirgerFocuser
from panoptes.pocs.utils.plotting import make_autofocus_plot

from huntsman.pocs.utils.focus import AutofocusSequence
from huntsman.pocs.focuser.serial import HuntsmanSerialFocuser
from panoptes.pocs.focuser.birger import Focuser as BirgerFocuser, error_pattern, error_messages


class Focuser(BirgerFocuser):
class Focuser(BirgerFocuser, HuntsmanSerialFocuser):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

Expand All @@ -21,132 +14,60 @@ def reconnect(self):
self.__del__()
self.connect(port=self.port)

def _send_command(self, *args, **kwargs):
""" Try command, attempt to reconnect on error and send command again. """
try:
return super()._send_command(*args, **kwargs)
except error.PanError as err:
self.logger.warning(f"Focuser command failed with exception: {err!r}. Retrying after"
" reconnect.")
self.reconnect()
return super()._send_command(*args, **kwargs)

def _autofocus(self, *args, **kwargs):
focus_event = kwargs.pop("focus_event")
try:
return self._run_autofocus(*args, **kwargs)
finally:
if focus_event is not None:
focus_event.set()

def _run_autofocus(self, seconds, focus_range, focus_step, cutout_size, keep_files=False,
take_dark=True, coarse=False, make_plots=False, max_exposure_retries=3,
**kwargs):
def _send_command(self, command, *args, **kwargs):
"""
Focuses the camera using the specified merit function. Optionally performs
a coarse focus to find the approximate position of infinity focus, which
should be followed by a fine focus before observing.
Sends a command to the focuser adaptor and retrieves the response.
Args:
seconds (scalar, optional): Exposure time for focus exposures, if not
specified will use value from config.
focus_range (2-tuple, optional): Coarse & fine focus sweep range, in
encoder units. Specify to override values from config.
focus_step (2-tuple, optional): Coarse & fine focus sweep steps, in
encoder units. Specify to override values from config.
cutout_size (int, optional): Size of square central region of image
to use, default 500 x 500 pixels.
keep_files (bool, optional): If True will keep all images taken
during focusing. If False (default) will delete all except the
first and last images from each focus run.
take_dark (bool, optional): If True will attempt to take a dark frame
before the focus run, and use it for dark subtraction and hot
pixel masking, default True.
coarse (bool, optional): Whether to perform a coarse focus, otherwise will perform
a fine focus. Default False.
make_plots (bool, optional): Whether to write focus plots to images folder. If not
given will fall back on value of `autofocus_make_plots` set on initialisation,
and if it wasn't set then will default to False.
blocking (bool, optional): Whether to block until autofocus complete, default False.
command (string): command string to send (without newline), e.g. 'fa1000', 'pf'
Returns:
list: possibly empty list containing the '\r' terminated lines of the response from the
adaptor.
"""
start_time = start_time = current_time(flatten=True)
imagedir = os.path.join(self.camera.get_config('directories.images'), 'focus',
self.camera.uid, start_time)
initial_position = self.position

# Get focus range
idx = 1 if coarse else 0
position_step = focus_step[idx]
position_min = max(self.min_position, initial_position - focus_range[idx] / 2)
position_max = min(self.max_position, initial_position + focus_range[idx] / 2)

# Make sequence object
sequence = AutofocusSequence(position_min=position_min, position_max=position_max,
position_step=position_step, bit_depth=self.camera.bit_depth,
**kwargs)
# Add a dark exposure
if take_dark:
self.logger.info(f"Taking dark frame before autofocus on {self}.")
filename = os.path.join(imagedir, f"dark.{self.camera.file_extension}")
cutout = self.camera.get_cutout(seconds, filename, cutout_size, keep_file=keep_files,
dark=True)
sequence.dark_image = cutout

# Take the focusing exposures
exposure_retries = 0
while not sequence.is_finished:
self.logger.info(f"Autofocus status on {self}: {sequence.status}")

new_position = sequence.get_next_position()
if not self.is_connected:
self.logger.critical("Attempt to send command to {} when not connected!".format(self))
return

basename = f"{new_position}-{sequence.exposure_idx:02d}.{self.camera.file_extension}"
filename = os.path.join(imagedir, basename)
# Success variable to verify that the command sent is read by the focuser.
success = False

# Move the focuser
self.move_to(new_position)
for i in range(self._max_command_retries):
# Clear the input buffer in case there's anything left over in there.
self._serial.reset_input_buffer()

# Get the exposure cutout
try:
cutout = self.camera.get_cutout(seconds, filename, cutout_size,
keep_file=keep_files)
exposure_retries = 0 # Reset exposure retries
except error.PanError as err:
self.logger.warning(f"Exception encountered in get_cutout on {self}: {err!r}")
# Send the command
self._serial.write(command + '\r')
raw_response = self._serial.read().rstrip().split("\r")

# Abort the sequence if max exposure retries is reached
exposure_retries += 1
if exposure_retries >= max_exposure_retries:
raise error.PanError(f"Max exposure retries reached during autofocus on"
f" {self}.")
self.logger.warning("Continuing with autofocus sequence after exposure error on"
f" {self}.")
# In verbose mode adaptor will first echo the command
echo = raw_response[0]
if echo != command:
self.logger.warning(f'echo != command: {echo!r} != {command!r}. Retrying command.')
continue

# Update the sequence
sequence.update(cutout, position=self.position)

# Get the best position and move to it
best_position = sequence.best_position
best_position_actual = self.move_to(best_position)
self.logger.info(f"Best focus position for {self}: {best_position}")

if make_plots:
focus_type = "coarse" if coarse else "fine"
plot_filename = os.path.join(imagedir, f'{focus_type}-focus-{self.camera.uid}.png')
plot_title = f'{self} {focus_type} focus at {start_time}'

metrics = sequence.metrics
focus_positions = sequence.positions
merit_function = sequence.merit_function_name

initial_idx = np.argmin(abs(focus_positions - initial_position))
initial_cutout = sequence.images[initial_idx]

final_idx = np.argmin(abs(focus_positions - best_position))
final_cutout = sequence.images[final_idx]

self.logger.info(f"Writing focus plot for {self} to {plot_filename}.")
make_autofocus_plot(plot_filename, initial_cutout, final_cutout, initial_position,
best_position_actual, focus_positions, metrics, merit_function,
plot_title=plot_title)
# Adaptor should then send 'OK', even if there was an error.
ok = raw_response[1]
if ok != 'OK':
self.logger.warning(f"ok != 'OK': {ok!r} != 'OK'. Retrying command.")
continue

return initial_position, best_position
# Depending on which command was sent there may or may not be any further response.
response = raw_response[2:]
success = True
break

if not success:
raise error.PanError(f'Failed command {command!r} on {self}')

# Check for an error message in response
if response:
# Not an empty list.
error_match = error_pattern.match(response[0])
if error_match:
# Got an error message! Translate it.
try:
error_message = error_messages[int(error_match.group())]
self.logger.error(f"{self} returned error message '{error_message}'!")
except Exception:
self.logger.error(f"Unknown error '{error_match.group()}' from {self}!")

return response
Loading