Skip to content

Commit

Permalink
Change linter to Ruff
Browse files Browse the repository at this point in the history
  • Loading branch information
Schamper committed Jul 26, 2024
1 parent 73aac2c commit 24d8895
Show file tree
Hide file tree
Showing 26 changed files with 259 additions and 198 deletions.
8 changes: 5 additions & 3 deletions dissect/util/compression/lz4.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from __future__ import annotations

import io
import struct
from typing import BinaryIO, Union
from typing import BinaryIO

from dissect.util.exceptions import CorruptDataError

Expand All @@ -23,8 +25,8 @@ def _get_length(src: BinaryIO, length: int) -> int:


def decompress(
src: Union[bytes, BinaryIO], max_length: int = -1, return_bytearray: bool = False, return_bytes_read: bool = False
) -> Union[bytes, tuple[bytes, int]]:
src: bytes | BinaryIO, max_length: int = -1, return_bytearray: bool = False, return_bytes_read: bool = False
) -> bytes | tuple[bytes, int]:
"""LZ4 decompress from a file-like object up to a certain length. Assumes no header.
Args:
Expand Down
8 changes: 5 additions & 3 deletions dissect/util/compression/lznt1.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# Reference: https://github.com/google/rekall/blob/master/rekall-core/rekall/plugins/filesystems/lznt1.py
from __future__ import annotations

import array
import io
import struct
from typing import BinaryIO, Union
from typing import BinaryIO


def _get_displacement(offset: int) -> int:
Expand All @@ -20,10 +22,10 @@ def _get_displacement(offset: int) -> int:
COMPRESSED_MASK = 1 << 15
SIGNATURE_MASK = 3 << 12
SIZE_MASK = (1 << 12) - 1
TAG_MASKS = [(1 << i) for i in range(0, 8)]
TAG_MASKS = [(1 << i) for i in range(8)]


def decompress(src: Union[bytes, BinaryIO]) -> bytes:
def decompress(src: bytes | BinaryIO) -> bytes:
"""LZNT1 decompress from a file-like object or bytes.
Args:
Expand Down
5 changes: 3 additions & 2 deletions dissect/util/compression/lzo.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
# - https://github.com/FFmpeg/FFmpeg/blob/master/libavutil/lzo.c
# - https://docs.kernel.org/staging/lzo.html
# - https://github.com/torvalds/linux/blob/master/lib/lzo/lzo1x_decompress_safe.c
from __future__ import annotations

import io
import struct
from typing import BinaryIO, Union
from typing import BinaryIO

MAX_READ_LENGTH = (1 << 32) - 1000

Expand All @@ -22,7 +23,7 @@ def _read_length(src: BinaryIO, val: int, mask: int) -> int:
return length + mask + val


def decompress(src: Union[bytes, BinaryIO], header: bool = True, buflen: int = -1) -> bytes:
def decompress(src: bytes | BinaryIO, header: bool = True, buflen: int = -1) -> bytes:
"""LZO decompress from a file-like object or bytes. Assumes no header.
Arguments are largely compatible with python-lzo API.
Expand Down
6 changes: 4 additions & 2 deletions dissect/util/compression/lzxpress.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
# Reference: [MS-XCA]
from __future__ import annotations

import io
import struct
from typing import BinaryIO, Union
from typing import BinaryIO


def decompress(src: Union[bytes, BinaryIO]) -> bytes:
def decompress(src: bytes | BinaryIO) -> bytes:
"""LZXPRESS decompress from a file-like object or bytes.
Args:
Expand Down
13 changes: 8 additions & 5 deletions dissect/util/compression/lzxpress_huffman.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
# https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-frs2/8cb5bae9-edf3-4833-9f0a-9d7e24218d3d
# https://winprotocoldoc.blob.core.windows.net/productionwindowsarchives/MS-XCA/[MS-XCA].pdf
from __future__ import annotations

import io
import struct
from collections import namedtuple
from typing import BinaryIO, Optional, Union
from typing import BinaryIO, NamedTuple

Symbol = namedtuple("Symbol", ["length", "symbol"])

class Symbol(NamedTuple):
length: int
symbol: int


def _read_16_bit(fh: BinaryIO) -> int:
Expand All @@ -16,7 +19,7 @@ def _read_16_bit(fh: BinaryIO) -> int:
class Node:
__slots__ = ("symbol", "is_leaf", "children")

def __init__(self, symbol: Optional[Symbol] = None, is_leaf: bool = False):
def __init__(self, symbol: Symbol | None = None, is_leaf: bool = False):
self.symbol = symbol
self.is_leaf = is_leaf
self.children = [None, None]
Expand Down Expand Up @@ -120,7 +123,7 @@ def decode(self, root: Node) -> Symbol:
return node.symbol


def decompress(src: Union[bytes, BinaryIO]) -> bytes:
def decompress(src: bytes | BinaryIO) -> bytes:
"""LZXPRESS decompress from a file-like object or bytes.
Decompresses until EOF of the input data.
Expand Down
8 changes: 5 additions & 3 deletions dissect/util/compression/sevenbit.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from __future__ import annotations

from io import BytesIO
from typing import BinaryIO, Union
from typing import BinaryIO


def compress(src: Union[bytes, BinaryIO]) -> bytes:
def compress(src: bytes | BinaryIO) -> bytes:
"""Sevenbit compress from a file-like object or bytes.
Args:
Expand Down Expand Up @@ -37,7 +39,7 @@ def compress(src: Union[bytes, BinaryIO]) -> bytes:
return bytes(dst)


def decompress(src: Union[bytes, BinaryIO], wide: bool = False) -> bytes:
def decompress(src: bytes | BinaryIO, wide: bool = False) -> bytes:
"""Sevenbit decompress from a file-like object or bytes.
Args:
Expand Down
16 changes: 9 additions & 7 deletions dissect/util/cpio.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import stat
import struct
import tarfile
Expand Down Expand Up @@ -38,7 +40,7 @@ class CpioInfo(tarfile.TarInfo):
"""

@classmethod
def fromtarfile(cls, tarfile: tarfile.TarFile) -> tarfile.TarInfo:
def fromtarfile(cls, tarfile: tarfile.TarFile) -> CpioInfo:
if tarfile.format not in (
FORMAT_CPIO_BIN,
FORMAT_CPIO_ODC,
Expand All @@ -64,7 +66,7 @@ def fromtarfile(cls, tarfile: tarfile.TarFile) -> tarfile.TarInfo:
return obj._proc_member(tarfile)

@classmethod
def frombuf(cls, buf: bytes, format: int, encoding: str, errors: str) -> tarfile.TarInfo:
def frombuf(cls, buf: bytes, format: int, encoding: str, errors: str) -> CpioInfo:
if format in (FORMAT_CPIO_BIN, FORMAT_CPIO_ODC, FORMAT_CPIO_HPBIN, FORMAT_CPIO_HPODC):
obj = cls._old_frombuf(buf, format)
elif format in (FORMAT_CPIO_NEWC, FORMAT_CPIO_CRC):
Expand All @@ -78,7 +80,7 @@ def frombuf(cls, buf: bytes, format: int, encoding: str, errors: str) -> tarfile
return obj

@classmethod
def _old_frombuf(cls, buf: bytes, format: int):
def _old_frombuf(cls, buf: bytes, format: int) -> CpioInfo:
if format in (FORMAT_CPIO_BIN, FORMAT_CPIO_HPBIN):
values = list(struct.unpack("<13H", buf))
if values[0] == _swap16(CPIO_MAGIC_OLD):
Expand Down Expand Up @@ -131,7 +133,7 @@ def _old_frombuf(cls, buf: bytes, format: int):
return obj

@classmethod
def _new_frombuf(cls, buf: bytes, format: int):
def _new_frombuf(cls, buf: bytes, format: int) -> CpioInfo:
values = struct.unpack("<6s8s8s8s8s8s8s8s8s8s8s8s8s8s", buf)
values = [int(values[0], 8)] + [int(v, 16) for v in values[1:]]
if values[0] not in (CPIO_MAGIC_NEW, CPIO_MAGIC_CRC):
Expand All @@ -157,7 +159,7 @@ def _new_frombuf(cls, buf: bytes, format: int):

return obj

def _proc_member(self, tarfile: tarfile.TarFile) -> tarfile.TarInfo:
def _proc_member(self, tarfile: tarfile.TarFile) -> CpioInfo | None:
self.name = tarfile.fileobj.read(self.namesize - 1).decode(tarfile.encoding, tarfile.errors)
if self.name == "TRAILER!!!":
# The last entry in a cpio file has the special name ``TRAILER!!!``, indicating the end of the archive
Expand Down Expand Up @@ -211,13 +213,13 @@ def _swap16(value: int) -> int:
return ((value & 0xFF) << 8) | (value >> 8)


def CpioFile(*args, **kwargs):
def CpioFile(*args, **kwargs) -> tarfile.TarFile:
"""Utility wrapper around ``tarfile.TarFile`` to easily open cpio archives."""
kwargs.setdefault("format", FORMAT_CPIO_UNKNOWN)
return tarfile.TarFile(*args, **kwargs, tarinfo=CpioInfo)


def open(*args, **kwargs):
def open(*args, **kwargs) -> tarfile.TarFile:
"""Utility wrapper around ``tarfile.open`` to easily open cpio archives."""
kwargs.setdefault("format", FORMAT_CPIO_UNKNOWN)
return tarfile.open(*args, **kwargs, tarinfo=CpioInfo)
2 changes: 1 addition & 1 deletion dissect/util/encoding/surrogateescape.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import codecs


def error_handler(error):
def error_handler(error: Exception) -> tuple[str, int]:

Check warning on line 4 in dissect/util/encoding/surrogateescape.py

View check run for this annotation

Codecov / codecov/patch

dissect/util/encoding/surrogateescape.py#L4

Added line #L4 was not covered by tests
if not isinstance(error, UnicodeDecodeError):
raise error

Expand Down
10 changes: 6 additions & 4 deletions dissect/util/feature.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from __future__ import annotations

import functools
import os
from enum import Enum
from typing import Callable, Optional
from typing import Callable, NoReturn


# Register feature flags in a central place to avoid chaos
Expand Down Expand Up @@ -43,7 +45,7 @@ def parse_blob():
return feature in feature_flags()


def feature(flag: Feature, alternative: Optional[Callable] = None) -> Callable:
def feature(flag: Feature, alternative: Callable | None = None) -> Callable:
"""Feature flag decorator allowing you to guard a function behind a feature flag.
Usage::
Expand All @@ -57,7 +59,7 @@ def my_func( ... ) -> ...

if alternative is None:

def alternative():
def alternative() -> NoReturn:
raise FeatureException(
"\n".join(
[
Expand All @@ -68,7 +70,7 @@ def alternative():
)
)

def decorator(func):
def decorator(func: Callable) -> Callable:
return func if feature_enabled(flag) else alternative

return decorator
Loading

0 comments on commit 24d8895

Please sign in to comment.