From 1436bbba1ad0dee634e823bcaf73c083311dd9e9 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Tue, 24 Sep 2024 23:34:01 +0200 Subject: [PATCH] bugfix: remove superfluous repository.list() call Because it ended the loop only when .list() returned an empty result, this always needed one call more than necessary. We can also detect that we are finished, if .list() returns less than the limit we gave to it. Also: reduce code duplication by using repo_lister func. --- src/borg/archiver/compact_cmd.py | 20 ++---- src/borg/archiver/debug_cmd.py | 66 ++++++++----------- src/borg/cache.py | 26 +++----- src/borg/repository.py | 12 ++++ .../testsuite/archiver/repo_compress_cmd.py | 44 ++++++------- 5 files changed, 74 insertions(+), 94 deletions(-) diff --git a/src/borg/archiver/compact_cmd.py b/src/borg/archiver/compact_cmd.py index 8fa37e90ca..8f529d5445 100644 --- a/src/borg/archiver/compact_cmd.py +++ b/src/borg/archiver/compact_cmd.py @@ -10,7 +10,7 @@ from ..helpers import ProgressIndicatorPercent from ..manifest import Manifest from ..remote import RemoteRepository -from ..repository import Repository +from ..repository import Repository, repo_lister from ..logger import create_logger @@ -49,18 +49,12 @@ def garbage_collect(self): def get_repository_chunks(self) -> ChunkIndex: """Build a dict id -> size of all chunks present in the repository""" chunks = ChunkIndex() - marker = None - while True: - result = self.repository.list(limit=LIST_SCAN_LIMIT, marker=marker) - if not result: - break - marker = result[-1][0] - for id, stored_size in result: - # we add this id to the chunks index, using refcount == 0, because - # we do not know yet whether it is actually referenced from some archives. - # we "abuse" the size field here. usually there is the plaintext size, - # but we use it for the size of the stored object here. - chunks[id] = ChunkIndexEntry(refcount=0, size=stored_size) + for id, stored_size in repo_lister(self.repository, limit=LIST_SCAN_LIMIT): + # we add this id to the chunks index, using refcount == 0, because + # we do not know yet whether it is actually referenced from some archives. + # we "abuse" the size field here. usually there is the plaintext size, + # but we use it for the size of the stored object here. + chunks[id] = ChunkIndexEntry(refcount=0, size=stored_size) return chunks def save_chunk_index(self): diff --git a/src/borg/archiver/debug_cmd.py b/src/borg/archiver/debug_cmd.py index 874ef58df1..b3ddb6bfc4 100644 --- a/src/borg/archiver/debug_cmd.py +++ b/src/borg/archiver/debug_cmd.py @@ -15,7 +15,7 @@ from ..helpers import CommandError, RTError from ..manifest import Manifest from ..platform import get_process_id -from ..repository import Repository, LIST_SCAN_LIMIT +from ..repository import Repository, LIST_SCAN_LIMIT, repo_lister from ..repoobj import RepoObj from ._common import with_repository, Highlander @@ -130,15 +130,9 @@ def decrypt_dump(id, cdata): cdata = repository.get(id) key = key_factory(repository, cdata) repo_objs = RepoObj(key) - marker = None - while True: - result = repository.list(limit=LIST_SCAN_LIMIT, marker=marker) - if not result: - break - marker = result[-1][0] - for id, stored_size in result: - cdata = repository.get(id) - decrypt_dump(id, cdata) + for id, stored_size in repo_lister(repository, limit=LIST_SCAN_LIMIT): + cdata = repository.get(id) + decrypt_dump(id, cdata) print("Done.") @with_repository(manifest=False) @@ -177,38 +171,32 @@ def print_finding(info, wanted, data, offset): key = key_factory(repository, cdata) repo_objs = RepoObj(key) - marker = None last_data = b"" last_id = None i = 0 - while True: - result = repository.list(limit=LIST_SCAN_LIMIT, marker=marker) - if not result: - break - marker = result[-1][0] - for id, stored_size in result: - cdata = repository.get(id) - _, data = repo_objs.parse(id, cdata, ro_type=ROBJ_DONTCARE) - - # try to locate wanted sequence crossing the border of last_data and data - boundary_data = last_data[-(len(wanted) - 1) :] + data[: len(wanted) - 1] - if wanted in boundary_data: - boundary_data = last_data[-(len(wanted) - 1 + context) :] + data[: len(wanted) - 1 + context] - offset = boundary_data.find(wanted) - info = "%d %s | %s" % (i, last_id.hex(), id.hex()) - print_finding(info, wanted, boundary_data, offset) - - # try to locate wanted sequence in data - count = data.count(wanted) - if count: - offset = data.find(wanted) # only determine first occurrence's offset - info = "%d %s #%d" % (i, id.hex(), count) - print_finding(info, wanted, data, offset) - - last_id, last_data = id, data - i += 1 - if i % 10000 == 0: - print("%d objects processed." % i) + for id, stored_size in repo_lister(repository, limit=LIST_SCAN_LIMIT): + cdata = repository.get(id) + _, data = repo_objs.parse(id, cdata, ro_type=ROBJ_DONTCARE) + + # try to locate wanted sequence crossing the border of last_data and data + boundary_data = last_data[-(len(wanted) - 1) :] + data[: len(wanted) - 1] + if wanted in boundary_data: + boundary_data = last_data[-(len(wanted) - 1 + context) :] + data[: len(wanted) - 1 + context] + offset = boundary_data.find(wanted) + info = "%d %s | %s" % (i, last_id.hex(), id.hex()) + print_finding(info, wanted, boundary_data, offset) + + # try to locate wanted sequence in data + count = data.count(wanted) + if count: + offset = data.find(wanted) # only determine first occurrence's offset + info = "%d %s #%d" % (i, id.hex(), count) + print_finding(info, wanted, data, offset) + + last_id, last_data = id, data + i += 1 + if i % 10000 == 0: + print("%d objects processed." % i) print("Done.") @with_repository(manifest=False) diff --git a/src/borg/cache.py b/src/borg/cache.py index 28d4951ae0..70101fcea8 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -31,7 +31,7 @@ from .manifest import Manifest from .platform import SaveFile from .remote import RemoteRepository -from .repository import LIST_SCAN_LIMIT, Repository, StoreObjectNotFound +from .repository import LIST_SCAN_LIMIT, Repository, StoreObjectNotFound, repo_lister # chunks is a list of ChunkListEntry FileCacheEntry = namedtuple("FileCacheEntry", "age inode size ctime mtime chunks") @@ -680,22 +680,14 @@ def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immedi logger.debug("querying the chunk IDs list from the repo...") chunks = ChunkIndex() t0 = perf_counter() - num_requests = 0 num_chunks = 0 - marker = None - while True: - result = repository.list(limit=LIST_SCAN_LIMIT, marker=marker) - num_requests += 1 - if not result: - break - marker = result[-1][0] - # The repo says it has these chunks, so we assume they are referenced chunks. - # We do not care for refcounting anymore, so we just set refcount = MAX_VALUE. - # We do not know the plaintext size (!= stored_size), thus we set size = 0. - init_entry = ChunkIndexEntry(refcount=ChunkIndex.MAX_VALUE, size=0) - for id, stored_size in result: - num_chunks += 1 - chunks[id] = init_entry + # The repo says it has these chunks, so we assume they are referenced chunks. + # We do not care for refcounting anymore, so we just set refcount = MAX_VALUE. + # We do not know the plaintext size (!= stored_size), thus we set size = 0. + init_entry = ChunkIndexEntry(refcount=ChunkIndex.MAX_VALUE, size=0) + for id, stored_size in repo_lister(repository, limit=LIST_SCAN_LIMIT): + num_chunks += 1 + chunks[id] = init_entry # Cache does not contain the manifest. if not isinstance(repository, (Repository, RemoteRepository)): del chunks[Manifest.MANIFEST_ID] @@ -703,7 +695,7 @@ def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immedi # Chunk IDs in a list are encoded in 34 bytes: 1 byte msgpack header, 1 byte length, 32 ID bytes. # Protocol overhead is neglected in this calculation. speed = format_file_size(num_chunks * 34 / duration) - logger.debug(f"queried {num_chunks} chunk IDs in {duration} s ({num_requests} requests), ~{speed}/s") + logger.debug(f"queried {num_chunks} chunk IDs in {duration} s, ~{speed}/s") if cache_immediately: # immediately update cache/chunks, so we only rarely have to do it the slow way: write_chunkindex_to_repo_cache(repository, chunks, compact=False, clear=False, force_write=True) diff --git a/src/borg/repository.py b/src/borg/repository.py index 66db295c2e..c293358b00 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -18,6 +18,18 @@ logger = create_logger(__name__) +def repo_lister(repository, *, limit=None): + marker = None + finished = False + while not finished: + result = repository.list(limit=limit, marker=marker) + finished = (len(result) < limit) if limit is not None else (len(result) == 0) + if not finished: + marker = result[-1][0] + for id, stored_size in result: + yield id, stored_size + + class Repository: """borgstore based key value store""" diff --git a/src/borg/testsuite/archiver/repo_compress_cmd.py b/src/borg/testsuite/archiver/repo_compress_cmd.py index 9e29e8927c..1d06a14d59 100644 --- a/src/borg/testsuite/archiver/repo_compress_cmd.py +++ b/src/borg/testsuite/archiver/repo_compress_cmd.py @@ -1,7 +1,7 @@ import os from ...constants import * # NOQA -from ...repository import Repository +from ...repository import Repository, repo_lister from ...manifest import Manifest from ...compress import ZSTD, ZLIB, LZ4, CNONE from ...helpers import bin_to_hex @@ -15,30 +15,24 @@ def check_compression(ctype, clevel, olevel): repository = Repository(archiver.repository_path, exclusive=True) with repository: manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK) - marker = None - while True: - result = repository.list(limit=LIST_SCAN_LIMIT, marker=marker) - if not result: - break - marker = result[-1][0] - for id, _ in result: - chunk = repository.get(id, read_data=True) - meta, data = manifest.repo_objs.parse( - id, chunk, ro_type=ROBJ_DONTCARE - ) # will also decompress according to metadata - m_olevel = meta.get("olevel", -1) - m_psize = meta.get("psize", -1) - print(bin_to_hex(id), meta["ctype"], meta["clevel"], meta["csize"], meta["size"], m_olevel, m_psize) - # this is not as easy as one thinks due to the DecidingCompressor choosing the smallest of - # (desired compressed, lz4 compressed, not compressed). - assert meta["ctype"] in (ctype, LZ4.ID, CNONE.ID) - assert meta["clevel"] in (clevel, 255) # LZ4 and CNONE has level 255 - if olevel != -1: # we expect obfuscation - assert "psize" in meta - assert m_olevel == olevel - else: - assert "psize" not in meta - assert "olevel" not in meta + for id, _ in repo_lister(repository, limit=LIST_SCAN_LIMIT): + chunk = repository.get(id, read_data=True) + meta, data = manifest.repo_objs.parse( + id, chunk, ro_type=ROBJ_DONTCARE + ) # will also decompress according to metadata + m_olevel = meta.get("olevel", -1) + m_psize = meta.get("psize", -1) + print(bin_to_hex(id), meta["ctype"], meta["clevel"], meta["csize"], meta["size"], m_olevel, m_psize) + # this is not as easy as one thinks due to the DecidingCompressor choosing the smallest of + # (desired compressed, lz4 compressed, not compressed). + assert meta["ctype"] in (ctype, LZ4.ID, CNONE.ID) + assert meta["clevel"] in (clevel, 255) # LZ4 and CNONE has level 255 + if olevel != -1: # we expect obfuscation + assert "psize" in meta + assert m_olevel == olevel + else: + assert "psize" not in meta + assert "olevel" not in meta create_regular_file(archiver.input_path, "file1", size=1024 * 10) create_regular_file(archiver.input_path, "file2", contents=os.urandom(1024 * 10))