前回の続きです。こちらでは主にEngine APIによる処理を扱います。
Engine APIはコンセンサスクライアントと連携するAPIです。どのような連携があるのかはこちらに詳しく書いてあります。
https://hackmd.io/@danielrachi/engine_api
Engine APIでメッセージを受け取った時の全体的な流れはこんな感じです
eth/catalyst/api.go
EngineAPIのメソッドにどういうリクエストがあるのか書いてあります。ざっくり以下のように分類できます。
forkchoiceUpdated | ブロック構築を開始してくれ |
---|---|
getPayload | 構築したブロックデータをくれ |
newPayload | 新しいブロックを検証してくれ |
1. EngineAPI.forkchoiceUpdated
ブロックチェーンのヘッドブロックが更新されたことを通知し、ブロックの検証を行います。
さらに、ペイロード属性が提供されている場合、新しいブロックの構築を開始します。
V1はParisフォークまで、V2はShanghaiフォークまで、V3はそれ以降をサポート。
進化の流れ
- V1 → V2: Withdrawals機能の追加
- V2 → V3: Beacon Rootの追加
eth/catalyst/api.go::ForkchoiceUpdatedV3()
V3はWithdrawalsとBeaconRootを必須としています。
本来はパラメータが間違っていても適用されるべきだがエラーを返してしまっているところが仕様に準拠していないと書かれています。
また関数を2つに分割する必要がある:
- ヘッドブロックの更新のみを行う関数
- ブロック構築を開始する関数
(取り組むチャンス?)
// ForkchoiceUpdatedV3 is equivalent to V2 with the addition of parent beacon block root
// in the payload attributes. It supports only PayloadAttributesV3.
func (api *ConsensusAPI) ForkchoiceUpdatedV3(update engine.ForkchoiceStateV1, params *engine.PayloadAttributes) (engine.ForkChoiceResponse, error) {
if params != nil {
if params.Withdrawals == nil {
return engine.STATUS_INVALID, engine.InvalidPayloadAttributes.With(errors.New("missing withdrawals"))
}
if params.BeaconRoot == nil {
return engine.STATUS_INVALID, engine.InvalidPayloadAttributes.With(errors.New("missing beacon root"))
}
if api.eth.BlockChain().Config().LatestFork(params.Timestamp) != forks.Cancun && api.eth.BlockChain().Config().LatestFork(params.Timestamp) != forks.Prague {
return engine.STATUS_INVALID, engine.UnsupportedFork.With(errors.New("forkchoiceUpdatedV3 must only be called for cancun payloads"))
}
}
// TODO(matt): the spec requires that fcu is applied when called on a valid
// hash, even if params are wrong. To do this we need to split up
// forkchoiceUpdate into a function that only updates the head and then a
// function that kicks off block construction.
return api.forkchoiceUpdated(update, params, engine.PayloadV3, false)
}
eth/catalyst/api.go::forkchoiceUpdated()
フォークチェーン更新の排他制御
他のフォークチェーン更新情報が来ても無視しなければいけない
このあたりRustだとどうなってるのか気になる
api.forkchoiceLock.Lock()
defer api.forkchoiceLock.Unlock()
log.Trace("Engine API request received", "method", "ForkchoiceUpdated", "head", update.HeadBlockHash, "finalized", update.FinalizedBlockHash, "safe", update.SafeBlockHash)
zero hashじゃないかチェック
if update.HeadBlockHash == (common.Hash{}) {
log.Warn("Forkchoice requested update to zero hash")
return engine.STATUS_INVALID, nil
}
最終更新時刻の記録
api.lastForkchoiceLock.Lock()
api.lastForkchoiceUpdate = time.Now()
api.lastForkchoiceLock.Unlock()
ブロックの存在確認
存在しなかった場合
- ブロックの祖先が無効でないことをチェック
- リモートブロックの存在確認し、存在しない場合は同期中の状態を返す
→ remoteBlocksってどこにあるの? - ビーコン同期を開始し、必要なブロックをダウンロードする
block := api.eth.BlockChain().GetBlockByHash(update.HeadBlockHash)
if block == nil {
// If this block was previously invalidated, keep rejecting it here too
if res := api.checkInvalidAncestor(update.HeadBlockHash, update.HeadBlockHash); res != nil {
return engine.ForkChoiceResponse{PayloadStatus: *res, PayloadID: nil}, nil
}
// If the head hash is unknown (was not given to us in a newPayload request),
// we cannot resolve the header, so not much to do. This could be extended in
// the future to resolve from the `eth` network, but it's an unexpected case
// that should be fixed, not papered over.
header := api.remoteBlocks.get(update.HeadBlockHash)
if header == nil {
log.Warn("Forkchoice requested unknown head", "hash", update.HeadBlockHash)
return engine.STATUS_SYNCING, nil
}
...
if err := api.eth.Downloader().BeaconSync(api.eth.SyncMode(), header, finalized); err != nil {
return engine.STATUS_SYNCING, err
}
return engine.STATUS_SYNCING, nil
}
マージ前のブロックでないことをチェックし、難易度が正であるかを確認する
// Block is known locally, just sanity check that the beacon client does not
// attempt to push us back to before the merge.
if block.Difficulty().BitLen() > 0 && block.NumberU64() > 0 {
ph := api.eth.BlockChain().GetHeader(block.ParentHash(), block.NumberU64()-1)
if ph == nil {
return engine.STATUS_INVALID, errors.New("parent unavailable for difficulty check")
}
if ph.Difficulty.Sign() == 0 && block.Difficulty().Sign() > 0 {
log.Error("Parent block is already post-ttd", "number", block.NumberU64(), "hash", update.HeadBlockHash, "diff", block.Difficulty(), "age", common.PrettyAge(time.Unix(int64(block.Time()), 0)))
return engine.ForkChoiceResponse{PayloadStatus: engine.INVALID_TERMINAL_BLOCK, PayloadID: nil}, nil
}
}
ブロックが正規チェーンに属しているか確認し、属していない場合はinvalidを返す
if rawdb.ReadCanonicalHash(api.eth.ChainDb(), block.NumberU64()) != update.HeadBlockHash {
// Block is not canonical, set head.
if latestValid, err := api.eth.BlockChain().SetCanonical(block); err != nil {
return engine.ForkChoiceResponse{PayloadStatus: engine.PayloadStatusV1{Status: engine.INVALID, LatestValidHash: &latestValid}}, err
}
}
...
api.eth.SetSynced()
もしファイナライズされたブロックも来ている場合は、正規チェーンへの所属を確認し、ファイナライズ状態を設定
// If the beacon client also advertised a finalized block, mark the local
// chain final and completely in PoS mode.
if update.FinalizedBlockHash != (common.Hash{}) {
// If the finalized block is not in our canonical tree, something is wrong
finalBlock := api.eth.BlockChain().GetBlockByHash(update.FinalizedBlockHash)
...
// Set the finalized block
api.eth.BlockChain().SetFinalized(finalBlock.Header())
}
セーフブロックハッシュが含まれていることを確認し、セーフ状態を設定
// Check if the safe block hash is in our canonical tree, if not something is wrong
if update.SafeBlockHash != (common.Hash{}) {
safeBlock := api.eth.BlockChain().GetBlockByHash(update.SafeBlockHash)
if safeBlock == nil {
log.Warn("Safe block not available in database")
return engine.STATUS_INVALID, engine.InvalidForkChoiceState.With(errors.New("safe block not available in database"))
}
if rawdb.ReadCanonicalHash(api.eth.ChainDb(), safeBlock.NumberU64()) != update.SafeBlockHash {
log.Warn("Safe block not in canonical chain")
return engine.STATUS_INVALID, engine.InvalidForkChoiceState.With(errors.New("safe block not in canonical chain"))
}
// Set the safe block
api.eth.BlockChain().SetSafe(safeBlock.Header())
}
ペイロード属性がある場合は、ブロック構築を開始する
// If payload generation was requested, create a new block to be potentially
// sealed by the beacon client. The payload will be requested later, and we
// will replace it arbitrarily many times in between.
if payloadAttributes != nil {
args := &miner.BuildPayloadArgs{
Parent: update.HeadBlockHash,
Timestamp: payloadAttributes.Timestamp,
FeeRecipient: payloadAttributes.SuggestedFeeRecipient,
Random: payloadAttributes.Random,
Withdrawals: payloadAttributes.Withdrawals,
BeaconRoot: payloadAttributes.BeaconRoot,
Version: payloadVersion,
}
id := args.Id()
// If we already are busy generating this work, then we do not need
// to start a second process.
if api.localBlocks.has(id) {
return valid(&id), nil
}
payload, err := api.eth.Miner().BuildPayload(args, payloadWitness)
if err != nil {
log.Error("Failed to build payload", "err", err)
return valid(nil), engine.InvalidPayloadAttributes.With(err)
}
api.localBlocks.put(id, payload)
return valid(&id), nil
}
eth/downloader/beaconsync.go::beaconSync()
信頼できるヘッドから後方にブロックの同期を開始する
スケルトンチェーンの同期を開始
スケルトンチェーンはブロックのヘッダだけがあるブロックチェーンのこと
if err := d.skeleton.Sync(head, final, force); err != nil {
return err
}
miner/payload_building.go::buildPayload()
空のペイロードを作成
emptyParams := &generateParams{
timestamp: args.Timestamp,
forceTime: true,
parentHash: args.Parent,
coinbase: args.FeeRecipient,
random: args.Random,
withdrawals: args.Withdrawals,
beaconRoot: args.BeaconRoot,
noTxs: true,
}
仕事を開始 (miner/worker.go
へ)
この時点では空を返す
empty := miner.generateWork(emptyParams, witness)
ペイロードオブジェクトの作成
payload := newPayload(empty.block, empty.requests, empty.witness, args.Id())
バックグラウンドでペイロードの更新を開始
go func() {
// バックグラウンドでのペイロード更新処理
12秒で終了するようにタイマーを設定
timer := time.NewTimer(0)
defer timer.Stop()
endTimer := time.NewTimer(time.Second * 12)
メインループ
for {
select {
case <-timer.C:
// ペイロードの再構築
case <-payload.stop:
// 配信による停止
case <-endTimer.C:
// タイムアウトによる停止
}
}
ペイロードの更新処理
もう一度generateWorkを呼び出してブロックを取得
start := time.Now()
r := miner.generateWork(fullParams, witness)
if r.err == nil {
payload.update(r, time.Since(start))
} else {
log.Info("Error while generating work", "id", payload.id, "err", r.err)
}
timer.Reset(miner.config.Recommit)
miner/worker.go::generateWork
まず準備を開始
- 親ブロックの特定
- タイムスタンプの検証と設定
- ブロックヘッダの構築
- 各種EIPの適用
- EIP-1559: base feeとgas limit
- EIP-4844: blob gas
- EIP-4788: beacon block rootの処理
- Prague: 親ブロックハッシュの処理
- ブロック生成に必要な環境を構築
- ステート管理やEVMの準備
work, err := miner.prepareWork(params, witness)
if err != nil {
return &newPayloadResult{err: err}
}
トランザクションを埋めていく (fillTransactions()
)
if !params.noTxs {
interrupt := new(atomic.Int32)
timer := time.AfterFunc(miner.config.Recommit, func() {
interrupt.Store(commitInterruptTimeout)
})
defer timer.Stop()
err := miner.fillTransactions(interrupt, work)
if errors.Is(err, errBlockInterruptedByTimeout) {
log.Warn("Block building is interrupted", "allowance", common.PrettyDuration(miner.config.Recommit))
}
}
ブロックボディの作成
body := types.Body{Transactions: work.txs, Withdrawals: params.withdrawals}
Pragueフォークの処理
- Depositログの解析
- 出金キューの処理
- 統合キューの処理
var requests [][]byte
if miner.chainConfig.IsPrague(work.header.Number, work.header.Time) {
requests = [][]byte{}
// EIP-6110 deposits
if err := core.ParseDepositLogs(&requests, allLogs, miner.chainConfig); err != nil {
return &newPayloadResult{err: err}
}
// EIP-7002
if err := core.ProcessWithdrawalQueue(&requests, work.evm); err != nil {
return &newPayloadResult{err: err}
}
// EIP-7251 consolidations
if err := core.ProcessConsolidationQueue(&requests, work.evm); err != nil {
return &newPayloadResult{err: err}
}
}
リクエストハッシュの設定
if requests != nil {
reqHash := types.CalcRequestsHash(requests)
work.header.RequestsHash = &reqHash
}
ブロックの確定と組み立て
block, err := miner.engine.FinalizeAndAssemble(miner.chain, work.header, work.state, &body, work.receipts)
if err != nil {
return &newPayloadResult{err: err}
}
miner/worker.go::fillTransactions()
トランザクションプールからトランザクションを取得し、ブロックに追加する最重要な機能
まずは設定の取得
miner.confMu.RLock()
tip := miner.config.GasPrice
prio := miner.prio
miner.confMu.RUnlock()
トランザクションをpendingから選択するようにフィルタ
base feeとblob feeを設定
filter := txpool.PendingFilter{
MinTip: uint256.MustFromBig(tip),
}
if env.header.BaseFee != nil {
filter.BaseFee = uint256.MustFromBig(env.header.BaseFee)
}
if env.header.ExcessBlobGas != nil {
filter.BlobFee = uint256.MustFromBig(eip4844.CalcBlobFee(miner.chainConfig, env.header))
}
tx poolのpendingに含まれているすべてのトランザクションを取得
filter.OnlyPlainTxs, filter.OnlyBlobTxs = true, false
pendingPlainTxs := miner.txpool.Pending(filter)
Blobトランザクションを取得
filter.OnlyPlainTxs, filter.OnlyBlobTxs = false, true
pendingBlobTxs := miner.txpool.Pending(filter)
優先トランザクションと通常トランザクションのマップを作成
prioPlainTxs, normalPlainTxs := make(map[common.Address][]*txpool.LazyTransaction), pendingPlainTxs
prioBlobTxs, normalBlobTxs := make(map[common.Address][]*txpool.LazyTransaction), pendingBlobTxs
優先トランザクションの分離
優先アカウントからのトランザクションを通常トランザクションから優先トランザクションへ移動
for _, account := range prio {
if txs := normalPlainTxs[account]; len(txs) > 0 {
delete(normalPlainTxs, account)
prioPlainTxs[account] = txs
}
if txs := normalBlobTxs[account]; len(txs) > 0 {
delete(normalBlobTxs, account)
prioBlobTxs[account] = txs
}
}
優先トランザクションについて価格とnonceによるソートを行ないブロックへ追加
if len(prioPlainTxs) > 0 || len(prioBlobTxs) > 0 {
plainTxs := newTransactionsByPriceAndNonce(env.signer, prioPlainTxs, env.header.BaseFee)
blobTxs := newTransactionsByPriceAndNonce(env.signer, prioBlobTxs, env.header.BaseFee)
if err := miner.commitTransactions(env, plainTxs, blobTxs, interrupt); err != nil {
return err
}
}
通常トランザクションについて価格とnonceによるソートを行ないブロックへ追加
if len(normalPlainTxs) > 0 || len(normalBlobTxs) > 0 {
plainTxs := newTransactionsByPriceAndNonce(env.signer, normalPlainTxs, env.header.BaseFee)
blobTxs := newTransactionsByPriceAndNonce(env.signer, normalBlobTxs, env.header.BaseFee)
if err := miner.commitTransactions(env, plainTxs, blobTxs, interrupt); err != nil {
return err
}
}
miner/worker.go/commitTransactions
ガス制限チェックを行い、もうガス入らないよってときは処理を終了
if env.gasPool.Gas() < params.TxGas {
log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", params.TxGas)
break
}
Blobスペースチェックを行い、もうBlob入らないよってときは処理を終了
if !blobTxs.Empty() && env.blobs >= eip4844.MaxBlobsPerBlock(miner.chainConfig, env.header.Time) {
log.Trace("Not enough blob space for further blob transactions")
blobTxs.Clear()
}
blob txと通常txのうちチップの高い方を選択
pltx, ptip := plainTxs.Peek()
bltx, btip := blobTxs.Peek()
switch {
case pltx == nil:
txs, ltx = blobTxs, bltx
case bltx == nil:
txs, ltx = plainTxs, pltx
default:
if ptip.Lt(btip) {
txs, ltx = blobTxs, bltx
} else {
txs, ltx = plainTxs, pltx
}
}
ガスチェックして必要なガスが不足している場合スキップ
if env.gasPool.Gas() < ltx.Gas {
log.Trace("Not enough gas left for transaction", "hash", ltx.Hash, "left", env.gasPool.Gas(), "needed", ltx.Gas)
txs.Pop()
continue
}
Blobガスチェックしてスペースが不足している場合スキップ
if miner.chainConfig.IsCancun(env.header.Number, env.header.Time) {
left := eip4844.MaxBlobsPerBlock(miner.chainConfig, env.header.Time) - env.blobs
if left < int(ltx.BlobGas/params.BlobTxBlobGasPerBlob) {
log.Trace("Not enough blob space left for transaction", "hash", ltx.Hash, "left", left, "needed", ltx.BlobGas/params.BlobTxBlobGasPerBlob)
txs.Pop()
continue
}
}
トランザクションの解決
tx := ltx.Resolve()
if tx == nil {
log.Trace("Ignoring evicted transaction", "hash", ltx.Hash)
txs.Pop()
continue
}
リプレイ攻撃保護チェック
if tx.Protected() && !miner.chainConfig.IsEIP155(env.header.Number) {
log.Trace("Ignoring replay protected transaction", "hash", ltx.Hash, "eip155", miner.chainConfig.EIP155Block)
txs.Pop()
continue
}
トランザクションの実行
ApplyTransactionWithEVMが呼ばれて実際に実行される
env.state.SetTxContext(tx.Hash(), env.tcount)
err := miner.commitTransaction(env, tx)
2. EngineAPI.getPayload
ペイロード(ブロック)を取得するAPI
前のforkchoiceUpdatedを受けてから作成をしていたブロックをここで返す
V2はShanghaiから、V3はCancunからでBlobサポート、V4はPragueからでEIP-6110, EIP-7002, EIP-7251をサポート
eth/catalyst/api.go::getPayload()
func (api *ConsensusAPI) getPayload(payloadID engine.PayloadID, full bool) (*engine.ExecutionPayloadEnvelope, error) {
log.Trace("Engine API request received", "method", "GetPayload", "id", payloadID)
data := api.localBlocks.get(payloadID, full)
if data == nil {
return nil, engine.UnknownPayload
}
return data, nil
}
3. EngineAPI.newPayload
コンセンサスレイヤー(CL)から新しいブロックを受け取り、チェーンに挿入するAPI
V2からWithdrawalsをサポート、V3からBlobをサポート、V4からExecutionRequestsとリクエスト検証をサポート
// NewPayloadV4 creates an Eth1 block, inserts it in the chain, and returns the status of the chain.
func (api *ConsensusAPI) NewPayloadV4(params engine.ExecutableData, versionedHashes []common.Hash, beaconRoot *common.Hash, executionRequests []hexutil.Bytes) (engine.PayloadStatusV1, error) {
...
requests := convertRequests(executionRequests)
if err := validateRequests(requests); err != nil {
return engine.PayloadStatusV1{Status: engine.INVALID}, engine.InvalidParams.With(err)
}
return api.newPayload(params, versionedHashes, beaconRoot, requests, false)
}
eth/catalyst/api.go::validateRequests()
- リクエストが空じゃないことをチェック
- リクエストがタイプ順になっていることをチェック
// validateRequests checks that requests are ordered by their type and are not empty.
func validateRequests(requests [][]byte) error {
for i, req := range requests {
// No empty requests.
if len(req) < 2 {
return fmt.Errorf("empty request: %v", req)
}
// Check that requests are ordered by their type.
// Each type must appear only once.
if i > 0 && req[0] <= requests[i-1][0] {
return fmt.Errorf("invalid request order: %v", req)
}
}
return nil
}
eth/catalyst/api.go::newPayload()
ペイロードをもとにブロックを構築
block, err := engine.ExecutableDataToBlock(params, versionedHashes, beaconRoot, requests)
if err != nil {
// エラー時の詳細なログ出力
return api.invalid(err, nil), nil
}
すでにブロックが存在している場合はスキップ
ブロックが存在している場合、すでに検証したことがあるため
if block := api.eth.BlockChain().GetBlockByHash(params.BlockHash); block != nil {
log.Warn("Ignoring already known beacon payload", "number", params.Number, "hash", params.BlockHash)
hash := block.Hash()
return engine.PayloadStatusV1{Status: engine.VALID, LatestValidHash: &hash}, nil
}
以前に拒否されたブロックじゃないことをチェック
if res := api.checkInvalidAncestor(block.Hash(), block.Hash()); res != nil {
return *res, nil
}
タイムスタンプの検証
if block.Time() <= parent.Time() {
log.Warn("Invalid timestamp", "parent", block.Time(), "block", block.Time())
return api.invalid(errors.New("invalid timestamp"), parent.Header()), nil
}
フルシンクされてるか確認
if api.eth.SyncMode() != ethconfig.FullSync {
return api.delayPayloadImport(block), nil
}
ステート可用性チェック
if !api.eth.BlockChain().HasBlockAndState(block.ParentHash(), block.NumberU64()-1) {
api.remoteBlocks.put(block.Hash(), block.Header())
log.Warn("State not available, ignoring new payload")
return engine.PayloadStatusV1{Status: engine.ACCEPTED}, nil
}
ブロック挿入 (core/blockchain.go::insertChain()
へ)
- 署名を回復
- ブロックヘッダ検証
- 既知ブロックをスキップ
- 祖先がいるかチェック
- 各ブロックに対して親ブロック取得とブロックの処理 (
processBlock
) - ブロックを挿入する
proofs, err := api.eth.BlockChain().InsertBlockWithoutSetHead(block, witness)
if err != nil {
// エラー処理
return api.invalid(err, parent.Header()), nil
}
core/blockchain.go::processBlock()
ブロックの実行と検証を行う処理
- ステートレス検証用のウィットネス生成 (Byzantium以降)
- トランザクションの実行とステートの更新
- ステートが正しいか検証
- レシートの検証
- ブロックの書き込み
- チェーンヘッドの更新
4. Heartbeat
Engine APIで正しく接続できていることをチェックする仕組みです。一定時間リクエストがなかった場合、警告ログを出すようになっています。
eth/catalyst/api.go::heartbeat()
最後に受け取ったトランザクションアップデート、フォークチョイス、ブロック検証が一定時間を過ぎていたらWarningを出す
func (api *ConsensusAPI) heartbeat() {
...
for {
// Sleep a bit and retrieve the last known consensus updates
time.Sleep(5 * time.Second)
api.lastTransitionLock.Lock()
lastTransitionUpdate := api.lastTransitionUpdate
api.lastTransitionLock.Unlock()
api.lastForkchoiceLock.Lock()
lastForkchoiceUpdate := api.lastForkchoiceUpdate
api.lastForkchoiceLock.Unlock()
api.lastNewPayloadLock.Lock()
lastNewPayloadUpdate := api.lastNewPayloadUpdate
api.lastNewPayloadLock.Unlock()
// If there have been no updates for the past while, warn the user
// that the beacon client is probably offline
if time.Since(lastForkchoiceUpdate) <= beaconUpdateConsensusTimeout || time.Since(lastNewPayloadUpdate) <= beaconUpdateConsensusTimeout {
offlineLogged = time.Time{}
continue
}
if time.Since(offlineLogged) > beaconUpdateWarnFrequency {
if lastForkchoiceUpdate.IsZero() && lastNewPayloadUpdate.IsZero() {
if lastTransitionUpdate.IsZero() {
log.Warn("Post-merge network, but no beacon client seen. Please launch one to follow the chain!")
} else {
log.Warn("Beacon client online, but never received consensus updates. Please ensure your beacon client is operational to follow the chain!")
}
} else {
log.Warn("Beacon client online, but no consensus updates received in a while. Please fix your beacon client to follow the chain!")
}
offlineLogged = time.Now()
}
continue
}
}
5. Beacon Light Client (Optional)
実はELから自発的にCLにリクエストを送信する仕組みも提供しています。これはEngine APIリクエストをCLから受け取ってから処理を開始するのではなく、イベントを検知したら処理を開始するようになっています。
- chainHeadEventを検知したら自発的にCLにリクエストを送信する
- beacon/blsyncパッケージに実装されている
- このモードを有効にすると、GethはBeacon Chainからブロックヘッダー情報を取得し、それを使用して自発的にConsensus Clientに対して以下のAPIコールを行う:
- engine_newPayloadVx: 新しいブロックデータを送信
- engine_forkchoiceUpdatedVx: 新しいチェーンヘッドを通知