前几篇分别介绍了以太坊的基本概念,基本环节-交易,区块、区块链的存储方式等,这篇打算介绍一下“挖矿“得到新区块的整个过程,然后下一篇讲下不同共识算法的实现细节。
总览
在Ethereum 代码中,名为miner的包(package)负责向外提供一个“挖矿”得到的新区块,其主要结构体的UML关系图如下图所示:
处于入口的类是Miner,它作为公共类型,向外暴露mine功能;它有一个worker类型的成员变量,负责管理mine过程;worker内部有一组Agent接口类型对象,这个接口有两个实现:CpuAgent和RemoteAgent。这里使用的是CpuAgent,该Agent会完成一个块的出块工作,同级的多个Agent是竞争关系,最终通过共识算法完成出一个块的工作。Work结构体主要用以携带数据,被视为挖掘一个区块时所需的数据环境。
主要的数据传输发生在worker和它的Agent(们)之间:在合适的时候,worker把一个Work对象发送给每个Agent,然后任何一个Agent完成mine时,将一个经过授权确认的Block加上那个更新过的Work,组成一个Result对象发送回worker。
调用方worker内部声明了一个Agent数组,但目前只有一个实现类CpuAgent的对象会被加到该数组。CpuAgent通过全局的<>对象,借助共识算法完成最终的区块授权。
另外,unconfirmedBlocks 也挺特别,它会以unconfirmedBlock的形式存储最近一些本地挖掘出的区块。在一段时间之后,根据区块的Number和Hash,再确定这些区块是否已经被收纳进主干链(canonical chain)里,以输出Log的方式来告知用户。
对于一个新区块被挖掘出的过程,代码实现上基本分为两个环节:一是组装出一个新区块,这个区块的数据基本完整,包括成员Header的部分属性,以及交易列表txs,和叔区块组uncles[],并且所有交易已经执行完毕,所有收据(Receipt)也已收集完毕,这部分主要由worker完成;二是填补该区块剩余的成员属性,比如Header.Difficulty等,并完成授权,这些工作是由Agent调用接口实现体,利用共识算法来完成的。
新区块的组装流程
挖掘新区块的流程入口在Miner里,具体入口在Miner结构体的创建函数(避免称之为‘构造函数’)里。
Miner的函数
####New()
1 2 3 4 5 6 7 8 9 10 11 12 13
| func New(eth Backend, config *params.ChainConfig, mux *event.TypeMux, engine consensus.Engine) *Miner { miner := &Miner{ eth: eth, mux: mux, engine: engine, worker: newWorker(config, engine, common.Address{}, eth, mux), canStart: 1, } miner.Register(NewCpuAgent(eth.BlockChain(), engine)) go miner.update() return miner }
|
在New()里,针对新对象miner的各个成员变量初始化完成后,会紧跟着创建worker对象,然后将Agent对象登记给worker,最后用一个单独线程去运行miner.Update()函数;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase common.Address, eth Backend, mux *event.TypeMux) *worker { worker := &worker{ config: config, engine: engine, eth: eth, mux: mux, txCh: make(chan core.TxPreEvent, txChanSize), chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize), chainDb: eth.ChainDb(), recv: make(chan *Result, resultQueueSize), chain: eth.BlockChain(), proc: eth.BlockChain().Validator(), possibleUncles: make(map[common.Hash]*types.Block), coinbase: coinbase, agents: make(map[Agent]struct{}), unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth), } worker.txSub = eth.TxPool().SubscribeTxPreEvent(worker.txCh) worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh) worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh) go worker.update() go worker.wait() worker.commitNewWork() return worker }
|
worker的创建函数里也如法炮制,分别用单独线程去启动worker.updater()和wait();最后worker.CommitNewWork()会开始准备一个新区块所需的基本数据,如Header,Txs, Uncles等。注意此时Agent尚未启动。
####Miner.Update()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| func (self *Miner) update() { events := self.mux.Subscribe(downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{}) out: for ev := range events.Chan() { switch ev.Data.(type) { case downloader.StartEvent: atomic.StoreInt32(&self.canStart, 0) if self.Mining() { self.Stop() atomic.StoreInt32(&self.shouldStart, 1) log.Info("Mining aborted due to sync") } case downloader.DoneEvent, downloader.FailedEvent: shouldStart := atomic.LoadInt32(&self.shouldStart) == 1 atomic.StoreInt32(&self.canStart, 1) atomic.StoreInt32(&self.shouldStart, 0) if shouldStart { self.Start(self.coinbase) } events.Unsubscribe() break out } } }
|
####Miner.Start()
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| func (self *Miner) Start(coinbase common.Address) { atomic.StoreInt32(&self.shouldStart, 1) self.SetEtherbase(coinbase) if atomic.LoadInt32(&self.canStart) == 0 { log.Info("Network syncing, will start miner afterwards") return } atomic.StoreInt32(&self.mining, 1) log.Info("Starting mining operation") self.worker.start() self.worker.commitNewWork() }
|
####worker.start()
1 2 3 4 5 6 7 8 9 10 11
| func (self *worker) start() { self.mu.Lock() defer self.mu.Unlock() atomic.StoreInt32(&self.mining, 1) for agent := range self.agents { agent.Start() } }
|
这个update()会订阅(监听)几种事件,均跟Downloader相关。当收到Downloader的StartEvent时,意味者此时本节点正在从其他节点下载新区块,这时miner会立即停止进行中的挖掘工作,并继续监听;如果收到DoneEvent或FailEvent时,意味本节点的下载任务已结束-无论下载成功或失败-此时都可以开始挖掘新区块,并且此时会退出Downloader事件的监听。
从miner.Update()的逻辑可以看出,对于任何一个Ethereum网络中的节点来说,挖掘一个新区块和从其他节点下载、同步一个新区块,根本是相互冲突的。这样的规定,保证了在某个节点上,一个新区块只可能有一种来源,这可以大大降低可能出现的区块冲突,并避免全网中计算资源的浪费。
worker的函数
worker的属性非常多而具体了,都是挖矿具体操作相关的,其中包括链本身的属性以及区块数据结构的属性。首先来看ChainConfig:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| type ChainConfig struct { ChainId *big.Int `json:"chainId"` HomesteadBlock *big.Int `json:"homesteadBlock,omitempty"` DAOForkBlock *big.Int `json:"daoForkBlock,omitempty"` DAOForkSupport bool `json:"daoForkSupport,omitempty"` EIP150Block *big.Int `json:"eip150Block,omitempty"` EIP150Hash common.Hash `json:"eip150Hash,omitempty"` EIP155Block *big.Int `json:"eip155Block,omitempty"` EIP158Block *big.Int `json:"eip158Block,omitempty"` ByzantiumBlock *big.Int `json:"byzantiumBlock,omitempty"` Ethash *EthashConfig `json:"ethash,omitempty"` Clique *CliqueConfig `json:"clique,omitempty"` }
|
ChainConfig顾名思义就是链的配置属性。ChainConfig中包含了ChainID等属性,其中有很多都是针对以太坊历史发生的问题进行的专门配置。
- ChainId可以预防replay攻击。
- Homestead是以太坊发展蓝图中的一个阶段。第一阶段是以太坊区块链面世,代号为frontier,第二个阶段即为当前阶段,代号为Homestead(家园),第三阶段为Metropolis(大都会),大都会又细分为两个小阶段,第一个是Byzantium(拜占庭)硬分叉(引入新型零知识证明算法以及pos权益证明共识算法),第二个是Constantinople(君士坦丁堡)硬分叉(以太坊正式应用pow和pos混合链,解决拜占庭引发的问题)。最后一个阶段代号Serenity(宁静),最终版本的以太坊稳定运行。
- 2017年6月18日,以太坊上DAO(去中心自治组织)的一次大危机做出的相应调整。感兴趣的可以自行谷百。
- 2017年10月16日,以太坊的一次Byzantium拜占庭硬分叉。
- EIPs(Ethereum Improvement Proposals),是以太坊更新改善的一些方案,对应后面的数字就是以太坊github源码issue的编号。
再来看下 Backend对象,Backend是一个自定义接口封装了所有挖矿所需方法。
1 2 3 4 5 6
| type Backend interface { AccountManager() *accounts.Manager BlockChain() *core.BlockChain TxPool() *core.TxPool ChainDb() ethdb.Database }
|
这里我们主要关注worker.updater()和wait()
####worker.update()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| func (self *worker) update() { defer self.txSub.Unsubscribe() defer self.chainHeadSub.Unsubscribe() defer self.chainSideSub.Unsubscribe() for { select { case <-self.chainHeadCh: self.commitNewWork() case ev := <-self.chainSideCh: self.uncleMu.Lock() self.possibleUncles[ev.Block.Hash()] = ev.Block self.uncleMu.Unlock() case ev := <-self.txCh: if atomic.LoadInt32(&self.mining) == 0 { self.currentMu.Lock() acc, _ := types.Sender(self.current.signer, ev.Tx) txs := map[common.Address]types.Transactions{acc: {ev.Tx}} txset := types.NewTransactionsByPriceAndNonce(self.current.signer, txs) self.current.commitTransactions(self.mux, txset, self.chain, self.coinbase) self.updateSnapshot() self.currentMu.Unlock() } else { if self.config.Clique != nil && self.config.Clique.Period == 0 { self.commitNewWork() } } case <-self.txSub.Err(): return case <-self.chainHeadSub.Err(): return case <-self.chainSideSub.Err(): return } } }
|
worker.update()分别监听ChainHeadEvent,ChainSideEvent,TxPreEvent几个事件,每个事件会触发worker不同的反应。ChainHeadEvent是指区块链中已经加入了一个新的区块作为整个链的链头,这时worker的回应是立即开始准备挖掘下一个新区块(也是够忙的);ChainSideEvent指区块链中加入了一个新区块作为当前链头的旁支,worker会把这个区块收纳进possibleUncles[]数组,作为下一个挖掘新区块可能的Uncle之一;TxPreEvent是TxPool对象发出的,指的是一个新的交易tx被加入了TxPool,这时如果worker没有处于挖掘中,那么就去执行这个tx,并把它收纳进Work.txs数组,为下次挖掘新区块备用。
需要稍稍注意的是,ChainHeadEvent 并不一定是外部源发出。由于 worker 对象有个成员变量chain(eth.BlockChain),所以当worker自己完成挖掘一个新区块,并把它写入数据库,加进区块链里成为新的链头时,worker自己也可以调用chain发出一个ChainHeadEvent,从而被worker.update()函数监听到,进入下一次区块挖掘。
####worker.wait()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| func (self *worker) wait() { for { mustCommitNewWork := true for result := range self.recv { atomic.AddInt32(&self.atWork, -1) if result == nil { continue } block := result.Block work := result.Work for _, r := range work.receipts { for _, l := range r.Logs { l.BlockHash = block.Hash() } } for _, log := range work.state.Logs() { log.BlockHash = block.Hash() } stat, err := self.chain.WriteBlockAndState(block, work.receipts, work.state) if err != nil { log.Error("Failed writing block to chain", "err", err) continue } if stat == core.CanonStatTy { mustCommitNewWork = false } self.mux.Post(core.NewMinedBlockEvent{Block: block}) var ( events []interface{} logs = work.state.Logs() ) events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs}) if stat == core.CanonStatTy { events = append(events, core.ChainHeadEvent{Block: block}) } self.chain.PostChainEvents(events, logs) self.unconfirmed.Insert(block.NumberU64(), block.Hash()) if mustCommitNewWork { self.commitNewWork() } } } }
|
worker.wait()会在一个channel处一直等待Agent完成挖掘发送回来的新Block和Work对象。这个Block会被写入数据库,加入本地的区块链试图成为最新的链头。注意,此时区块中的所有交易,假设都已经被执行过了,所以这里的操作,不会再去执行这些交易对象。
当这一切都完成,worker就会发送一条事件(NewMinedBlockEvent{}),等于通告天下:我挖出了一个新区块!这样监听到该事件的其他节点,就会根据自身的状况,来决定是否接受这个新区块成为全网中公认的区块链新的链头。至于这个公认过程如何实现,就属于共识算法的范畴了。
####worker.commitNewWork()
commitNewWork方法源码比较长,这里就不粘贴出来了,这个方法主要的工作是为新块准备基本数据,包括header,txs,uncles等。commitNewWork()会在worker内部多处被调用,注意它每次都是被直接调用,并没有以goroutine的方式启动。commitNewWork()内部使用sync.Mutex对全部操作做了隔离。这个函数的基本逻辑如下:
- 准备新区块的时间属性Header.Time,一般均等于系统当前时间,不过要确保父区块的时间(parentBlock.Time())要早于新区块的时间,父区块当然来自当前区块链的链头了。
- 创建新区块的Header对象,其各属性中:Num可确定(父区块Num +1);Time可确定;ParentHash可确定;其余诸如Difficulty,GasLimit等,均留待之后共识算法中确定。
- 调用Engine.Prepare()函数,完成Header对象的准备。
- 根据新区块的位置(Number),查看它是否处于DAO硬分叉的影响范围内,如果是,则赋值予header.Extra。
- 根据已有的Header对象,创建一个新的Work对象,并用其更新worker.current成员变量。
- 如果配置信息中支持硬分叉,在Work对象的StateDB里应用硬分叉。
- 准备新区块的交易列表,来源是TxPool中那些最近加入的tx,并执行这些交易。
- 准备新区块的叔区块uncles[],来源是worker.possibleUncles[],而possibleUncles[]中的每个区块都从事件ChainSideEvent中搜集得到。注意叔区块最多有两个。
- 调用Engine.Finalize()函数,对新区块“定型”,填充上Header.Root, TxHash, ReceiptHash, UncleHash等几个属性。
- 如果上一个区块(即旧的链头区块)处于unconfirmedBlocks中,意味着它也是由本节点挖掘出来的,尝试去验证它已经被吸纳进主干链中。
- 把创建的Work对象,通过channel发送给每一个登记过的Agent,进行后续的挖掘。
以上步骤中,4和6都是仅仅在该区块配置中支持DAO硬分叉,并且该区块的位置正好处于DAO硬分叉影响范围内时才会发生;其他步骤是普遍性的。commitNewWork()完成了待挖掘区块的组装,block.Header创建完毕,交易数组txs,叔区块Uncles[]都已取得,并且由于所有交易被执行完毕,相应的Receipt[]也已获得。万事俱备,可以交给Agent进行‘挖掘’了。
CpuAgent的函数
CpuAgent中与mine相关的函数,主要是update()和mine():
####CpuAgent.update()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| func (self *CpuAgent) update() { out: for { select { case work := <-self.workCh: self.mu.Lock() if self.quitCurrentOp != nil { close(self.quitCurrentOp) } self.quitCurrentOp = make(chan struct{}) go self.mine(work, self.quitCurrentOp) self.mu.Unlock() case <-self.stop: self.mu.Lock() if self.quitCurrentOp != nil { close(self.quitCurrentOp) self.quitCurrentOp = nil } self.mu.Unlock() break out } } }
|
CpuAgent.update()就是worker.commitNewWork()结束后发出Work对象的会一直监听相关channel,如果收到Work对象(显然由worker.commitNewWork()结束后发出),就启动mine()函数;如果收到停止(mine)的消息,就退出一切相关操作。
####CpuAgent.mine()
1 2 3 4 5 6 7 8 9 10 11
| func (self *CpuAgent) mine(work *Work, stop <-chan struct{}) { if result, err := self.engine.Seal(self.chain, work.Block, stop); result != nil { log.Info("Successfully sealed new block", "number", result.Number(), "hash", result.Hash()) self.returnCh <- &Result{work, result} } else { if err != nil { log.Warn("Block sealing failed", "err", err) } self.returnCh <- nil } }
|
CpuAgent.mine()会直接调用Engine.Seal()函数,利用Engine实现体的共识算法对传入的Block进行最终的授权,如果成功,就将Block同Work一起通过channel发还给worker,那边worker.wait()会接收并处理。
显然,这两个函数都没做什么实质性工作,它们只是负责调用接口实现体,待授权完成后将区块数据发送回worker。挖掘出一个区块的真正奥妙全在Engine实现体所代表的共识算法里。