diff --git a/derotation/analysis/derotation_pipeline.py b/derotation/analysis/derotation_pipeline.py index 1dc55ce..6e37864 100644 --- a/derotation/analysis/derotation_pipeline.py +++ b/derotation/analysis/derotation_pipeline.py @@ -5,6 +5,7 @@ import numpy as np import numpy.ma as ma +import tqdm from fancylog import fancylog from scipy.ndimage import rotate from scipy.optimize import bisect @@ -33,12 +34,15 @@ def __init__(self): self.config, self.direction, self.path_to_dataset_folder, + self.filename, ) = get_data() self.rot_deg = 360 + self.assume_full_rotation = True logging.info( f"Dataset {self.config['paths']['dataset-folder']} loaded" ) + logging.info(f"Filename: {self.filename}") def process_analog_signals(self): # =================================== @@ -53,9 +57,15 @@ def process_analog_signals(self): self.expected_tiks_per_rotation = self.check_number_of_rotations( self.rotation_increment ) - self.corrected_increments = self.adjust_rotation_increment( - self.rotation_increment - ) + if self.assume_full_rotation: + self.corrected_increments = self.adjust_rotation_increment( + self.rotation_increment + ) + else: + self.corrected_increments = ( + self.adjust_rotation_increment_for_incremental_changes() + ) + logging.info(f"Corrected increments: {self.corrected_increments}") # =================================== # Quantify the rotation for each line of each frame @@ -64,10 +74,15 @@ def process_analog_signals(self): self.lines_end, self.threshold, ) = self.get_starting_and_ending_times(clock_type="line") - ( - self.rot_deg_line, - self.signed_rotation_degrees_line, - ) = self.find_rotation_for_each_line_from_motor() + if self.assume_full_rotation: + ( + self.rot_deg_line, + self.signed_rotation_degrees_line, + ) = self.find_rotation_for_each_line_from_motor() + else: + self.rot_deg_line = ( + self.find_rotation_angles_by_line_in_incremental_rotation() + ) print("Analog signals processed") @@ -176,6 +191,16 @@ def check_number_of_rotations(self, given_increment=0.2): return expected_tiks_per_rotation + def adjust_rotation_increment_for_incremental_changes( + self, given_increment=0.2 + ): + total_ticks_number = len(self.rotation_ticks_peaks) + + if total_ticks_number == self.expected_tiks_per_rotation: + return given_increment + else: + return self.rot_deg / total_ticks_number + def adjust_rotation_increment(self, given_increment=0.2): increments_per_rotation = [] for i, (start, end) in enumerate( @@ -244,6 +269,90 @@ def goodness_of_threshold(k, clock, target_len): start = np.where(np.diff(clock) > threshold)[0] return len(start) - target_len + def find_rotation_angles_by_frame_in_incremental_rotation(self): + frame_start, frame_end, threshold = self.get_starting_and_ending_times( + clock_type="frame" + ) + + rotation_increment_by_frame = np.zeros(len(self.image_stack)) + total_rotation_of_this_frame = 0 + for frame_id, start in enumerate(frame_start): + try: + ticks_in_this_frame = np.where( + np.logical_and( + self.rotation_ticks_peaks > start, + self.rotation_ticks_peaks < frame_start[frame_id + 1], + ) + )[0].shape[0] + except IndexError: + ticks_in_this_frame = np.where( + np.logical_and( + self.rotation_ticks_peaks > start, + self.rotation_ticks_peaks < frame_end[-1], + ) + )[0].shape[0] + total_rotation_of_this_frame += ( + ticks_in_this_frame * self.corrected_increments + ) + + rotation_increment_by_frame[ + frame_id + ] = total_rotation_of_this_frame + + return rotation_increment_by_frame + + def find_rotation_angles_by_line_in_incremental_rotation(self): + # calculate the rotation degrees for each line + rotation_increment_by_line = np.zeros(len(self.image_stack) * 256) + total_rotation_of_this_line = 0 + for line_id, start in enumerate(self.lines_start): + try: + ticks_in_this_line = np.where( + np.logical_and( + self.rotation_ticks_peaks > start, + self.rotation_ticks_peaks + < self.lines_start[line_id + 1], + ) + )[0].shape[0] + except IndexError: + ticks_in_this_line = np.where( + np.logical_and( + self.rotation_ticks_peaks > start, + self.rotation_ticks_peaks < self.lines_end[-1], + ) + )[0].shape[0] + total_rotation_of_this_line += ( + ticks_in_this_line * self.corrected_increments + ) + + try: + rotation_increment_by_line[ + line_id + ] = total_rotation_of_this_line + except IndexError: + break + + return rotation_increment_by_line + + def roatate_by_frame_incremental(self): + new_rotated_image_stack = np.zeros_like(self.image_stack) + + rotation_increment_by_frame = ( + self.find_rotation_angles_by_frame_in_incremental_rotation() + ) + + for idx, frame in enumerate(self.image_stack): + new_rotated_image_stack[idx] = rotate( + frame, + rotation_increment_by_frame[idx], + reshape=False, + order=0, + mode="constant", + ) + logging.info(f"Frame {idx} rotated") + + return new_rotated_image_stack + def find_rotation_for_each_line_from_motor(self): # calculate the rotation degrees for each line rotation_degrees = np.empty_like(self.line_clock) @@ -252,6 +361,7 @@ def find_rotation_for_each_line_from_motor(self): tick_peaks_corrected = np.insert( self.rotation_ticks_peaks, 0, 0, axis=0 ) + for i in range(1, len(tick_peaks_corrected)): try: rotation_idx = np.where( @@ -260,12 +370,22 @@ def find_rotation_for_each_line_from_motor(self): except IndexError: logging.warning("End of rotations reached") - increment = self.corrected_increments[rotation_idx] + if self.assume_full_rotation: + increment = self.corrected_increments[rotation_idx] + else: + increment = self.rotation_increment time_interval = ( tick_peaks_corrected[i] - tick_peaks_corrected[i - 1] ) - if time_interval > 2000 and i != 0: + if ( + (time_interval > 2000) + and (i != 0) + and self.assume_full_rotation + ): + # we cannot trust the number of ticks + # to understand if a rotation is finished + # therefore we wait the rotation off signal rotation_increment = 0 rotation_array = np.zeros(time_interval) else: @@ -279,7 +399,12 @@ def find_rotation_for_each_line_from_motor(self): rotation_degrees[ tick_peaks_corrected[i - 1] : tick_peaks_corrected[i] ] = rotation_array - signed_rotation_degrees = rotation_degrees * self.rotation_on + + if self.assume_full_rotation: + signed_rotation_degrees = rotation_degrees * self.rotation_on + else: + signed_rotation_degrees = rotation_degrees + image_rotation_degree_per_line = signed_rotation_degrees[ self.lines_start ] @@ -294,25 +419,38 @@ def rotate_frames_line_by_line(self): rotated_image_stack = copy.deepcopy(self.image_stack) previous_image_completed = True rotation_completed = True - for i, rotation in enumerate(self.rot_deg_line): + + min_value_img = np.min(self.image_stack) + + # use tqdm + for i, rotation in tqdm.tqdm(enumerate(self.rot_deg_line)): line_counter = i % height image_counter = i // height is_rotating = np.absolute(rotation) > 0.00001 image_scanning_completed = line_counter == (height - 1) - rotation_just_finished = not is_rotating and ( - np.absolute(self.rot_deg_line[i - 1]) > np.absolute(rotation) - ) + if not self.assume_full_rotation and i == 0: + rotation_just_finished = False + else: + rotation_just_finished = not is_rotating and ( + np.absolute(self.rot_deg_line[i - 1]) + > np.absolute(rotation) + ) if is_rotating: if rotation_completed and (line_counter != 0): # starting a new rotation in the middle of the image - rotated_filled_image = self.image_stack[image_counter] + rotated_filled_image = ( + np.ones_like(self.image_stack[image_counter]) + * min_value_img + ) # non sampled pixels are set to the min val of the image + rotated_filled_image[:line_counter] = self.image_stack[ + image_counter + ][:line_counter] elif previous_image_completed: - # rotation in progress and new image to be rotated - # rotated_filled_image = np.zeros_like( - # image_stack[image_counter] - # ) - rotated_filled_image = self.image_stack[image_counter] + rotated_filled_image = ( + np.ones_like(self.image_stack[image_counter]) + * min_value_img + ) rotation_completed = False @@ -323,6 +461,7 @@ def rotate_frames_line_by_line(self): image_with_only_line = np.zeros_like(img_with_new_lines) image_with_only_line[line_counter] = line + # is the mask really useful if we rotate with order=0? empty_image_mask = np.ones_like(img_with_new_lines) empty_image_mask[line_counter] = 0 @@ -365,7 +504,7 @@ def rotate_frames_line_by_line(self): rotated_image_stack[image_counter] = rotated_filled_image previous_image_completed = True - logging.info("Image {} rotated".format(image_counter)) + # logging.info("Image {} rotated".format(image_counter)) return rotated_image_stack @@ -383,10 +522,10 @@ def add_circle_mask(rotated_image_stack): return masked_img_array def save(self, masked): - path = self.path_to_dataset_folder / "dertotated" + path = self.path_to_dataset_folder / "derotated" path.mkdir(parents=True, exist_ok=True) imsave( - path / "masked.tif", + path / "masked_increment_with_adjustments_no_background.tif", np.array(masked), ) logging.info(f"Masked image saved in {path}")