Skip to content

Commit

Permalink
Add a lazy is_sorted function to collections (#159)
Browse files Browse the repository at this point in the history
* Add a lazy is_sorted function to collections

* chore: make the type hint public

* chore: revert docs change necessary for Sphinx

* docs: fix one docs reference
  • Loading branch information
clintval authored Jul 15, 2024
1 parent fbe5923 commit 2261825
Show file tree
Hide file tree
Showing 3 changed files with 207 additions and 28 deletions.
113 changes: 86 additions & 27 deletions fgpyo/collections/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,24 @@
"""
# Functions for Working with Collections
# Custom Collections and Collection Functions
This module contains classes and functions for working with collections and iterators.
## Helpful Functions for Working with Collections
To test if an iterable is sorted or not:
```python
>>> from fgpyo.collections import is_sorted
>>> is_sorted([])
True
>>> is_sorted([1])
True
>>> is_sorted([1, 2, 2, 3])
True
>>> is_sorted([1, 2, 4, 3])
False
```
## Examples of a "Peekable" Iterator
"Peekable" iterators are useful to "peek" at the next item in an iterator without consuming it.
Expand All @@ -11,56 +27,86 @@
[`takewhile()`][fgpyo.collections.PeekableIterator.takewhile] and
[`dropwhile()`][fgpyo.collections.PeekableIterator.dropwhile] methods.
An empty peekable iterator throws StopIteration:
An empty peekable iterator throws a
[`StopIteration`](https://docs.python.org/3/library/exceptions.html#StopIteration):
```python
>>> from fgpyo.collections import PeekableIterator
>>> piter = PeekableIterator(iter([]))
>>> piter.peek()
StopIteration
>>> from fgpyo.collections import PeekableIterator
>>> piter = PeekableIterator(iter([]))
>>> piter.peek()
StopIteration
```
A peekable iterator will return the next item before consuming it.
```python
>>> piter = PeekableIterator([1, 2, 3])
>>> piter.peek()
1
>>> next(piter)
1
>>> [j for j in piter]
[2, 3]
>>> piter = PeekableIterator([1, 2, 3])
>>> piter.peek()
1
>>> next(piter)
1
>>> [j for j in piter]
[2, 3]
```
The `can_peek()` function can be used to determine if the iterator can be peeked without
StopIteration being thrown:
The [`can_peek()`][fgpyo.collections.PeekableIterator.can_peek] function can be used to determine if
the iterator can be peeked without a
[`StopIteration`](https://docs.python.org/3/library/exceptions.html#StopIteration) from being
thrown:
>>> piter = PeekableIterator([1])
>>> piter.peek() if piter.can_peek() else -1
1
>>> next(piter)
1
>>> piter.peek() if piter.can_peek() else -1
-1
>>> next(piter)
StopIteration
```python
>>> piter = PeekableIterator([1])
>>> piter.peek() if piter.can_peek() else -1
1
>>> next(piter)
1
>>> piter.peek() if piter.can_peek() else -1
-1
>>> next(piter)
StopIteration
```
`PeekableIterator`'s constructor supports creation from iterable objects as well as iterators.
[`PeekableIterator`][fgpyo.collections.PeekableIterator]'s constructor supports creation from
iterable objects as well as iterators.
"""

import sys
from operator import le
from typing import Any
from typing import Callable
from typing import Generic
from typing import Iterable
from typing import Iterator
from typing import List
from typing import Protocol
from typing import Tuple
from typing import TypeVar
from typing import Union

if sys.version_info[:2] >= (3, 10):
from itertools import pairwise as _pairwise
else:
# TODO: remove this branch when Python <3.10 support is dropped
def _pairwise(iterable: Iterable[Any]) -> Iterator[Tuple[Any, Any]]:
"""Return successive overlapping pairs taken from the input iterable."""
iterator = iter(iterable)
head = next(iterator, None)
for other in iterator:
yield head, other
head = other


class SupportsLessThanOrEqual(Protocol):
"""A structural type for objects that support less-than-or-equal comparison."""

def __le__(self, other: Any) -> bool: ...


IterType = TypeVar("IterType")

LessThanOrEqualType = TypeVar("LessThanOrEqualType", bound=SupportsLessThanOrEqual)
"""A type variable for an object that supports less-than-or-equal comparisons."""


class PeekableIterator(Generic[IterType], Iterator[IterType]):
"""A peekable iterator wrapping an iterator or iterable.
Expand Down Expand Up @@ -133,3 +179,16 @@ def dropwhile(self, pred: Callable[[IterType], bool]) -> "PeekableIterator[IterT
while self.can_peek() and pred(self._peek):
self.__update_peek()
return self


def is_sorted(iterable: Iterable[LessThanOrEqualType]) -> bool:
"""Tests lazily if an iterable of comparable objects is sorted or not.
Args:
iterable: An iterable of comparable objects.
Raises:
TypeError: If there is more than 1 element in ``iterable`` and any of the elements are not
comparable.
"""
return all(map(lambda pair: le(*pair), _pairwise(iterable)))
8 changes: 7 additions & 1 deletion mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,13 @@ plugins:
show_submodules: true
- table-reader
markdown_extensions:
- pymdownx.highlight
- pymdownx.highlight:
anchor_linenums: true
line_spans: __span
pygments_lang_class: true
- pymdownx.inlinehilite
- pymdownx.snippets
- pymdownx.superfences
- toc:
permalink: true
exclude_docs: |
Expand Down
114 changes: 114 additions & 0 deletions tests/fgpyo/collections/test_is_sorted.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
from functools import total_ordering
from typing import Any
from typing import List

import pytest

from fgpyo.collections import is_sorted


def test_is_sorted_empty_input() -> None:
"""Test is_sorted on a variety of empty collections."""
assert is_sorted(tuple())
assert is_sorted(list())
assert is_sorted(iter([]))
assert is_sorted(dict())


def test_is_sorted_on_single_element_collections() -> None:
"""Test is_sorted on collections with a single element."""
assert is_sorted((1,))
assert is_sorted([1])
assert is_sorted(iter([1]))
assert is_sorted({1: 1})


# NB: this 2-element test exists due to special handling for this case in "pairwise"
def test_is_sorted_on_correctly_sorted_two_element_collections() -> None:
"""Test is_sorted on collections with two correctly sorted elements."""
# two identical elements one after the other
assert is_sorted([1, 1])
assert is_sorted((1, 1))
assert is_sorted(iter([1, 1]))

# two elements monotonically increasing
assert is_sorted((1, 2))
assert is_sorted([1, 2])
assert is_sorted(iter([1, 2]))
assert is_sorted({1: 1, 2: 2})


# NB: this 2-element test exists due to special handling for this case in "pairwise"
def test_is_sorted_on_incorrectly_sorted_two_element_collections() -> None:
"""Test is_sorted on collections with two incorrectly sorted elements."""
assert not is_sorted((2, 1))
assert not is_sorted([2, 1])
assert not is_sorted(iter([2, 1]))
assert not is_sorted({2: 2, 1: 1})


def test_is_sorted_on_correctly_sorted_collections_with_more_than_two_elements() -> None:
"""Test is_sorted on sorted collections with more than two elements."""
# three identical elements one after the other
assert is_sorted([1, 1, 1])
assert is_sorted((1, 1, 1))
assert is_sorted(iter([1, 1, 1]))

# three elements monotonically increasing
assert is_sorted((1, 2, 3))
assert is_sorted([1, 2, 3])
assert is_sorted(iter([1, 2, 3]))
assert is_sorted({1: 1, 2: 2, 3: 3})


def test_is_sorted_on_incorrectly_sorted_collections_with_more_than_two_elements() -> None:
"""Test is_sorted on non-sorted collections with more than two elements."""
assert not is_sorted((1, 3, 2))
assert not is_sorted([1, 3, 2])
assert not is_sorted(iter([1, 3, 2]))
assert not is_sorted({1: 1, 3: 3, 2: 2})


def test_is_sorted_raises_on_non_comparable_objects() -> None:
"""Test is_sorted raises an exception on a collection containing non-comparable objects."""

class MyClass:
"""A test class that is not comparable but does have a comparable field."""

def __init__(self, field: int) -> None:
self.field = field

# NB: an exception is only raised when there are more than one objects
iterable: List[MyClass] = [MyClass(field=1), MyClass(field=2)]

with pytest.raises(TypeError):
# NB: the type ignore below checks that MyPy is aware the custom class is incorrectly typed
is_sorted(iterable) # type: ignore[type-var]


def test_is_sorted_on_custom_comparable_objects() -> None:
"""Test is_sorted on a custom collection containing comparable objects."""

@total_ordering
class MyClass:
"""A test class that is comparable by relying on a comparable field."""

def __init__(self, field: int) -> None:
self.field = field

def __eq__(self, other: Any) -> bool:
if isinstance(other, type(self)):
return self.field == other.field
return NotImplemented

def __le__(self, other: Any) -> bool:
if isinstance(other, type(self)):
return self.field <= other.field
return NotImplemented

# NB: comparisons only occur when there are more than one object in the iterable.
iterable_sorted: List[MyClass] = [MyClass(field=1), MyClass(field=2)]
iterable_unsorted: List[MyClass] = [MyClass(field=2), MyClass(field=1)]

assert is_sorted(iterable_sorted)
assert not is_sorted(iterable_unsorted)

0 comments on commit 2261825

Please sign in to comment.