Skip to content

Commit

Permalink
speculos: Fasten graphics handling
Browse files Browse the repository at this point in the history
This changes allow to reduce app-boilerplate stax tests run time from
50s to 18s on my setup.
Note that this could be further reduced to 10s if using ragger
configuration BACKEND_SCOPE="session".
  • Loading branch information
Xavier Chapron committed Feb 23, 2024
1 parent 3a3439f commit a841439
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 76 deletions.
92 changes: 48 additions & 44 deletions speculos/mcu/display.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io
from abc import ABC, abstractmethod
from functools import cache
from PIL import Image
from socket import socket
from typing import Any, Dict, IO, List, Optional, Tuple, Union
Expand Down Expand Up @@ -54,35 +55,6 @@ def can_read(self, screen: DisplayNotifier) -> None:
}


def _screenshot_to_iobytes_value(screen_size, data):
image = Image.frombytes("RGB", screen_size, data)
iobytes = io.BytesIO()
image.save(iobytes, format="PNG")
return iobytes.getvalue()


class Screenshot:
def __init__(self, screen_size: Tuple[int, int]):
self.pixels: Dict[Tuple[int, int], int] = {}
self.width, self.height = screen_size
for y in range(0, self.height):
for x in range(0, self.width):
self.pixels[(x, y)] = 0x000000

def update(self, pixels: Dict[Tuple[int, int], int]) -> None:
# Don't call update, replace the object instead
self.pixels = {**self.pixels, **pixels}

def get_image(self) -> Tuple[Tuple[int, int], bytes]:
# Get the pixels object once, as it may be replaced during the loop.
data = bytearray(self.width * self.height * 3)
for y in range(0, self.height):
for x in range(0, self.width):
pos = 3 * (y * self.width + x)
data[pos:pos + 3] = self.pixels[(x, y)].to_bytes(3, "big")
return (self.width, self.height), bytes(data)


class FrameBuffer:
"""
A class responsible for managing the graphic screen of the current application.
Expand All @@ -99,34 +71,67 @@ class FrameBuffer:

def __init__(self, model: str):
self.pixels: Dict[Tuple[int, int], int] = {}
self.screenshot_pixels: Dict[Tuple[int, int], int] = {}
self.default_color = 0
self.draw_default_color = False
self._public_screenshot_value = b''
self.current_data = b''
self.recreate_public_screenshot = True
self.model = model
self.current_screen_size = MODELS[model].screen_size
self.screenshot = Screenshot(self.current_screen_size)
# Init published content now, don't wait for the first request
if self.model == "stax":
self.update_public_screenshot()
self._width, self._height = MODELS[model].screen_size

def draw_point(self, x: int, y: int, color: int) -> None:
@cache
def check_color(self, color: int) -> int:
# There are only 2 colors on the Nano S and the Nano X but the one
# passed in argument isn't always valid. Fix it here.
if self.model != 'stax':
if color != 0x000000:
color = FrameBuffer.COLORS.get(self.model, color)
self.pixels[(x, y)] = color
return color

def draw_point(self, x: int, y: int, color: int) -> None:
self.pixels[(x, y)] = self.check_color(color)

def draw_horizontal_line(self, x0: int, y: int, width: int, color: int) -> None:
for x in range(x0, x0 + width):
self.pixels[(x, y)] = self.check_color(color)

def draw_rect(self, x0: int, y0: int, width: int, height: int, color: int) -> None:
color = self.check_color(color)

if x0 == 0 and y0 == 0 and width == self._width and height == self._height:
self.default_color = color
self.draw_default_color = True
self.pixels = {}
self.screenshot_pixels = {}
return

for x in range(x0, x0 + width):
for y in range(y0, y0 + height):
self.pixels[(x, y)] = color

def _get_image(self) -> bytes:
data = bytearray(self.default_color.to_bytes(3, "big")) * self._width * self._height
for (x, y), color in self.screenshot_pixels.items():
pos = 3 * (y * self._width + x)
data[pos:pos + 3] = color.to_bytes(3, "big")
return bytes(data)

def _get_screenshot_iobytes_value(self) -> bytes:
# Get the pixels object once, as it may be replaced during the loop.
data = self._get_image()

def screenshot_update_pixels(self):
# Update the screenshot object with our current pixels content
self.screenshot.update(self.pixels)
image = Image.frombytes("RGB", self.current_screen_size, data)
iobytes = io.BytesIO()
image.save(iobytes, format="PNG")
return iobytes.getvalue()

def take_screenshot(self) -> Tuple[Tuple[int, int], bytes]:
self.current_screen_size, self.current_data = self.screenshot.get_image()
return self.current_screen_size, self.current_data
return self.current_screen_size, self._get_image()

def update_screenshot(self) -> None:
self.screenshot.update(self.pixels)
self.screenshot_pixels = {**self.screenshot_pixels, **self.pixels}

def update_public_screenshot(self) -> None:
# Stax only
Expand All @@ -140,7 +145,7 @@ def public_screenshot_value(self) -> bytes:
# and not necessary if no one tries to read the value
if self.recreate_public_screenshot:
self.recreate_public_screenshot = False
self._public_screenshot_value = _screenshot_to_iobytes_value(self.current_screen_size, self.current_data)
self._public_screenshot_value = self._get_screenshot_iobytes_value()

return self._public_screenshot_value

Expand All @@ -152,8 +157,7 @@ def get_public_screenshot(self) -> bytes:
return self.public_screenshot_value
# On nano we have no knowledge of screen refreshes so we can't be scarce on publishes
# So we publish the raw current content every time. It's ok as take_screenshot is fast on Nano
screen_size, data = self.take_screenshot()
return _screenshot_to_iobytes_value(screen_size, data)
return self._get_screenshot_iobytes_value()

# Should be declared as an `@abstractmethod` (and also `class FrameBuffer(ABC):`), but in this
# case multiple inheritance in `screen.PaintWidget(FrameBuffer, QWidget)` will break, as both
Expand Down
5 changes: 3 additions & 2 deletions speculos/mcu/headless.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,16 @@ def update(self,
_1: Optional[int] = None,
_2: Optional[int] = None,
_3: Optional[int] = None) -> bool:
if self.pixels:
if self.pixels or self.draw_default_color:
self._redraw()
self.pixels = {}
self.draw_default_color = False
return True
return False

def _redraw(self) -> None:
if self.vnc:
self.vnc.redraw(self.pixels)
self.vnc.redraw(self.pixels, self.default_color)
self.update_screenshot()


Expand Down
20 changes: 11 additions & 9 deletions speculos/mcu/nbgl.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import sys
from construct import Struct, Int8ul, Int16ul
from enum import IntEnum
from functools import cache
from typing import Tuple

from .display import FrameBuffer, GraphicLibrary
Expand Down Expand Up @@ -30,6 +31,7 @@ class NbglColor(IntEnum):
class NBGL(GraphicLibrary):

@staticmethod
@cache
def to_screen_color(color: int, bpp: int) -> int:
color_table = {
1: 0xFFFFFF,
Expand Down Expand Up @@ -57,9 +59,7 @@ def __assert_area(self, area) -> None:
def hal_draw_rect(self, data: bytes) -> None:
area = nbgl_area_t.parse(data)
self.__assert_area(area)
for x in range(area.x0, area.x0+area.width):
for y in range(area.y0, area.y0+area.height):
self.fb.draw_point(x, y, NBGL.to_screen_color(area.color, 2))
self.fb.draw_rect(area.x0, area.y0, area.width, area.height, NBGL.to_screen_color(area.color, 2))

def refresh(self, data: bytes) -> bool:
area = nbgl_area_t.parse(data)
Expand All @@ -74,19 +74,21 @@ def hal_draw_line(self, data: bytes) -> None:

back_color = NBGL.to_screen_color(area.color, 2)
front_color = NBGL.to_screen_color(color, 2)
for x in range(area.x0, area.x0+area.width):
for y in range(area.y0, area.y0+area.height):
if (mask >> (y-area.y0)) & 0x1:
self.fb.draw_point(x, y, front_color)
else:
self.fb.draw_point(x, y, back_color)

for y in range(area.y0, area.y0+area.height):
if (mask >> (y-area.y0)) & 0x1:
self.fb.draw_horizontal_line(area.x0, y, area.width, front_color)
else:
self.fb.draw_horizontal_line(area.x0, y, area.width, back_color)

@staticmethod
@cache
def get_color_from_color_map(color, color_map, bpp):
# #define GET_COLOR_MAP(__map__,__col__) ((__map__>>(__col__*2))&0x3)
return NBGL.to_screen_color((color_map >> (color*2)) & 0x3, bpp)

@staticmethod
@cache
def get_4bpp_color_from_color_index(index, front_color, back_color):
COLOR_MAPS_4BPP = {
# Manually hardcoced color maps
Expand Down
8 changes: 6 additions & 2 deletions speculos/mcu/screen.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ def __init__(self, parent, model: str, pixel_size: int, vnc: Optional[VNC] = Non
self.vnc = vnc

def paintEvent(self, event: QEvent):
if self.pixels:
if self.pixels or self.draw_default_color:
pixmap = QPixmap(self.size() / self.pixel_size)
pixmap.fill(Qt.white)
painter = QPainter(pixmap)
painter.drawPixmap(0, 0, self.mPixmap)
self._redraw(painter)
self.mPixmap = pixmap
self.pixels = {}
self.draw_default_color = False

qp = QPainter(self)
copied_pixmap = self.mPixmap
Expand All @@ -60,12 +61,15 @@ def update(self, # type: ignore[override]
return self.pixels != {}

def _redraw(self, qp):
if self.draw_default_color:
qp.fillRect(0, 0, self._width, self._height, QColor.fromRgb(self.default_color))

for (x, y), color in self.pixels.items():
qp.setPen(QColor.fromRgb(color))
qp.drawPoint(x, y)

if self.vnc is not None:
self.vnc.redraw(self.pixels)
self.vnc.redraw(self.pixels, self.default_color)

self.update_screenshot()

Expand Down
7 changes: 2 additions & 5 deletions speculos/mcu/seproxyhal.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,11 +322,8 @@ def can_read(self, screen: DisplayNotifier):
if self.refreshed:
self.refreshed = False

# Run the OCR
# Update the screenshot, we'll upload its associated events shortly
screen.display.gl.update_screenshot()
screen.display.gl.take_screenshot()

# Publish the new screenshot, we'll upload its associated events shortly
screen.display.gl.update_public_screenshot()

if screen.display.model != "stax" and screen.display.screen_update():
Expand Down Expand Up @@ -423,7 +420,7 @@ def can_read(self, screen: DisplayNotifier):
screen.display.gl.refresh(data)
# Stax only
# We have refreshed the screen, remember it for the next time we have SephTag.GENERAL_STATUS
# then we'll perform a new OCR and make public the resulting screenshot / OCR analysis
# then we'll perform a screen update and make public the resulting screenshot
self.refreshed = True

elif tag == SephTag.NBGL_DRAW_LINE:
Expand Down
30 changes: 16 additions & 14 deletions speculos/mcu/vnc.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ def __init__(self,
verbose: bool = False):
self.logger = logging.getLogger("vnc")

width, height = screen_size
self.width, self.height = screen_size
path = os.path.dirname(os.path.realpath(__file__))
server = os.path.join(path, '../resources/vnc_server')
cmd = [server]

# custom options
cmd += ['-s', f'{width}x{height}']
cmd += ['-s', f'{self.width}x{self.height}']
if verbose:
cmd += ['-v']

Expand All @@ -48,23 +48,25 @@ def file(self) -> IO[bytes]:
assert self.subprocess.stdout is not None
return self.subprocess.stdout

def redraw(self, pixels):
def redraw(self, pixels, default_color):
'''The framebuffer was updated, forward everything to the VNC server.'''

# int.to_bytes() is super slow, hence the manual encoding
buf = bytearray(len(pixels) * 9)
i = 0
for (x, y), color in pixels.items():
buf[i + 0] = y & 0xff
buf[i + 1] = (y >> 8) & 0xff
buf[i + 2] = x & 0xff
buf[i + 3] = (x >> 8) & 0xff
buf[i + 4] = color & 0xff
buf[i + 5] = (color >> 8) & 0xff
buf[i + 6] = (color >> 16) & 0xff
buf[i + 7] = (color >> 24) & 0xff
buf[i + 8] = 0x0a
i += 9
for x in range(0, self._width):
for y in range(0, self._height):
color = pixels.get((x, y), default_color)
buf[i + 0] = y & 0xff
buf[i + 1] = (y >> 8) & 0xff
buf[i + 2] = x & 0xff
buf[i + 3] = (x >> 8) & 0xff
buf[i + 4] = color & 0xff
buf[i + 5] = (color >> 8) & 0xff
buf[i + 6] = (color >> 16) & 0xff
buf[i + 7] = (color >> 24) & 0xff
buf[i + 8] = 0x0a
i += 9

self.subprocess.stdin.write(buf)
self.subprocess.stdin.flush()
Expand Down

0 comments on commit a841439

Please sign in to comment.