Skip to content

Commit

Permalink
Improve rotation by line and add fileter for incremental rotation
Browse files Browse the repository at this point in the history
  • Loading branch information
lauraporta committed Oct 10, 2023
1 parent eed8414 commit 90d47b4
Showing 1 changed file with 162 additions and 23 deletions.
185 changes: 162 additions & 23 deletions derotation/analysis/derotation_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import numpy as np
import numpy.ma as ma
import tqdm
from fancylog import fancylog
from scipy.ndimage import rotate
from scipy.optimize import bisect
Expand Down Expand Up @@ -33,12 +34,15 @@ def __init__(self):
self.config,
self.direction,
self.path_to_dataset_folder,
self.filename,
) = get_data()
self.rot_deg = 360
self.assume_full_rotation = True

logging.info(
f"Dataset {self.config['paths']['dataset-folder']} loaded"
)
logging.info(f"Filename: {self.filename}")

def process_analog_signals(self):
# ===================================
Expand All @@ -53,9 +57,15 @@ def process_analog_signals(self):
self.expected_tiks_per_rotation = self.check_number_of_rotations(
self.rotation_increment
)
self.corrected_increments = self.adjust_rotation_increment(
self.rotation_increment
)
if self.assume_full_rotation:
self.corrected_increments = self.adjust_rotation_increment(
self.rotation_increment
)
else:
self.corrected_increments = (
self.adjust_rotation_increment_for_incremental_changes()
)
logging.info(f"Corrected increments: {self.corrected_increments}")

# ===================================
# Quantify the rotation for each line of each frame
Expand All @@ -64,10 +74,15 @@ def process_analog_signals(self):
self.lines_end,
self.threshold,
) = self.get_starting_and_ending_times(clock_type="line")
(
self.rot_deg_line,
self.signed_rotation_degrees_line,
) = self.find_rotation_for_each_line_from_motor()
if self.assume_full_rotation:
(
self.rot_deg_line,
self.signed_rotation_degrees_line,
) = self.find_rotation_for_each_line_from_motor()
else:
self.rot_deg_line = (
self.find_rotation_angles_by_line_in_incremental_rotation()
)

print("Analog signals processed")

Expand Down Expand Up @@ -176,6 +191,16 @@ def check_number_of_rotations(self, given_increment=0.2):

return expected_tiks_per_rotation

def adjust_rotation_increment_for_incremental_changes(
self, given_increment=0.2
):
total_ticks_number = len(self.rotation_ticks_peaks)

if total_ticks_number == self.expected_tiks_per_rotation:
return given_increment
else:
return self.rot_deg / total_ticks_number

def adjust_rotation_increment(self, given_increment=0.2):
increments_per_rotation = []
for i, (start, end) in enumerate(
Expand Down Expand Up @@ -244,6 +269,90 @@ def goodness_of_threshold(k, clock, target_len):
start = np.where(np.diff(clock) > threshold)[0]
return len(start) - target_len

def find_rotation_angles_by_frame_in_incremental_rotation(self):
frame_start, frame_end, threshold = self.get_starting_and_ending_times(
clock_type="frame"
)

rotation_increment_by_frame = np.zeros(len(self.image_stack))
total_rotation_of_this_frame = 0
for frame_id, start in enumerate(frame_start):
try:
ticks_in_this_frame = np.where(
np.logical_and(
self.rotation_ticks_peaks > start,
self.rotation_ticks_peaks < frame_start[frame_id + 1],
)
)[0].shape[0]
except IndexError:
ticks_in_this_frame = np.where(
np.logical_and(
self.rotation_ticks_peaks > start,
self.rotation_ticks_peaks < frame_end[-1],
)
)[0].shape[0]
total_rotation_of_this_frame += (
ticks_in_this_frame * self.corrected_increments
)

rotation_increment_by_frame[
frame_id
] = total_rotation_of_this_frame

return rotation_increment_by_frame

def find_rotation_angles_by_line_in_incremental_rotation(self):
# calculate the rotation degrees for each line
rotation_increment_by_line = np.zeros(len(self.image_stack) * 256)
total_rotation_of_this_line = 0
for line_id, start in enumerate(self.lines_start):
try:
ticks_in_this_line = np.where(
np.logical_and(
self.rotation_ticks_peaks > start,
self.rotation_ticks_peaks
< self.lines_start[line_id + 1],
)
)[0].shape[0]
except IndexError:
ticks_in_this_line = np.where(
np.logical_and(
self.rotation_ticks_peaks > start,
self.rotation_ticks_peaks < self.lines_end[-1],
)
)[0].shape[0]
total_rotation_of_this_line += (
ticks_in_this_line * self.corrected_increments
)

try:
rotation_increment_by_line[
line_id
] = total_rotation_of_this_line
except IndexError:
break

return rotation_increment_by_line

def roatate_by_frame_incremental(self):
new_rotated_image_stack = np.zeros_like(self.image_stack)

rotation_increment_by_frame = (
self.find_rotation_angles_by_frame_in_incremental_rotation()
)

for idx, frame in enumerate(self.image_stack):
new_rotated_image_stack[idx] = rotate(
frame,
rotation_increment_by_frame[idx],
reshape=False,
order=0,
mode="constant",
)
logging.info(f"Frame {idx} rotated")

return new_rotated_image_stack

def find_rotation_for_each_line_from_motor(self):
# calculate the rotation degrees for each line
rotation_degrees = np.empty_like(self.line_clock)
Expand All @@ -252,6 +361,7 @@ def find_rotation_for_each_line_from_motor(self):
tick_peaks_corrected = np.insert(
self.rotation_ticks_peaks, 0, 0, axis=0
)

for i in range(1, len(tick_peaks_corrected)):
try:
rotation_idx = np.where(
Expand All @@ -260,12 +370,22 @@ def find_rotation_for_each_line_from_motor(self):
except IndexError:
logging.warning("End of rotations reached")

increment = self.corrected_increments[rotation_idx]
if self.assume_full_rotation:
increment = self.corrected_increments[rotation_idx]
else:
increment = self.rotation_increment

time_interval = (
tick_peaks_corrected[i] - tick_peaks_corrected[i - 1]
)
if time_interval > 2000 and i != 0:
if (
(time_interval > 2000)
and (i != 0)
and self.assume_full_rotation
):
# we cannot trust the number of ticks
# to understand if a rotation is finished
# therefore we wait the rotation off signal
rotation_increment = 0
rotation_array = np.zeros(time_interval)
else:
Expand All @@ -279,7 +399,12 @@ def find_rotation_for_each_line_from_motor(self):
rotation_degrees[
tick_peaks_corrected[i - 1] : tick_peaks_corrected[i]
] = rotation_array
signed_rotation_degrees = rotation_degrees * self.rotation_on

if self.assume_full_rotation:
signed_rotation_degrees = rotation_degrees * self.rotation_on
else:
signed_rotation_degrees = rotation_degrees

image_rotation_degree_per_line = signed_rotation_degrees[
self.lines_start
]
Expand All @@ -294,25 +419,38 @@ def rotate_frames_line_by_line(self):
rotated_image_stack = copy.deepcopy(self.image_stack)
previous_image_completed = True
rotation_completed = True
for i, rotation in enumerate(self.rot_deg_line):

min_value_img = np.min(self.image_stack)

# use tqdm
for i, rotation in tqdm.tqdm(enumerate(self.rot_deg_line)):
line_counter = i % height
image_counter = i // height
is_rotating = np.absolute(rotation) > 0.00001
image_scanning_completed = line_counter == (height - 1)
rotation_just_finished = not is_rotating and (
np.absolute(self.rot_deg_line[i - 1]) > np.absolute(rotation)
)
if not self.assume_full_rotation and i == 0:
rotation_just_finished = False
else:
rotation_just_finished = not is_rotating and (
np.absolute(self.rot_deg_line[i - 1])
> np.absolute(rotation)
)

if is_rotating:
if rotation_completed and (line_counter != 0):
# starting a new rotation in the middle of the image
rotated_filled_image = self.image_stack[image_counter]
rotated_filled_image = (
np.ones_like(self.image_stack[image_counter])
* min_value_img
) # non sampled pixels are set to the min val of the image
rotated_filled_image[:line_counter] = self.image_stack[
image_counter
][:line_counter]
elif previous_image_completed:
# rotation in progress and new image to be rotated
# rotated_filled_image = np.zeros_like(
# image_stack[image_counter]
# )
rotated_filled_image = self.image_stack[image_counter]
rotated_filled_image = (
np.ones_like(self.image_stack[image_counter])
* min_value_img
)

rotation_completed = False

Expand All @@ -323,6 +461,7 @@ def rotate_frames_line_by_line(self):
image_with_only_line = np.zeros_like(img_with_new_lines)
image_with_only_line[line_counter] = line

# is the mask really useful if we rotate with order=0?
empty_image_mask = np.ones_like(img_with_new_lines)
empty_image_mask[line_counter] = 0

Expand Down Expand Up @@ -365,7 +504,7 @@ def rotate_frames_line_by_line(self):
rotated_image_stack[image_counter] = rotated_filled_image
previous_image_completed = True

logging.info("Image {} rotated".format(image_counter))
# logging.info("Image {} rotated".format(image_counter))

return rotated_image_stack

Expand All @@ -383,10 +522,10 @@ def add_circle_mask(rotated_image_stack):
return masked_img_array

def save(self, masked):
path = self.path_to_dataset_folder / "dertotated"
path = self.path_to_dataset_folder / "derotated"
path.mkdir(parents=True, exist_ok=True)
imsave(
path / "masked.tif",
path / "masked_increment_with_adjustments_no_background.tif",
np.array(masked),
)
logging.info(f"Masked image saved in {path}")

0 comments on commit 90d47b4

Please sign in to comment.