Skip to content

Commit

Permalink
New ACCOUNT_FREEZER_STATE DB segment. New BonsaiArchiveFreezer to lis…
Browse files Browse the repository at this point in the history
…ten for blocks and move account state to new DB segment

Signed-off-by: Matthew Whitehead <[email protected]>
  • Loading branch information
matthew1001 committed Aug 20, 2024
1 parent 67fa46f commit 7d4a524
Show file tree
Hide file tree
Showing 7 changed files with 299 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.core.config.Configurator;
import org.hyperledger.besu.ethereum.worldstate.ImmutableDataStorageConfiguration;
import org.rocksdb.RocksDBException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.hyperledger.besu.ethereum.trie.diffbased.bonsai.BonsaiWorldStateProvider;
import org.hyperledger.besu.ethereum.trie.diffbased.bonsai.cache.BonsaiCachedMerkleTrieLoader;
import org.hyperledger.besu.ethereum.trie.diffbased.bonsai.storage.BonsaiWorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.trie.diffbased.bonsai.worldview.BonsaiArchiveFreezer;
import org.hyperledger.besu.ethereum.trie.diffbased.common.trielog.TrieLogManager;
import org.hyperledger.besu.ethereum.trie.diffbased.common.trielog.TrieLogPruner;
import org.hyperledger.besu.ethereum.trie.forest.ForestWorldStateArchive;
Expand Down Expand Up @@ -752,6 +753,19 @@ public BesuController build() {
trieLogManager.subscribe(trieLogPruner);
}

// TODO - do we want a flag to turn this on and off?
if (DataStorageFormat.BONSAI_ARCHIVE.equals(dataStorageConfiguration.getDataStorageFormat())) {
final BonsaiWorldStateKeyValueStorage worldStateKeyValueStorage =
worldStateStorageCoordinator.getStrategy(BonsaiWorldStateKeyValueStorage.class);
final BonsaiArchiveFreezer archiveFreezer =
createBonsaiArchiveFreezer(
worldStateKeyValueStorage,
blockchain,
scheduler,
((BonsaiWorldStateProvider) worldStateArchive).getTrieLogManager());
blockchain.observeBlockAdded(archiveFreezer);
}

final List<Closeable> closeables = new ArrayList<>();
closeables.add(protocolContext.getWorldStateArchive());
closeables.add(storageProvider);
Expand Down Expand Up @@ -818,6 +832,23 @@ private TrieLogPruner createTrieLogPruner(
return trieLogPruner;
}

private BonsaiArchiveFreezer createBonsaiArchiveFreezer(
final WorldStateKeyValueStorage worldStateStorage,
final Blockchain blockchain,
final EthScheduler scheduler,
final TrieLogManager trieLogManager) {
final BonsaiArchiveFreezer archiveFreezer =
new BonsaiArchiveFreezer(
(BonsaiWorldStateKeyValueStorage) worldStateStorage,
blockchain,
scheduler::executeServiceTask,
10,
trieLogManager);
archiveFreezer.initialize();

return archiveFreezer;
}

/**
* Create synchronizer synchronizer.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public Optional<Bytes> getFlatCode(
// use getNearest() with an account key that is suffixed by the block context
final Optional<Bytes> codeFound =
storage
.getNearestTo(CODE_STORAGE, keyNearest)
.getNearestBefore(CODE_STORAGE, keyNearest)
// return empty when we find a "deleted value key"
.filter(
found ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/
package org.hyperledger.besu.ethereum.trie.diffbased.bonsai.storage.flat;

import static org.hyperledger.besu.ethereum.storage.keyvalue.KeyValueSegmentIdentifier.ACCOUNT_FREEZER_STATE;
import static org.hyperledger.besu.ethereum.storage.keyvalue.KeyValueSegmentIdentifier.ACCOUNT_INFO_STATE;
import static org.hyperledger.besu.ethereum.storage.keyvalue.KeyValueSegmentIdentifier.ACCOUNT_STORAGE_STORAGE;

Expand Down Expand Up @@ -50,7 +51,7 @@ public ArchiveFlatDbStrategy(

static final byte[] MAX_BLOCK_SUFFIX = Bytes.ofUnsignedLong(Long.MAX_VALUE).toArrayUnsafe();
static final byte[] MIN_BLOCK_SUFFIX = Bytes.ofUnsignedLong(0L).toArrayUnsafe();
static final byte[] DELETED_ACCOUNT_VALUE = new byte[0];
public static final byte[] DELETED_ACCOUNT_VALUE = new byte[0];
public static final byte[] DELETED_CODE_VALUE = new byte[0];
static final byte[] DELETED_STORAGE_VALUE = new byte[0];

Expand All @@ -60,15 +61,15 @@ public Optional<Bytes> getFlatAccount(
final NodeLoader nodeLoader,
final Hash accountHash,
final SegmentedKeyValueStorage storage) {
getAccountCounter.inc();

getAccountCounter.inc();
// keyNearest, use MAX_BLOCK_SUFFIX in the absence of a block context:
Bytes keyNearest = calculateArchiveKeyWithMaxSuffix(context, accountHash.toArrayUnsafe());

// use getNearest() with an account key that is suffixed by the block context
final Optional<Bytes> accountFound =
storage
.getNearestTo(ACCOUNT_INFO_STATE, keyNearest)
.getNearestBefore(ACCOUNT_INFO_STATE, keyNearest)
// return empty when we find a "deleted value key"
.filter(
found ->
Expand All @@ -80,10 +81,30 @@ public Optional<Bytes> getFlatAccount(

if (accountFound.isPresent()) {
getAccountFoundInFlatDatabaseCounter.inc();
return accountFound;
} else {
getAccountNotFoundInFlatDatabaseCounter.inc();
// Check the frozen state as old state is moved out of the primary DB segment
final Optional<Bytes> frozenAccountFound =
storage
.getNearestBefore(ACCOUNT_FREEZER_STATE, keyNearest)
// return empty when we find a "deleted value key"
.filter(
found ->
!Arrays.areEqual(
DELETED_ACCOUNT_VALUE, found.value().orElse(DELETED_ACCOUNT_VALUE)))
// don't return accounts that do not have a matching account hash
.filter(found -> accountHash.commonPrefixLength(found.key()) >= accountHash.size())
.flatMap(SegmentedKeyValueStorage.NearestKeyValue::wrapBytes);

if (frozenAccountFound.isPresent()) {
// TODO - different metric for frozen lookups?
getAccountFoundInFlatDatabaseCounter.inc();
} else {
getAccountNotFoundInFlatDatabaseCounter.inc();
}

return frozenAccountFound;
}
return accountFound;
}

/*
Expand Down Expand Up @@ -132,7 +153,7 @@ public Optional<Bytes> getFlatStorageValueByStorageSlotKey(
// use getNearest() with a key that is suffixed by the block context
final Optional<Bytes> storageFound =
storage
.getNearestTo(ACCOUNT_STORAGE_STORAGE, keyNearest)
.getNearestBefore(ACCOUNT_STORAGE_STORAGE, keyNearest)
// return empty when we find a "deleted value key"
.filter(
found ->
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.trie.diffbased.bonsai.worldview;

import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.chain.BlockAddedEvent;
import org.hyperledger.besu.ethereum.chain.BlockAddedObserver;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.trie.diffbased.common.storage.DiffBasedWorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.trie.diffbased.common.trielog.TrieLogManager;
import org.hyperledger.besu.plugin.services.trielogs.TrieLog;

import java.util.Comparator;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.TreeMultimap;
import org.apache.tuweni.bytes.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class manages the "freezing" of historic state that is still needed to satisfy queries but
* doesn't need to be in the main DB segment for. Doing so would degrade block-import performance
* over time so we move state beyond a certain age (in blocks) to other DB segments, assuming there
* is a more recent (i.e. changed) version of the state. If state is created once and never changed
* it will remain in the primary DB segment(s).
*/
public class BonsaiArchiveFreezer implements BlockAddedObserver {

private static final Logger LOG = LoggerFactory.getLogger(BonsaiArchiveFreezer.class);

private final DiffBasedWorldStateKeyValueStorage rootWorldStateStorage;
private final Blockchain blockchain;
private final Consumer<Runnable> executeAsync;
private final long numberOfBlocksToKeepInWarmStorage;
private final TrieLogManager trieLogManager;

private final Multimap<Long, Hash> blocksToMoveToFreezer =
TreeMultimap.create(Comparator.reverseOrder(), Comparator.naturalOrder());

public BonsaiArchiveFreezer(
final DiffBasedWorldStateKeyValueStorage rootWorldStateStorage,
final Blockchain blockchain,
final Consumer<Runnable> executeAsync,
final long numberOfBlocksToKeepInWarmStorage,
final TrieLogManager trieLogManager) {
this.rootWorldStateStorage = rootWorldStateStorage;
this.blockchain = blockchain;
this.executeAsync = executeAsync;
this.numberOfBlocksToKeepInWarmStorage = numberOfBlocksToKeepInWarmStorage;
this.trieLogManager = trieLogManager;
}

public int initialize() {
// TODO Probably need to freeze old blocks that haven't been frozen already?
return 0;
}

public synchronized void addToFreezerQueue(final long blockNumber, final Hash blockHash) {
LOG.atDebug()
.setMessage(
"adding block to archive freezer queue for moving to cold storage, blockNumber {}; blockHash {}")
.addArgument(blockNumber)
.addArgument(blockHash)
.log();
blocksToMoveToFreezer.put(blockNumber, blockHash);
}

public synchronized int moveBlockStateToFreezer() {
final long retainAboveThisBlock =
blockchain.getChainHeadBlockNumber() - numberOfBlocksToKeepInWarmStorage;
if (rootWorldStateStorage.getFlatDbMode().getVersion() == Bytes.EMPTY) {
throw new IllegalStateException("DB mode version not set");
}

AtomicInteger frozenStateCount = new AtomicInteger();

LOG.atDebug()
.setMessage(
"Moving cold state to freezer storage (chainHeadNumber: {} - numberOfBlocksToKeepInWarmStorage: {}) = {}")
.addArgument(blockchain::getChainHeadBlockNumber)
.addArgument(numberOfBlocksToKeepInWarmStorage)
.addArgument(retainAboveThisBlock)
.log();

final var blocksToMove =
blocksToMoveToFreezer.asMap().entrySet().stream()
.dropWhile((e) -> e.getKey() > retainAboveThisBlock);
// TODO - limit to a configurable number of blocks to move per loop

final Multimap<Long, Hash> movedToFreezer = ArrayListMultimap.create();

// Determine which world state keys have changed in the last N blocks by looking at the
// trie logs for the blocks. Then move the old keys to the freezer segment (if and only if they
// have changed)
blocksToMove.forEach(
(block) -> {
for (Hash blockHash : block.getValue()) {
Optional<TrieLog> trieLog = trieLogManager.getTrieLogLayer(blockHash);
if (trieLog.isPresent()) {
trieLog
.get()
.getAccountChanges()
.forEach(
(address, change) -> {
// Move any previous state for this account
frozenStateCount.addAndGet(
rootWorldStateStorage.freezePreviousAccountState(
blockchain.getBlockHeader(blockHash),
blockchain.getBlockHeader(block.getKey() - 1),
address.addressHash()));
// TODO - block number - 1 is a hack until getNearestBefore() is pulled in
});
}
movedToFreezer.put(block.getKey(), blockHash);
}
});

movedToFreezer.keySet().forEach(blocksToMoveToFreezer::removeAll);

if (frozenStateCount.get() > 0) {
LOG.atInfo()
.setMessage("froze {} state entries for {} blocks")
.addArgument(frozenStateCount.get())
.addArgument(movedToFreezer::size)
.log();
}

return movedToFreezer.size();
}

@Override
public void onBlockAdded(final BlockAddedEvent addedBlockContext) {
final Hash blockHash = addedBlockContext.getBlock().getHeader().getBlockHash();
final Optional<Long> blockNumber =
Optional.of(addedBlockContext.getBlock().getHeader().getNumber());
blockNumber.ifPresent(
blockNum ->
executeAsync.accept(
() -> {
addToFreezerQueue(blockNum, blockHash);
moveBlockStateToFreezer();
}));
}
}
Loading

0 comments on commit 7d4a524

Please sign in to comment.