ゴール: Gethの動作を理解すること。
想定読者: EthereumのELとCLの仕組みはなんとなく知っていて、より深く理解したい。
GethはEthereumの実行クライアントの一つで、トランザクションの処理やブロックの作成・検証などの重要な役割を担います。
今回はそのGethの処理に直接関係あるコードを全行読み、内容を追っていきます。
コード:
Gethの開発状況について
ほぼ毎日mergeされてる
https://github.com/ethereum/go-ethereum/commits/master/
117件もPR上がってる
https://github.com/ethereum/go-ethereum/pulls
issueは191件
https://github.com/ethereum/go-ethereum/issues
Fusaka関連のissueももう上がっており2025/06には実装が完了してdevnet以降すべきという記述がある。5月頭にPectraがメインネットマージされたばかりなのに早い。
https://github.com/ethereum/go-ethereum/issues/31701
デバッグ関連のメモ
- EVM Tracing: EVMによって実行された内容を正確に調査できる
- Dashboards: Grafanaで可視化ダッシュボードを作成できる
- Ethstats: Gethノードのパフォーマンスを監視できる
- ちゃんとテスト書いてあるから、それを全部さらってくのもよさそう
コントリビュートについて
https://geth.ethereum.org/docs/developers/geth-developer/contributing
フォークしてPR上げてね
複雑な変更を提出する場合はまずDiscordサーバーでコア開発者に連絡を取り、変更がプロジェクトの全体的な理念に沿っているか確認し、早期のフィードバックを得るようにしてね
修正や機能が完了していなくてもできるだけ早くPRを作成してね
処理が発生する場面
- Userからのtxリクエストを受け取った時
- 他のELからtxを受け取った時
- EngineAPIでコンセンサスクライアントからリクエストを受け取った時 (ブロック構築やブロック検証など)
ディレクトリ構成
Directory | Description |
---|---|
accounts/ |
Implementation of wallet and account management features |
• keystore/
: Keystore implementation for managing accounts
• abi/
: Application Binary Interface for contract interaction
• usbwallet/
: Hardware wallet support
• scwallet/
: Smart contract wallet support |
| beacon/
| Implementation of Proof-of-Stake related functionality for the Beacon Chain |
| build/
| Scripts and tools for building Geth |
| cmd/
| Contains executable entry points for various tools (geth client, abigen, clef, etc.)
• geth/
: The main Ethereum client
• abigen/
: Generates Go bindings for Solidity contracts
• clef/
: External signer for secure key management
• evm/
: Standalone Ethereum Virtual Machine runner
• rlpdump/
: Tool for debugging RLP data
• devp2p/
: P2P network utilities
• ethkey/
: Ethereum key management tools |
| common/
| Common utilities, helper functions, and data structures |
| consensus/
| Different consensus algorithms (ethash, clique, beacon)
• ethash/
: Proof-of-Work consensus engine
• clique/
: Proof-of-Authority consensus engine
• beacon/
: Proof-of-Stake consensus interface for The Merge
• misc/
: Miscellaneous consensus-related utilities |
| console/
| JavaScript console for interacting with Geth |
| core/
| Core blockchain implementation (state, transactions, EVM, etc.)
• blockchain.go
: Main blockchain code for managing the chain
• genesis.go
: Genesis block handling
• state/
: State management
• types/
: Core blockchain types (blocks, transactions)
• vm/
: Ethereum Virtual Machine implementation
• txpool/
: Transaction pool for pending transactions
• rawdb/
: Low-level database operations |
| crypto/
| Cryptographic primitives and utilities |
| eth/
| Ethereum protocol implementation (networking, RPC, blockchain operations)
• backend.go
: Backend service interface
• handler.go
: Protocol message handling
• downloader/
: Blockchain synchronization
• filters/
: Event filtering
• gasprice/
: Gas price oracle
• tracers/
: Transaction tracing tools |
| ethclient/
| Go client library for Ethereum |
| ethdb/
| Database interfaces and implementations
• leveldb/
: LevelDB adapter
• memorydb/
: In-memory database for testing
• pebble/
: Pebble database adapter |
| event/
| Event subscription and notification system |
| graphql/
| GraphQL API for Ethereum |
| internal/
| Internal packages and utilities |
| log/
| Logging framework |
| metrics/
| Performance and monitoring metrics |
| miner/
| Block mining implementation
• miner.go
: Block creation and mining
• worker.go
: Mining worker that assembles blocks |
| node/
| Framework for setting up an Ethereum node |
| p2p/
| Peer-to-peer networking stack
• server.go
: P2P server implementation
• peer.go
: Peer connection handling
• discover/
: Node discovery protocol
• nat/
: NAT traversal functions
• dnsdisc/
: DNS-based discovery |
| params/
| Network parameters, constants, and configurations |
| rlp/
| Recursive Length Prefix encoding/decoding |
| rpc/
| Remote procedure call (RPC) implementation
• server.go
: RPC server implementation
• client.go
: RPC client implementation
• http.go
: HTTP transport
• websocket.go
: WebSocket transport |
| signer/
| Transaction signing utilities |
| tests/
| Integration and regression tests |
| trie/
| Merkle Patricia Trie implementation
• trie.go
: Core trie implementation
• secure_trie.go
: Secure trie implementation
• sync.go
: Trie synchronization |
| triedb/
| Database implementation for Merkle Patricia Tries |
オレンジの部分がコアなクライアント実装に関わるので、そこから読んでいくとする
上から読むよりもユーザーフローに沿って順番にコードを読んでいくのがよさそう
1. Userからtxを受け取ったとき
UserがRPCエンドポイントを介してsendTransaction/sendRawTransactionを送信すると、Gethはそれを処理します
全体のフロー:
internal/ethapi/api.go::SendTransaction
まずtxnのfromをもとにaccountとwalletを発見する
// Look up the wallet containing the requested signer
account := accounts.Account{Address: args.from()}
wallet, err := api.b.AccountManager().Find(account)
if err != nil {
return common.Hash{}, err
}
nonceがなければnonceをロックする
この関数が終了するまでロックが継続し、同じnonceが使われるのを防止する
if args.Nonce == nil {
// Hold the mutex around signing to prevent concurrent assignment of
// the same nonce to multiple accounts.
api.nonceLock.LockAddr(args.from())
defer api.nonceLock.UnlockAddr(args.from())
}
EIP4844 (Blob Tx)にサポートされていない場合はエラーを返す
if args.IsEIP4844() {
return common.Hash{}, errBlobTxNotSupported
}
gas fee, gas limit, nonceなどのパラメータにデフォルト値を設定する
// Set some sanity defaults and terminate on failure
if err := args.setDefaults(ctx, api.b, false); err != nil {
return common.Hash{}, err
}
トランザクションオブジェクトを作成する
必ずlegacy typeが指定されている
// Assemble the transaction and sign with the wallet
tx := args.ToTransaction(types.LegacyTxType)
txへ署名を行う
chain idを含めることでリプレイ攻撃を防止する
別チェーンで署名が使い回せないのは大事だけど、同じ人へのトランザクションを何回も遅れたりしない?それは別で二重支払い対策がされているんだろうか。
signed, err := wallet.SignTx(account, tx, api.b.ChainConfig().ChainID)
if err != nil {
return common.Hash{}, err
}
submit transactionを実行してtx poolに送信し、tx hashを返却する
return SubmitTransaction(ctx, api.b, signed)
SendRawTransaction
すでに署名されたtxを受け取り、ハッシュをチェックしたらtx poolに送信する
// SendRawTransaction will add the signed transaction to the transaction pool.
// The sender is responsible for signing the transaction and using the correct nonce.
func (api *TransactionAPI) SendRawTransaction(ctx context.Context, input hexutil.Bytes) (common.Hash, error) {
tx := new(types.Transaction)
if err := tx.UnmarshalBinary(input); err != nil {
return common.Hash{}, err
}
return SubmitTransaction(ctx, api.b, tx)
}
SubmitTransaction
- gasPrice * gasLimitがRPCTxFeeCapを超えていないことをチェック
- txがEIP-155保護(リプレイアタック防止)されているかチェック
- ここでリプレイ攻撃は一度作成した署名を別のチェーンで使いまわす攻撃と定義
- 署名のV値は
V = 27 + (2 * chain_id) + (recovery_id % 2)
で計算される - recovery_idは1 or 2
- 署名のV値が8bit以下の場合は27,28,1,0以外の値であることをチェック
- Vはメインネット(chain_id=1)なら29,30を返し、テストネット(chain_id=5)なら37,38を返す
- なのでこれをチェックすれば署名にchain_idが含まれているか検証できる
- SendTx: tx poolへ送信
// SubmitTransaction is a helper function that submits tx to txPool and logs a message.
func SubmitTransaction(ctx context.Context, b Backend, tx *types.Transaction) (common.Hash, error) {
// If the transaction fee cap is already specified, ensure the
// fee of the given transaction is _reasonable_.
if err := checkTxFee(tx.GasPrice(), tx.Gas(), b.RPCTxFeeCap()); err != nil {
return common.Hash{}, err
}
if !b.UnprotectedAllowed() && !tx.Protected() {
// Ensure only eip155 signed transactions are submitted if EIP155Required is set.
return common.Hash{}, errors.New("only replay-protected (EIP-155) transactions allowed over RPC")
}
if err := b.SendTx(ctx, tx); err != nil {
return common.Hash{}, err
}
// Print a log with full tx details for manual investigations and interventions
head := b.CurrentBlock()
signer := types.MakeSigner(b.ChainConfig(), head.Number, head.Time)
from, err := types.Sender(signer, tx)
if err != nil {
return common.Hash{}, err
}
if tx.To() == nil {
addr := crypto.CreateAddress(from, tx.Nonce())
log.Info("Submitted contract creation", "hash", tx.Hash().Hex(), "from", from, "nonce", tx.Nonce(), "contract", addr.Hex(), "value", tx.Value())
} else {
log.Info("Submitted transaction", "hash", tx.Hash().Hex(), "from", from, "nonce", tx.Nonce(), "recipient", tx.To(), "value", tx.Value())
}
return tx.Hash(), nil
}
eth/api_backend.go::SendTx
tx poolに追加
false
パラメータは、トランザクションが非同期で処理されることを示す
テスト環境下や決定論的な結果が必要な場合のみtrue
にすべきで、その場合、処理が完了するまでブロックされるためパフォーマンスは低下する
err := b.eth.txPool.Add([]*types.Transaction{signedTx}, false)[0]
local track
// If the local transaction tracker is not configured, returns whatever
// returned from the txpool.
if b.eth.localTxTracker == nil {
return err
}
// If the transaction fails with an error indicating it is invalid, or if there is
// very little chance it will be accepted later (e.g., the gas price is below the
// configured minimum, or the sender has insufficient funds to cover the cost),
// propagate the error to the user.
if err != nil && !locals.IsTemporaryReject(err) {
return err
}
// No error will be returned to user if the transaction fails with a temporary
// error and might be accepted later (e.g., the transaction pool is full).
// Locally submitted transactions will be resubmitted later via the local tracker.
b.eth.localTxTracker.Track(signedTx)
return nil
core/txpool/txpool.go::TxPool.Add
配列の初期化
txsets := make([][]*types.Transaction, len(p.subpools))
splits := make([]int, len(txs))
各トランザクションに対して
- 初期値として-1を設定
- 各サブプールのfilterメソッドでトランザクションの種類をチェック
- 適切なサブプールが見つかったらそのサブプールの配列に追加
- legacy or blobの2種類
for i, tx := range txs {
// 初期値として-1を設定(どのサブプールにも属さない)
splits[i] = -1
// 各サブプールでトランザクションを受け入れるかチェック
for j, subpool := range p.subpools {
if subpool.Filter(tx) {
txsets[j] = append(txsets[j], tx)
splits[i] = j
break
}
}
}
サブプールへトランザクションを追加する
errsets := make([][]error, len(p.subpools))
for i := 0; i < len(p.subpools); i++ {
errsets[i] = p.subpools[i].Add(txsets[i], sync)
}
どのサブプールでも受け入れられなかった場合はエラー
errs := make([]error, len(txs))
for i, split := range splits {
// どのサブプールでも受け入れられなかった場合
if split == -1 {
errs[i] = fmt.Errorf("%w: received type %d", core.ErrTxTypeNotSupported, txs[i].Type())
continue
}
// 対応するサブプールのエラーを取得
errs[i] = errsets[split][0]
errsets[split] = errsets[split][1:]
}
core/txpool/blobpool.go::BlobPool.add
トランザクション追加の待機時間を計測
プールのロック
waitStart := time.Now()
p.lock.Lock()
addwaitHist.Update(time.Since(waitStart).Nanoseconds())
defer p.lock.Unlock()
defer func(start time.Time) {
addtimeHist.Update(time.Since(start).Nanoseconds())
}(time.Now())
トランザクションの検証 (validateTx
を呼び出す)
if err := p.validateTx(tx); err != nil {
log.Trace("Transaction validation failed", "hash", tx.Hash(), "err", err)
switch {
case errors.Is(err, txpool.ErrUnderpriced):
addUnderpricedMeter.Mark(1)
// ... 他のエラーケース
}
return err
}
txの送信者を特定し、新しいアカウントの場合は予約を取得
from, _ := types.Sender(p.signer, tx)
if _, ok := p.index[from]; !ok {
if err := p.reserver.Hold(from); err != nil {
addNonExclusiveMeter.Mark(1)
return err
}
defer func() {
if err != nil {
p.reserver.Release(from)
}
}()
}
トランザクションをRLPエンコードしてストレージに保存
blob, err := rlp.EncodeToBytes(tx)
if err != nil {
log.Error("Failed to encode transaction for storage", "hash", tx.Hash(), "err", err)
return err
}
id, err := p.store.Put(blob)
if err != nil {
return err
}
blobメタデータを作成
現在のnonceとtx nonceの差を計算
meta := newBlobTxMeta(id, tx.Size(), p.store.Size(id), tx)
var (
next = p.state.GetNonce(from)
offset = int(tx.Nonce() - next)
newacc = false
)
nonceに基づいて既存のトランザクションを置換あるいは追加
if len(p.index[from]) > offset {
// 既存のトランザクションを置換
prev := p.index[from][offset]
// ... 置換処理
} else {
// 新しいトランザクションを追加
p.index[from] = append(p.index[from], meta)
// ... 追加処理
}
追放
for i := offset; i < len(txs); i++ {
// 追放フィールドの計算
// ... 詳細な計算処理
}
switch {
case newacc:
heap.Push(p.evict, from)
case len(txs) == 1:
heap.Fix(p.evict, p.evict.index[from])
default:
// ... ヒープの更新条件
}
ストレージ制限をチェックし、超えた場合はトランザクションをdropする
- ヒープから最も優先度の低いアカウントを選択
→ 優先順位付けの仕組みは?gasと回数だっけ? - そのアカウントの最後のトランザクションを追放対象に
- そのアカウントのトランザクションがもうない場合はインデックス、支出記録から削除し、アカウントの予約を開放
- ストレージから削除
for p.stored > p.config.Datacap {
p.drop()
}
core/txpool/blobpool.go::BlobPool.validateTx
基本的な検証 (core/txpool/validation.go::ValidateTransaction()
)
- txのtypeがサポートされてることをチェック
- txのsizeが制限内かチェック
- ルールチェック
- Berlin以前の場合: LegacyTxしか対応してない
- London以前場合: DynamicFeeTypeに対応してない
- Cancun以前の場合: BlobTxに対応してない
- Prague以前場合: setCodeTxTypeに対応してない
- Shanghai以降: contract callのinitコードサイズ制限チェック
- txの値が負でないかチェック
- ガス代が制限内かチェック
- txの送信者と署名者が一致していることをチェック
if err := p.ValidateTxBasics(tx); err != nil {
return err
}
ステートの計算
- nonce gapがある場合は次のnonceを計算
- アカウントの残りのtx数制限を計算
- アカウントの既存支出を計算
- 既存コストの計算
stateOpts := &txpool.ValidationOptionsWithState{
State: p.state,
// ... 各種検証オプション
}
ステートの検証 (core/txpool/validation.go::ValidateTransactionWithState()
)
if err := txpool.ValidateTransactionWithState(tx, p.signer, stateOpts); err != nil {
return err
}
委任制限のチェック
tx senderが委任されているなら委任が適切かチェック
if err := p.checkDelegationLimit(tx); err != nil {
return err
}
既存のトランザクションの置換を検証
同じnonceを持つ新しいトランザクションで既存のトランザクションを置き換える際の検証で、価格上昇や、同じnonceを持つトランザクションの存在などの場合に発動する
これにより二重支払い防止の機能も果たす
var (
from, _ = types.Sender(p.signer, tx)
next = p.state.GetNonce(from)
)
if uint64(len(p.index[from])) > tx.Nonce()-next {
prev := p.index[from][int(tx.Nonce()-next)]
// ... 置換の検証
}
価格上昇の検証
switch {
case tx.GasFeeCapIntCmp(prev.execFeeCap.ToBig()) <= 0:
return fmt.Errorf("%w: new tx gas fee cap %v <= %v queued", txpool.ErrReplaceUnderpriced, tx.GasFeeCap(), prev.execFeeCap)
// ... 他の価格チェック
}
最小価格の検証
switch {
case tx.GasFeeCapIntCmp(minGasFeeCap.ToBig()) < 0:
return fmt.Errorf("%w: new tx gas fee cap %v < %v queued + %d%% replacement penalty", txpool.ErrReplaceUnderpriced, tx.GasFeeCap(), prev.execFeeCap, p.config.PriceBump)
// ... 他の最小価格チェック
}
ちなみにルール管理によってtxがあるアップデートよりも後に作成されたtxかどうか識別する。これはフォークのたびにルールが増えていく。
もうOsaka, Verkleの型も準備されている。
func (c *ChainConfig) Rules(num *big.Int, isMerge bool, timestamp uint64) Rules {
chainID := c.ChainID
if chainID == nil {
chainID = new(big.Int)
}
// disallow setting Merge out of order
isMerge = isMerge && c.IsLondon(num)
isVerkle := isMerge && c.IsVerkle(num, timestamp)
return Rules{
ChainID: new(big.Int).Set(chainID),
IsHomestead: c.IsHomestead(num),
IsEIP150: c.IsEIP150(num),
IsEIP155: c.IsEIP155(num),
IsEIP158: c.IsEIP158(num),
IsByzantium: c.IsByzantium(num),
IsConstantinople: c.IsConstantinople(num),
IsPetersburg: c.IsPetersburg(num),
IsIstanbul: c.IsIstanbul(num),
IsBerlin: c.IsBerlin(num),
IsEIP2929: c.IsBerlin(num) && !isVerkle,
IsLondon: c.IsLondon(num),
IsMerge: isMerge,
IsShanghai: isMerge && c.IsShanghai(num, timestamp),
IsCancun: isMerge && c.IsCancun(num, timestamp),
IsPrague: isMerge && c.IsPrague(num, timestamp),
IsOsaka: isMerge && c.IsOsaka(num, timestamp),
IsVerkle: isVerkle,
IsEIP4762: isVerkle,
}
}
core/txpool/legacypool.go::LegacyPool.Add
既知のトランザクションのフィルタリング
プールに既にあるトランザクションはスキップする
for i, tx := range txs {
if pool.all.Get(tx.Hash()) != nil {
errs[i] = txpool.ErrAlreadyKnown
knownTxMeter.Mark(1)
continue
}
validateBasics (BlobPoolと一緒)
if err := pool.ValidateTxBasics(tx); err != nil {
errs[i] = err
log.Trace("Discarding invalid transaction", "hash", tx.Hash(), "err", err)
invalidTxMeter.Mark(1)
continue
}
トランザクションの追加処理
pool.mu.Lock()
newErrs, dirtyAddrs := pool.addTxsLocked(news)
pool.mu.Unlock()
プールの再編成
done := pool.requestPromoteExecutables(dirtyAddrs)
if sync {
<-done
}
eth/handler.go
ノードの開始時にSubscribeTransactionsでtx poolにトランザクションが追加されたのを検知するようにする
func (h *handler) Start(maxPeers int) {
h.maxPeers = maxPeers
// broadcast and announce transactions (only new ones, not resurrected ones)
h.wg.Add(1)
h.txsCh = make(chan core.NewTxsEvent, txChanSize)
h.txsSub = h.txpool.SubscribeTransactions(h.txsCh, false)
go h.txBroadcastLoop()
// start sync handlers
h.txFetcher.Start()
// start peer handler tracker
h.wg.Add(1)
go h.protoTracker()
}
イベントを受け取ったら接続しているピアにトランザクションをブロードキャストする
ある程度まとまった数ブロードキャストしている
// txBroadcastLoop announces new transactions to connected peers.
func (h *handler) txBroadcastLoop() {
defer h.wg.Done()
for {
select {
case event := <-h.txsCh:
h.BroadcastTransactions(event.Txs)
case <-h.txsSub.Err():
return
}
}
}
eth/protocols/eth/peer.go::AsyncSendTransactions
knownTxsに追加したらrequest queueに追加
非同期でp2p.Send(p.rw, TransactionsMsg, txs)
で送信される
// AsyncSendTransactions queues a list of transactions (by hash) to eventually
// propagate to a remote peer. The number of pending sends are capped (new ones
// will force old sends to be dropped)
func (p *Peer) AsyncSendTransactions(hashes []common.Hash) {
select {
case p.txBroadcast <- hashes:
// Mark all the transactions as known, but ensure we don't overflow our limits
p.knownTxs.Add(hashes...)
case <-p.term:
p.Log().Debug("Dropping transaction propagation", "count", len(hashes))
}
}
2. 他のELからtxを受け取った時
p2pメッセージ経由で他のクライアントからtxを受け取った時にもUserから受け取ったときと似たような処理が実行されます
eth/protocols/eth/handers.go::handleTransactions
txをチェックしてknownTxsに追加する
backend.Handeを呼び出す
func handleTransactions(backend Backend, msg Decoder, peer *Peer) error {
// Transactions arrived, make sure we have a valid and fresh chain to handle them
if !backend.AcceptTxs() {
return nil
}
// Transactions can be processed, parse all of them and deliver to the pool
var txs TransactionsPacket
if err := msg.Decode(&txs); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
for i, tx := range txs {
// Validate and mark the remote transaction
if tx == nil {
return fmt.Errorf("%w: transaction %d is nil", errDecode, i)
}
peer.markTransaction(tx.Hash())
}
return backend.Handle(peer, &txs)
}
eth/handler_eth.go::Handle
このメソッドでパケットを処理する
transactionsPacketの場合、Fetcherに追加して終わり
blobはp2pでは受け付けてないってことは、blobは一体どうしてるんだ???
case *eth.TransactionsPacket:
for _, tx := range *packet {
if tx.Type() == types.BlobTxType {
return errors.New("disallowed broadcast blob transaction")
}
}
return h.txFetcher.Enqueue(peer.ID(), *packet, false)
eth/fetcher/tx_fetcher.go::Enqueue
メトリクス選択
ブロードキャストか直接かによって違う
var (
inMeter = txReplyInMeter
knownMeter = txReplyKnownMeter
underpricedMeter = txReplyUnderpricedMeter
otherRejectMeter = txReplyOtherRejectMeter
)
if !direct {
inMeter = txBroadcastInMeter
knownMeter = txBroadcastKnownMeter
underpricedMeter = txBroadcastUnderpricedMeter
otherRejectMeter = txBroadcastOtherRejectMeter
}
バッチ処理ループ
for i := 0; i < len(txs); i += addTxsBatchSize {
end := i + addTxsBatchSize
if end > len(txs) {
end = len(txs)
}
トランザクションの追加 (h.txpool.Add(txs, false)
)
Addで同じくevent発行されて他のpeerに伝播してるっぽいね
ちょっと伝播のタイミング遅い気がする。先に軽いチェックだけして伝播すると複雑になるけど速くなる?
priceが小さすぎる場合にトラックしておく
これはpeerからのDDoS攻撃を防ぐためかな?
for j, err := range f.addTxs(batch) {
if errors.Is(err, txpool.ErrUnderpriced) || errors.Is(err, txpool.ErrReplaceUnderpriced) || errors.Is(err, txpool.ErrTxGasPriceTooLow) {
f.underpriced.Add(batch[j].Hash(), batch[j].Time())
}
クリーンアップ要求
届いたよーって伝えるってことかな?
select {
case f.cleanup <- &txDelivery{origin: peer, hashes: added, metas: metas, direct: direct}:
return nil
case <-f.quit:
return errTerminated
}
EngineAPIによる処理は次回に続きます