diff --git a/beacon_chain/beacon_chain_file.nim b/beacon_chain/beacon_chain_file.nim new file mode 100644 index 0000000000..39378e85ea --- /dev/null +++ b/beacon_chain/beacon_chain_file.nim @@ -0,0 +1,941 @@ +# beacon_chain +# Copyright (c) 2018-2024 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [].} + +import results, snappy, stew/[io2, endians2] +import ./spec/[eth2_ssz_serialization, eth2_merkleization, forks] +from ./consensus_object_pools/block_pools_types import BlockData +export results + +type + ChainFileHeader = object + header: uint32 + version: uint32 + kind: uint64 + comprSize: uint32 + plainSize: uint32 + slot: uint64 + + ChainFileFooter = object + kind: uint64 + comprSize: uint32 + plainSize: uint32 + slot: uint64 + + Chunk = object + header: ChainFileHeader + footer: ChainFileFooter + data: seq[byte] + + ChainFileData* = object + head*: Opt[BlockData] + tail*: Opt[BlockData] + + ChainFileHandle* = object + data*: ChainFileData + handle*: IoHandle + + ChainFileErrorType* {.pure.} = enum + IoError, # OS input/output error + IncorrectSize, # Incorrect/unexpected size of chunk + IncompleteFooter, # Incomplete footer was read + IncompleteHeader, # Incomplete header was read + IncompleteData, # Incomplete data was read + FooterError, # Incorrect chunk's footer + HeaderError, # Incorrect chunk's header + MismatchError # Header and footer not from same chunk + + ChainFileCheckResult* {.pure.} = enum + FileMissing, + FileEmpty, + FileOk, + FileRepaired, + FileCorrupted + + ChainFileFlag* {.pure.} = enum + Repair, + OpenAlways + + ChainFileError* = object + kind*: ChainFileErrorType + message*: string + +const + ChainFileHeaderSize* = 32 + ChainFileFooterSize* = 24 + ChainFileVersion = 1'u32 + ChainFileHeaderValue = 0x424D494E'u32 + ChainFileBufferSize* = 4096 + MaxChunkSize = int(GOSSIP_MAX_SIZE) + ChainFileHeaderArray = ChainFileHeaderValue.toBytesLE() + IncompleteWriteError = "Unable to write data to file, disk full?" + MaxForksCount* = 16384 + BlockForkCodeRange = + int(ConsensusFork.Phase0) .. int(high(ConsensusFork)) + BlobForkCodeRange = + MaxForksCount .. (MaxForksCount + int(high(ConsensusFork)) - int(ConsensusFork.Deneb)) + +func getBlockForkCode(fork: ConsensusFork): uint64 = + uint64(fork) + +func getBlobForkCode(fork: ConsensusFork): uint64 = + case fork + of ConsensusFork.Deneb: + uint64(MaxForksCount) + of ConsensusFork.Electra: + uint64(MaxForksCount) + uint64(fork) - uint64(ConsensusFork.Deneb) + of ConsensusFork.Phase0 .. ConsensusFork.Capella: + raiseAssert "Blobs are not supported for the fork" + +proc init(t: typedesc[ChainFileError], k: ChainFileErrorType, + m: string): ChainFileError = + ChainFileError(kind: k, message: m) + +template init(t: typedesc[ChainFileHeader], + kind: uint64, clength, plength: uint32, + number: uint64): ChainFileHeader = + ChainFileHeader( + header: ChainFileHeaderValue, + version: ChainFileVersion, + kind: kind, + comprSize: clength, + plainSize: plength, + slot: number) + +template init(t: typedesc[ChainFileFooter], + kind: uint64, clength, plength: uint32, + number: uint64): ChainFileFooter = + ChainFileFooter( + kind: kind, + comprSize: clength, + plainSize: plength, + slot: number) + +template unmaskKind(k: uint64): uint64 = + k and not(0x8000_0000_0000_0000'u64) + +template maskKind(k: uint64): uint64 = + k or 0x8000_0000_0000_0000'u64 + +template isLast(k: uint64): bool = + (k and 0x8000_0000_0000_0000'u64) != 0'u64 + +proc checkKind(kind: uint64): Result[void, string] = + let hkind = + block: + let res = unmaskKind(kind) + if res > uint64(high(int)): + return err("Unsuppoted chunk kind value") + int(res) + if (hkind in BlockForkCodeRange) or (hkind in BlobForkCodeRange): + ok() + else: + err("Unsuppoted chunk kind value") + +proc check(a: ChainFileHeader): Result[void, string] = + if a.header != ChainFileHeaderValue: + return err("Incorrect chunk header [NIMB]") + if a.version != 1'u32: + return err("Unsuppoted chunk version") + if a.comprSize > uint32(MaxChunkSize): + return err("Incorrect compressed size in chunk header") + if a.plainSize > uint32(MaxChunkSize): + return err("Incorrect plain size in chunk header") + ? checkKind(a.kind) + ok() + +proc check(a: ChainFileFooter): Result[void, string] = + if a.comprSize > uint32(MaxChunkSize): + return err("Incorrect compressed size in chunk header") + if a.plainSize > uint32(MaxChunkSize): + return err("Incorrect plain size in chunk header") + ? a.kind.checkKind() + ok() + +proc check(a: ChainFileFooter, b: ChainFileHeader): Result[void, string] = + if a.kind != b.kind: + return err("Footer and header reports different chunk kind") + if a.comprSize != b.comprSize: + return err("Footer and header reports different compressed size") + if a.plainSize != b.plainSize: + return err("Footer and header reports different plain size") + if a.slot != b.slot: + return err("Footer and header reports different slots") + ok() + +proc init(t: typedesc[ChainFileHeader], + data: openArray[byte]): Result[ChainFileHeader, string] = + doAssert(len(data) >= ChainFileHeaderSize) + let header = + ChainFileHeader( + header: uint32.fromBytesLE(data.toOpenArray(0, 3)), + version: uint32.fromBytesLE(data.toOpenArray(4, 7)), + kind: uint64.fromBytesLE(data.toOpenArray(8, 15)), + comprSize: uint32.fromBytesLE(data.toOpenArray(16, 19)), + plainSize: uint32.fromBytesLE(data.toOpenArray(20, 23)), + slot: uint64.fromBytesLE(data.toOpenArray(24, 31))) + ? check(header) + ok(header) + +proc init(t: typedesc[ChainFileFooter], + data: openArray[byte]): Result[ChainFileFooter, string] = + doAssert(len(data) >= ChainFileFooterSize) + let footer = + ChainFileFooter( + kind: uint64.fromBytesLE(data.toOpenArray(0, 7)), + comprSize: uint32.fromBytesLE(data.toOpenArray(8, 11)), + plainSize: uint32.fromBytesLE(data.toOpenArray(12, 15)), + slot: uint64.fromBytesLE(data.toOpenArray(16, 23))) + ? check(footer) + ok(footer) + +template `[]=`(data: var openArray[byte], slice: Slice[int], + src: array[4, byte]) = + var k = 0 + for i in slice: + data[i] = src[k] + inc(k) + +template `[]=`(data: var openArray[byte], slice: Slice[int], + src: array[8, byte]) = + var k = 0 + for i in slice: + data[i] = src[k] + inc(k) + +proc store(a: ChainFileHeader, data: var openArray[byte]) = + doAssert(len(data) >= ChainFileHeaderSize) + data[0 .. 3] = a.header.toBytesLE() + data[4 .. 7] = a.version.toBytesLE() + data[8 .. 15] = a.kind.toBytesLE() + data[16 .. 19] = a.comprSize.toBytesLE() + data[20 .. 23] = a.plainSize.toBytesLE() + data[24 .. 31] = a.slot.toBytesLE() + +proc store(a: ChainFileFooter, data: var openArray[byte]) = + doAssert(len(data) >= ChainFileFooterSize) + data[0 .. 7] = a.kind.toBytesLE() + data[8 .. 11] = a.comprSize.toBytesLE() + data[12 .. 15] = a.plainSize.toBytesLE() + data[16 .. 23] = a.slot.toBytesLE() + +proc init(t: typedesc[Chunk], kind, slot: uint64, plainSize: uint32, + data: openArray[byte]): seq[byte] = + doAssert((len(data) < MaxChunkSize) and (plainSize < uint32(MaxChunkSize))) + + var + dst = newSeq[byte](len(data) + ChainFileHeaderSize + ChainFileFooterSize) + + let + header = ChainFileHeader.init(kind, uint32(len(data)), plainSize, slot) + footer = ChainFileFooter.init(kind, uint32(len(data)), plainSize, slot) + + var offset = 0 + header.store(dst.toOpenArray(offset, offset + ChainFileHeaderSize - 1)) + offset += ChainFileHeaderSize + + if len(data) > 0: + copyMem(addr dst[offset], unsafeAddr data[0], len(data)) + offset += len(data) + + footer.store(dst.toOpenArray(offset, offset + ChainFileFooterSize - 1)) + dst + +template getBlockChunkKind(kind: ConsensusFork, last: bool): uint64 = + if last: + maskKind(getBlockForkCode(kind)) + else: + getBlockForkCode(kind) + +template getBlobChunkKind(kind: ConsensusFork, last: bool): uint64 = + if last: + maskKind(getBlobForkCode(kind)) + else: + getBlobForkCode(kind) + +proc getBlockConsensusFork(header: ChainFileHeader): ConsensusFork = + let hkind = unmaskKind(header.kind) + if int(hkind) in BlockForkCodeRange: + cast[ConsensusFork](hkind) + else: + raiseAssert("Should not be happened") + +template isBlock(h: ChainFileHeader | ChainFileFooter): bool = + let hkind = unmaskKind(h.kind) + int(hkind) in BlockForkCodeRange + +template isBlob(h: ChainFileHeader | ChainFileFooter): bool = + let hkind = unmaskKind(h.kind) + int(hkind) in BlobForkCodeRange + +template isLast(h: ChainFileHeader | ChainFileFooter): bool = + h.kind.isLast() + +proc isFilePresent*(filename: string): bool = + isFile(filename) + +template head*(chandle: ChainFileHandle): Opt[BlockData] = + chandle.data.head + +template tail*(chandle: ChainFileHandle): Opt[BlockData] = + chandle.data.tail + +proc setHead*(chandle: var ChainFileHandle, bdata: BlockData) = + chandle.data.head = Opt.some(bdata) + +proc setTail*(chandle: var ChainFileHandle, bdata: BlockData) = + chandle.data.tail = Opt.some(bdata) + +proc store*(chandle: ChainFileHandle, signedBlock: ForkedSignedBeaconBlock, + blobs: Opt[BlobSidecars]): Result[void, string] = + let origOffset = + updateFilePos(chandle.handle, 0'i64, SeekPosition.SeekEnd).valueOr: + return err(ioErrorMsg(error)) + + block: + let + kind = getBlockChunkKind(signedBlock.kind, blobs.isNone()) + (data, plainSize) = + withBlck(signedBlock): + let res = SSZ.encode(forkyBlck) + (snappy.encode(res), len(res)) + slot = signedBlock.slot + buffer = Chunk.init(kind, uint64(slot), uint32(plainSize), data) + wrote = writeFile(chandle.handle, buffer).valueOr: + discard truncate(chandle.handle, origOffset) + discard fsync(chandle.handle) + return err(ioErrorMsg(error)) + if wrote != uint(len(buffer)): + discard truncate(chandle.handle, origOffset) + discard fsync(chandle.handle) + return err(IncompleteWriteError) + + if blobs.isSome(): + let blobSidecars = blobs.get() + for index, blob in blobSidecars.pairs(): + let + kind = + getBlobChunkKind(signedBlock.kind, (index + 1) == len(blobSidecars)) + (data, plainSize) = + block: + let res = SSZ.encode(blob[]) + (snappy.encode(res), len(res)) + slot = blob[].signed_block_header.message.slot + buffer = Chunk.init(kind, uint64(slot), uint32(plainSize), data) + + setFilePos(chandle.handle, 0'i64, SeekPosition.SeekEnd).isOkOr: + discard truncate(chandle.handle, origOffset) + discard fsync(chandle.handle) + return err(ioErrorMsg(error)) + + let + wrote = writeFile(chandle.handle, buffer).valueOr: + discard truncate(chandle.handle, origOffset) + discard fsync(chandle.handle) + return err(ioErrorMsg(error)) + if wrote != uint(len(buffer)): + discard truncate(chandle.handle, origOffset) + discard fsync(chandle.handle) + return err(IncompleteWriteError) + + fsync(chandle.handle).isOkOr: + discard truncate(chandle.handle, origOffset) + return err(ioErrorMsg(error)) + + ok() + +proc readChunkForward(handle: IoHandle, + dataRead: bool): Result[Opt[Chunk], ChainFileError] = + # This function only reads chunk header and footer, but does not read actual + # chunk data. + var + buffer = newSeq[byte](max(ChainFileHeaderSize, ChainFileFooterSize)) + data: seq[byte] + bytesRead: uint + + bytesRead = + readFile(handle, buffer.toOpenArray(0, ChainFileHeaderSize - 1)).valueOr: + return err( + ChainFileError.init(ChainFileErrorType.IoError, ioErrorMsg(error))) + + if bytesRead == 0'u: + # End of file. + return ok(Opt.none(Chunk)) + + if bytesRead != uint(ChainFileHeaderSize): + return err( + ChainFileError.init(ChainFileErrorType.IncompleteHeader, + "Unable to read chunk header data, incorrect file?")) + + let + header = ChainFileHeader.init( + buffer.toOpenArray(0, ChainFileHeaderSize - 1)).valueOr: + return err( + ChainFileError.init(ChainFileErrorType.HeaderError, error)) + + if not(dataRead): + setFilePos(handle, int64(header.comprSize), + SeekPosition.SeekCurrent).isOkOr: + return err( + ChainFileError.init(ChainFileErrorType.IoError, ioErrorMsg(error))) + else: + # Safe conversion to `int`, because header.comprSize < MaxChunkSize + data.setLen(int(header.comprSize)) + bytesRead = + readFile(handle, data.toOpenArray(0, len(data) - 1)).valueOr: + return err( + ChainFileError.init(ChainFileErrorType.IoError, ioErrorMsg(error))) + + if bytesRead != uint(header.comprSize): + return err( + ChainFileError.init(ChainFileErrorType.IncompleteData, + "Unable to read chunk data, incorrect file?")) + + bytesRead = + readFile(handle, buffer.toOpenArray(0, ChainFileFooterSize - 1)).valueOr: + return err( + ChainFileError.init(ChainFileErrorType.IoError, ioErrorMsg(error))) + + if bytesRead != uint(ChainFileFooterSize): + return err( + ChainFileError.init(ChainFileErrorType.IncompleteFooter, + "Unable to read chunk footer data, incorrect file?")) + + let + footer = ChainFileFooter.init( + buffer.toOpenArray(0, ChainFileFooterSize - 1)).valueOr: + return err( + ChainFileError.init(ChainFileErrorType.FooterError, error)) + + check(footer, header).isOkOr: + return err( + ChainFileError.init(ChainFileErrorType.MismatchError, error)) + + if not(dataRead): + ok(Opt.some(Chunk(header: header, footer: footer))) + else: + ok(Opt.some(Chunk(header: header, footer: footer, data: data))) + +proc readChunkBackward(handle: IoHandle, + dataRead: bool): Result[Opt[Chunk], ChainFileError] = + # This function only reads chunk header and footer, but does not read actual + # chunk data. + var + buffer = newSeq[byte](max(ChainFileHeaderSize, ChainFileFooterSize)) + data: seq[byte] + bytesRead: uint + + let offset = getFilePos(handle).valueOr: + return err( + ChainFileError.init(ChainFileErrorType.IoError, ioErrorMsg(error))) + + if offset == 0: + return ok(Opt.none(Chunk)) + + if offset <= (ChainFileHeaderSize + ChainFileFooterSize): + return err( + ChainFileError.init(ChainFileErrorType.IncorrectSize, + "File position is incorrect")) + + setFilePos(handle, -ChainFileFooterSize, SeekPosition.SeekCurrent).isOkOr: + return err( + ChainFileError.init(ChainFileErrorType.IoError, ioErrorMsg(error))) + + bytesRead = + readFile(handle, buffer.toOpenArray(0, ChainFileFooterSize - 1)).valueOr: + return err( + ChainFileError.init(ChainFileErrorType.IoError, ioErrorMsg(error))) + + if bytesRead != ChainFileFooterSize: + return err( + ChainFileError.init(ChainFileErrorType.IncompleteFooter, + "Unable to read chunk footer data, incorrect file?")) + let + footer = ChainFileFooter.init( + buffer.toOpenArray(0, ChainFileFooterSize - 1)).valueOr: + return err( + ChainFileError.init(ChainFileErrorType.FooterError, error)) + + block: + let position = + -(ChainFileHeaderSize + ChainFileFooterSize + int64(footer.comprSize)) + setFilePos(handle, position, SeekPosition.SeekCurrent).isOkOr: + return err( + ChainFileError.init(ChainFileErrorType.IoError, ioErrorMsg(error))) + + bytesRead = + readFile(handle, buffer.toOpenArray(0, ChainFileHeaderSize - 1)).valueOr: + return err( + ChainFileError.init(ChainFileErrorType.IoError, ioErrorMsg(error))) + + if bytesRead != ChainFileHeaderSize: + return err( + ChainFileError.init(ChainFileErrorType.IncompleteHeader, + "Unable to read chunk header data, incorrect file?")) + + let + header = ChainFileHeader.init( + buffer.toOpenArray(0, ChainFileHeaderSize - 1)).valueOr: + return err( + ChainFileError.init(ChainFileErrorType.HeaderError, error)) + + check(footer, header).isOkOr: + return err( + ChainFileError.init(ChainFileErrorType.MismatchError, error)) + + if not(dataRead): + let position = int64(-ChainFileHeaderSize) + setFilePos(handle, position, SeekPosition.SeekCurrent).isOkOr: + return err( + ChainFileError.init(ChainFileErrorType.IoError, ioErrorMsg(error))) + else: + # Safe conversion to `int`, because header.comprSize < MaxChunkSize + data.setLen(int(header.comprSize)) + bytesRead = + readFile(handle, data.toOpenArray(0, len(data) - 1)).valueOr: + return err( + ChainFileError.init(ChainFileErrorType.IoError, ioErrorMsg(error))) + + if bytesRead != uint(header.comprSize): + return err( + ChainFileError.init(ChainFileErrorType.IncompleteData, + "Unable to read chunk data, incorrect file?")) + + let position = -(ChainFileHeaderSize + int64(header.comprSize)) + setFilePos(handle, position, SeekPosition.SeekCurrent).isOkOr: + return err( + ChainFileError.init(ChainFileErrorType.IoError, ioErrorMsg(error))) + + if not(dataRead): + ok(Opt.some(Chunk(header: header, footer: footer))) + else: + ok(Opt.some(Chunk(header: header, footer: footer, data: data))) + +proc decodeBlock( + header: ChainFileHeader, + data: openArray[byte] +): Result[ForkedSignedBeaconBlock, string] = + if header.plainSize > uint32(MaxChunkSize): + return err("Size of block is enormously big") + + let + fork = header.getBlockConsensusFork() + decompressed = snappy.decode(data, uint32(header.plainSize)) + blck = + try: + withConsensusFork(fork): + ForkedSignedBeaconBlock.init( + SSZ.decode(decompressed, consensusFork.SignedBeaconBlock)) + except SerializationError: + return err("Incorrect block format") + ok(blck) + +proc decodeBlob( + header: ChainFileHeader, + data: openArray[byte] +): Result[BlobSidecar, string] = + if header.plainSize > uint32(MaxChunkSize): + return err("Size of blob is enormously big") + + let + decompressed = snappy.decode(data, uint32(header.plainSize)) + blob = + try: + SSZ.decode(decompressed, BlobSidecar) + except SerializationError: + return err("Incorrect blob format") + ok(blob) + +proc getChainFileTail*(handle: IoHandle): Result[Opt[BlockData], string] = + var sidecars: BlobSidecars + while true: + let chunk = + block: + let res = readChunkBackward(handle, true).valueOr: + return err(error.message) + if res.isNone(): + if len(sidecars) == 0: + return ok(Opt.none(BlockData)) + else: + return err("Blobs without block encountered, incorrect file?") + res.get() + if chunk.header.isBlob(): + let blob = ? decodeBlob(chunk.header, chunk.data) + sidecars.add(newClone blob) + else: + let blck = ? decodeBlock(chunk.header, chunk.data) + return + if len(sidecars) == 0: + ok(Opt.some(BlockData(blck: blck))) + else: + ok(Opt.some(BlockData(blck: blck, blob: Opt.some(sidecars)))) + +proc getChainFileHead(handle: IoHandle): Result[Opt[BlockData], string] = + var + offset: int64 = 0 + endOfFile = false + + let + blck = + block: + let chunk = + block: + let res = readChunkForward(handle, true).valueOr: + return err(error.message) + if res.isNone(): + return ok(Opt.none(BlockData)) + res.get() + if not(chunk.header.isBlock()): + return err("Unexpected blob chunk encountered") + ? decodeBlock(chunk.header, chunk.data) + blob = + block: + var sidecars: BlobSidecars + block mainLoop: + while true: + offset = getFilePos(handle).valueOr: + return err(ioErrorMsg(error)) + let chunk = + block: + let res = readChunkForward(handle, true).valueOr: + return err(error.message) + if res.isNone(): + endOfFile = true + break mainLoop + res.get() + if chunk.header.isBlob(): + let blob = ? decodeBlob(chunk.header, chunk.data) + sidecars.add(newClone blob) + else: + break mainLoop + + if len(sidecars) > 0: + Opt.some(sidecars) + else: + Opt.none(BlobSidecars) + + if not(endOfFile): + setFilePos(handle, offset, SeekPosition.SeekBegin).isOkOr: + return err(ioErrorMsg(error)) + + ok(Opt.some(BlockData(blck: blck, blob: blob))) + +proc seekForSlotBackward*(handle: IoHandle, + slot: Slot): Result[Opt[int64], string] = + ## Search from the beginning of the file for the first chunk of data + ## identified by slot ``slot``. + ## This procedure updates current file position to the beginning of the found + ## chunk and returns this position as the result. + block: + let res = setFilePos(handle, 0, SeekPosition.SeekEnd) + if res.isErr(): + return err(ioErrorMsg(res.error)) + + while true: + let chunk = + block: + let res = readChunkBackward(handle, false).valueOr: + return err(error.message) + if res.isNone(): + return ok(Opt.none(int64)) + res.get() + + if chunk.header.slot == slot: + block: + let + position = + ChainFileHeaderSize + ChainFileFooterSize + + int64(chunk.header.comprSize) + res = setFilePos(handle, position, SeekPosition.SeekCurrent) + if res.isErr(): + return err(ioErrorMsg(res.error)) + block: + let res = getFilePos(handle) + if res.isErr(): + return err(ioErrorMsg(res.error)) + return ok(Opt.some(res.get())) + +proc seekForSlotForward*(handle: IoHandle, + slot: Slot): Result[Opt[int64], string] = + ## Search from the end of the file for the last chunk of data identified by + ## slot ``slot``. + ## This procedure updates current file position to the beginning of the found + ## chunk and returns this position as the result. + block: + let res = setFilePos(handle, 0, SeekPosition.SeekBegin) + if res.isErr(): + return err(ioErrorMsg(res.error)) + + while true: + let chunk = + block: + let res = readChunkForward(handle, false).valueOr: + return err(error.message) + if res.isNone(): + return ok(Opt.none(int64)) + res.get() + + if chunk.header.slot == slot: + block: + let + position = + -(ChainFileHeaderSize + ChainFileFooterSize + + int64(chunk.header.comprSize)) + res = setFilePos(handle, position, SeekPosition.SeekCurrent) + if res.isErr(): + return err(ioErrorMsg(res.error)) + block: + let res = getFilePos(handle) + if res.isErr(): + return err(ioErrorMsg(res.error)) + return ok(Opt.some(res.get())) + +proc search(data: openArray[byte], srch: openArray[byte], + state: var int): Opt[int] = + doAssert(len(srch) > 0) + for index in countdown(len(data) - 1, 0): + if data[index] == srch[len(srch) - 1 - state]: + inc(state) + if state == len(srch): + return Opt.some(index) + else: + state = 0 + Opt.none(int) + +proc seekForChunkBackward( + handle: IoHandle, + bufferSize = ChainFileBufferSize +): Result[Opt[int64], string] = + var + state = 0 + data = newSeq[byte](bufferSize) + bytesRead: uint = 0 + + while true: + let + position = getFilePos(handle).valueOr: + return err(ioErrorMsg(error)) + offset = max(0'i64, position - int64(bufferSize)) + + setFilePos(handle, offset, SeekPosition.SeekBegin).isOkOr: + return err(ioErrorMsg(error)) + + bytesRead = readFile(handle, data).valueOr: + return err(ioErrorMsg(error)) + + let indexOpt = search(data.toOpenArray(0, int(bytesRead) - 1), + ChainFileHeaderArray, state) + + if indexOpt.isNone(): + setFilePos(handle, offset, SeekPosition.SeekBegin).isOkOr: + return err(ioErrorMsg(error)) + continue + + let chunkOffset = -(int64(bufferSize) - int64(indexOpt.get())) + + setFilePos(handle, chunkOffset, SeekPosition.SeekCurrent).isOkOr: + return err(ioErrorMsg(error)) + + let chunk = readChunkForward(handle, false).valueOr: + # Incorrect chunk detected, so we start our searching again + state = 0 + setFilePos(handle, offset, SeekPosition.SeekBegin).isOkOr: + return err(ioErrorMsg(error)) + continue + + if chunk.isNone(): + return err("File has been changed, while repairing") + + if chunk.get().header.isLast(): + let finishOffset = getFilePos(handle).valueOr: + return err(ioErrorMsg(error)) + return ok(Opt.some(finishOffset)) + + ok(Opt.none(int64)) + +proc checkRepair*(filename: string, + repair: bool): Result[ChainFileCheckResult, string] = + if not(isFile(filename)): + return ok(ChainFileCheckResult.FileMissing) + + let + handle = openFile(filename, {OpenFlags.Read, OpenFlags.Write}).valueOr: + return err(ioErrorMsg(error)) + filesize = getFileSize(handle).valueOr: + discard closeFile(handle) + return err(ioErrorMsg(error)) + + if filesize == 0'i64: + closeFile(handle).isOkOr: + return err(ioErrorMsg(error)) + return ok(ChainFileCheckResult.FileEmpty) + + setFilePos(handle, 0'i64, SeekPosition.SeekEnd).isOkOr: + discard closeFile(handle) + return err(ioErrorMsg(error)) + + let res = readChunkBackward(handle, false) + if res.isOk(): + let chunk = res.get() + if chunk.isNone(): + discard closeFile(handle) + return err("File was changed while reading") + + if chunk.get().header.isLast(): + # Last chunk being marked as last, everything is fine. + closeFile(handle).isOkOr: + return err(ioErrorMsg(error)) + return ok(ChainFileCheckResult.FileOk) + + # Last chunk was not marked properly, searching for the proper last chunk. + while true: + let nres = readChunkBackward(handle, false) + if nres.isErr(): + discard closeFile(handle) + return err(nres.error.message) + + let cres = nres.get() + if cres.isNone(): + # We reached start of file. + return + if repair: + truncate(handle, 0).isOkOr: + discard closeFile(handle) + return err(ioErrorMsg(error)) + closeFile(handle).isOkOr: + return err(ioErrorMsg(error)) + ok(ChainFileCheckResult.FileRepaired) + else: + closeFile(handle).isOkOr: + return err(ioErrorMsg(error)) + ok(ChainFileCheckResult.FileCorrupted) + + if cres.get().header.isLast(): + return + if repair: + let + position = getFilePos(handle).valueOr: + discard closeFile(handle) + return err(ioErrorMsg(error)) + offset = position + int64(cres.get().header.comprSize) + + ChainFileHeaderSize + ChainFileFooterSize + truncate(handle, offset).isOkOr: + discard closeFile(handle) + return err(ioErrorMsg(error)) + + closeFile(handle).isOkOr: + return err(ioErrorMsg(error)) + + ok(ChainFileCheckResult.FileRepaired) + else: + closeFile(handle).isOkOr: + return err(ioErrorMsg(error)) + ok(ChainFileCheckResult.FileCorrupted) + + ok(ChainFileCheckResult.FileCorrupted) + else: + setFilePos(handle, 0'i64, SeekPosition.SeekEnd).isOkOr: + discard closeFile(handle) + return err(ioErrorMsg(error)) + + let position = seekForChunkBackward(handle).valueOr: + discard closeFile(handle) + return err(error) + + if position.isNone(): + discard closeFile(handle) + return ok(ChainFileCheckResult.FileCorrupted) + + if repair: + truncate(handle, position.get()).isOkOr: + discard closeFile(handle) + return err(ioErrorMsg(error)) + closeFile(handle).isOkOr: + return err(ioErrorMsg(error)) + ok(ChainFileCheckResult.FileRepaired) + else: + closeFile(handle).isOkOr: + return err(ioErrorMsg(error)) + ok(ChainFileCheckResult.FileCorrupted) + +proc init*(t: typedesc[ChainFileHandle], filename: string, + flags: set[ChainFileFlag]): Result[ChainFileHandle, string] = + let + handle = + if not(isFile(filename)): + if ChainFileFlag.OpenAlways in flags: + let flags = {OpenFlags.Read, OpenFlags.Write, OpenFlags.Create} + openFile(filename, flags).valueOr: + return err(ioErrorMsg(error)) + else: + return err("File not found") + else: + # If file exists we perform automatic check/repair procedure. + let res = + checkRepair(filename, ChainFileFlag.Repair in flags).valueOr: + return err(error) + + if res notin {ChainFileCheckResult.FileMissing, FileEmpty, + FileOk, FileRepaired}: + return err("Chain file data is corrupted") + + let flags = {OpenFlags.Read, OpenFlags.Write} + openFile(filename, flags).valueOr: + return err(ioErrorMsg(error)) + + head = getChainFileHead(handle).valueOr: + discard closeFile(handle) + return err(error) + + setFilePos(handle, 0'i64, SeekPosition.SeekEnd).isOkOr: + discard closeFile(handle) + return err(ioErrorMsg(error)) + + let tail = getChainFileTail(handle).valueOr: + discard closeFile(handle) + return err(error) + + ok(ChainFileHandle(handle: handle, + data: ChainFileData(head: head, tail: tail))) + +proc close*(ch: ChainFileHandle): Result[void, string] = + closeFile(ch.handle).isOkOr: + return err(ioErrorMsg(error)) + ok() + +proc seekForSlot*(ch: ChainFileHandle, + slot: Slot): Result[Opt[int64], string] = + if ch.head.isNone() or ch.tail.isNone(): + return err("Attempt to seek for slot in empty file") + + let + headRange = + block: + let headSlot = ch.head.get().blck.slot() + if headSlot >= slot: + headSlot - slot + else: + slot - headSlot + tailRange = + block: + let tailSlot = ch.tail.get().blck.slot() + if tailSlot >= slot: + tailSlot - slot + else: + slot - tailSlot + offset = + if headRange <= tailRange: + ? seekForSlotForward(ch.handle, slot) + else: + ? seekForSlotBackward(ch.handle, slot) + ok(offset) + +proc clearFile*(filename: string): Result[void, string] = + removeFile(filename).isOkOr: + return err(ioErrorMsg(error)) + ok() diff --git a/beacon_chain/beacon_node.nim b/beacon_chain/beacon_node.nim index d85e7113ba..590d1feadf 100644 --- a/beacon_chain/beacon_node.nim +++ b/beacon_chain/beacon_node.nim @@ -22,10 +22,12 @@ import ./el/el_manager, ./consensus_object_pools/[ blockchain_dag, blob_quarantine, block_quarantine, consensus_manager, - attestation_pool, sync_committee_msg_pool, validator_change_pool], + attestation_pool, sync_committee_msg_pool, validator_change_pool, + blockchain_list], ./spec/datatypes/[base, altair], ./spec/eth2_apis/dynamic_fee_recipients, - ./sync/[sync_manager, request_manager], + ./spec/signatures_batch, + ./sync/[sync_manager, request_manager, sync_types], ./validators/[ action_tracker, message_router, validator_monitor, validator_pool, keystore_management], @@ -38,7 +40,7 @@ export eth2_network, el_manager, request_manager, sync_manager, eth2_processor, optimistic_processor, blockchain_dag, block_quarantine, base, message_router, validator_monitor, validator_pool, - consensus_manager, dynamic_fee_recipients + consensus_manager, dynamic_fee_recipients, sync_types type EventBus* = object @@ -57,6 +59,7 @@ type RestVersioned[ForkedLightClientFinalityUpdate]] optUpdateQueue*: AsyncEventQueue[ RestVersioned[ForkedLightClientOptimisticUpdate]] + optFinHeaderUpdateQueue*: AsyncEventQueue[ForkedLightClientHeader] BeaconNode* = ref object nickname*: string @@ -71,6 +74,7 @@ type .Raising([CancelledError]) lightClient*: LightClient dag*: ChainDAGRef + list*: ChainListRef quarantine*: ref Quarantine blobQuarantine*: ref BlobQuarantine attestationPool*: ref AttestationPool @@ -87,8 +91,11 @@ type requestManager*: RequestManager syncManager*: SyncManager[Peer, PeerId] backfiller*: SyncManager[Peer, PeerId] + untrustedManager*: SyncManager[Peer, PeerId] + syncOverseer*: SyncOverseerRef genesisSnapshotContent*: string processor*: ref Eth2Processor + batchVerifier*: ref BatchVerifier blockProcessor*: ref BlockProcessor consensusManager*: ref ConsensusManager attachedValidatorBalanceTotal*: Gwei diff --git a/beacon_chain/beacon_node_light_client.nim b/beacon_chain/beacon_node_light_client.nim index d66f788892..fc5d919051 100644 --- a/beacon_chain/beacon_node_light_client.nim +++ b/beacon_chain/beacon_node_light_client.nim @@ -53,10 +53,10 @@ proc initLightClient*( getBeaconTime, optimisticHandler) shouldInhibitSync = func(): bool = - if node.syncManager != nil: - not node.syncManager.inProgress # No LC sync needed if DAG is in sync - else: + if isNil(node.syncOverseer): false + else: + not node.syncOverseer.syncInProgress # No LC sync needed if DAG is in sync lightClient = createLightClient( node.network, rng, config, cfg, forkDigests, getBeaconTime, genesis_validators_root, LightClientFinalizationMode.Strict, @@ -107,7 +107,16 @@ proc initLightClient*( # The execution block hash is only available from Capella onward info "Ignoring new LC optimistic header until Capella" + proc onFinalizedHeader( + lightClient: LightClient, + finalizedHeader: ForkedLightClientHeader) = + if not node.consensusManager[].shouldSyncOptimistically(node.currentSlot): + return + + node.eventBus.optFinHeaderUpdateQueue.emit(finalizedHeader) + lightClient.onOptimisticHeader = onOptimisticHeader + lightClient.onFinalizedHeader = onFinalizedHeader lightClient.trustedBlockRoot = config.trustedBlockRoot elif config.trustedBlockRoot.isSome: diff --git a/beacon_chain/consensus_object_pools/block_clearance.nim b/beacon_chain/consensus_object_pools/block_clearance.nim index 89362ffd73..7494944156 100644 --- a/beacon_chain/consensus_object_pools/block_clearance.nim +++ b/beacon_chain/consensus_object_pools/block_clearance.nim @@ -8,16 +8,15 @@ {.push raises: [].} import + std/sequtils, chronicles, results, stew/assign2, ../spec/[ beaconstate, forks, signatures, signatures_batch, state_transition, state_transition_epoch], - "."/[block_dag, blockchain_dag, blockchain_dag_light_client] - -from ../spec/datatypes/capella import asSigVerified, asTrusted, shortLog -from ../spec/datatypes/deneb import asSigVerified, asTrusted, shortLog + "."/[block_pools_types, block_dag, blockchain_dag, + blockchain_dag_light_client] export results, signatures_batch, block_dag, blockchain_dag @@ -114,15 +113,18 @@ proc addResolvedHeadBlock( blockRef proc checkStateTransition( - dag: ChainDAGRef, signedBlock: ForkySigVerifiedSignedBeaconBlock, - cache: var StateCache): Result[void, VerifierError] = + dag: ChainDAGRef, + signedBlock: ForkySigVerifiedSignedBeaconBlock, + cache: var StateCache, + updateFlags: UpdateFlags, +): Result[void, VerifierError] = ## Ensure block can be applied on a state func restore(v: var ForkedHashedBeaconState) = assign(dag.clearanceState, dag.headState) let res = state_transition_block( dag.cfg, dag.clearanceState, signedBlock, - cache, dag.updateFlags, restore) + cache, updateFlags, restore) if res.isErr(): info "Invalid block", @@ -150,7 +152,8 @@ proc advanceClearanceState*(dag: ChainDAGRef) = var cache = StateCache() info = ForkedEpochInfo() - dag.advanceSlots(dag.clearanceState, next, true, cache, info) + dag.advanceSlots(dag.clearanceState, next, true, cache, info, + dag.updateFlags) debug "Prepared clearance state for next block", next, updateStateDur = Moment.now() - startTick @@ -267,7 +270,7 @@ proc addHeadBlockWithParent*( # onto which we can apply the new block let clearanceBlock = BlockSlotId.init(parent.bid, signedBlock.message.slot) if not updateState( - dag, dag.clearanceState, clearanceBlock, true, cache): + dag, dag.clearanceState, clearanceBlock, true, cache, dag.updateFlags): # We should never end up here - the parent must be a block no older than and # rooted in the finalized checkpoint, hence we should always be able to # load its corresponding state @@ -297,7 +300,8 @@ proc addHeadBlockWithParent*( let sigVerifyTick = Moment.now() - ? checkStateTransition(dag, signedBlock.asSigVerified(), cache) + ? checkStateTransition(dag, signedBlock.asSigVerified(), cache, + dag.updateFlags) let stateVerifyTick = Moment.now() # Careful, clearanceState.data has been updated but not blck - we need to @@ -449,3 +453,110 @@ proc addBackfillBlock*( putBlockDur = putBlockTick - sigVerifyTick ok() + +template BlockAdded(kind: static ConsensusFork): untyped = + when kind == ConsensusFork.Electra: + OnElectraBlockAdded + elif kind == ConsensusFork.Deneb: + OnDenebBlockAdded + elif kind == ConsensusFork.Capella: + OnCapellaBlockAdded + elif kind == ConsensusFork.Bellatrix: + OnBellatrixBlockAdded + elif kind == ConsensusFork.Altair: + OnAltairBlockAdded + elif kind == ConsensusFork.Phase0: + OnPhase0BlockAdded + else: + static: raiseAssert "Unreachable" + +proc verifyBlockProposer*( + dag: ChainDAGRef, + verifier: var BatchVerifier, + blocks: openArray[ForkedSignedBeaconBlock] +): Result[void, string] = + var sigs: seq[SignatureSet] + + ? sigs.collectProposerSignatureSet( + blocks, dag.db.immutableValidators, dag.clearanceState) + + if not verifier.batchVerify(sigs): + err("Block batch signature verification failed") + else: + ok() + +proc addBackfillBlockData*( + dag: ChainDAGRef, + bdata: BlockData, + onStateUpdated: OnStateUpdated, + onBlockAdded: OnForkedBlockAdded +): Result[void, VerifierError] = + var cache = StateCache() + + withBlck(bdata.blck): + let + parent = checkHeadBlock(dag, forkyBlck).valueOr: + if error == VerifierError.Duplicate: + return ok() + return err(error) + startTick = Moment.now() + parentBlock = dag.getForkedBlock(parent.bid.root).get() + trustedStateRoot = + withBlck(parentBlock): + forkyBlck.message.state_root + clearanceBlock = BlockSlotId.init(parent.bid, forkyBlck.message.slot) + updateFlags1 = dag.updateFlags + {skipLastStateRootCalculation} + + if not updateState(dag, dag.clearanceState, clearanceBlock, true, cache, + updateFlags1): + error "Unable to load clearance state for parent block, " & + "database corrupt?", clearanceBlock = shortLog(clearanceBlock) + return err(VerifierError.MissingParent) + + dag.clearanceState.setStateRoot(trustedStateRoot) + + let proposerVerifyTick = Moment.now() + + if not(isNil(onStateUpdated)): + ? onStateUpdated(forkyBlck.message.slot) + + let + stateDataTick = Moment.now() + updateFlags2 = + dag.updateFlags + {skipBlsValidation, skipStateRootValidation} + + ? checkStateTransition(dag, forkyBlck.asSigVerified(), cache, updateFlags2) + + let stateVerifyTick = Moment.now() + + if bdata.blob.isSome(): + for blob in bdata.blob.get(): + dag.db.putBlobSidecar(blob[]) + + type Trusted = typeof forkyBlck.asTrusted() + + proc onBlockAddedHandler( + blckRef: BlockRef, + trustedBlock: Trusted, + epochRef: EpochRef, + unrealized: FinalityCheckpoints + ) {.gcsafe, raises: [].} = + onBlockAdded( + blckRef, + ForkedTrustedSignedBeaconBlock.init(trustedBlock), + epochRef, + unrealized) + + let blockHandler: BlockAdded(consensusFork) = onBlockAddedHandler + + discard addResolvedHeadBlock( + dag, dag.clearanceState, + forkyBlck.asTrusted(), + true, + parent, cache, + blockHandler, + proposerVerifyTick - startTick, + stateDataTick - proposerVerifyTick, + stateVerifyTick - stateDataTick) + + ok() diff --git a/beacon_chain/consensus_object_pools/block_pools_types.nim b/beacon_chain/consensus_object_pools/block_pools_types.nim index e092b446aa..78eaa1a64e 100644 --- a/beacon_chain/consensus_object_pools/block_pools_types.nim +++ b/beacon_chain/consensus_object_pools/block_pools_types.nim @@ -285,7 +285,11 @@ type # balances, as used in fork choice effective_balances_bytes*: seq[byte] - OnBlockAdded[T: ForkyTrustedSignedBeaconBlock] = proc( + BlockData* = object + blck*: ForkedSignedBeaconBlock + blob*: Opt[BlobSidecars] + + OnBlockAdded*[T: ForkyTrustedSignedBeaconBlock] = proc( blckRef: BlockRef, blck: T, epochRef: EpochRef, unrealized: FinalityCheckpoints) {.gcsafe, raises: [].} OnPhase0BlockAdded* = OnBlockAdded[phase0.TrustedSignedBeaconBlock] @@ -299,6 +303,13 @@ type OnPhase0BlockAdded | OnAltairBlockAdded | OnBellatrixBlockAdded | OnCapellaBlockAdded | OnDenebBlockAdded | OnElectraBlockAdded + OnForkedBlockAdded* = proc( + blckRef: BlockRef, blck: ForkedTrustedSignedBeaconBlock, epochRef: EpochRef, + unrealized: FinalityCheckpoints) {.gcsafe, raises: [].} + + OnStateUpdated* = proc( + slot: Slot): Result[void, VerifierError] {.gcsafe, raises: [].} + HeadChangeInfoObject* = object slot*: Slot block_root* {.serializedFieldName: "block".}: Eth2Digest diff --git a/beacon_chain/consensus_object_pools/blockchain_dag.nim b/beacon_chain/consensus_object_pools/blockchain_dag.nim index 7498f23bbd..81332d4bfa 100644 --- a/beacon_chain/consensus_object_pools/blockchain_dag.nim +++ b/beacon_chain/consensus_object_pools/blockchain_dag.nim @@ -69,7 +69,8 @@ proc putBlock*( proc updateState*( dag: ChainDAGRef, state: var ForkedHashedBeaconState, bsi: BlockSlotId, - save: bool, cache: var StateCache): bool {.gcsafe.} + save: bool, cache: var StateCache, + updateFlags: UpdateFlags): bool {.gcsafe.} template withUpdatedState*( dag: ChainDAGRef, stateParam: var ForkedHashedBeaconState, @@ -80,7 +81,7 @@ template withUpdatedState*( block: let bsi {.inject.} = bsiParam var cache {.inject.} = StateCache() - if updateState(dag, stateParam, bsi, false, cache): + if updateState(dag, stateParam, bsi, false, cache, dag.updateFlags): template bid(): BlockId {.inject, used.} = bsi.bid template updatedState(): ForkedHashedBeaconState {.inject, used.} = stateParam okBody @@ -934,7 +935,8 @@ proc putState(dag: ChainDAGRef, state: ForkedHashedBeaconState, bid: BlockId) = proc advanceSlots*( dag: ChainDAGRef, state: var ForkedHashedBeaconState, slot: Slot, save: bool, - cache: var StateCache, info: var ForkedEpochInfo) = + cache: var StateCache, info: var ForkedEpochInfo, + updateFlags: UpdateFlags) = # Given a state, advance it zero or more slots by applying empty slot # processing - the state must be positioned at or before `slot` doAssert getStateField(state, slot) <= slot @@ -948,7 +950,7 @@ proc advanceSlots*( process_slots( dag.cfg, state, getStateField(state, slot) + 1, cache, info, - dag.updateFlags).expect("process_slots shouldn't fail when state slot is correct") + updateFlags).expect("process_slots shouldn't fail when state slot is correct") if save: dag.putState(state, stateBid) @@ -970,7 +972,8 @@ proc advanceSlots*( proc applyBlock( dag: ChainDAGRef, state: var ForkedHashedBeaconState, bid: BlockId, - cache: var StateCache, info: var ForkedEpochInfo): Result[void, cstring] = + cache: var StateCache, info: var ForkedEpochInfo, + updateFlags: UpdateFlags): Result[void, cstring] = loadStateCache(dag, cache, bid, getStateField(state, slot).epoch) discard case dag.cfg.consensusForkAtEpoch(bid.slot.epoch) @@ -979,37 +982,37 @@ proc applyBlock( return err("Block load failed") ? state_transition( dag.cfg, state, data, cache, info, - dag.updateFlags + {slotProcessed}, noRollback) + updateFlags + {slotProcessed}, noRollback) of ConsensusFork.Altair: let data = getBlock(dag, bid, altair.TrustedSignedBeaconBlock).valueOr: return err("Block load failed") ? state_transition( dag.cfg, state, data, cache, info, - dag.updateFlags + {slotProcessed}, noRollback) + updateFlags + {slotProcessed}, noRollback) of ConsensusFork.Bellatrix: let data = getBlock(dag, bid, bellatrix.TrustedSignedBeaconBlock).valueOr: return err("Block load failed") ? state_transition( dag.cfg, state, data, cache, info, - dag.updateFlags + {slotProcessed}, noRollback) + updateFlags + {slotProcessed}, noRollback) of ConsensusFork.Capella: let data = getBlock(dag, bid, capella.TrustedSignedBeaconBlock).valueOr: return err("Block load failed") ? state_transition( dag.cfg, state, data, cache, info, - dag.updateFlags + {slotProcessed}, noRollback) + updateFlags + {slotProcessed}, noRollback) of ConsensusFork.Deneb: let data = getBlock(dag, bid, deneb.TrustedSignedBeaconBlock).valueOr: return err("Block load failed") ? state_transition( dag.cfg, state, data, cache, info, - dag.updateFlags + {slotProcessed}, noRollback) + updateFlags + {slotProcessed}, noRollback) of ConsensusFork.Electra: let data = getBlock(dag, bid, electra.TrustedSignedBeaconBlock).valueOr: return err("Block load failed") ? state_transition( dag.cfg, state, data, cache, info, - dag.updateFlags + {slotProcessed}, noRollback) + updateFlags + {slotProcessed}, noRollback) ok() @@ -1143,7 +1146,7 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB, while headBlocks.len > 0: dag.applyBlock( dag.headState, headBlocks.pop().bid, cache, - info).expect("head blocks should apply") + info, dag.updateFlags).expect("head blocks should apply") dag.head = headRef dag.heads = @[headRef] @@ -1402,7 +1405,8 @@ proc getEpochRef*( return err("Requesting EpochRef for non-canonical block") var cache: StateCache - if not updateState(dag, dag.epochRefState, ancestor, false, cache): + if not updateState(dag, dag.epochRefState, ancestor, false, cache, + dag.updateFlags): return err("Could not load requested state") ok(dag.getEpochRef(dag.epochRefState, cache)) @@ -1688,7 +1692,7 @@ proc getBlockRange*( proc updateState*( dag: ChainDAGRef, state: var ForkedHashedBeaconState, bsi: BlockSlotId, - save: bool, cache: var StateCache): bool = + save: bool, cache: var StateCache, updateFlags: UpdateFlags): bool = ## Rewind or advance state such that it matches the given block and slot - ## this may include replaying from an earlier snapshot if blck is on a ## different branch or has advanced to a higher slot number than slot @@ -1842,7 +1846,8 @@ proc updateState*( # again. Also, because we're applying blocks that were loaded from the # database, we can skip certain checks that have already been performed # before adding the block to the database. - if (let res = dag.applyBlock(state, ancestors[i], cache, info); res.isErr): + if (let res = dag.applyBlock(state, ancestors[i], cache, info, + updateFlags); res.isErr): warn "Failed to apply block from database", blck = shortLog(ancestors[i]), state_bid = shortLog(state.latest_block_id), @@ -1851,7 +1856,7 @@ proc updateState*( return false # ...and make sure to process empty slots as requested - dag.advanceSlots(state, bsi.slot, save, cache, info) + dag.advanceSlots(state, bsi.slot, save, cache, info, updateFlags) # ...and make sure to load the state cache, if it exists loadStateCache(dag, cache, bsi.bid, getStateField(state, slot).epoch) @@ -2390,7 +2395,7 @@ proc updateHead*( # to use existing in-memory states to make this smooth var cache: StateCache if not updateState( - dag, dag.headState, newHead.bid.atSlot(), false, cache): + dag, dag.headState, newHead.bid.atSlot(), false, cache, dag.updateFlags): # Advancing the head state should never fail, given that the tail is # implicitly finalised, the head is an ancestor of the tail and we always # store the tail state in the database, as well as every epoch slot state in @@ -2654,7 +2659,7 @@ proc getProposalState*( # it now if not dag.updateState( state[], head.atSlot(slot - 1).toBlockSlotId().expect("not nil"), - false, cache): + false, cache, dag.updateFlags): error "Cannot get proposal state - skipping block production, database corrupt?", head = shortLog(head), slot @@ -2843,7 +2848,8 @@ proc rebuildIndex*(dag: ChainDAGRef) = # The slot check is needed to avoid re-applying a block if bids.isProposed and getStateField(state[], latest_block_header).slot < bids.bid.slot: - let res = dag.applyBlock(state[], bids.bid, cache, info) + let res = dag.applyBlock(state[], bids.bid, cache, info, + dag.updateFlags) if res.isErr: error "Failed to apply block while building index", state_bid = shortLog(state[].latest_block_id()), diff --git a/beacon_chain/consensus_object_pools/blockchain_dag_light_client.nim b/beacon_chain/consensus_object_pools/blockchain_dag_light_client.nim index 05f330ff87..e750a91f40 100644 --- a/beacon_chain/consensus_object_pools/blockchain_dag_light_client.nim +++ b/beacon_chain/consensus_object_pools/blockchain_dag_light_client.nim @@ -35,7 +35,7 @@ proc updateExistingState( dag: ChainDAGRef, state: var ForkedHashedBeaconState, bsi: BlockSlotId, save: bool, cache: var StateCache): bool = ## Wrapper around `updateState` for states expected to exist. - let ok = dag.updateState(state, bsi, save, cache) + let ok = dag.updateState(state, bsi, save, cache, dag.updateFlags) if not ok: error "State failed to load unexpectedly", bsi, tail = dag.tail.slot, backfill = shortLog(dag.backfill) diff --git a/beacon_chain/consensus_object_pools/blockchain_list.nim b/beacon_chain/consensus_object_pools/blockchain_list.nim new file mode 100644 index 0000000000..0d33be3739 --- /dev/null +++ b/beacon_chain/consensus_object_pools/blockchain_list.nim @@ -0,0 +1,243 @@ +# beacon_chain +# Copyright (c) 2018-2024 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [].} + +import std/sequtils, chronicles, chronos, metrics, + ../spec/forks, + ../[beacon_chain_file, beacon_clock], + ../sszdump + +from ./block_pools_types import VerifierError, BlockData +from ../spec/state_transition_block import validate_blobs +from std/os import `/` + +export beacon_chain_file + +const + ChainFileName = "nbc.bfdata" + +type + ChainListRef* = ref object + path*: string + handle*: Opt[ChainFileHandle] + +template chainFilePath*(directory: string): string = + directory / ChainFileName + +template filePath*(clist: ChainListRef): string = + chainFilePath(clist.path) + +proc init*(T: type ChainListRef, directory: string): ChainListRef = + let + filename = directory.chainFilePath() + handle = + if not(isFilePresent(filename)): + Opt.none(ChainFileHandle) + else: + let + flags = {ChainFileFlag.Repair} + res = ChainFileHandle.init(filename, flags) + if res.isErr(): + fatal "Unexpected failure while loading backfill data", + filename = filename, reason = res.error + quit 1 + Opt.some(res.get()) + ChainListRef(path: directory, handle: handle) + +proc init*(T: type ChainListRef, directory: string, + slot: Slot): Result[ChainListRef, string] = + let + flags = {ChainFileFlag.Repair, ChainFileFlag.OpenAlways} + filename = directory.chainFilePath() + handle = ? ChainFileHandle.init(filename, flags) + offset {.used.} = ? seekForSlot(handle, slot) + ok(ChainListRef(path: directory, handle: Opt.some(handle))) + +proc seekForSlot*(clist: ChainListRef, slot: Slot): Result[void, string] = + if clist.handle.isNone(): + let + flags = {ChainFileFlag.Repair, ChainFileFlag.OpenAlways} + filename = clist.path.chainFilePath() + handle = ? ChainFileHandle.init(filename, flags) + clist.handle = Opt.some(handle) + + let offset {.used.} = ? seekForSlot(clist.handle.get(), slot) + ok() + +proc close*(clist: ChainListRef): Result[void, string] = + if clist.handle.isNone(): + return ok() + ? clist.handle.get().close() + ok() + +proc clear*(clist: ChainListRef): Result[void, string] = + ? clist.close() + ? clearFile(clist.path.chainFilePath()) + clist.handle = Opt.none(ChainFileHandle) + ok() + +template slot*(data: BlockData): Slot = + data.blck.slot + +template parent_root*(data: ForkedSignedBeaconBlock): Eth2Digest = + withBlck(data): forkyBlck.message.parent_root + +template parent_root*(data: BlockData): Eth2Digest = + data.blck.parent_root() + +template root*(data: BlockData): Eth2Digest = + withBlck(data.blck): forkyBlck.root + +template shortLog*(x: BlockData): string = + let count = if x.blob.isSome(): $len(x.blob.get()) else: "0" + $(x.slot()) & "@" & shortLog(x.parent_root()) & "#" & count + +template shortLog*(x: Opt[BlockData]): string = + if x.isNone(): + "[none]" + else: + shortLog(x.get()) + +func tail*(clist: ChainListRef): Opt[BlockData] = + if clist.handle.isSome(): + clist.handle.get().data.tail + else: + Opt.none(BlockData) + +func head*(clist: ChainListRef): Opt[BlockData] = + if clist.handle.isSome(): + clist.handle.get().data.head + else: + Opt.none(BlockData) + +proc setHead*(clist: ChainListRef, bdata: BlockData) = + doAssert(clist.handle.isSome()) + var handle = clist.handle.get() + handle.setHead(bdata) + clist.handle = Opt.some(handle) + +proc setTail*(clist: ChainListRef, bdata: BlockData) = + doAssert(clist.handle.isSome()) + var handle = clist.handle.get() + handle.setTail(bdata) + clist.handle = Opt.some(handle) + +proc store*(clist: ChainListRef, signedBlock: ForkedSignedBeaconBlock, + blobs: Opt[BlobSidecars]): Result[void, string] = + if clist.handle.isNone(): + let + filename = clist.path.chainFilePath() + flags = {ChainFileFlag.Repair, ChainFileFlag.OpenAlways} + handle = ? ChainFileHandle.init(filename, flags) + clist.handle = Opt.some(handle) + store(handle, signedBlock, blobs) + else: + store(clist.handle.get(), signedBlock, blobs) + +proc checkBlobs(signedBlock: ForkedSignedBeaconBlock, + blobsOpt: Opt[BlobSidecars]): Result[void, VerifierError] = + withBlck(signedBlock): + when consensusFork >= ConsensusFork.Deneb: + if blobsOpt.isSome(): + let + blobs = blobsOpt.get() + commits = forkyBlck.message.body.blob_kzg_commitments.asSeq + + if len(blobs) > 0 or len(commits) > 0: + let res = + validate_blobs(commits, blobs.mapIt(KzgBlob(bytes: it.blob)), + blobs.mapIt(it.kzg_proof)) + if res.isErr(): + debug "Blob validation failed", + block_root = shortLog(forkyBlck.root), + blobs = shortLog(blobs), + blck = shortLog(forkyBlck.message), + kzg_commits = mapIt(commits, shortLog(it)), + signature = shortLog(forkyBlck.signature), + msg = res.error() + return err(VerifierError.Invalid) + ok() + +proc addBackfillBlockData*( + clist: ChainListRef, signedBlock: ForkedSignedBeaconBlock, + blobsOpt: Opt[BlobSidecars]): Result[void, VerifierError] = + doAssert(not(isNil(clist))) + + logScope: + backfill_tail = shortLog(clist.tail) + signed_block_slot = signedBlock.slot + signed_block_root = signedBlock.root + signed_block_parent_root = signedBlock.parent_root + + let verifyBlockTick = Moment.now() + + if clist.tail.isNone(): + ? checkBlobs(signedBlock, blobsOpt) + + let storeBlockTick = Moment.now() + + store(clist, signedBlock, blobsOpt).isOkOr: + fatal "Unexpected failure while trying to store data", + filename = chainFilePath(clist.path), reason = error + quit 1 + + let bdata = BlockData(blck: signedBlock, blob: blobsOpt) + clist.setTail(bdata) + if clist.head.isNone(): + clist.setHead(bdata) + + debug "Initial block backfilled", + verify_block_duration = shortLog(storeBlockTick - verifyBlockTick), + store_block_duration = shortLog(Moment.now() - storeBlockTick) + + return ok() + + let tail = clist.tail.get() + + if signedBlock.slot == tail.slot: + if signedBlock.root == tail.root: + debug "Duplicate block" + return err(VerifierError.Duplicate) + else: + debug "Block from unviable fork" + return err(VerifierError.UnviableFork) + elif signedBlock.slot > tail.slot: + debug "Block from unviable fork" + return err(VerifierError.UnviableFork) + + if tail.parent_root != signedBlock.root: + debug "Block does not match expected backfill root" + return err(VerifierError.MissingParent) + + ? checkBlobs(signedBlock, blobsOpt) + + let storeBlockTick = Moment.now() + + store(clist, signedBlock, blobsOpt).isOkOr: + fatal "Unexpected failure while trying to store data", + filename = chainFilePath(clist.path), reason = error + quit 1 + + debug "Block backfilled", + verify_block_duration = shortLog(storeBlockTick - verifyBlockTick), + store_block_duration = shortLog(Moment.now() - storeBlockTick) + + clist.setTail(BlockData(blck: signedBlock, blob: blobsOpt)) + + ok() + +proc untrustedBackfillVerifier*( + clist: ChainListRef, + signedBlock: ForkedSignedBeaconBlock, + blobs: Opt[BlobSidecars], + maybeFinalized: bool +): Future[Result[void, VerifierError]] {. + async: (raises: [CancelledError], raw: true).} = + let retFuture = newFuture[Result[void, VerifierError]]() + retFuture.complete(clist.addBackfillBlockData(signedBlock, blobs)) + retFuture diff --git a/beacon_chain/gossip_processing/block_processor.nim b/beacon_chain/gossip_processing/block_processor.nim index cc3e69b3d1..bc8c986993 100644 --- a/beacon_chain/gossip_processing/block_processor.nim +++ b/beacon_chain/gossip_processing/block_processor.nim @@ -107,7 +107,7 @@ type ## The slot at which we sent a payload to the execution client the last ## time - NewPayloadStatus {.pure.} = enum + NewPayloadStatus* {.pure.} = enum valid notValid invalid @@ -123,7 +123,7 @@ type proc new*(T: type BlockProcessor, dumpEnabled: bool, dumpDirInvalid, dumpDirIncoming: string, - rng: ref HmacDrbgContext, taskpool: TaskPoolPtr, + batchVerifier: ref BatchVerifier, consensusManager: ref ConsensusManager, validatorMonitor: ref ValidatorMonitor, blobQuarantine: ref BlobQuarantine, @@ -137,7 +137,7 @@ proc new*(T: type BlockProcessor, validatorMonitor: validatorMonitor, blobQuarantine: blobQuarantine, getBeaconTime: getBeaconTime, - verifier: BatchVerifier.init(rng, taskpool) + verifier: batchVerifier[] ) # Sync callbacks @@ -406,6 +406,102 @@ proc enqueueBlock*( except AsyncQueueFullError: raiseAssert "unbounded queue" +proc updateHead*( + consensusManager: ref ConsensusManager, + validatorMonitor: ref ValidatorMonitor, + getBeaconTimeFn: GetBeaconTimeFn, + signedBlock: ForkySignedBeaconBlock, + payloadStatus: NewPayloadStatus +): Future[Result[void, string]] {.async: (raises: [CancelledError]).} = + let + attestationPool = consensusManager.attestationPool + wallTime = getBeaconTimeFn() + wallSlot = wallTime.slotOrZero() + newHead = + attestationPool[].selectOptimisticHead(wallSlot.start_beacon_time) + + if newHead.isOk(): + template elManager(): auto = consensusManager.elManager + if consensusManager[].shouldSyncOptimistically(wallSlot): + # Optimistic head is far in the future; report it as head block to EL. + + # Note that the specification allows an EL client to skip fcU processing + # if an update to an ancestor is requested. + # > Client software MAY skip an update of the forkchoice state and MUST + # NOT begin a payload build process if `forkchoiceState.headBlockHash` + # references an ancestor of the head of canonical chain. + # https://github.com/ethereum/execution-apis/blob/v1.0.0-beta.3/src/engine/paris.md#specification-1 + # + # However, in practice, an EL client may not have completed importing all + # block headers, so may be unaware of a block's ancestor status. + # Therefore, hopping back and forth between the optimistic head and the + # chain DAG head does not work well in practice, e.g., Geth: + # - "Beacon chain gapped" from DAG head to optimistic head, + # - followed by "Beacon chain reorged" from optimistic head back to DAG. + consensusManager[].updateHead(newHead.get.blck) + + template callForkchoiceUpdated(attributes: untyped) = + if NewPayloadStatus.noResponse != payloadStatus and + not consensusManager[].optimisticExecutionBlockHash.isZero: + discard await elManager.forkchoiceUpdated( + headBlockHash = + consensusManager[].optimisticExecutionBlockHash, + safeBlockHash = + newHead.get.safeExecutionBlockHash, + finalizedBlockHash = + newHead.get.finalizedExecutionBlockHash, + payloadAttributes = + Opt.none attributes) + + let consensusFork = + consensusManager.dag.cfg.consensusForkAtEpoch( + newHead.get.blck.bid.slot.epoch) + + withConsensusFork(consensusFork): + when consensusFork >= ConsensusFork.Bellatrix: + callForkchoiceUpdated(consensusFork.PayloadAttributes) + else: + let headExecutionBlockHash = + consensusManager.dag.loadExecutionBlockHash( + newHead.get.blck).get(ZERO_HASH) + + if headExecutionBlockHash.isZero or + NewPayloadStatus.noResponse == payloadStatus: + # Blocks without execution payloads can't be optimistic, and don't try + # to fcU to a block the EL hasn't seen + consensusManager[].updateHead(newHead.get.blck) + elif newHead.get.blck.executionValid: + # `forkchoiceUpdated` necessary for EL client only. + consensusManager[].updateHead(newHead.get.blck) + + template callForkChoiceUpdated: untyped = + withConsensusFork(consensusManager.dag.cfg.consensusForkAtEpoch( + newHead.get.blck.bid.slot.epoch)): + when consensusFork >= ConsensusFork.Bellatrix: + await elManager.expectValidForkchoiceUpdated( + headBlockPayloadAttributesType = consensusFork.PayloadAttributes, + headBlockHash = headExecutionBlockHash, + safeBlockHash = newHead.get.safeExecutionBlockHash, + finalizedBlockHash = newHead.get.finalizedExecutionBlockHash, + receivedBlock = signedBlock) + + if consensusManager.checkNextProposer(wallSlot).isNone: + # No attached validator is next proposer, so use non-proposal fcU + callForkChoiceUpdated() + else: + # Some attached validator is next proposer, so prepare payload. As + # updateHead() updated the DAG head, runProposalForkchoiceUpdated, + # which needs the state corresponding to that head block, can run. + if (await consensusManager.runProposalForkchoiceUpdated( + wallSlot)).isNone: + callForkChoiceUpdated() + else: + await consensusManager.updateHeadWithExecution( + newHead.get, getBeaconTimeFn) + ok() + else: + err("Head selection failed, using previous head") + proc storeBlock( self: ref BlockProcessor, src: MsgSource, wallTime: BeaconTime, signedBlock: ForkySignedBeaconBlock, diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index a35377a42c..cc1d267028 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -13,13 +13,13 @@ import metrics, metrics/chronos_httpserver, stew/[byteutils, io2], eth/p2p/discoveryv5/[enr, random2], - ./consensus_object_pools/blob_quarantine, + ./consensus_object_pools/[blob_quarantine, blockchain_list], ./consensus_object_pools/vanity_logs/vanity_logs, ./networking/[topic_params, network_metadata_downloads], ./rpc/[rest_api, state_ttl_cache], ./spec/datatypes/[altair, bellatrix, phase0], ./spec/[deposit_snapshots, engine_authentication, weak_subjectivity], - ./sync/[sync_protocol, light_client_protocol], + ./sync/[sync_protocol, light_client_protocol, sync_overseer], ./validators/[keystore_management, beacon_validators], "."/[ beacon_node, beacon_node_light_client, deposits, @@ -277,6 +277,7 @@ proc initFullNode( node: BeaconNode, rng: ref HmacDrbgContext, dag: ChainDAGRef, + clist: ChainListRef, taskpool: TaskPoolPtr, getBeaconTime: GetBeaconTimeFn) {.async.} = template config(): auto = node.config @@ -363,6 +364,12 @@ proc initFullNode( else: dag.tail.slot + func getUntrustedBackfillSlot(): Slot = + if clist.tail.isSome(): + clist.tail.get().blck.slot + else: + dag.tail.slot + func getFrontfillSlot(): Slot = max(dag.frontfill.get(BlockId()).slot, dag.horizon) @@ -399,10 +406,12 @@ proc initFullNode( ActionTracker.init(node.network.nodeId, config.subscribeAllSubnets), node.dynamicFeeRecipientsStore, config.validatorsDir, config.defaultFeeRecipient, config.suggestedGasLimit) + batchVerifier = BatchVerifier.new(rng, taskpool) blockProcessor = BlockProcessor.new( config.dumpEnabled, config.dumpDirInvalid, config.dumpDirIncoming, - rng, taskpool, consensusManager, node.validatorMonitor, + batchVerifier, consensusManager, node.validatorMonitor, blobQuarantine, getBeaconTime) + blockVerifier = proc(signedBlock: ForkedSignedBeaconBlock, blobs: Opt[BlobSidecars], maybeFinalized: bool): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError], raw: true).} = @@ -412,6 +421,11 @@ proc initFullNode( # that should probably be reimagined more holistically in the future. blockProcessor[].addBlock( MsgSource.gossip, signedBlock, blobs, maybeFinalized = maybeFinalized) + untrustedBlockVerifier = + proc(signedBlock: ForkedSignedBeaconBlock, blobs: Opt[BlobSidecars], + maybeFinalized: bool): Future[Result[void, VerifierError]] {. + async: (raises: [CancelledError], raw: true).} = + clist.untrustedBackfillVerifier(signedBlock, blobs, maybeFinalized) rmanBlockVerifier = proc(signedBlock: ForkedSignedBeaconBlock, maybeFinalized: bool): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} = @@ -472,6 +486,20 @@ proc initFullNode( dag.backfill.slot, blockVerifier, maxHeadAge = 0, shutdownEvent = node.shutdownEvent, flags = syncManagerFlags) + clistPivotSlot = + if clist.tail.isSome(): + clist.tail.get().blck.slot() + else: + getLocalWallSlot() + untrustedManager = newSyncManager[Peer, PeerId]( + node.network.peerPool, + dag.cfg.DENEB_FORK_EPOCH, dag.cfg.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS, + SyncQueueKind.Backward, getLocalHeadSlot, + getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getUntrustedBackfillSlot, + getFrontfillSlot, isWithinWeakSubjectivityPeriod, + clistPivotSlot, untrustedBlockVerifier, maxHeadAge = 0, + shutdownEvent = node.shutdownEvent, + flags = syncManagerFlags) router = (ref MessageRouter)( processor: processor, network: node.network) @@ -502,6 +530,7 @@ proc initFullNode( dag.setReorgCb(onChainReorg) node.dag = dag + node.list = clist node.blobQuarantine = blobQuarantine node.quarantine = quarantine node.attestationPool = attestationPool @@ -509,11 +538,24 @@ proc initFullNode( node.lightClientPool = lightClientPool node.validatorChangePool = validatorChangePool node.processor = processor + node.batchVerifier = batchVerifier node.blockProcessor = blockProcessor node.consensusManager = consensusManager node.requestManager = requestManager node.syncManager = syncManager node.backfiller = backfiller + node.untrustedManager = untrustedManager + node.syncOverseer = SyncOverseerRef.new(node.consensusManager, + node.validatorMonitor, + config, + getBeaconTime, + node.list, + node.beaconClock, + node.eventBus.optFinHeaderUpdateQueue, + node.network.peerPool, + node.batchVerifier, + syncManager, backfiller, + untrustedManager) node.router = router await node.addValidators() @@ -589,11 +631,20 @@ proc init*(T: type BeaconNode, checkpoint = Checkpoint( epoch: epoch(getStateField(genesisState[], slot)), root: getStateField(genesisState[], latest_block_header).state_root) + + notice "Genesis state information", + genesis_fork = genesisState.kind, + is_post_altair = (cfg.ALTAIR_FORK_EPOCH == GENESIS_EPOCH) + if config.longRangeSync == LongRangeSyncMode.Light: if not is_within_weak_subjectivity_period(metadata.cfg, currentSlot, genesisState[], checkpoint): - fatal WeakSubjectivityLogMessage, current_slot = currentSlot - quit 1 + # We do support any network which starts from Altair or later fork. + let metadata = config.loadEth2Network() + if metadata.cfg.ALTAIR_FORK_EPOCH != GENESIS_EPOCH: + fatal WeakSubjectivityLogMessage, current_slot = currentSlot, + altair_fork_epoch = metadata.cfg.ALTAIR_FORK_EPOCH + quit 1 try: if config.numThreads < 0: @@ -631,7 +682,8 @@ proc init*(T: type BeaconNode, finUpdateQueue: newAsyncEventQueue[ RestVersioned[ForkedLightClientFinalityUpdate]](), optUpdateQueue: newAsyncEventQueue[ - RestVersioned[ForkedLightClientOptimisticUpdate]]()) + RestVersioned[ForkedLightClientOptimisticUpdate]](), + optFinHeaderUpdateQueue: newAsyncEventQueue[ForkedLightClientHeader]()) db = BeaconChainDB.new(config.databaseDir, cfg, inMemory = false) if config.externalBeaconApiUrl.isSome and ChainDAGRef.isInitialized(db).isErr: @@ -805,6 +857,16 @@ proc init*(T: type BeaconNode, getBeaconTime = beaconClock.getBeaconTimeFn() + let clist = + block: + # TODO (cheatfate): We should reset (delete) blockchain file if tail is + # not in weak subjectivity period. + let + res = ChainListRef.init(config.databaseDir()) + info "Backfill database has been loaded", path = config.databaseDir(), + head = shortLog(res.head), tail = shortLog(res.tail) + res + if config.weakSubjectivityCheckpoint.isSome: dag.checkWeakSubjectivityCheckpoint( config.weakSubjectivityCheckpoint.get, beaconClock) @@ -935,7 +997,7 @@ proc init*(T: type BeaconNode, node.initLightClient( rng, cfg, dag.forkDigests, getBeaconTime, dag.genesis_validators_root) - await node.initFullNode(rng, dag, taskpool, getBeaconTime) + await node.initFullNode(rng, dag, clist, taskpool, getBeaconTime) node.updateLightClientFromDag() @@ -1647,26 +1709,29 @@ func formatNextConsensusFork( $nextConsensusFork & ":" & $nextForkEpoch) func syncStatus(node: BeaconNode, wallSlot: Slot): string = - let optimisticHead = not node.dag.head.executionValid - if node.syncManager.inProgress: - let - optimisticSuffix = - if optimisticHead: - "/opt" - else: - "" - lightClientSuffix = - if node.consensusManager[].shouldSyncOptimistically(wallSlot): - " - lc: " & $shortLog(node.consensusManager[].optimisticHead) - else: - "" - node.syncManager.syncStatus & optimisticSuffix & lightClientSuffix - elif node.backfiller.inProgress: - "backfill: " & node.backfiller.syncStatus - elif optimisticHead: - "synced/opt" - else: - "synced" + node.syncOverseer.statusMsg.valueOr: + let optimisticHead = not node.dag.head.executionValid + if node.syncManager.inProgress: + let + optimisticSuffix = + if optimisticHead: + "/opt" + else: + "" + lightClientSuffix = + if node.consensusManager[].shouldSyncOptimistically(wallSlot): + " - lc: " & $shortLog(node.consensusManager[].optimisticHead) + else: + "" + node.syncManager.syncStatus & optimisticSuffix & lightClientSuffix + elif node.untrustedManager.inProgress: + "untrusted: " & node.untrustedManager.syncStatus + elif node.backfiller.inProgress: + "backfill: " & node.backfiller.syncStatus + elif optimisticHead: + "synced/opt" + else: + "synced" when defined(windows): from winservice import establishWindowsService, reportServiceStatusSuccess @@ -1950,18 +2015,6 @@ proc stop(node: BeaconNode) = node.db.close() notice "Databases closed" -proc startBackfillTask(node: BeaconNode) {.async.} = - while node.dag.needsBackfill: - if not node.syncManager.inProgress: - # Only start the backfiller if it's needed _and_ head sync has completed - - # if we lose sync after having synced head, we could stop the backfilller, - # but this should be a fringe case - might as well keep the logic simple for - # now - node.backfiller.start() - return - - await sleepAsync(chronos.seconds(2)) - proc run(node: BeaconNode) {.raises: [CatchableError].} = bnStatus = BeaconNodeStatus.Running @@ -1981,9 +2034,7 @@ proc run(node: BeaconNode) {.raises: [CatchableError].} = node.startLightClient() node.requestManager.start() - node.syncManager.start() - - if node.dag.needsBackfill(): asyncSpawn node.startBackfillTask() + node.syncOverseer.start() waitFor node.updateGossipStatus(wallSlot) diff --git a/beacon_chain/rpc/rest_rewards_api.nim b/beacon_chain/rpc/rest_rewards_api.nim index 1e6aeb331b..44f5f9eb3e 100644 --- a/beacon_chain/rpc/rest_rewards_api.nim +++ b/beacon_chain/rpc/rest_rewards_api.nim @@ -82,7 +82,7 @@ proc installRewardsApiHandlers*(router: var RestRouter, node: BeaconNode) = tmpState = assignClone(node.dag.headState) if not updateState( - node.dag, tmpState[], targetBlock, false, cache): + node.dag, tmpState[], targetBlock, false, cache, node.dag.updateFlags): return RestApiResponse.jsonError(Http404, ParentBlockMissingStateError) func rollbackProc(state: var ForkedHashedBeaconState) {. @@ -164,7 +164,7 @@ proc installRewardsApiHandlers*(router: var RestRouter, node: BeaconNode) = tmpState = assignClone(node.dag.headState) if not updateState( - node.dag, tmpState[], targetBlock, false, cache): + node.dag, tmpState[], targetBlock, false, cache, node.dag.updateFlags): return RestApiResponse.jsonError(Http404, ParentBlockMissingStateError) let response = diff --git a/beacon_chain/rpc/rest_utils.nim b/beacon_chain/rpc/rest_utils.nim index 0095bd1b03..6885fc806d 100644 --- a/beacon_chain/rpc/rest_utils.nim +++ b/beacon_chain/rpc/rest_utils.nim @@ -192,7 +192,8 @@ template withStateForBlockSlotId*(nodeParam: BeaconNode, else: assignClone(node.dag.headState) - if node.dag.updateState(stateToAdvance[], blockSlotId, false, cache): + if node.dag.updateState(stateToAdvance[], blockSlotId, false, cache, + node.dag.updateFlags): if cachedState == nil and node.stateTtlCache != nil: # This was not a cached state, we can cache it now node.stateTtlCache.add(stateToAdvance) diff --git a/beacon_chain/spec/signatures_batch.nim b/beacon_chain/spec/signatures_batch.nim index d07dee8cdb..207c0d8267 100644 --- a/beacon_chain/spec/signatures_batch.nim +++ b/beacon_chain/spec/signatures_batch.nim @@ -229,6 +229,39 @@ func bls_to_execution_change_signature_set*( SignatureSet.init(pubkey, signing_root, signature) +proc collectProposerSignatureSet*( + sigs: var seq[SignatureSet], + blocks: openArray[ForkedSignedBeaconBlock], + validatorKeys: openArray[ImmutableValidatorData2], + state: ForkedHashedBeaconState +): Result[void, string] = + mixin load + + let + fork = getStateField(state, fork) + genesis_validators_root = getStateField(state, genesis_validators_root) + + for forkedBlock in blocks: + let item = + withBlck(forkedBlock): + let + proposerKey = + validatorKeys.load(forkyBlck.message.proposer_index).valueOr: + let msg = "collectSignatureSets: invalid proposer index (" & + $forkyBlck.message.proposer_index & ")" + return err(msg) + signature = + forkyBlck.signature.load().valueOr: + let msg = "collectSignatureSets: cannot load signature (" & + $ forkyBlck.signature & ")" + return err(msg) + block_signature_set( + fork, genesis_validators_root, + forkyBlck.message.slot, forkyBlck.root, + proposerKey, signature) + sigs.add(item) + ok() + proc collectSignatureSets*( sigs: var seq[SignatureSet], signed_block: ForkySignedBeaconBlock, diff --git a/beacon_chain/sync/sync_manager.nim b/beacon_chain/sync/sync_manager.nim index 6e7c17764c..95fdaa74df 100644 --- a/beacon_chain/sync/sync_manager.nim +++ b/beacon_chain/sync/sync_manager.nim @@ -72,7 +72,7 @@ type rangeAge: uint64 chunkSize: uint64 queue: SyncQueue[A] - syncFut: Future[void] + syncFut: Future[void].Raising([CancelledError]) blockVerifier: BlockVerifier inProgress*: bool insSyncSpeed*: float @@ -88,7 +88,14 @@ type BeaconBlocksRes = NetRes[List[ref ForkedSignedBeaconBlock, Limit MAX_REQUEST_BLOCKS]] - BlobSidecarsRes = NetRes[List[ref BlobSidecar, Limit(MAX_REQUEST_BLOB_SIDECARS)]] + BlobSidecarsRes = + NetRes[List[ref BlobSidecar, Limit(MAX_REQUEST_BLOB_SIDECARS)]] + + SyncBlockData* = object + blocks*: seq[ref ForkedSignedBeaconBlock] + blobs*: Opt[seq[BlobSidecars]] + + SyncBlockDataRes* = Result[SyncBlockData, string] proc now*(sm: typedesc[SyncMoment], slots: uint64): SyncMoment {.inline.} = SyncMoment(stamp: now(chronos.Moment), slots: slots) @@ -171,6 +178,12 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B], res.initQueue() res +func combine(acc: seq[Slot], cur: Slot): seq[Slot] = + var copy = acc + if copy[^1] != cur: + copy.add(cur) + copy + proc getBlocks[A, B](man: SyncManager[A, B], peer: A, req: SyncRequest): Future[BeaconBlocksRes] {. async: (raises: [CancelledError], raw: true).} = @@ -269,8 +282,96 @@ func checkBlobs(blobs: seq[BlobSidecars]): Result[void, string] = ? blob_sidecar[].verify_blob_sidecar_inclusion_proof() ok() -proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) - {.async: (raises: [CancelledError]).} = +proc getSyncBlockData*[T]( + peer: T, + req: SyncRequest, + blobsPresent: bool +): Future[SyncBlockDataRes] {.async: (raises: [CancelledError]).} = + mixin getScore + + logScope: + peer_score = peer.getScore() + peer_speed = peer.netKbps() + topics = "syncman" + + doAssert(not(req.isEmpty()), "Request must not be empty!") + + debug "Requesting blocks from peer", request = req + let blocksRange = + block: + let res = await beaconBlocksByRange_v2(peer, req.slot, req.count, 1'u64) + if res.isErr(): + peer.updateScore(PeerScoreNoValues) + debug "Failed to receive blocks on request", + request = req, err = res.error + return err("Failed to receive blocks on request [" & $res.error & "]") + res.get().asSeq + + debug "Received blocks on request", blocks_count = len(blocksRange), + blocks_map = getShortMap(req, blocksRange), request = req + + let slots = mapIt(blocksRange, it[].slot) + + if not(checkResponse(req, slots)): + peer.updateScore(PeerScoreBadResponse) + return err("Received blocks sequence is not in requested range") + + let shouldGetBlobs = + if not(blobsPresent): + false + else: + var hasBlobs = false + for blck in blocksRange: + withBlck(blck[]): + when consensusFork >= ConsensusFork.Deneb: + if forkyBlck.message.body.blob_kzg_commitments.len > 0: + hasBlobs = true + break + hasBlobs + + let blobsRange = + if shouldGetBlobs: + let blobData = + block: + debug "Requesting blobs sidecars from peer", request = req + let res = await blobSidecarsByRange(peer, req.slot, req.count) + if res.isErr(): + peer.updateScore(PeerScoreNoValues) + return err( + "Failed to receive blobs on request [" & $res.error & "]") + res.get().asSeq() + + debug "Received blobs on request", + blobs_count = len(blobData), + blobs_map = getShortMap(req, blobData), request = req + + if len(blobData) > 0: + let + slots = mapIt(blobData, it[].signed_block_header.message.slot) + uniqueSlots = foldl(slots, combine(a, b), @[slots[0]]) + + if not(checkResponse(req, uniqueSlots)): + peer.updateScore(PeerScoreBadResponse) + return err("Received blobs sequence is not in requested range") + + let groupedBlobs = groupBlobs(req, blocksRange, blobData) + if groupedBlobs.isErr(): + peer.updateScore(PeerScoreNoValues) + return err("Received blobs sequence is inconsistent") + + if (let checkRes = groupedBlobs.get.checkBlobs(); checkRes.isErr): + peer.updateScore(PeerScoreBadResponse) + return err( + "Received blobs sequence is invalid [" & checkRes.error & "]") + Opt.some(groupedBlobs.get()) + else: + Opt.none(seq[BlobSidecars]) + + ok SyncBlockData(blocks: blocksRange, blobs: blobsRange) + +proc syncStep[A, B]( + man: SyncManager[A, B], index: int, peer: A +) {.async: (raises: [CancelledError]).} = logScope: peer_score = peer.getScore() peer_speed = peer.netKbps() @@ -409,17 +510,16 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) request = req, err = blocks.error return let blockData = blocks.get().asSeq() - let blockSmap = getShortMap(req, blockData) debug "Received blocks on request", blocks_count = len(blockData), - blocks_map = blockSmap, request = req + blocks_map = getShortMap(req, blockData), request = req let slots = mapIt(blockData, it[].slot) if not(checkResponse(req, slots)): peer.updateScore(PeerScoreBadResponse) man.queue.push(req) warn "Received blocks sequence is not in requested range", - blocks_count = len(blockData), blocks_map = blockSmap, - request = req + blocks_count = len(blockData), + blocks_map = getShortMap(req, blockData), request = req return let shouldGetBlobs = @@ -435,12 +535,6 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) break hasBlobs - func combine(acc: seq[Slot], cur: Slot): seq[Slot] = - var copy = acc - if copy[copy.len-1] != cur: - copy.add(cur) - copy - let blobData = if shouldGetBlobs: let blobs = await man.getBlobSidecars(peer, req) @@ -451,9 +545,9 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) request = req, err = blobs.error return let blobData = blobs.get().asSeq() - let blobSmap = getShortMap(req, blobData) - debug "Received blobs on request", blobs_count = len(blobData), - blobs_map = blobSmap, request = req + debug "Received blobs on request", + blobs_count = len(blobData), + blobs_map = getShortMap(req, blobData), request = req if len(blobData) > 0: let slots = mapIt(blobData, it[].signed_block_header.message.slot) @@ -462,24 +556,26 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) peer.updateScore(PeerScoreBadResponse) man.queue.push(req) warn "Received blobs sequence is not in requested range", - blobs_count = len(blobData), blobs_map = getShortMap(req, blobData), - request = req + blobs_count = len(blobData), + blobs_map = getShortMap(req, blobData), + request = req return let groupedBlobs = groupBlobs(req, blockData, blobData) if groupedBlobs.isErr(): peer.updateScore(PeerScoreNoValues) man.queue.push(req) info "Received blobs sequence is inconsistent", - blobs_map = getShortMap(req, blobData), request = req, msg=groupedBlobs.error() + blobs_map = getShortMap(req, blobData), + request = req, msg = groupedBlobs.error() return if (let checkRes = groupedBlobs.get.checkBlobs(); checkRes.isErr): peer.updateScore(PeerScoreBadResponse) man.queue.push(req) warn "Received blobs sequence is invalid", - blobs_count = len(blobData), - blobs_map = getShortMap(req, blobData), - request = req, - msg = checkRes.error + blobs_count = len(blobData), + blobs_map = getShortMap(req, blobData), + request = req, + msg = checkRes.error return Opt.some(groupedBlobs.get()) else: @@ -512,7 +608,9 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) await man.queue.push(req, blockData, blobData, maybeFinalized, proc() = man.workers[index].status = SyncWorkerStatus.Processing) -proc syncWorker[A, B](man: SyncManager[A, B], index: int) {.async: (raises: [CancelledError]).} = +proc syncWorker[A, B]( + man: SyncManager[A, B], index: int +) {.async: (raises: [CancelledError]).} = mixin getKey, getScore, getHeadSlot logScope: @@ -610,8 +708,9 @@ proc toTimeLeftString*(d: Duration): string = res = res & "00m" res -proc syncClose[A, B](man: SyncManager[A, B], - speedTaskFut: Future[void]) {.async.} = +proc syncClose[A, B]( + man: SyncManager[A, B], speedTaskFut: Future[void] +) {.async: (raises: []).} = var pending: seq[FutureBase] if not(speedTaskFut.finished()): pending.add(speedTaskFut.cancelAndWait()) @@ -620,7 +719,10 @@ proc syncClose[A, B](man: SyncManager[A, B], pending.add(worker.future.cancelAndWait()) await noCancel allFutures(pending) -proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} = +proc syncLoop[A, B]( + man: SyncManager[A, B] +) {.async: (raises: [CancelledError]).} = + logScope: sync_ident = man.ident direction = man.direction @@ -671,14 +773,27 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} = let (map, sleeping, waiting, pending) = man.getWorkersStats() - debug "Current syncing state", workers_map = map, - sleeping_workers_count = sleeping, - waiting_workers_count = waiting, - pending_workers_count = pending, - wall_head_slot = wallSlot, local_head_slot = headSlot, - pause_time = $chronos.seconds(pauseTime), - avg_sync_speed = man.avgSyncSpeed, ins_sync_speed = man.insSyncSpeed - + case man.queue.kind + of SyncQueueKind.Forward: + debug "Current syncing state", workers_map = map, + sleeping_workers_count = sleeping, + waiting_workers_count = waiting, + pending_workers_count = pending, + wall_head_slot = wallSlot, + local_head_slot = headSlot, + pause_time = $chronos.seconds(pauseTime), + avg_sync_speed = man.avgSyncSpeed.formatBiggestFloat(ffDecimal, 4), + ins_sync_speed = man.insSyncSpeed.formatBiggestFloat(ffDecimal, 4) + of SyncQueueKind.Backward: + debug "Current syncing state", workers_map = map, + sleeping_workers_count = sleeping, + waiting_workers_count = waiting, + pending_workers_count = pending, + wall_head_slot = wallSlot, + backfill_slot = man.getSafeSlot(), + pause_time = $chronos.seconds(pauseTime), + avg_sync_speed = man.avgSyncSpeed.formatBiggestFloat(ffDecimal, 4), + ins_sync_speed = man.insSyncSpeed.formatBiggestFloat(ffDecimal, 4) let pivot = man.progressPivot progress = @@ -806,3 +921,18 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} = proc start*[A, B](man: SyncManager[A, B]) = ## Starts SyncManager's main loop. man.syncFut = man.syncLoop() + +proc updatePivot*[A, B](man: SyncManager[A, B], pivot: Slot) = + ## Update progress pivot slot. + man.progressPivot = pivot + +proc join*[A, B]( + man: SyncManager[A, B] +): Future[void] {.async: (raw: true, raises: [CancelledError]).} = + if man.syncFut.isNil(): + let retFuture = + Future[void].Raising([CancelledError]).init("nimbus-eth2.join()") + retFuture.complete() + retFuture + else: + man.syncFut.join() diff --git a/beacon_chain/sync/sync_overseer.nim b/beacon_chain/sync/sync_overseer.nim new file mode 100644 index 0000000000..527e6213a6 --- /dev/null +++ b/beacon_chain/sync/sync_overseer.nim @@ -0,0 +1,526 @@ +# beacon_chain +# Copyright (c) 2018-2024 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [].} + +import std/[strutils, sequtils] +import stew/base10, chronos, chronicles, results +import + ../consensus_object_pools/blockchain_list, + ../spec/datatypes/[phase0, altair], + ../spec/eth2_apis/rest_types, + ../spec/[helpers, forks, network, forks_light_client, weak_subjectivity], + ../networking/[peer_pool, peer_scores, eth2_network], + ../gossip_processing/block_processor, + ../[beacon_clock, beacon_node], + ./[sync_types, sync_manager, sync_queue] + +from ../consensus_object_pools/spec_cache import get_attesting_indices + +export sync_types + +logScope: + topics = "overseer" + +const + PARALLEL_REQUESTS* = 3 + ## Number of peers we using to resolve our request. + BLOCKS_PROCESS_CHUNK_SIZE* = 2 + ## Number of blocks sent to processing (CPU heavy task). + +type + BlockDataRes* = Result[BlockData, string] + +proc init*(t: typedesc[BlockDataChunk], + stateCallback: OnStateUpdated, + data: openArray[BlockData]): BlockDataChunk = + BlockDataChunk( + blocks: @data, + onStateUpdatedCb: stateCallback, + resfut: + Future[Result[void, string]].Raising([CancelledError]).init( + "blockdata.chunk") + ) + +proc shortLog*(c: BlockDataChunk): string = + let + map = + (c.blocks.mapIt(shortLog(it.blck.root) & ":" & $it.blck.slot)). + join(", ") + futureState = if c.resfut.finished(): "pending" else: "completed" + "[" & map & "]:" & futureState + +iterator chunks*(data: openArray[BlockData], + stateCallback: OnStateUpdated, + maxCount: Positive): BlockDataChunk = + for i in countup(0, len(data) - 1, maxCount): + yield BlockDataChunk.init(stateCallback, + data.toOpenArray(i, min(i + maxCount, len(data)) - 1)) + +proc getLatestBeaconHeader*( + overseer: SyncOverseerRef +): Future[BeaconBlockHeader] {.async: (raises: [CancelledError]).} = + let eventKey = overseer.eventQueue.register() + + defer: + overseer.eventQueue.unregister(eventKey) + + let events = + try: + await overseer.eventQueue.waitEvents(eventKey) + except CancelledError as exc: + raise exc + except AsyncEventQueueFullError: + raiseAssert "AsyncEventQueueFullError should not happen!" + + withForkyHeader(events[^1]): + when lcDataFork > LightClientDataFork.None: + forkyHeader.beacon + else: + raiseAssert "Should not happen" + +proc getPeerBlock*( + overseer: SyncOverseerRef, + slot: Slot, +): Future[BlockDataRes] {.async: (raises: [CancelledError]).} = + let peer = await overseer.pool.acquire() + try: + let + request = SyncRequest[Peer](kind: SyncQueueKind.Forward, + slot: slot, count: 1'u64, item: peer) + res = (await getSyncBlockData(peer, request, true)).valueOr: + return err(error) + + if len(res.blocks) == 0: + return err("Empty sequence received") + + let + blob = + if res.blobs.isSome(): + Opt.some(res.blobs.get()[0]) + else: + Opt.none(BlobSidecars) + ok(BlockData(blck: res.blocks[0][], blob: blob)) + finally: + overseer.pool.release(peer) + +# proc `==`(a, b: BeaconBlockHeader): bool = +# (a.slot == b.slot) and (a.proposer_index == b.proposer_index) and +# (a.parent_root.data == b.parent_root.data) and +# (a.state_root.data == b.state_root.data) and +# (a.body_root.data == b.body_root.data) + +proc getBlock*( + overseer: SyncOverseerRef, + slot: Slot, + blockHeader: Opt[BeaconBlockHeader] +): Future[BlockData] {.async: (raises: [CancelledError]).} = + var workers: + array[PARALLEL_REQUESTS, Future[BlockDataRes].Raising([CancelledError])] + + while true: + for i in 0 ..< PARALLEL_REQUESTS: + workers[i] = overseer.getPeerBlock(slot) + + try: + await allFutures(workers) + except CancelledError as exc: + let pending = + workers.filterIt(not(it.finished())).mapIt(cancelAndWait(it)) + await noCancel allFutures(pending) + raise exc + + var results: seq[BlockData] + for i in 0 ..< PARALLEL_REQUESTS: + if workers[i].value.isOk: + results.add(workers[i].value.get()) + + if blockHeader.isSome: + if len(results) > 0: + for item in results: + withBlck(item.blck): + if forkyBlck.message.toBeaconBlockHeader() == blockHeader.get(): + return item + else: + # TODO (cheatfate): Compare received blocks + if len(results) > 0: + return results[0] + + # Wait for 2 seconds before trying one more time. + await sleepAsync(2.seconds) + +proc isWithinWeakSubjectivityPeriod( + overseer: SyncOverseerRef, slot: Slot): bool = + let + dag = overseer.consensusManager.dag + currentSlot = overseer.beaconClock.now().slotOrZero() + checkpoint = Checkpoint( + epoch: + getStateField(dag.headState, slot).epoch(), + root: + getStateField(dag.headState, latest_block_header).state_root) + + is_within_weak_subjectivity_period( + dag.cfg, currentSlot, dag.headState, checkpoint) + +proc isUntrustedBackfillEmpty(clist: ChainListRef): bool = + clist.tail.isNone() + +func speed(start, finish: Moment, entities: int): float = + if entities <= 0: + 0.0 + else: + float(entities) / toFloatSeconds(finish - start) + +proc updatePerformance(overseer: SyncOverseerRef, startTick: Moment, + entities: int) = + let dag = overseer.consensusManager.dag + doAssert(overseer.clist.head.isSome() and overseer.clist.tail.isSome()) + let + clistHeadSlot = overseer.clist.head.get().slot + clistTailSlot = overseer.clist.tail.get().slot + doAssert(clistHeadSlot >= dag.head.slot) + let slotsPerSec = speed(startTick, Moment.now(), entities) + + inc(overseer.avgSpeedCounter) + overseer.avgSpeed = overseer.avgSpeed + + (slotsPerSec - overseer.avgSpeed) / float(overseer.avgSpeedCounter) + + let + total = clistHeadSlot - clistTailSlot + progress = dag.head.slot - clistTailSlot + done = float(progress) / float(total) + remaining = total - progress + timeleft = + if overseer.avgSpeed >= 0.001: + Duration.fromFloatSeconds(remaining.float / overseer.avgSpeed) + else: + InfiniteDuration + + # Update status string + overseer.statusMsg = Opt.some( + timeleft.toTimeLeftString() & " (" & + (done * 100).formatBiggestFloat(ffDecimal, 2) & "%) " & + overseer.avgSpeed.formatBiggestFloat(ffDecimal, 4) & + "slots/s (" & $dag.head.slot & ")") + +proc blockProcessingLoop(overseer: SyncOverseerRef): Future[void] {. + async: (raises: [CancelledError]).} = + let + consensusManager = overseer.consensusManager + dag = consensusManager.dag + attestationPool = consensusManager.attestationPool + validatorMonitor = overseer.validatorMonitor + + proc onBlockAdded( + blckRef: BlockRef, blck: ForkedTrustedSignedBeaconBlock, epochRef: EpochRef, + unrealized: FinalityCheckpoints) {.gcsafe, raises: [].} = + + let wallTime = overseer.getBeaconTimeFn() + withBlck(blck): + attestationPool[].addForkChoice( + epochRef, blckRef, unrealized, forkyBlck.message, wallTime) + + validatorMonitor[].registerBeaconBlock( + MsgSource.sync, wallTime, forkyBlck.message) + + for attestation in forkyBlck.message.body.attestations: + for validator_index in + dag.get_attesting_indices(attestation, true): + validatorMonitor[].registerAttestationInBlock( + attestation.data, validator_index, forkyBlck.message.slot) + + withState(dag[].clearanceState): + when (consensusFork >= ConsensusFork.Altair) and + (type(forkyBlck) isnot phase0.TrustedSignedBeaconBlock): + for i in forkyBlck.message.body.sync_aggregate. + sync_committee_bits.oneIndices(): + validatorMonitor[].registerSyncAggregateInBlock( + forkyBlck.message.slot, forkyBlck.root, + forkyState.data.current_sync_committee.pubkeys.data[i]) + + block mainLoop: + while true: + let bchunk = await overseer.blocksQueue.popFirst() + + block innerLoop: + for bdata in bchunk.blocks: + block: + let res = addBackfillBlockData(dag, bdata, bchunk.onStateUpdatedCb, + onBlockAdded) + if res.isErr(): + let msg = "Unable to add block data to database [" & + $res.error & "]" + bchunk.resfut.complete(Result[void, string].err(msg)) + break innerLoop + + withBlck(bdata.blck): + let res = + try: + await updateHead(consensusManager, validatorMonitor, + overseer.getBeaconTimeFn, forkyBlck, + NewPayloadStatus.noResponse) + except CancelledError: + let msg = "Unable to update head [interrupted]" + bchunk.resfut.complete(Result[void, string].err(msg)) + break mainLoop + if res.isErr(): + let msg = "Unable to update head [" & res.error & "]" + bchunk.resfut.complete(Result[void, string].err(msg)) + break innerLoop + + bchunk.resfut.complete(Result[void, string].ok()) + +proc verifyBlockProposer( + dag: ChainDAGRef, + signedBlock: ForkedSignedBeaconBlock +): Result[void, cstring] = + let + fork = getStateField(dag.clearanceState, fork) + genesis_validators_root = + getStateField(dag.clearanceState, genesis_validators_root) + + withBlck(signedBlock): + let proposerKey = + dag.db.immutableValidators.load(forkyBlck.message.proposer_index).valueOr: + return err("Unable to find proposer key") + + if not(verify_block_signature(fork, genesis_validators_root, + forkyBlck.message.slot, forkyBlck.message, + proposerKey, forkyBlck.signature)): + return err("Signature verification failed") + + ok() + +proc rebuildState(overseer: SyncOverseerRef): Future[void] {. + async: (raises: [CancelledError]).} = + overseer.statusMsg = Opt.some("rebuilding state") + let + consensusManager = overseer.consensusManager + dag = consensusManager.dag + batchVerifier = overseer.batchVerifier + clist = + block: + overseer.clist.seekForSlot(dag.head.slot).isOkOr: + fatal "Unable to find slot in backfill data", reason = error, + path = overseer.clist.path + quit 1 + overseer.clist + + var + blocks: seq[BlockData] + processEpoch: Epoch = FAR_FUTURE_EPOCH + + let handle = clist.handle.get() + + overseer.avgSpeed = 0.0 + overseer.avgSpeedCounter = 0 + + # Set minimum slot number from which LC data is collected. + dag.lcDataStore.cache.tailSlot = clist.head.get().slot + + block mainLoop: + while true: + let res = getChainFileTail(handle.handle) + if res.isErr(): + fatal "Unable to read backfill data", reason = res.error + quit 1 + let bres = res.get() + if bres.isNone(): + return + + let + data = bres.get() + blockEpoch = data.blck.slot.epoch() + + if blockEpoch != processEpoch: + if len(blocks) != 0: + let + startTick = Moment.now() + blocksOnly = blocks.mapIt(it.blck) + + proc onStateUpdate(slot: Slot): Result[void, VerifierError] {. + gcsafe, raises: [].} = + + if slot != blocksOnly[0].slot: + # We going to verify signatures only at the beginning of + # chunk/epoch. + return ok() + + verifyBlockProposer(dag, batchVerifier[], blocksOnly).isOkOr: + for signedBlock in blocksOnly: + let res = verifyBlockProposer(dag, signedBlock) + if res.isErr(): + fatal "Unable to verify block proposer", + blck = shortLog(signedBlock), + reason = res.error + return err(VerifierError.Invalid) + ok() + + for bchunk in blocks.chunks(onStateUpdate, BLOCKS_PROCESS_CHUNK_SIZE): + try: + overseer.blocksQueue.addLastNoWait(bchunk) + except AsyncQueueFullError: + raiseAssert "Should not happen with unbounded AsyncQueue" + let res = await bchunk.resfut + if res.isErr(): + fatal "Unable to add block data to database", reason = res.error + quit 1 + + let updateTick = Moment.now() + debug "Number of blocks injected", + blocks_count = len(blocks), + head = shortLog(dag.head), + finalized = shortLog(getStateField( + dag.headState, finalized_checkpoint)), + store_update_time = updateTick - startTick + + overseer.updatePerformance(startTick, len(blocks)) + blocks.setLen(0) + + processEpoch = blockEpoch + + if data.blck.slot != GENESIS_SLOT: + blocks.add(data) + +proc initUntrustedSync(overseer: SyncOverseerRef): Future[void] {. + async: (raises: [CancelledError]).} = + + overseer.statusMsg = Opt.some("awaiting light client") + + let blockHeader = await overseer.getLatestBeaconHeader() + + notice "Received light client block header", + beacon_header = shortLog(blockHeader), + current_slot = overseer.beaconClock.now().slotOrZero() + + overseer.statusMsg = Opt.some("retrieving block") + + let + blck = await overseer.getBlock(blockHeader.slot, Opt.some(blockHeader)) + blobsCount = if blck.blob.isNone(): 0 else: len(blck.blob.get()) + + notice "Received beacon block", blck = shortLog(blck.blck), + blobs_count = blobsCount + + overseer.statusMsg = Opt.some("storing block") + + let res = overseer.clist.addBackfillBlockData(blck.blck, blck.blob) + if res.isErr(): + warn "Unable to store initial block", reason = res.error + return + + overseer.statusMsg = Opt.none(string) + + notice "Initial block being stored", + blck = shortLog(blck.blck), blobs_count = blobsCount + +proc startBackfillTask(overseer: SyncOverseerRef): Future[void] {. + async: (raises: []).} = + # This procedure performs delayed start of backfilling process. + while overseer.consensusManager.dag.needsBackfill: + if not(overseer.forwardSync.inProgress): + # Only start the backfiller if it's needed _and_ head sync has completed - + # if we lose sync after having synced head, we could stop the backfilller, + # but this should be a fringe case - might as well keep the logic simple + # for now. + overseer.backwardSync.start() + return + try: + await sleepAsync(chronos.seconds(2)) + except CancelledError: + return + +proc mainLoop*( + overseer: SyncOverseerRef +): Future[void] {.async: (raises: []).} = + let + dag = overseer.consensusManager.dag + clist = overseer.clist + currentSlot = overseer.beaconClock.now().slotOrZero() + + if overseer.isWithinWeakSubjectivityPeriod(currentSlot): + # Starting forward sync manager/monitor. + overseer.forwardSync.start() + # Starting backfill/backward sync manager. + if dag.needsBackfill(): + asyncSpawn overseer.startBackfillTask() + return + else: + if dag.needsBackfill(): + # Checkpoint/Trusted state we have is too old. + error "Trusted node sync started too long time ago" + quit 1 + + if not(isUntrustedBackfillEmpty(clist)): + let headSlot = clist.head.get().slot + if not(overseer.isWithinWeakSubjectivityPeriod(headSlot)): + # Light forward sync file is too old. + warn "Light client sync was started too long time ago", + current_slot = currentSlot, backfill_data_slot = headSlot + + if overseer.config.longRangeSync == LongRangeSyncMode.Lenient: + # Starting forward sync manager/monitor only. + overseer.forwardSync.start() + return + + if overseer.config.longRangeSync == LongRangeSyncMode.Light: + let dagHead = dag.finalizedHead + if dagHead.slot < dag.cfg.ALTAIR_FORK_EPOCH.start_slot: + fatal "Light forward syncing requires a post-Altair state", + head_slot = dagHead.slot, + altair_start_slot = dag.cfg.ALTAIR_FORK_EPOCH.start_slot + quit 1 + + if isUntrustedBackfillEmpty(clist): + overseer.untrustedInProgress = true + + try: + await overseer.initUntrustedSync() + except CancelledError: + return + # We need to update pivot slot to enable timeleft calculation. + overseer.untrustedSync.updatePivot(overseer.clist.tail.get().slot) + # Note: We should not start forward sync manager! + overseer.untrustedSync.start() + + # Waiting until untrusted backfilling will not be complete + try: + await overseer.untrustedSync.join() + except CancelledError: + return + + notice "Start state rebuilding process" + # We spawn block processing loop to keep async world happy, otherwise + # it could be single cpu heavy procedure call. + let blockProcessingFut = overseer.blockProcessingLoop() + + try: + await overseer.rebuildState() + except CancelledError: + await cancelAndWait(blockProcessingFut) + return + + clist.clear().isOkOr: + warn "Unable to remove backfill data file", + path = clist.path.chainFilePath(), reason = error + quit 1 + + overseer.untrustedInProgress = false + + # When we finished state rebuilding process - we could start forward + # SyncManager which could perform finish sync. + overseer.forwardSync.start() + +proc start*(overseer: SyncOverseerRef) = + overseer.loopFuture = overseer.mainLoop() + +proc stop*(overseer: SyncOverseerRef) {.async: (raises: []).} = + doAssert(not(isNil(overseer.loopFuture)), + "SyncOverseer was not started yet") + if not(overseer.loopFuture.finished()): + await cancelAndWait(overseer.loopFuture) diff --git a/beacon_chain/sync/sync_types.nim b/beacon_chain/sync/sync_types.nim new file mode 100644 index 0000000000..1de01d5b1b --- /dev/null +++ b/beacon_chain/sync/sync_types.nim @@ -0,0 +1,85 @@ +# beacon_chain +# Copyright (c) 2018-2024 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [].} + +import results, chronos, + ".."/spec/[forks_light_client, signatures_batch], + ".."/consensus_object_pools/[block_pools_types, blockchain_dag, + attestation_pool, blockchain_list, + consensus_manager], + ".."/validators/validator_monitor, + ".."/[beacon_clock, conf], + ".."/networking/eth2_network, + "."/sync_manager + +export results, chronos, block_pools_types, conf + +type + BlockDataChunk* = ref object + resfut*: Future[Result[void, string]].Raising([CancelledError]) + onStateUpdatedCb*: OnStateUpdated + blocks*: seq[BlockData] + + SyncOverseer* = object + statusMsg*: Opt[string] + consensusManager*: ref ConsensusManager + validatorMonitor*: ref ValidatorMonitor + config*: BeaconNodeConf + getBeaconTimeFn*: GetBeaconTimeFn + clist*: ChainListRef + beaconClock*: BeaconClock + eventQueue*: AsyncEventQueue[ForkedLightClientHeader] + loopFuture*: Future[void].Raising([]) + forwardSync*: SyncManager[Peer, PeerId] + backwardSync*: SyncManager[Peer, PeerId] + untrustedSync*: SyncManager[Peer, PeerId] + batchVerifier*: ref BatchVerifier + pool*: PeerPool[Peer, PeerId] + avgSpeedCounter*: int + avgSpeed*: float + blocksQueue*: AsyncQueue[BlockDataChunk] + untrustedInProgress*: bool + + SyncOverseerRef* = ref SyncOverseer + +proc new*( + t: typedesc[SyncOverseerRef], + cm: ref ConsensusManager, + vm: ref ValidatorMonitor, + configuration: BeaconNodeConf, + bt: GetBeaconTimeFn, + clist: ChainListRef, + clock: BeaconClock, + eq: AsyncEventQueue[ForkedLightClientHeader], + pool: PeerPool[Peer, PeerId], + batchVerifier: ref BatchVerifier, + forwardSync: SyncManager[Peer, PeerId], + backwardSync: SyncManager[Peer, PeerId], + untrustedSync: SyncManager[Peer, PeerId] +): SyncOverseerRef = + SyncOverseerRef( + consensusManager: cm, + validatorMonitor: vm, + config: configuration, + getBeaconTimeFn: bt, + clist: clist, + beaconClock: clock, + eventQueue: eq, + pool: pool, + batchVerifier: batchVerifier, + forwardSync: forwardSync, + backwardSync: backwardSync, + untrustedSync: untrustedSync, + untrustedInProgress: false, + blocksQueue: newAsyncQueue[BlockDataChunk]()) + +proc syncInProgress*(overseer: SyncOverseerRef): bool = + overseer.forwardSync.inProgress or + overseer.backwardSync.inProgress or + overseer.untrustedSync.inProgress or + overseer.untrustedInProgress diff --git a/beacon_chain/validators/validator_monitor.nim b/beacon_chain/validators/validator_monitor.nim index cfcb385839..23a5cea8fa 100644 --- a/beacon_chain/validators/validator_monitor.nim +++ b/beacon_chain/validators/validator_monitor.nim @@ -215,6 +215,7 @@ type # expanded in the future. gossip = "gossip" api = "api" + sync = "sync" template toGaugeValue(v: bool): int64 = if v: 1 else: 0 diff --git a/beacon_chain/winservice.nim b/beacon_chain/winservice.nim index acfcf042ad..e5a131f9ee 100644 --- a/beacon_chain/winservice.nim +++ b/beacon_chain/winservice.nim @@ -47,6 +47,7 @@ when defined(windows): SERVICE_CONTROL_INTERROGATE = 4 SERVICE_ACCEPT_STOP = 1 ERROR_INVALID_PARAMETER = 87 + ERROR_INVALID_ACCESS = 12 ERROR_BAD_CONFIGURATION = 1610 NO_ERROR = 0 @@ -155,11 +156,15 @@ when defined(windows): reportServiceStatus(SERVICE_STOPPED, ERROR_BAD_CONFIGURATION, 0) quit QuitFailure - argEntryPoint(config) - - info "Service thread stopped" - reportServiceStatus(SERVICE_STOPPED, NO_ERROR, 0) + try: + argEntryPoint(config) + info "Service thread stopped" + # we have to report back when we stopped! + reportServiceStatus(SERVICE_STOPPED, NO_ERROR, 0) + except CatchableError: + info "Service thread crashed" # we have to report back when we stopped! + reportServiceStatus(SERVICE_STOPPED, ERROR_INVALID_ACCESS, 0) let serviceName = newWideCString(argServiceName) diff --git a/ncli/ncli_db.nim b/ncli/ncli_db.nim index 3129e31e13..4ebf4eb881 100644 --- a/ncli/ncli_db.nim +++ b/ncli/ncli_db.nim @@ -295,7 +295,7 @@ proc cmdBench(conf: DbConf, cfg: RuntimeConfig) = doAssert dag.updateState( stateData[], dag.atSlot(blockRefs[^1], blockRefs[^1].slot - 1).expect("not nil"), - false, cache) + false, cache, dag.updateFlags) template processBlocks(blocks: auto) = for b in blocks.mitems(): @@ -612,7 +612,8 @@ proc cmdExportEra(conf: DbConf, cfg: RuntimeConfig) = withTimer(timers[tState]): var cache: StateCache - if not updateState(dag, tmpState[], eraBid, false, cache): + if not updateState(dag, tmpState[], eraBid, false, cache, + dag.updateFlags): notice "Skipping era, state history not available", era, name missingHistory = true continue @@ -753,7 +754,7 @@ proc cmdValidatorPerf(conf: DbConf, cfg: RuntimeConfig) = doAssert dag.updateState( state[], dag.atSlot(blockRefs[^1], blockRefs[^1].slot - 1).expect("block found"), - false, cache) + false, cache, dag.updateFlags) proc processEpoch() = let @@ -1052,10 +1053,12 @@ proc cmdValidatorDb(conf: DbConf, cfg: RuntimeConfig) = let slot = if startEpochSlot > 0: startEpochSlot - 1 else: 0.Slot if blockRefs.len > 0: discard dag.updateState( - tmpState[], dag.atSlot(blockRefs[^1], slot).expect("block"), false, cache) + tmpState[], dag.atSlot(blockRefs[^1], slot).expect("block"), false, cache, + dag.updateFlags) else: discard dag.updateState( - tmpState[], dag.getBlockIdAtSlot(slot).expect("block"), false, cache) + tmpState[], dag.getBlockIdAtSlot(slot).expect("block"), false, cache, + dag.updateFlags) let savedValidatorsCount = outDb.getDbValidatorsCount var validatorsCount = getStateField(tmpState[], validators).len @@ -1214,4 +1217,4 @@ when isMainModule: of DbCmd.validatorPerf: cmdValidatorPerf(conf, cfg) of DbCmd.validatorDb: - cmdValidatorDb(conf, cfg) \ No newline at end of file + cmdValidatorDb(conf, cfg) diff --git a/research/block_sim.nim b/research/block_sim.nim index b7443f8d55..413c64f975 100644 --- a/research/block_sim.nim +++ b/research/block_sim.nim @@ -530,8 +530,8 @@ cli do(slots = SLOTS_PER_EPOCH * 7, var cache = StateCache() doAssert dag.updateState( replayState[], dag.getBlockIdAtSlot(Slot(slots)).expect("block"), - false, cache) + false, cache, dag.updateFlags) echo "Done!" - printTimers(dag.headState, attesters, true, timers) \ No newline at end of file + printTimers(dag.headState, attesters, true, timers) diff --git a/tests/consensus_spec/test_fixture_fork_choice.nim b/tests/consensus_spec/test_fixture_fork_choice.nim index a7aa97b94c..e4a523f56d 100644 --- a/tests/consensus_spec/test_fixture_fork_choice.nim +++ b/tests/consensus_spec/test_fixture_fork_choice.nim @@ -200,7 +200,8 @@ proc stepOnBlock( state, dag.getBlockIdAtSlot(time.slotOrZero).expect("block exists"), save = false, - stateCache + stateCache, + dag.updateFlags ) # 3. Add block to DAG diff --git a/tests/test_block_processor.nim b/tests/test_block_processor.nim index 9f699ffe17..67a392c666 100644 --- a/tests/test_block_processor.nim +++ b/tests/test_block_processor.nim @@ -55,8 +55,9 @@ suite "Block processor" & preset(): b1 = addTestBlock(state[], cache).phase0Data b2 = addTestBlock(state[], cache).phase0Data getTimeFn = proc(): BeaconTime = b2.message.slot.start_beacon_time() + batchVerifier = BatchVerifier.new(rng, taskpool) processor = BlockProcessor.new( - false, "", "", rng, taskpool, consensusManager, + false, "", "", batchVerifier, consensusManager, validatorMonitor, blobQuarantine, getTimeFn) processorFut = processor.runQueueProcessingLoop() diff --git a/tests/test_blockchain_dag.nim b/tests/test_blockchain_dag.nim index 33485f3654..70e6baadc1 100644 --- a/tests/test_blockchain_dag.nim +++ b/tests/test_blockchain_dag.nim @@ -256,39 +256,41 @@ suite "Block pool processing" & preset(): # move to specific block var cache = StateCache() check: - dag.updateState(tmpState[], bs1, false, cache) + dag.updateState(tmpState[], bs1, false, cache, dag.updateFlags) tmpState[].latest_block_root == b1Add[].root getStateField(tmpState[], slot) == bs1.slot # Skip slots check: - dag.updateState(tmpState[], bs1_3, false, cache) # skip slots + dag.updateState(tmpState[], bs1_3, false, cache, dag.updateFlags) # skip slots tmpState[].latest_block_root == b1Add[].root getStateField(tmpState[], slot) == bs1_3.slot # Move back slots, but not blocks check: dag.updateState( - tmpState[], dag.parent(bs1_3.bid).expect("block").atSlot(), false, cache) + tmpState[], dag.parent(bs1_3.bid).expect("block").atSlot(), false, + cache, dag.updateFlags) tmpState[].latest_block_root == b1Add[].parent.root getStateField(tmpState[], slot) == b1Add[].parent.slot # Move to different block and slot check: - dag.updateState(tmpState[], bs2_3, false, cache) + dag.updateState(tmpState[], bs2_3, false, cache, dag.updateFlags) tmpState[].latest_block_root == b2Add[].root getStateField(tmpState[], slot) == bs2_3.slot # Move back slot and block check: - dag.updateState(tmpState[], bs1, false, cache) + dag.updateState(tmpState[], bs1, false, cache, dag.updateFlags) tmpState[].latest_block_root == b1Add[].root getStateField(tmpState[], slot) == bs1.slot # Move back to genesis check: dag.updateState( - tmpState[], dag.parent(bs1.bid).expect("block").atSlot(), false, cache) + tmpState[], dag.parent(bs1.bid).expect("block").atSlot(), false, cache, + dag.updateFlags) tmpState[].latest_block_root == b1Add[].parent.root getStateField(tmpState[], slot) == b1Add[].parent.slot @@ -500,7 +502,7 @@ suite "chain DAG finalization tests" & preset(): check: updateState( dag, tmpStateData[], dag.head.atSlot(dag.head.slot).toBlockSlotId().expect("not nil"), - false, cache) + false, cache, dag.updateFlags) check: dag.head.slot.epoch in cache.shuffled_active_validator_indices @@ -623,7 +625,8 @@ suite "chain DAG finalization tests" & preset(): while cur != nil: # Go all the way to dag.finalizedHead assign(tmpStateData[], dag.headState) check: - dag.updateState(tmpStateData[], cur.bid.atSlot(), false, cache) + dag.updateState(tmpStateData[], cur.bid.atSlot(), false, cache, + dag.updateFlags) dag.getForkedBlock(cur.bid).get().phase0Data.message.state_root == getStateRoot(tmpStateData[]) getStateRoot(tmpStateData[]) == hash_tree_root(