本文章记录以太坊源码blockchain.go的阅读笔记。该文件包含了与区块链相关的各种功能和操作,用于实现区块链核心功能。
源码开头部分定义了一个变量块,用于同时声明多个变量。
var (
// 表示头部区块数量的计量表,用于记录当前区块链的头部区块数量
headBlockGauge = metrics.NewRegisteredGauge("chain/head/block", nil)
// 表示头部头信息数量的计量表,用于记录当前区块链的头部信息数量
headHeaderGauge = metrics.NewRegisteredGauge("chain/head/header", nil)
// 表示头部快速块数量的计量表,用于记录当前区块链的头部快速块的数量
headFastBlockGauge = metrics.NewRegisteredGauge("chain/head/receipt", nil)
// 表示已最终确定的块数量的计量表,用于记录当前区块链已经最终确定的块数量
headFinalizedBlockGauge = metrics.NewRegisteredGauge("chain/head/finalized", nil)
// 表示安全块数量的计量表,用于记录当前区块链的安全块数量
headSafeBlockGauge = metrics.NewRegisteredGauge("chain/head/safe", nil)
// 表示区块链信息的计量表,用于记录与区块链相关的一般信息
chainInfoGauge = metrics.NewRegisteredGaugeInfo("chain/info", nil)
// 表示账户读取时间的重置计时器,用于记录区块链中账户读取操作的时间
accountReadTimer = metrics.NewRegisteredResettingTimer("chain/account/reads", nil)
// 表示账户哈希计算时间的重置计时器,用于记录区块链中账户哈希操作的时间
accountHashTimer = metrics.NewRegisteredResettingTimer("chain/account/hashes", nil)
// 表示账户更新时间的重置计时器,用于记录区块链中账户更新操作的时间
accountUpdateTimer = metrics.NewRegisteredResettingTimer("chain/account/updates", nil)
// 表示账户提交时间的重置计时器,用于记录区块链中账户提交操作的时间
accountCommitTimer = metrics.NewRegisteredResettingTimer("chain/account/commits", nil)
// 读取操作时间计时器
storageReadTimer = metrics.NewRegisteredResettingTimer("chain/storage/reads", nil)
// 哈希计算操作时间计时器
storageHashTimer = metrics.NewRegisteredResettingTimer("chain/storage/hashes", nil)
// 更新操作时间计时器
storageUpdateTimer = metrics.NewRegisteredResettingTimer("chain/storage/updates", nil)
// 提交操作时间计时器
storageCommitTimer = metrics.NewRegisteredResettingTimer("chain/storage/commits", nil)
// 快照中账户读取操作时间计时器
snapshotAccountReadTimer = metrics.NewRegisteredResettingTimer("chain/snapshot/account/reads", nil)
// 快照中存储读取操作时间计时器
snapshotStorageReadTimer = metrics.NewRegisteredResettingTimer("chain/snapshot/storage/reads", nil)
// 快照提交操作时间计时器
snapshotCommitTimer = metrics.NewRegisteredResettingTimer("chain/snapshot/commits", nil)
// Trie数据库提交操作计时器
triedbCommitTimer = metrics.NewRegisteredResettingTimer("chain/triedb/commits", nil)
// 区块插入操作计时器
blockInsertTimer = metrics.NewRegisteredResettingTimer("chain/inserts", nil)
// 区块验证操作计时器
blockValidationTimer = metrics.NewRegisteredResettingTimer("chain/validation", nil)
// 区块执行操作计时器
blockExecutionTimer = metrics.NewRegisteredResettingTimer("chain/execution", nil)
// 区块写入操作计时器
blockWriteTimer = metrics.NewRegisteredResettingTimer("chain/write", nil)
// 记录操作的频率
blockReorgMeter = metrics.NewRegisteredMeter("chain/reorg/executes", nil)
// 新增区块到重新组织的频率
blockReorgAddMeter = metrics.NewRegisteredMeter("chain/reorg/add", nil)
// 丢弃的区块重新组织的频率
blockReorgDropMeter = metrics.NewRegisteredMeter("chain/reorg/drop", nil)
// 区块预取执行操作的计时器
blockPrefetchExecuteTimer = metrics.NewRegisteredTimer("chain/prefetch/executes", nil)
// 区块预取中断的频率
blockPrefetchInterruptMeter = metrics.NewRegisteredMeter("chain/prefetch/interrupts", nil)
// 插入操作中断的错误信息
errInsertionInterrupted = errors.New("insertion is interrupted")
// 区块链停止的错误信息
errChainStopped = errors.New("blockchain is stopped")
// 旧区块链无效的错误信息
errInvalidOldChain = errors.New("invalid old chain")
// 新区块无效的错误信息
errInvalidNewChain = errors.New("invalid new chain")
)
上面提到了快速块和安全块,安全块和快速块是区块链中的两种不同类型的块,它们在区块链协议中具有不同的含义和作用。
安全块(Finalized Block):
安全块是指已经被网络中大多数节点(或者特定的一组节点)确认的块,这些节点已经通过一定的共识机制达成一致,认可该块的有效性,并将其视为区块链的一部分。
一般来说,安全块是区块链的最终确认版本,即它们被认为是永久的,不可逆转的。
安全块的确认通常是通过一些共识算法(如工作量证明、权益证明等)来实现的,确保了块的有效性和整个网络的安全性。
快速块(Fast Block):
快速块是指在某些区块链协议中,通过一定的机制使得特定的块可以更快地被确认和接受,从而提高了交易的处理速度和网络的吞吐量。
快速块可能是临时性的,它们在一定条件下可以更快地被确认,但在后续的安全确认中可能会发生变化。
快速块的引入可以带来更快的交易确认和更高的网络效率,但也可能会牺牲一定程度的安全性和去中心化特性。
总的来说,快速块可以更快的被确认,从而提高交易处理的速度,而安全块则具备更高的安全性和可靠性。
下面是常量块,用于同时声明多个常量:
const (
bodyCacheLimit = 256 // 区块体缓存的大小限制,通常包含交易等数据
blockCacheLimit = 256 // 区块缓存的大小限制,用于存储最近处理过的区块,提高性能
receiptsCacheLimit = 32 // 收据缓存的大小限制,用于存储交易执行的结果
txLookupCacheLimit = 1024 // 交易查询缓存的大小,用于存储交易哈希与区块的映射,加快查询
TriesInMemory = 128 // 内存中存储的Trie的数量
BlockChainVersion uint64 = 8 // 区块链版本号
)
上文提到的Trie结构可以参考下面的文章:以太坊Geth Trie源码解析_以太坊设计与源码之美-CSDN博客
下面是对Trie结构的配置:
type CacheConfig struct {
TrieCleanLimit int // 用于缓存内存中trie节点的内存限制(以MB为单位)
TrieCleanNoPrefetch bool // 是否禁用启发式状态预取以供后续块使用
TrieDirtyLimit int // 当脏trie节点内存占用达到多少MB时开始将其刷新到磁盘
TrieDirtyDisabled bool // 是否完全禁用trie写缓存和垃圾回收
TrieTimeLimit time.Duration // 在何时将当前内存中的trie刷新到磁盘的时间限制
SnapshotLimit int // 用于在内存中缓存快照条目的内存限制
Preimages bool // 是否将trie键的预映像存储到磁盘上
StateHistory uint64 // 从头开始保留状态历史记录的块数
StateScheme string // 用于在顶部存储以太坊状态和默克尔树节点的方案
SnapshotNoBuild bool // 是否允许后台生成快照
SnapshotWait bool // 是否在启动时等待快照构建
}
对于配置来说,以太坊提供了加载配置和默认配置:
func (c *CacheConfig) triedbConfig(isVerkle bool) *triedb.Config {
config := &triedb.Config{
Preimages: c.Preimages,
IsVerkle: isVerkle,
}
// 根据不同的方案配置不同的存储引擎
if c.StateScheme == rawdb.HashScheme {
config.HashDB = &hashdb.Config{
CleanCacheSize: c.TrieCleanLimit * 1024 * 1024,
}
}
if c.StateScheme == rawdb.PathScheme {
config.PathDB = &pathdb.Config{
StateHistory: c.StateHistory,
CleanCacheSize: c.TrieCleanLimit * 1024 * 1024,
DirtyCacheSize: c.TrieDirtyLimit * 1024 * 1024,
}
}
return config
}
// 默认的缓存配置
var defaultCacheConfig = &CacheConfig{
TrieCleanLimit: 256, // Trie节点的内存限制为256MB
TrieDirtyLimit: 256, // 脏Trie节点达到256MB被刷新到磁盘
TrieTimeLimit: 5 * time.Minute, // 5分钟后将内存的Trie刷新到磁盘
SnapshotLimit: 256, // 缓存快照条目设置为256MB
SnapshotWait: true, // 在启动时等待快照构建
StateScheme: rawdb.HashScheme, // 使用哈希方案存储以太坊状态和merkle树
}
// 指定以哈希方式存储以太坊状态和 Merkle 树节点的方式
func DefaultCacheConfigWithScheme(scheme string) *CacheConfig {
config := *defaultCacheConfig
config.StateScheme = scheme
return &config
}
上面的代码涉及到Verkle Trie树,Verkle Trie 是一种改进的 Merkle Trie 数据结构,用于存储键值对。Merkle Trie节点存储的是哈希值,而Verkle Trie存储的是哈希值以及对这个哈希值存在的证明。它是以太坊 2.0 中引入的一项技术,旨在提供更高效的存储和验证方法。Verkle Trie 相比传统的 Merkle Trie 有几个优点:
更少的存储需求:Verkle Trie 使用路径压缩和共享子树的方式来减少存储需求,特别是对于包含大量相似键的情况,可以显著减少存储空间的占用。
更高的效率:由于路径压缩和共享子树的特性,Verkle Trie 在执行插入、删除和更新操作时,通常比传统的 Merkle Trie 更高效。
更好的隐私性:Verkle Trie 可以隐藏非叶子节点的具体键值对,从而提高了对存储数据的隐私性。
下面是对交易查找的结构体封装,用于将交易对象和交易查找结果封装在一起,以便于在程序中处理和传递交易及其相关信息。通过将这两个信息结合在一起,可以更方便地在程序中使用交易对象,并且可以轻松地从交易查找结果中获取所需的信息。
type txLookup struct {
lookup *rawdb.LegacyTxLookupEntry
transaction *types.Transaction
}
下面是对区块链的结构体封装,它包含了各种与区块链操作相关的字段和方法。
type BlockChain struct {
// chainConfig 指向区块链中链和网络配置的指针。
chainConfig *params.ChainConfig
// cacheConfig 指向缓存配置的指针。
cacheConfig *CacheConfig
// db 是最终持久化数据的数据库。
db ethdb.Database
// snaps 是一个快速访问 Trie 的快照树。
snaps *snapshot.Tree
// triegc 是一个优先队列,将区块号映射到需要进行垃圾回收的 Trie。
triegc *prque.Prque[int64, common.Hash]
// gcproc 是用于累积规范块处理以进行 Trie 转储的时间间隔。
gcproc time.Duration
// lastWrite 是上一次状态被刷新时的区块号。
lastWrite uint64
// flushInterval 是用于定时刷新状态的时间间隔。
flushInterval atomic.Int64
// triedb 是用于维护 Trie 节点的数据库处理程序。
triedb *triedb.Database
// stateCache 是状态数据库,用于在导入之间重用状态。
stateCache state.Database
// txIndexer 是事务索引器,如果未启用,则可能为 nil。
txIndexer *txIndexer
// hc 是区块头链管理器。
hc *HeaderChain
// rmLogsFeed 是用于订阅链上移除日志的事件 Feed。
rmLogsFeed event.Feed
// chainFeed 是链事件的事件 Feed。
chainFeed event.Feed
// chainSideFeed 是侧链事件的事件 Feed。
chainSideFeed event.Feed
// chainHeadFeed 是链头事件的事件 Feed。
chainHeadFeed event.Feed
// logsFeed 是用于订阅链上新日志的事件 Feed。
logsFeed event.Feed
// blockProcFeed 是用于订阅块处理事件的事件 Feed。
blockProcFeed event.Feed
// scope 是事件订阅范围。
scope event.SubscriptionScope
// genesisBlock 是创世块。
genesisBlock *types.Block
// chainmu 是用于同步链写操作的互斥锁。
chainmu *syncx.ClosableMutex
// currentBlock 是当前链的头部,原子指针指向 types.Header 对象。
currentBlock atomic.Pointer[types.Header]
// currentSnapBlock 是当前快照同步的头部,原子指针指向 types.Header 对象。
currentSnapBlock atomic.Pointer[types.Header]
// currentFinalBlock 是最终确定的块,原子指针指向 types.Header 对象。
currentFinalBlock atomic.Pointer[types.Header]
// currentSafeBlock 是最近的安全块,原子指针指向 types.Header 对象。
currentSafeBlock atomic.Pointer[types.Header]
// bodyCache 是用于缓存区块体的 LRU 缓存。
bodyCache *lru.Cache[common.Hash, *types.Body]
// bodyRLPCache 是用于缓存区块体的 RLP 编码的 LRU 缓存。
bodyRLPCache *lru.Cache[common.Hash, rlp.RawValue]
// receiptsCache 是用于缓存区块的交易收据的 LRU 缓存。
receiptsCache *lru.Cache[common.Hash, []*types.Receipt]
// blockCache 是用于缓存区块的 LRU 缓存。
blockCache *lru.Cache[common.Hash, *types.Block]
// txLookupCache 是用于缓存事务查找结果的 LRU 缓存。
txLookupCache *lru.Cache[common.Hash, txLookup]
// wg 是一个 WaitGroup,用于等待所有区块链协程完成。
wg sync.WaitGroup
// quit 是一个用于发送关闭信号的通道。
quit chan struct{}
// stopping 是一个原子布尔类型,表示区块链是否正在停止。
stopping atomic.Bool
// procInterrupt 是一个原子布尔类型,用于中断块处理。
procInterrupt atomic.Bool
// engine 是共识引擎,用于验证和执行共识规则。
engine consensus.Engine
// validator 是验证器接口,用于验证区块和状态。
validator Validator
// prefetcher 是预取器接口,用于预取区块数据。
prefetcher Prefetcher
// processor 是块事务处理器接口,用于处理块中的事务。
processor Processor
// forker 是分叉选择对象,用于选择主链。
forker *ForkChoice
// vmConfig 是虚拟机配置,用于配置以太坊虚拟机。
vmConfig vm.Config
// logger 是日志记录钩子,用于记录区块链操作的日志。
logger *tracing.Hooks
}
下面函数是用来创建一个全面初始化的区块链对象的。函数接受了一系列参数,包括数据库、缓存配置、创世块信息、链覆盖、共识引擎、虚拟机配置等。
func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis, overrides *ChainOverrides, engine consensus.Engine, vmConfig vm.Config, shouldPreserve func(header *types.Header) bool, txLookupLimit *uint64) (*BlockChain, error) {
if cacheConfig == nil {
// 使用默认的缓存配置
cacheConfig = defaultCacheConfig
}
// 使用提供的数据库配置打开trie数据库
triedb := triedb.NewDatabase(db, cacheConfig.triedbConfig(genesis != nil && genesis.IsVerkle()))
// 接收SetupGenesisBlockWithOverride的三个返回值
chainConfig, genesisHash, genesisErr := SetupGenesisBlockWithOverride(db, triedb, genesis, overrides)
// 如果在创世块设置过程中发生了其他类型的错误会直接返回错误,否则如果genesisErr!=nil且是一个 // params.ConfigCompatError错误,说明创世块设置过程发生了配置不兼容的错误,但这种错误被允许
if _, ok := genesisErr.(*params.ConfigCompatError); genesisErr != nil && !ok {
return nil, genesisErr
}
// 打印空行-输出153个--分行打印配置的信息-答应153--打印空行
log.Info("")
log.Info(strings.Repeat("-", 153))
for _, line := range strings.Split(chainConfig.Description(), "\n") {
log.Info(line)
}
log.Info(strings.Repeat("-", 153))
log.Info("")
// 创建区块链
bc := &BlockChain{
chainConfig: chainConfig,
cacheConfig: cacheConfig,
db: db,
triedb: triedb,
triegc: prque.New[int64, common.Hash](nil),
quit: make(chan struct{}),
chainmu: syncx.NewClosableMutex(),
bodyCache: lru.NewCache[common.Hash, *types.Body](bodyCacheLimit),
bodyRLPCache: lru.NewCache[common.Hash, rlp.RawValue](bodyCacheLimit),
receiptsCache: lru.NewCache[common.Hash, []*types.Receipt](receiptsCacheLimit),
blockCache: lru.NewCache[common.Hash, *types.Block](blockCacheLimit),
txLookupCache: lru.NewCache[common.Hash, txLookup](txLookupCacheLimit),
engine: engine,
vmConfig: vmConfig,
logger: vmConfig.Tracer,
}
// 将缓存配置中的 trie 时间限制存储到 BlockChain 对象的 flushInterval 字段中。
bc.flushInterval.Store(int64(cacheConfig.TrieTimeLimit))
// 使用 NewForkChoice 函数创建一个分叉选择对象,并赋值给 BlockChain 对象的 forker 字段,用于处 // 理分叉选择。
bc.forker = NewForkChoice(bc, shouldPreserve)
// 使用 state.NewDatabaseWithNodeDB 函数创建一个状态数据库,并赋值给 BlockChain 对象的 // stateCache 字段。
bc.stateCache = state.NewDatabaseWithNodeDB(bc.db, bc.triedb)
// 使用 NewBlockValidator 函数创建一个区块验证器,并赋值给 BlockChain 对象的 validator 字段。
bc.validator = NewBlockValidator(chainConfig, bc, engine)
// 使用 newStatePrefetcher 函数创建一个状态预取器,并赋值给 BlockChain 对象的 prefetcher 字 // 段。
bc.prefetcher = newStatePrefetcher(chainConfig, bc, engine)
// 使用 NewStateProcessor 函数创建一个状态处理器,并赋值给 BlockChain 对象的 processor 字段。
bc.processor = NewStateProcessor(chainConfig, bc, engine)
var err error
bc.hc, err = NewHeaderChain(db, chainConfig, engine, bc.insertStopped)
if err != nil {
return nil, err
}
// 获得创世区块
bc.genesisBlock = bc.GetBlockByNumber(0)
if bc.genesisBlock == nil {
return nil, ErrNoGenesis
}
// 表示当前链上没有任何区块
bc.currentBlock.Store(nil)
bc.currentSnapBlock.Store(nil)
bc.currentFinalBlock.Store(nil)
bc.currentSafeBlock.Store(nil)
// 将当前区块链的 ID 更新到度量指标中,以便监控该区块链的状态
chainInfoGauge.Update(metrics.GaugeInfoValue{"chain_id": bc.chainConfig.ChainID.String()})
// 如果区块链是否是空,为空则从快照加载数据来初始化
if bc.empty() {
rawdb.InitDatabaseFromFreezer(bc.db)
}
// 从磁盘导入区块链状态
if err := bc.loadLastState(); err != nil {
return nil, err
}
// 获取区块链当前区块
head := bc.CurrentBlock()
if !bc.HasState(head.Root) {
// 链头状态不可用,可能是状态丢失或者本地没有同步
if head.Number.Uint64() == 0 {
// 头部区块丢失状态,需要等待同步
log.Info("Genesis state is missing, wait state sync")
} else {
var diskRoot common.Hash
// 如果启用了快照,则从快照提取快照中的根哈希值
if bc.cacheConfig.SnapshotLimit > 0 {
diskRoot = rawdb.ReadSnapshotRoot(bc.db)
}
if diskRoot != (common.Hash{}) {
// 如果哈希值不存在,尝试从快照中恢复哈希值
log.Warn("Head state missing, repairing", "number", head.Number, "hash", head.Hash(), "snaproot", diskRoot)
snapDisk, err := bc.setHeadBeyondRoot(head.Number.Uint64(), 0, diskRoot, true)
if err != nil {
return nil, err
}
// 成功恢复了状态,则将恢复的快照编号写入数据库,以便表示恢复过程
if snapDisk != 0 {
rawdb.WriteSnapshotRecoveryNumber(bc.db, snapDisk)
}
} else {
// 快照根哈希值不存在,快照未启用或不存在,调用setHeadBeyondRoot函数设置链头
// 传入一个空的哈希值,尝试从头开始恢复状态
log.Warn("Head state missing, repairing", "number", head.Number, "hash", head.Hash())
if _, err := bc.setHeadBeyondRoot(head.Number.Uint64(), 0, common.Hash{}, true); err != nil {
return nil, err
}
}
}
}
// 是否有祖先区块,声明是否需要回滚区块以及需要回滚区块的编号
if frozen, err := bc.db.Ancients(); err == nil && frozen > 0 {
var (
needRewind bool
low uint64
)
// 获取最新区块
fullBlock := bc.CurrentBlock()
// 当前区块不为空,不是创世区块且区块变化小于祖先区块-1,需要回滚
if fullBlock != nil && fullBlock.Hash() != bc.genesisBlock.Hash() && fullBlock.Number.Uint64() < frozen-1 {
needRewind = true
low = fullBlock.Number.Uint64()
}
// 获取当前区链的快照区块
snapBlock := bc.CurrentSnapBlock()
if snapBlock != nil && snapBlock.Number.Uint64() < frozen-1 {
needRewind = true
if snapBlock.Number.Uint64() < low || low == 0 {
low = snapBlock.Number.Uint64()
}
}
// 回滚操作,需要指定回滚的起始区块编号和终止区块编号,设置新的区块链头部,回滚到low区块
if needRewind {
log.Error("Truncating ancient chain", "from", bc.CurrentHeader().Number.Uint64(), "to", low)
if err := bc.SetHead(low); err != nil {
return nil, err
}
}
}
// 区块链引擎确定当前区块是否合法
bc.engine.VerifyHeader(bc, bc.CurrentHeader())
// 遍历有问题的哈希集合,如果区块头的哈希值和里面的某个值相等,表示该区块哈希值有问题需要回滚
for hash := range BadHashes {
if header := bc.GetHeaderByHash(hash); header != nil {
headerByNumber := bc.GetHeaderByNumber(header.Number.Uint64())
if headerByNumber != nil && headerByNumber.Hash() == header.Hash() {
log.Error("Found bad hash, rewinding chain", "number", header.Number, "hash", header.ParentHash)
if err := bc.SetHead(header.Number.Uint64() - 1); err != nil {
return nil, err
}
log.Error("Chain rewind was successful, resuming normal operation")
}
}
}
// 记录区块链初始化事件
if bc.logger != nil && bc.logger.OnBlockchainInit != nil {
bc.logger.OnBlockchainInit(chainConfig)
}
// 检查日志器是否有OnGenesisBlock函数,如果是创世区块则试图访问区块信息,然后执行回调函数
if bc.logger != nil && bc.logger.OnGenesisBlock != nil {
if block := bc.CurrentBlock(); block.Number.Uint64() == 0 {
alloc, err := getGenesisState(bc.db, block.Hash())
if err != nil {
return nil, fmt.Errorf("failed to get genesis state: %w", err)
}
if alloc == nil {
return nil, fmt.Errorf("live blockchain tracer requires genesis alloc to be set")
}
bc.logger.OnGenesisBlock(bc.genesisBlock, alloc)
}
}
// 启用了快照
if bc.cacheConfig.SnapshotLimit > 0 {
// 是否启用了快照恢复
var recover bool
head := bc.CurrentBlock()
if layer := rawdb.ReadSnapshotRecoveryNumber(bc.db); layer != nil && *layer >= head.Number.Uint64() {
log.Warn("Enabling snapshot recovery", "chainhead", head.Number, "diskbase", *layer)
recover = true
}
snapconfig := snapshot.Config{
CacheSize: bc.cacheConfig.SnapshotLimit,
Recovery: recover,
NoBuild: bc.cacheConfig.SnapshotNoBuild,
AsyncBuild: !bc.cacheConfig.SnapshotWait,
}
// 创建快照对象
bc.snaps, _ = snapshot.New(snapconfig, bc.db, bc.triedb, head.Root)
}
// 检查是否存在配置兼容性问题并进行相应处理
if compat, ok := genesisErr.(*params.ConfigCompatError); ok {
log.Warn("Rewinding chain to upgrade configuration", "err", compat)
if compat.RewindToTime > 0 {
// 将链回滚到指定时间
bc.SetHeadWithTimestamp(compat.RewindToTime)
} else {
// 将链回滚到指定区块id
bc.SetHead(compat.RewindToBlock)
}
// 将更新后的链配置写入数据库
rawdb.WriteChainConfig(db, genesisHash, chainConfig)
}
// 检查是否传入了交易索引限制参数,并相应的配置区块链的交易索引器
if txLookupLimit != nil {
bc.txIndexer = newTxIndexer(*txLookupLimit, bc)
}
return bc, nil
}
下面是从数据库中加载最新的链状态,
// loadLastState 从数据库中加载最新的已知链状态。该方法假设链管理器的互斥锁已被持有。
func (bc *BlockChain) loadLastState() error {
// 恢复最新的已知头区块
head := rawdb.ReadHeadBlockHash(bc.db) // 从数据库中读取最新的区块哈希
if head == (common.Hash{}) { // 如果数据库中最新的区块哈希为空,表示数据库可能损坏或为空
// 数据库损坏或为空,重新初始化区块链
log.Warn("数据库为空,正在重置链") // 发出警告日志,数据库为空,重置区块链
return bc.Reset() // 调用重置方法进行初始化
}
// 确保完整的头区块可用
headBlock := bc.GetBlockByHash(head) // 获取最新的区块
if headBlock == nil { // 如果获取的区块为空,表示数据库可能损坏或为空
// 数据库损坏或为空,重新初始化区块链
log.Warn("头区块丢失,正在重置链", "哈希", head) // 发出警告日志,头区块丢失,重置区块链
return bc.Reset() // 调用重置方法进行初始化
}
// 一切正常,将其设置为头区块
bc.currentBlock.Store(headBlock.Header()) // 存储头区块
headBlockGauge.Update(int64(headBlock.NumberU64())) // 更新头区块计数器
// 恢复最新已知的头标头
headHeader := headBlock.Header() // 获取头标头
if head := rawdb.ReadHeadHeaderHash(bc.db); head != (common.Hash{}) { // 如果数据库中存在头标头哈希
if header := bc.GetHeaderByHash(head); header != nil { // 获取头标头
headHeader = header // 设置头标头
}
}
bc.hc.SetCurrentHeader(headHeader) // 设置当前标头
// 恢复最新已知的头快照块
bc.currentSnapBlock.Store(headBlock.Header()) // 存储快照区块头
headFastBlockGauge.Update(int64(headBlock.NumberU64())) // 更新快照区块计数器
if head := rawdb.ReadHeadFastBlockHash(bc.db); head != (common.Hash{}) { // 如果数据库中存在快照区块哈希
if block := bc.GetBlockByHash(head); block != nil { // 获取快照区块
bc.currentSnapBlock.Store(block.Header()) // 存储快照区块头
headFastBlockGauge.Update(int64(block.NumberU64())) // 更新快照区块计数器
}
}
// 恢复最新已知的最终化块和安全块
// 注意:安全块未存储在磁盘上,并在启动时设置为最后已知的最终化块
if head := rawdb.ReadFinalizedBlockHash(bc.db); head != (common.Hash{}) { // 如果数据库中存在最终化块哈希
if block := bc.GetBlockByHash(head); block != nil { // 获取最终化块
bc.currentFinalBlock.Store(block.Header()) // 存储最终化块头
headFinalizedBlockGauge.Update(int64(block.NumberU64())) // 更新最终化块计数器
bc.currentSafeBlock.Store(block.Header()) // 存储安全块头
headSafeBlockGauge.Update(int64(block.NumberU64())) // 更新安全块计数器
}
}
// 向用户发出状态日志
var (
currentSnapBlock = bc.CurrentSnapBlock() // 获取当前快照区块
currentFinalBlock = bc.CurrentFinalBlock() // 获取当前最终化块
headerTd = bc.GetTd(headHeader.Hash(), headHeader.Number.Uint64()) // 获取头标头的总难度
blockTd = bc.GetTd(headBlock.Hash(), headBlock.NumberU64()) // 获取头区块的总难度
)
if headHeader.Hash() != headBlock.Hash() { // 如果头标头哈希与头区块哈希不相等
log.Info("加载最新的本地标头", "高度", headHeader.Number, "哈希", headHeader.Hash(), "总难度", headerTd, "时间", common.PrettyAge(time.Unix(int64(headHeader.Time), 0))) // 输出日志,加载最新的本地标头
}
log.Info("加载最新的本地区块", "高度", headBlock.Number(), "哈希", headBlock.Hash(), "总难度", blockTd, "时间", common.PrettyAge(time.Unix(int64(headBlock.Time()), 0))) // 输出日志,加载最新的本地区块
if headBlock.Hash() != currentSnapBlock.Hash() { // 如果头区块哈希与当前快照区块哈希不相等
snapTd := bc.GetTd(currentSnapBlock.Hash(), currentSnapBlock.Number.Uint64()) // 获取快照区块的总难度
log.Info("加载最新的本地快照区块", "高度", currentSnapBlock.Number, "哈希", currentSnapBlock.Hash(), "总难度", snapTd, "时间", common.PrettyAge(time.Unix(int64(currentSnapBlock.Time), 0))) // 输出日志,加载最新的本地快照区块
}
if currentFinalBlock != nil { // 如果存在当前最终化块
finalTd := bc.GetTd(currentFinalBlock.Hash(), currentFinalBlock.Number.Uint64()) // 获取最终化块的总难度
log.Info("加载最新的本地最终化块", "高度", currentFinalBlock.Number, "哈希", currentFinalBlock.Hash(), "总难度", finalTd, "时间", common.PrettyAge(time.Unix(int64(currentFinalBlock.Time), 0))) // 输出日志,加载最新的本地最终化块
}
if pivot := rawdb.ReadLastPivotNumber(bc.db); pivot != nil { // 如果存在最后的快照同步枢纽标记
log.Info("加载最后的快照同步枢纽标记", "高度", *pivot) // 输出日志,加载最后的快照同步枢纽标记
}
return nil // 返回空错误
}
下面这个函数的主要功能是设置链的头部,负责回滚区块链到请求的头部,并根据需要进行相关的清理工作,包括:
获取链的锁,确保只有一个线程正在进行链头设置的操作;
根据修复标志和数据库状态,决定是否执行头部的更新操作;
根据新的链头更新函数和删除函数对链更新;
清理缓存中的陈旧内容;
需要的情况下,清除已经失效的安全块和已经失效的最终化的块;
加载最新的链状态;
返回相关信息
func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Hash, repair bool) (uint64, error) {
// 如果无法获取锁,返回错误信息
if !bc.chainmu.TryLock() {
return 0, errChainStopped
}
defer bc.chainmu.Unlock()
var (
// 跟踪请求根哈希值的块编号
rootNumber uint64 // (如果没有根,则始终为0)
// 检索最后一个枢轴块以在其之前短路回滚,并检索当前冻结器限制以开始清理下溢的数据。
pivot = rawdb.ReadLastPivotNumber(bc.db)
)
updateFn := func(db ethdb.KeyValueWriter, header *types.Header) (*types.Header, bool) {
// 回滚区块链,确保我们不会得到一个无状态的链头块。注意,深度相等性被允许,以允许将 SetHead 用作链修复机制而不删除任何数据!
if currentBlock := bc.CurrentBlock(); currentBlock != nil && header.Number.Uint64() <= currentBlock.Number.Uint64() {
var newHeadBlock *types.Header
newHeadBlock, rootNumber = bc.rewindHead(header, root)
rawdb.WriteHeadBlockHash(db, newHeadBlock.Hash())
// 如果显式地恢复了链标记,则降级链标记。在理论上,我们应该在最后一步更新所有内存中的标记,然而 SetHead 的方向是从高到低,所以可以直接更新内存中的标记。
bc.currentBlock.Store(newHeadBlock)
headBlockGauge.Update(int64(newHeadBlock.Number.Uint64()))
// 头状态丢失,这只在基于路径的方案中可能发生。当链头被回滚到枢轴点以下时会出现这种情况。在这种情况下,除了重新运行快照同步器之外,没有可能的恢复方法。在此之前不执行任何操作。
if !bc.HasState(newHeadBlock.Root) {
if newHeadBlock.Number.Uint64() != 0 {
log.Crit("链在非创世块处处于无状态")
}
log.Info("链处于无状态,请等待状态同步", "number", newHeadBlock.Number, "hash", newHeadBlock.Hash())
}
}
// 简单地回滚快照块到目标头
if currentSnapBlock := bc.CurrentSnapBlock(); currentSnapBlock != nil && header.Number.Uint64() < currentSnapBlock.Number.Uint64() {
newHeadSnapBlock := bc.GetBlock(header.Hash(), header.Number.Uint64())
// 如果任一块达到 nil,则重置到创世状态
if newHeadSnapBlock == nil {
newHeadSnapBlock = bc.genesisBlock
}
rawdb.WriteHeadFastBlockHash(db, newHeadSnapBlock.Hash())
// 如果显式地恢复了链标记,则降级链标记。在理论上,我们应该在最后一步更新所有内存中的标记,然而 SetHead 的方向是从高到低,所以可以直接更新内存中的标记。
bc.currentSnapBlock.Store(newHeadSnapBlock.Header())
headFastBlockGauge.Update(int64(newHeadSnapBlock.NumberU64()))
}
var (
headHeader = bc.CurrentBlock()
headNumber = headHeader.Number.Uint64()
)
// 如果 SetHead 下溢冻结器阈值,并且之后的区块处理意图是完整的区块导入,则删除状态块之间的链段和设置头目标之间的链段。
var wipe bool
frozen, _ := bc.db.Ancients()
if headNumber+1 < frozen {
wipe = pivot == nil || headNumber >= *pivot
}
return headHeader, wipe // 如果是完整同步,只有在强制清理时才清除
}
// 回滚头链,删除直到所有区块体的块
delFn := func(db ethdb.KeyValueWriter, hash common.Hash, num uint64) {
// 忽略此处的错误,因为轻客户端不会走到这条路径上
frozen, _ := bc.db.Ancients()
if num+1 <= frozen {
// 从古代存储中截断所有相关数据(头部、总难度、主体、收据和规范哈希)。
if _, err := bc.db.TruncateHead(num); err != nil {
log.Crit("截断古代数据失败", "number", num, "err", err)
}
// 从活动存储中删除哈希 <-> 编号映射。
rawdb.DeleteHeaderNumber(db, hash)
} else {
// 从活动存储中删除相关体和收据。
// 头、总难度和规范哈希将在 hc.SetHead 函数中删除。
rawdb.DeleteBody(db, hash, num)
rawdb.DeleteReceipts(db, hash, num)
}
// 待办事项(rjl493456442) 事务查找、布隆位图等
}
// 如果 SetHead 仅作为链修复方法调用,则尝试跳过头链
// 如果修复冻结器已损坏,则强制进行清理。
if repair {
if target, force := updateFn(bc.db, bc.CurrentBlock()); force {
bc.hc.SetHead(target.Number.Uint64(), nil, delFn)
}
} else {
// 回滚链到请求的头部,然后向后继续,直到找到一个具有状态的块或通过快照同步枢轴。
if time > 0 {
log.Warn("回滚区块链到时间戳", "目标", time)
bc.hc.SetHeadWithTimestamp(time, updateFn, delFn)
} else {
log.Warn("回滚区块链到区块", "目标", head)
bc.hc.SetHead(head, updateFn, delFn)
}
}
// 清除缓存中的任何陈旧内容
bc.bodyCache.Purge()
bc.bodyRLPCache.Purge()
bc.receiptsCache.Purge()
bc.blockCache.Purge()
bc.txLookupCache.Purge()
// 如果需要,清除安全块、已最终化的块
if safe := bc.CurrentSafeBlock(); safe != nil && head < safe.Number.Uint64() {
log.Warn("SetHead 使安全块无效")
bc.SetSafe(nil)
}
if finalized := bc.CurrentFinalBlock(); finalized != nil && head < finalized.Number.Uint64() {
log.Error("SetHead 使已最终化的块无效")
bc.SetFinalized(nil)
}
return rootNumber, bc.loadLastState()
}
在区块链停止前,需要执行一系列的操作来确保数据的一致性和持久性,主要逻辑如下:
stopWithoutSaving 被调用,停止区块链服务但不保存数据。
如果存在快照管理器 snaps,则将当前区块的状态快照记录到磁盘日志中,并释放快照管理器。
如果区块链数据库的方案为路径方案(PathScheme),则确保将内存中的Trie节点记录到磁盘日志中。
否则,对于最近的一些区块(当前区块、当前区块的前一个区块、当前区块之前的一些区块),将其状态Trie写入磁盘以确保数据持久性。这是为了应对不同的重启场景,比如:
HEAD:为了避免在一般情况下不需要重新处理任何区块。
HEAD-1:为了避免如果HEAD成为叔块时出现大的重组。
HEAD-127:为了对重新执行的区块数量设置硬性限制。
如果存在快照基础 snapBase,则将其状态Trie写入磁盘。
对于缓存中的任何尚未写入磁盘的Trie节点,从缓存中删除。
检查数据库是否已经清空,如果没有,则记录错误。
关闭Trie数据库,释放所有资源。
打印区块链停止的信息。
func (bc *BlockChain) Stop() {
// 停止区块链服务
// 停止区块链服务但不保存状态快照
bc.stopWithoutSaving()
// 确保将完整的状态快照记录到磁盘中
var snapBase common.Hash
if bc.snaps != nil {
var err error
// 如果存在快照,则记录快照
if snapBase, err = bc.snaps.Journal(bc.CurrentBlock().Root); err != nil {
log.Error("记录状态快照失败", "err", err)
}
bc.snaps.Release() // 释放快照资源
}
// 如果 trie 数据库是基于路径的,则确保将内存中的 trie 节点正确记录到磁盘中
if bc.triedb.Scheme() == rawdb.PathScheme {
if err := bc.triedb.Journal(bc.CurrentBlock().Root); err != nil {
log.Info("记录内存中 trie 节点失败", "err", err)
}
} else {
// 否则,确保最近一个区块的状态也被存储到磁盘中
// 我们写入三种不同的状态以捕获不同的重启场景:
// - HEAD: 这样我们在一般情况下不需要重新处理任何区块
// - HEAD-1: 这样如果我们的 HEAD 成为叔块,我们不会做出大幅度的重组
// - HEAD-127: 这样我们对重新执行的区块数量有一个硬性限制
if !bc.cacheConfig.TrieDirtyDisabled {
triedb := bc.triedb
for _, offset := range []uint64{0, 1, TriesInMemory - 1} {
if number := bc.CurrentBlock().Number.Uint64(); number > offset {
recent := bc.GetBlockByNumber(number - offset)
log.Info("将缓存的状态写入磁盘", "block", recent.Number(), "hash", recent.Hash(), "root", recent.Root())
if err := triedb.Commit(recent.Root(), true); err != nil {
log.Error("提交最近状态 trie 失败", "err", err)
}
}
}
if snapBase != (common.Hash{}) {
log.Info("将快照状态写入磁盘", "root", snapBase)
if err := triedb.Commit(snapBase, true); err != nil {
log.Error("提交快照状态 trie 失败", "err", err)
}
}
// 清理 triegc 中的节点
for !bc.triegc.Empty() {
triedb.Dereference(bc.triegc.PopItem())
}
// 如果 trie 数据库中仍有节点,则输出错误信息
if _, nodes, _ := triedb.Size(); nodes != 0 { // 所有内存都包含在节点返回的 hashdb 中
log.Error("清理后仍存在悬空 trie 节点")
}
}
}
// 关闭 trie 数据库,并在最后一步释放所有资源
if err := bc.triedb.Close(); err != nil {
log.Error("关闭 trie 数据库失败", "err", err)
}
log.Info("区块链服务已停止")
}
下面函数的作用是向区块链中插入收据链(Receipt Chain)。在以太坊区块链中,每个区块都包含一个交易列表,每个交易执行完成后都会生成一个收据(Receipt),收据中包含了交易执行的结果信息,比如合约调用的执行结果、日志等。而收据链则是指一系列区块中的收据的集合。
具体来说,这个函数的主要作用包括:
将收据链插入到区块链中:
该函数接受三个参数:blockChain(区块链中的区块列表)、receiptChain(收据链,与区块链中的区块一一对应)、ancientLimit(古老区块的限制值)。
首先,函数会对提供的区块链进行一些合理性检查,确保区块的顺序和连续性。
然后,根据古老区块的限制值,将区块链分为古老区块和活跃区块。
接着,函数会将古老区块的数据写入古老存储(Ancient Store),活跃区块的数据写入活跃存储(Active Store)。
在写入数据之前,还会进行一些额外的检查和处理,比如验证区块的连续性、更新头部快照等。
最后,函数会输出一些统计信息,比如处理的区块数量、执行时间等。
在插入过程中,还会进行一些额外的处理:
在写入古老存储之前,会检查古老存储中是否已包含 Genesis 区块,如果没有则将 Genesis 区块写入古老存储。
在写入活跃存储之前,会检查活跃存储中是否已存在相同的区块,如果存在则会忽略该区块。
func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain []types.Receipts, ancientLimit uint64) (int, error) {
// 这里我们不需要 chainMu,因为我们希望最大化头部插入和收据插入的并发性。
bc.wg.Add(1) // 增加等待组计数器
defer bc.wg.Done() // 在函数退出时减少等待组计数器
var (
ancientBlocks, liveBlocks types.Blocks // 用于存放古老区块和活跃区块
ancientReceipts, liveReceipts []types.Receipts // 用于存放古老区块和活跃区块对应的收据
)
// 执行一些合理性检查,确保提供的链是有序且连续链接的
for i, block := range blockChain {
if i != 0 {
prev := blockChain[i-1]
// 检查区块号和父哈希是否连续
if block.NumberU64() != prev.NumberU64()+1 || block.ParentHash() != prev.Hash() {
log.Error("非连续的收据插入",
"number", block.Number(), "hash", block.Hash(), "parent", block.ParentHash(),
"prevnumber", prev.Number(), "prevhash", prev.Hash())
return 0, fmt.Errorf("非连续的插入:项目 %d 是 #%d [%x..],项目 %d 是 #%d [%x..](父级 [%x..])",
i-1, prev.NumberU64(), prev.Hash().Bytes()[:4],
i, block.NumberU64(), block.Hash().Bytes()[:4], block.ParentHash().Bytes()[:4])
}
}
// 将区块分为古老区块和活跃区块
if block.NumberU64() <= ancientLimit {
ancientBlocks, ancientReceipts = append(ancientBlocks, block), append(ancientReceipts, receiptChain[i])
} else {
liveBlocks, liveReceipts = append(liveBlocks, block), append(liveReceipts, receiptChain[i])
}
// 在这里还验证区块中的 blob 交易是否包含附属信息。
// 虽然附属信息不影响区块哈希 / 交易哈希,但在一个区块内发送 blob 是不允许的。
for txIndex, tx := range block.Transactions() {
if tx.Type() == types.BlobTxType && tx.BlobTxSidecar() != nil {
return 0, fmt.Errorf("区块 #%d 中的交易索引 %d 包含意外的 blob 附属信息", block.NumberU64(), txIndex)
}
}
}
var (
stats = struct{ processed, ignored int32 }{} // 统计处理的区块数和被忽略的区块数
start = time.Now() // 记录插入开始时间
size = int64(0) // 记录数据大小
)
// 更新头部快照同步区块,如果插入的区块更好,并返回插入的区块是否是主链的指示器。
updateHead := func(head *types.Block) bool {
if !bc.chainmu.TryLock() {
return false
}
defer bc.chainmu.Unlock()
// 如果当前头部快照的区块号大于等于插入的区块号,则跳过
if bc.CurrentHeader().Number.Cmp(head.Number()) >= 0 {
// 如果发生重组,则跳过
reorg, err := bc.forker.ReorgNeeded(bc.CurrentSnapBlock(), head.Header())
if err != nil {
log.Warn("重组失败", "err", err)
return false
} else if !reorg {
return false
}
rawdb.WriteHeadFastBlockHash(bc.db, head.Hash())
bc.currentSnapBlock.Store(head.Header())
headFastBlockGauge.Update(int64(head.NumberU64()))
return true
}
return false
}
// 将区块链和对应的收据链写入古老存储
writeAncient := func(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) {
first := blockChain[0] // 古老区块链的第一个区块
last := blockChain[len(blockChain)-1] // 古老区块链的最后一个区块
// 确保 genesis 区块在古老存储中
if first.NumberU64() == 1 {
if frozen, _ := bc.db.Ancients(); frozen == 0 {
td := bc.genesisBlock.Difficulty()
writeSize, err := rawdb.WriteAncientBlocks(bc.db, []*types.Block{bc.genesisBlock}, []types.Receipts{nil}, td)
if err != nil {
log.Error("将 genesis 写入古老存储失败", "err", err)
return 0, err
}
size += writeSize
log.Info("已将 genesis 写入古老存储")
}
}
// 在将区块写入古老存储之前,我们需要确保它们与头部链的预期一致。
// 我们只检查最后一个区块/头部,因为它是一个连续的链。
if !bc.HasHeader(last.Hash(), last.NumberU64()) {
return 0, fmt.Errorf("包含头部 #%d [%x..] 未知", last.Number(), last.Hash().Bytes()[:4])
}
// 将所有链数据写入古老存储
td := bc.GetTd(first.Hash(), first.NumberU64()) // 获取古老区块链的总难度
writeSize, err := rawdb.WriteAncientBlocks(bc.db, blockChain, receiptChain, td)
if err != nil {
log.Error("将链数据导入古老存储失败", "err", err)
return 0, err
}
size += writeSize
// 显式同步古老存储,以确保所有数据已刷新到磁盘。
if err := bc.db.Sync(); err != nil {
return 0, err
}
// 更新当前快照区块,因为所有区块数据现在都存在于 DB 中。
previousSnapBlock := bc.CurrentSnapBlock().Number.Uint64()
if !updateHead(blockChain[len(blockChain)-1]) {
// 如果头部链发生了重组,并且区块/收据与主链不匹配,则进入此处。
if _, err := bc.db.TruncateHead(previousSnapBlock + 1); err != nil {
log.Error("无法在插入失败后截断古老存储", "err", err)
}
return 0, errSideChainReceipts
}
// 从主数据库中删除区块数据
var (
batch = bc.db.NewBatch()
canonHashes = make(map[common.Hash]struct{}) // 存放主链区块哈希
)
for _, block := range blockChain {
canonHashes[block.Hash()] = struct{}{}
if block.NumberU64() == 0 {
continue
}
rawdb.DeleteCanonicalHash(batch, block.NumberU64())
rawdb.DeleteBlockWithoutNumber(batch, block.Hash(), block.NumberU64())
}
// 删除侧链哈希到编号的映射
for _, nh := range rawdb.ReadAllHashesInRange(bc.db, first.NumberU64(), last.NumberU64()) {
if _, canon := canonHashes[nh.Hash]; !canon {
rawdb.DeleteHeader(batch, nh.Hash, nh.Number)
}
}
if err := batch.Write(); err != nil {
return 0, err
}
stats.processed += int32(len(blockChain))
return 0, nil
}
// 将下载的链数据和对应的收据链数据写入古老存储和活跃存储
if len(ancientBlocks) > 0 {
if n, err := writeAncient(ancientBlocks, ancientReceipts); err != nil {
if err == errInsertionInterrupted {
return 0, nil
}
return n, err
}
}
if len(liveBlocks) > 0 {
if n, err := writeLive(liveBlocks, liveReceipts); err != nil {
if err == errInsertionInterrupted {
return 0, nil
}
return n, err
}
}
var (
head = blockChain[len(blockChain)-1]
context = []interface{}{
"count", stats.processed, "elapsed", common.PrettyDuration(time.Since(start)),
"number", head.Number(), "hash", head.Hash(), "age", common.PrettyAge(time.Unix(int64(head.Time()), 0)),
"size", common.StorageSize(size),
}
)
if stats.ignored > 0 {
context = append(context, []interface{}{"ignored", stats.ignored}...)
}
log.Debug("导入新区块收据", context...)
return 0, nil
}
下面函数是将带有状态的区块写入区块链中,并将相关状态信息存储到数据库中:
func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB) error {
// 计算区块的总难度
ptd := bc.GetTd(block.ParentHash(), block.NumberU64()-1)
if ptd == nil {
return consensus.ErrUnknownAncestor // 如果父区块的总难度为空,则返回错误
}
// 确保在插入期间不会泄露不一致的状态
externTd := new(big.Int).Add(block.Difficulty(), ptd)
// 不考虑规范状态,将区块本身写入数据库。
// 注意:所有区块组件(总难度、哈希->编号映射、头部、主体、收据)都应原子化地写入。
// BlockBatch 用于包含所有组件。
blockBatch := bc.db.NewBatch()
rawdb.WriteTd(blockBatch, block.Hash(), block.NumberU64(), externTd) // 写入总难度
rawdb.WriteBlock(blockBatch, block) // 写入区块
rawdb.WriteReceipts(blockBatch, block.Hash(), block.NumberU64(), receipts) // 写入收据
rawdb.WritePreimages(blockBatch, state.Preimages()) // 写入预映像
if err := blockBatch.Write(); err != nil {
log.Crit("Failed to write block into disk", "err", err) // 如果写入失败,记录关键错误
}
// 将所有缓存的状态更改提交到底层内存数据库
root, err := state.Commit(block.NumberU64(), bc.chainConfig.IsEIP158(block.Number())) // 提交状态更改
if err != nil {
return err // 如果提交失败,返回错误
}
// 如果节点运行在路径模式下,跳过显式的垃圾回收操作,因为在此模式下是不必要的
if bc.triedb.Scheme() == rawdb.PathScheme {
return nil
}
// 如果节点运行在存档模式下,始终执行刷新
if bc.cacheConfig.TrieDirtyDisabled {
return bc.triedb.Commit(root, false) // 将根哈希提交到数据库
}
// 如果节点不是运行在存档模式下,进行适当的垃圾回收操作
// 引用根哈希以保持状态树存活
bc.triedb.Reference(root, common.Hash{})
// 将根哈希和区块号压入垃圾回收器
bc.triegc.Push(root, -int64(block.NumberU64()))
// 不考虑前TriesInMemory块的刷新限制
current := block.NumberU64()
if current <= TriesInMemory {
return nil
}
// 如果超出了内存限制,将成熟的单例节点刷新到磁盘
var (
_, nodes, imgs = bc.triedb.Size() // 所有内存都包含在节点返回的大小中
limit = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024
)
if nodes > limit || imgs > 4*1024*1024 {
bc.triedb.Cap(limit - ethdb.IdealBatchSize) // 限制内存使用
}
// 查找下一个需要提交的状态树
chosen := current - TriesInMemory
flushInterval := time.Duration(bc.flushInterval.Load())
// 如果超过时间允许的范围,刷新整个状态树到磁盘
if bc.gcproc > flushInterval {
// 如果缺少头部(规范链后面的标头),则正在重新组织低难度侧链。暂停提交直到此操作完成。
header := bc.GetHeaderByNumber(chosen)
if header == nil {
log.Warn("Reorg in progress, trie commit postponed", "number", chosen)
} else {
// 如果超过限制但尚未达到足够大的内存差距,警告用户系统正在变得不稳定
if chosen < bc.lastWrite+TriesInMemory && bc.gcproc >= 2*flushInterval {
log.Info("State in memory for too long, committing", "time", bc.gcproc, "allowance", flushInterval, "optimum", float64(chosen-bc.lastWrite)/TriesInMemory)
}
// 刷新整个状态树并重置计数器
bc.triedb.Commit(header.Root, true)
bc.lastWrite = chosen
bc.gcproc = 0
}
}
// 回收低于所需写入保留量的任何内容
for !bc.triegc.Empty() {
root, number := bc.triegc.Pop()
if uint64(-number) > chosen {
bc.triegc.Push(root, number)
break
}
bc.triedb.Dereference(root)
}
return nil
}
下面是将一组区块插入到区块链中。它负责验证、处理和导入给定链的区块,并根据需要设置链的头部:
func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) {
// 如果链已经被终止,则不执行导入操作
if bc.insertStopped() {
return 0, nil
}
// 启动并行签名恢复(在分叉转换时签名者可能会失败,性能损失最小)
SenderCacher.RecoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number(), chain[0].Time()), chain)
var (
stats = insertStats{startTime: mclock.Now()} // 插入统计信息
lastCanon *types.Block // 最后一个确定的区块
)
// 在返回之前,发送一个链头事件,如果我们推进了链
defer func() {
if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() {
bc.chainHeadFeed.Send(ChainHeadEvent{lastCanon})
}
}()
// 启动并行头部验证
headers := make([]*types.Header, len(chain))
for i, block := range chain {
headers[i] = block.Header()
}
abort, results := bc.engine.VerifyHeaders(bc, headers)
defer close(abort)
// 获取第一个区块并检查错误,以决定导入逻辑的方向
it := newInsertIterator(chain, results, bc.validator)
block, err := it.next()
// 如果导入链不需要构建快照,则跳过已知区块
if bc.skipBlock(err, it) {
// 第一个区块(和状态)已知
// 1. 我们进行了回滚,并且现在应该重新导入
// 2. 区块被存储为侧链,并且它在其状态根上说谎,并且通过了状态根
// 来自尚未验证的规范链,这是不一致的。
// 跳过我们后面已知的区块。
var (
reorg bool
current = bc.CurrentBlock()
)
for block != nil && bc.skipBlock(err, it) {
reorg, err = bc.forker.ReorgNeeded(current, block.Header())
if err != nil {
return it.index, err
}
if reorg {
// 如果分叉器表示重新组织是必要的,则切换到导入模式
// 并且区块不在规范链上。
// 在 eth2 中,分叉器总是返回 true 以进行重新组织决策
// (盲目信任外部共识引擎),但为了避免导入已知块时不必要的重新组织,
// 这里处理特殊情况。
if block.NumberU64() > current.Number.Uint64() || bc.GetCanonicalHash(block.NumberU64()) != block.Hash() {
break
}
}
log.Debug("忽略已知区块", "number", block.Number(), "hash", block.Hash())
stats.ignored++
block, err = it.next()
}
// 跳转到区块导入
}
switch {
case errors.Is(err, consensus.ErrPrunedAncestor):
// 第一个块已被修剪
if setHead {
// 第一个块已修剪,将其插入为侧链,只有在 TD 增长足够时才重新组织
log.Debug("修剪的祖先,插入为侧链", "number", block.Number(), "hash", block.Hash())
return bc.insertSideChain(block, it)
} else {
// 我们已经合并并且父块已被修剪,尝试恢复父状态
log.Debug("修剪的祖先", "number", block.Number(), "hash", block.Hash())
_, err := bc.recoverAncestors(block)
return it.index, err
}
case err != nil && !errors.Is(err, ErrKnownBlock):
// 发生了其他错误(除了 ErrKnownBlock),中止
stats.ignored += len(it.chain)
bc.reportBlock(block, nil, err)
return it.index, err
}
// 没有验证错误的第一个块(或跳过了链前缀)
var activeState *state.StateDB
defer func() {
// 区块导入过程中,该函数启动和停止了预取器。
// 如果发生了坏块或其他错误,可能会导致早期返回不正确地终止后台线程。
// 此延迟确保我们清理悬挂的预取器,而不需要每次延迟一个都持有活动引用。
if activeState != nil {
activeState.StopPrefetcher()
}
}()
for ; block != nil && err == nil || errors.Is(err, ErrKnownBlock); block, err = it.next() {
// 如果链已终止,则停止处理区块
if bc.insertStopped() {
log.Debug("区块处理过程中中止")
break
}
// 如果头部是被禁止的,则直接中止
if BadHashes[block.Hash()] {
bc.reportBlock(block, nil, ErrBannedHash)
return it.index, ErrBannedHash
}
// 如果区块是已知的(在链中间),这是 Clique 区块的特殊情况,
// 它们可以共享彼此之间的状态,因此导入旧区块可能完成后续区块的状态。
// 在这种情况下,只需跳过区块(我们已经完全验证过了它们一次,因为它们的头部和主体已经在数据库中)。
// 但是,如果缺少对应的快照层,可能需要强制重新执行以构建它。
if bc.skipBlock(err, it) {
logger := log.Debug
if bc.chainConfig.Clique == nil {
logger = log.Warn
}
logger("插入已知区块", "number", block.Number(), "hash", block.Hash(),
"uncles", len(block.Uncles()), "txs", len(block.Transactions()), "gas", block.GasUsed(),
"root", block.Root())
// 特殊情况。如果我们在链中间遇到已知块,那么就提交空的 receipt 切片。
// 它只能在 Clique 链中发生。每当我们通过 `insertSideChain` 插入块时,
// 我们只提交 `td`、`header` 和 `body`(如果不存在的话)。由于我们没有
// 事务而没有收据,所以没有什么可提交的。但是如果侧链最终会被采纳为规范链,
// 那么它需要重新执行以获得丢失的状态,但如果是这种特殊情况(跳过重新执行),
// 我们将丢失空收据条目。
if len(block.Transactions()) == 0 {
rawdb.WriteReceipts(bc.db, block.Hash(), block.NumberU64(), nil)
} else {
log.Error("请提交问题,跳过已知块执行而没有收据",
"hash", block.Hash(), "number", block.NumberU64())
}
if err := bc.writeKnownBlock(block); err != nil {
return it.index, err
}
stats.processed++
if bc.logger != nil && bc.logger.OnSkippedBlock != nil {
bc.logger.OnSkippedBlock(tracing.BlockEvent{
Block: block,
TD: bc.GetTd(block.ParentHash(), block.NumberU64()-1),
Finalized: bc.CurrentFinalBlock(),
Safe: bc.CurrentSafeBlock(),
})
}
// 在这里我们可以假定日志为空,因为连续的 Clique 区块共享状态的唯一方式是没有交易。
lastCanon = block
continue
}
// 获取父区块及其状态以在其上执行
start := time.Now()
parent := it.previous()
if parent == nil {
parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1)
}
statedb, err := state.New(parent.Root, bc.stateCache, bc.snaps)
if err != nil {
return it.index, err
}
statedb.SetLogger(bc.logger)
// 启用预取以在处理事务时拉取 Trie 节点路径
statedb.StartPrefetcher("chain")
activeState = statedb
// 如果有后续区块,则根据当前状态运行以预缓存事务和概率性地一些账户/存储 Trie 节点。
var followupInterrupt atomic.Bool
if !bc.cacheConfig.TrieCleanNoPrefetch {
if followup, err := it.peek(); followup != nil && err == nil {
throwaway, _ := state.New(parent.Root, bc.stateCache, bc.snaps)
go func(start time.Time, followup *types.Block, throwaway *state.StateDB) {
// 对预取执行禁用跟踪。
vmCfg := bc.vmConfig
vmCfg.Tracer = nil
bc.prefetcher.Prefetch(followup, throwaway, vmCfg, &followupInterrupt)
blockPrefetchExecuteTimer.Update(time.Since(start))
if followupInterrupt.Load() {
blockPrefetchInterruptMeter.Mark(1)
}
}(time.Now(), followup, throwaway)
}
}
// 区块导入的跟踪部分
res, err := bc.processBlock(block, statedb, start, setHead)
followupInterrupt.Store(true)
if err != nil {
return it.index, err
}
// 在返回各种结果之前报告导入统计信息
stats.processed++
stats.usedGas += res.usedGas
var snapDiffItems, snapBufItems common.StorageSize
if bc.snaps != nil {
snapDiffItems, snapBufItems = bc.snaps.Size()
}
trieDiffNodes, trieBufNodes, _ := bc.triedb.Size()
stats.report(chain, it.index, snapDiffItems, snapBufItems, trieDiffNodes, trieBufNodes, setHead)
if !setHead {
// 在合并后,我们预期很少的侧链。只计数
// CL 提供给我们的所有块,以用于 GC 处理时间
bc.gcproc += res.procTime
return it.index, nil // 直接插入单个块
}
switch res.status {
case CanonStatTy:
log.Debug("插入新块", "number", block.Number(), "hash", block.Hash(),
"uncles", len(block.Uncles()), "txs", len(block.Transactions()), "gas", block.GasUsed(),
"elapsed", common.PrettyDuration(time.Since(start)),
"root", block.Root())
lastCanon = block
// 仅对规范块计数以进行 GC 处理时间
bc.gcproc += res.procTime
case SideStatTy:
log.Debug("插入分叉块", "number", block.Number(), "hash", block.Hash(),
"diff", block.Difficulty(), "elapsed", common.PrettyDuration(time.Since(start)),
"txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles()),
"root", block.Root())
default:
// 理论上不可能,但是为了我们将来的自己,还是留下一条日志,
// 而不是尝试追踪不发出日志的块导入。
log.Warn("插入具有未知状态的块", "number", block.Number(), "hash", block.Hash(),
"diff", block.Difficulty(), "elapsed", common.PrettyDuration(time.Since(start)),
"txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles()),
"root", block.Root())
}
}
stats.ignored += it.remaining()
return it.index, err
}
结构体 blockProcessingResult 用于表示处理区块的结果,它包含了以下字段:
usedGas uint64:表示处理该区块时消耗的燃气数量(Gas)。在以太坊区块链中,Gas 是执行智能合约和交易时所需支付的手续费。这个字段记录了执行区块中的交易所消耗的 Gas 数量。
procTime time.Duration:表示处理该区块所花费的时间。这个字段记录了处理区块的时间长度,以 time.Duration 类型表示。
status WriteStatus:表示处理该区块的结果状态。WriteStatus 是一个枚举类型,可能的取值有 CanonStatTy(表示这个区块是规范的,是区块链上的主要分支)、SideStatTy(表示这个区块是分叉的,不是区块链的主要分支)、以及其他可能的状态。
得到区块后通过下面的函数来处理,包括下面的操作:
调用处理器(bc.processor)处理区块,执行区块中的交易,并返回区块的收据(receipts)、日志(logs)、消耗的 Gas 数量(usedGas)以及可能的错误(err)。
调用验证器(bc.validator)验证区块的状态,确保区块中的交易执行正确,并根据验证结果更新相应的指标。
将处理后的区块写入到区块链中,并设置新的区块头(如果 setHead 参数为 true)。
更新与区块处理相关的度量指标,如账户读取、存储读取、账户更新、存储更新、区块执行时间、区块验证时间等。
返回处理结果,包括使用的 Gas 数量、处理时间以及区块写入状态。
func (bc *BlockChain) processBlock(block *types.Block, statedb *state.StateDB, start time.Time, setHead bool) (_ *blockProcessingResult, blockEndErr error) {
// 如果日志记录器存在且存在块启动事件回调函数
if bc.logger != nil && bc.logger.OnBlockStart != nil {
// 获取区块的父区块的难度
td := bc.GetTd(block.ParentHash(), block.NumberU64()-1)
// 触发块启动事件回调
bc.logger.OnBlockStart(tracing.BlockEvent{
Block: block, // 当前区块
TD: td, // 父区块的难度
Finalized: bc.CurrentFinalBlock(), // 当前已经最终化的区块
Safe: bc.CurrentSafeBlock(), // 当前已经安全的区块
})
}
// 如果日志记录器存在且存在块结束事件回调函数
if bc.logger != nil && bc.logger.OnBlockEnd != nil {
// 在函数返回时触发块结束事件回调
defer func() {
bc.logger.OnBlockEnd(blockEndErr)
}()
}
// 使用父状态为参考点处理区块
pstart := time.Now() // 记录处理开始时间
// 调用处理器处理区块,执行区块中的交易,获取收据、日志、使用的 Gas 数量和可能的错误
receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig)
if err != nil {
// 如果处理过程中发生错误,报告区块处理错误并返回错误
bc.reportBlock(block, receipts, err)
return nil, err
}
ptime := time.Since(pstart) // 记录处理时间
vstart := time.Now() // 记录验证开始时间
// 验证区块状态,确保区块中的交易执行正确
if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil {
// 如果验证过程中发生错误,报告区块验证错误并返回错误
bc.reportBlock(block, receipts, err)
return nil, err
}
vtime := time.Since(vstart) // 记录验证时间
proctime := time.Since(start) // 计算总的处理时间(处理时间 + 验证时间)
// 更新区块处理和验证期间涉及的度量指标
accountReadTimer.Update(statedb.AccountReads) // 账户读取已完成(在处理中)
storageReadTimer.Update(statedb.StorageReads) // 存储读取已完成(在处理中)
snapshotAccountReadTimer.Update(statedb.SnapshotAccountReads) // 账户读取已完成(在处理中)
snapshotStorageReadTimer.Update(statedb.SnapshotStorageReads) // 存储读取已完成(在处理中)
accountUpdateTimer.Update(statedb.AccountUpdates) // 账户更新已完成(在验证中)
storageUpdateTimer.Update(statedb.StorageUpdates) // 存储更新已完成(在验证中)
accountHashTimer.Update(statedb.AccountHashes) // 账户哈希已完成(在验证中)
storageHashTimer.Update(statedb.StorageHashes) // 存储哈希已完成(在验证中)
triehash := statedb.AccountHashes + statedb.StorageHashes // 用于哈希的 tries 的时间
trieUpdate := statedb.AccountUpdates + statedb.StorageUpdates // 用于更新的 tries 的时间
trieRead := statedb.SnapshotAccountReads + statedb.AccountReads // 用于账户读取的 tries 的时间
trieRead += statedb.SnapshotStorageReads + statedb.StorageReads // 用于存储读取的 tries 的时间
blockExecutionTimer.Update(ptime - trieRead) // 区块执行的时间(除去 trie 读取时间)
blockValidationTimer.Update(vtime - (triehash + trieUpdate)) // 区块验证的时间(除去哈希和更新时间)
// 将区块写入链中并获取状态
var (
wstart = time.Now() // 记录写入开始时间
status WriteStatus // 写入状态
)
if !setHead {
// 不设置头,仅插入区块
err = bc.writeBlockWithState(block, receipts, statedb)
} else {
// 设置头并插入区块
status, err = bc.writeBlockAndSetHead(block, receipts, logs, statedb, false)
}
if err != nil {
return nil, err
}
// 更新写入期间涉及的度量指标
accountCommitTimer.Update(statedb.AccountCommits) // 账户提交已完成,可以标记
storageCommitTimer.Update(statedb.StorageCommits) // 存储提交已完成,可以标记
snapshotCommitTimer.Update(statedb.SnapshotCommits) // 快照提交已完成,可以标记
triedbCommitTimer.Update(statedb.TrieDBCommits) // tries 数据库提交已完成,可以标记
// 更新区块写入时间指标
blockWriteTimer.Update(time.Since(wstart) - statedb.AccountCommits - statedb.StorageCommits - statedb.SnapshotCommits - statedb.TrieDBCommits)
// 更新区块插入时间指标
blockInsertTimer.UpdateSince(start)
// 返回区块处理结果,包括使用的 Gas 数量、处理时间和写入状态
return &blockProcessingResult{usedGas: usedGas, procTime: proctime, status: status}, nil
}
侧链区块是指与主链区块无关的区块,可能因为某些原因被裁减或分叉,以太坊需要对这些区块进行处理,
验证侧链区块的状态和难度,并根据情况更新外部总难度。
将侧链区块写入磁盘,同时记录区块相关信息,如交易数量、Gas消耗等。
如果外部总难度大于本地总难度,可能需要重新导入之前被裁剪的区块以重新生成所需的状态。
最后,该函数将导入侧链区块的信息记录下来,以便后续参考。
func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (int, error) {
var (
externTd *big.Int // 外部总难度,用于存储外部链上当前区块的总难度
lastBlock = block // 最后一个处理的区块,默认为当前区块
current = bc.CurrentBlock() // 当前主链上的最新区块
)
// 第一个侧链区块的错误已经被验证为 ErrPrunedAncestor。
// 由于我们在这里不导入它们,因此我们期望对于剩余的侧链区块会出现 ErrUnknownAncestor 错误。
// 任何其他错误意味着该区块无效,不应写入磁盘。
err := consensus.ErrPrunedAncestor
for ; block != nil && errors.Is(err, consensus.ErrPrunedAncestor); block, err = it.next() {
// 检查该编号的规范状态根
if number := block.NumberU64(); current.Number.Uint64() >= number {
canonical := bc.GetBlockByNumber(number)
if canonical != nil && canonical.Hash() == block.Hash() {
// 不是侧链区块,这是一个重新导入的正常区块,其状态已被裁剪
// 收集该区块的难度。由于我们知道它是一个正常区块,
// 我们可以直接获取它,而不是像下面的方式那样使用父区块,然后在其上添加该区块
externTd = bc.GetTd(block.Hash(), block.NumberU64())
continue
}
if canonical != nil && canonical.Root() == block.Root() {
// 这很可能是一个影子状态攻击。当一个分叉被导入到数据库中,并且最终达到一个未裁剪的区块高度时,
// 我们刚刚发现状态已经存在!这意味着侧链区块引用的状态已经存在于我们的主链中。
//
// 如果不检查,我们现在将继续导入区块,而没有实际验证以前区块的状态。
log.Warn("侧链影子状态攻击检测到", "编号", block.NumberU64(), "侧链根", block.Root(), "主链根", canonical.Root())
// 如果有人合法地侧矿块,它们仍然会像往常一样被导入。然而,
// 当我们明显的目标是裁剪机制时,我们不能冒险写入未经验证的区块到磁盘上。
return it.index, errors.New("侧链影子状态攻击")
}
}
if externTd == nil {
externTd = bc.GetTd(block.ParentHash(), block.NumberU64()-1)
}
externTd = new(big.Int).Add(externTd, block.Difficulty())
if !bc.HasBlock(block.Hash(), block.NumberU64()) {
start := time.Now()
if err := bc.writeBlockWithoutState(block, externTd); err != nil {
return it.index, err
}
log.Debug("注入侧链区块", "编号", block.Number(), "哈希", block.Hash(),
"难度", block.Difficulty(), "经过时间", common.PrettyDuration(time.Since(start)),
"交易数", len(block.Transactions()), "gas消耗", block.GasUsed(), "叔块数", len(block.Uncles()),
"根", block.Root())
}
lastBlock = block
}
// 此时,我们已将所有侧链区块写入数据库。循环要么在其他错误上结束,要么全部被处理。
// 如果出现其他错误,我们可以忽略剩余的区块。
//
// 如果 externTd 大于我们的本地总难度,则需要重新导入之前的区块以重新生成所需的状态
reorg, err := bc.forker.ReorgNeeded(current, lastBlock.Header())
if err != nil {
return it.index, err
}
if !reorg {
localTd := bc.GetTd(current.Hash(), current.Number.Uint64())
log.Info("侧链已写入磁盘", "开始", it.first().NumberU64(), "结束", it.previous().Number, "侧链总难度", externTd, "本地总难度", localTd)
return it.index, err
}
// 收集所有侧链哈希(完整区块可能会占用大量内存)
var (
hashes []common.Hash
numbers []uint64
)
parent := it.previous()
for parent != nil && !bc.HasState(parent.Root) {
if bc.stateRecoverable(parent.Root) {
if err := bc.triedb.Recover(parent.Root); err != nil {
return 0, err
}
break
}
hashes = append(hashes, parent.Hash())
numbers = append(numbers, parent.Number.Uint64())
parent = bc.GetHeader(parent.ParentHash, parent.Number.Uint64()-1)
}
if parent == nil {
return it.index, errors.New("缺少父区块")
}
// 导入所有被裁剪的区块以使状态可用
var (
blocks []*types.Block
memory uint64
)
for i := len(hashes) - 1; i >= 0; i-- {
// 将下一个区块追加到我们的批处理中
block := bc.GetBlock(hashes[i], numbers[i])
blocks = append(blocks, block)
memory += block.Size()
// 如果内存使用量增长太大,则导入并继续。遗憾的是,我们需要丢弃
// 由于内存使用过重,此处的所有事件和日志通知。
if len(blocks) >= 2048 || memory > 64*1024*1024 {
log.Info("导入重量级侧链片段", "区块数", len(blocks), "开始", blocks[0].NumberU64(), "结束", block.NumberU64())
if _, err := bc.insertChain(blocks, true); err != nil {
return 0, err
}
blocks, memory = blocks[:0], 0
// 如果链正在终止,停止处理区块
if bc.insertStopped() {
log.Debug("在处理区块时中止")
return 0, nil
}
}
}
if len(blocks) > 0 {
log.Info("导入侧链片段", "开始", blocks[0].NumberU64(), "结束", blocks[len(blocks)-1].NumberU64())
return bc.insertChain(blocks, true)
}
return 0, nil
}
当发生分叉时,需要重新组织区块链,需要执行的操作如下:
确定两条链的共同祖先块(commonBlock)。
根据共同祖先块,将两条链分为旧链(oldChain)和新链(newChain)。
将旧链上的所有块标记为已删除,并收集旧链上的所有交易。
将新链上的所有块标记为待添加,并收集新链上的所有交易。
发送链事件,通知有关旧链和新链的变化。
更新交易查找缓存,以清除过时的交易查找。
将新链(除了头块)按正确的增量顺序插入到链中。
删除无用的索引,包括非规范交易索引和超过头部的规范链索引。
发送事件以通知删除的日志和新增的日志。
func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error {
var (
newChain types.Blocks // 新链
oldChain types.Blocks // 旧链
commonBlock *types.Block // 共同祖先块
deletedTxs []common.Hash // 已删除的交易哈希列表
addedTxs []common.Hash // 待添加的交易哈希列表
)
// 获取旧链和新链的头块
oldBlock := bc.GetBlock(oldHead.Hash(), oldHead.Number.Uint64())
if oldBlock == nil {
return errors.New("current head block missing") // 当前头块缺失
}
newBlock := newHead
// 将较长的链缩短为与较短链相同数量的块
if oldBlock.NumberU64() > newBlock.NumberU64() {
// 旧链更长,收集所有交易和日志作为已删除项
for ; oldBlock != nil && oldBlock.NumberU64() != newBlock.NumberU64(); oldBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1) {
oldChain = append(oldChain, oldBlock)
for _, tx := range oldBlock.Transactions() {
deletedTxs = append(deletedTxs, tx.Hash())
}
}
} else {
// 新链更长,保存所有块以供后续插入
for ; newBlock != nil && newBlock.NumberU64() != oldBlock.NumberU64(); newBlock = bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1) {
newChain = append(newChain, newBlock)
}
}
// 确保旧链块不为空
if oldBlock == nil {
return errInvalidOldChain // 无效的旧链
}
if newBlock == nil {
return errInvalidNewChain // 无效的新链
}
// 两边的链达到相同数量后,继续缩短直到找到共同祖先块
for {
// 如果找到了共同祖先块,则退出循环
if oldBlock.Hash() == newBlock.Hash() {
commonBlock = oldBlock
break
}
// 将旧块移除,并将新块存储起来
oldChain = append(oldChain, oldBlock)
for _, tx := range oldBlock.Transactions() {
deletedTxs = append(deletedTxs, tx.Hash())
}
newChain = append(newChain, newBlock)
// 同时将两边的链向前移动一步
oldBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1)
if oldBlock == nil {
return errInvalidOldChain // 无效的旧链
}
newBlock = bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1)
if newBlock == nil {
return errInvalidNewChain // 无效的新链
}
}
// 确保用户看到大的链重组
if len(oldChain) > 0 && len(newChain) > 0 {
logFn := log.Info
msg := "Chain reorg detected" // 检测到链重组
if len(oldChain) > 63 {
msg = "Large chain reorg detected" // 检测到大型链重组
logFn = log.Warn
}
logFn(msg, "number", commonBlock.Number(), "hash", commonBlock.Hash(),
"drop", len(oldChain), "dropfrom", oldChain[0].Hash(), "add", len(newChain), "addfrom", newChain[0].Hash())
blockReorgAddMeter.Mark(int64(len(newChain)))
blockReorgDropMeter.Mark(int64(len(oldChain)))
blockReorgMeter.Mark(1)
} else if len(newChain) > 0 {
// 特殊情况发生在后续合并阶段,当前头块是新头块的祖先,但这两个块不是连续的
log.Info("Extend chain", "add", len(newChain), "number", newChain[0].Number(), "hash", newChain[0].Hash())
blockReorgAddMeter.Mark(int64(len(newChain)))
} else {
// len(newChain) == 0 && len(oldChain) > 0
// 将规范链倒回到较低的位置。
log.Error("Impossible reorg, please file an issue", "oldnum", oldBlock.Number(), "oldhash", oldBlock.Hash(), "oldblocks", len(oldChain), "newnum", newBlock.Number(), "newhash", newBlock.Hash(), "newblocks", len(newChain))
}
// 在写入新链数据之前,重置交易查找缓存,以清除过时的交易查找。
bc.txLookupCache.Purge()
// 将新链(除了头块)按适当的增量顺序插入链中。
for i := len(newChain) - 1; i >= 1; i-- {
// 按照规范的方式插入块,重写历史记录
bc.writeHeadBlock(newChain[i])
// 收集新添加的交易
for _, tx := range newChain[i].Transactions() {
addedTxs = append(addedTxs, tx.Hash())
}
}
// 删除无用的索引,包括非规范交易索引和超过头部的规范链索引。
var (
indexesBatch = bc.db.NewBatch()
diffs = types.HashDifference(deletedTxs, addedTxs)
)
for _, tx := range diffs {
rawdb.DeleteTxLookupEntry(indexesBatch, tx)
}
// 删除所有不属于新规范链的哈希标记。
number := commonBlock.NumberU64()
if len(newChain) > 1 {
number = newChain[1].NumberU64()
}
for i := number + 1; ; i++ {
hash := rawdb.ReadCanonicalHash(bc.db, i)
if hash == (common.Hash{}) {
break
}
rawdb.DeleteCanonicalHash(indexesBatch, i)
}
if err := indexesBatch.Write(); err != nil {
log.Crit("Failed to delete useless indexes", "err", err)
}
// 发送事件以通知删除的日志和新增的日志。
// 已删除的日志 + 块:
var deletedLogs []*types.Log
for i := len(oldChain) - 1; i >= 0; i-- {
// 也为从规范链中删除的块发送事件。
bc.chainSideFeed.Send(ChainSideEvent{Block: oldChain[i]})
// 收集已删除日志以进行通知
if logs := bc.collectLogs(oldChain[i], true); len(logs) > 0 {
deletedLogs = append(deletedLogs, logs...)
}
if len(deletedLogs) > 512 {
bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
deletedLogs = nil
}
}
if len(deletedLogs) > 0 {
bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
}
// 新日志:
var rebirthLogs []*types.Log
for i := len(newChain) - 1; i >= 1; i-- {
if logs := bc.collectLogs(newChain[i], false); len(logs) > 0 {
rebirthLogs = append(rebirthLogs, logs...)
}
if len(rebirthLogs) > 512 {
bc.logsFeed.Send(rebirthLogs)
rebirthLogs = nil
}
}
if len(rebirthLogs) > 0 {
bc.logsFeed.Send(rebirthLogs)
}
return nil
}
最后给出其他函数的功能说明:
| 函数签名 | 说明 |
|---|---|
| func (bc *BlockChain) empty() bool | 区块链是不是空的 |
| func (bc *BlockChain) SetHead(head uint64) error | 修改区块的id |
| func (bc *BlockChain) SetHeadWithTimestamp(timestamp uint64) error | 根据时间戳设置区块头 |
| func (bc *BlockChain) SetFinalized(header *types.Header) | 更新区块链的最终化区块,并将相关信息写入数据库 |
| func (bc *BlockChain) SetSafe(header *types.Header) | 设置安全区块(经过指定数目区块确认) |
| func (bc *BlockChain) rewindHashHead(head types.Header, root common.Hash) (types.Header, uint64) | 在区块链中找到一个新的链头,以便在区块链的状态发生变化或重新组织时进行更新(哈希方案) |
| func (bc *BlockChain) rewindPathHead(head types.Header, root common.Hash) (types.Header, uint64) | 在区块链中找到一个新的链头,以便在区块链的状态发生变化或重新组织时进行更新(路径方案) |
| func (bc *BlockChain) rewindHead(head types.Header, root common.Hash) (types.Header, uint64) | 根据方案(路径方案或哈希方案)调用相应的链头回溯函数 |
| func (bc *BlockChain) SnapSyncCommitHead(hash common.Hash) error | 在进行快照同步时提交链的头部 |
| func (bc *BlockChain) Reset() error | 函数的作用是清除整个区块链,将其恢复到创世状态 |
| func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error | 将活动链导出到给定的写入器 |
| func (bc *BlockChain) Export(w io.Writer) error | 将活动链导出到给定的写入器 |
| func (bc *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error | 将活动链的子集导出到给定的写入器,指定区间 |
| func (bc *BlockChain) writeHeadBlock(block *types.Block) | 更新链的索引和标记,将指定的区块设置为链的头部块 |
| func (bc *BlockChain) stopWithoutSaving() | 停止区块链服务,但不执行保存数据的后续工作 |
| func (bc *BlockChain) StopInsert() | 停止区块链的插入过程 |
| func (bc *BlockChain) insertStopped() bool | 检查插入过程是否已经停止 |
| func (bc *BlockChain) writeBlockWithoutState(block *types.Block, td *big.Int) (err error) | 将一个区块写入到数据库中,但不包含状态数据(即账户状态)的写入 |
| func (bc *BlockChain) writeKnownBlock(block *types.Block) error | 写入已知的区块到区块链中 |
| func (bc *BlockChain) writeBlockAndSetHead(block types.Block, receipts []types.Receipt, logs []*types.Log, state *state.StateDB, emitHeadEvent bool) (status WriteStatus, err error) | 将区块写入区块链数据库,并根据区块的规范性与否,更新区块链的头区块,在需要情况下发送事件通知 |
| func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) | 将一组区块导入到区块链 |
| func (bc *BlockChain) recoverAncestors(block *types.Block) (common.Hash, error) | 从给定的区块开始,向上追溯并恢复所有被裁剪或丢失的祖先区块的状态 |
| func (bc *BlockChain) collectLogs(b types.Block, removed bool) []types.Log | 收集指定区块中的日志信息,并返回一个包含所有日志的切片 |
| func (bc *BlockChain) InsertBlockWithoutSetHead(block *types.Block) error | 向区块链中插入新的区块,但不设置成头块 |
| func (bc *BlockChain) SetCanonical(head *types.Block) (common.Hash, error) | 将指定区块设置为区块链头块,并在需要时运行重新组织过程 |
| func (bc *BlockChain) skipBlock(err error, it *insertIterator) bool | 是否可以跳过处理导入的区块 |
| func (bc *BlockChain) reportBlock(block *types.Block, receipts types.Receipts, err error) | 记录和报告区块处理过程中遇到的错误 |
| func summarizeBadBlock(block types.Block, receipts []types.Receipt, config *params.ChainConfig, err error) string | 生成一个包含了坏区块相关信息的字符串 |
| func (bc *BlockChain) SetBlockValidatorAndProcessorForTesting(v Validator, p Processor) | 设置用于验证和处理区块的验证器(Validator)和处理器(Processor) |
| func (bc *BlockChain) SetTrieFlushInterval(interval time.Duration) | 配置内存中 trie 刷新到磁盘的间隔时间 |
| func (bc *BlockChain) GetTrieFlushInterval() time.Duration | 获取内存中 trie 刷新到磁盘的间隔时间 |