diff --git a/go.mod b/go.mod index 25f06c6..a003b92 100644 --- a/go.mod +++ b/go.mod @@ -115,4 +115,4 @@ require ( replace github.com/ethereum/go-ethereum => github.com/mdehoog/op-geth v0.0.0-20241003075401-d8f4cde5a852 -replace github.com/ethereum-optimism/optimism => github.com/mdehoog/optimism v0.0.0-20241004232359-14da93be19e5 +replace github.com/ethereum-optimism/optimism => github.com/mdehoog/optimism v0.0.0-20241005031606-445adf498c86 diff --git a/go.sum b/go.sum index 6d73d7c..e6d1339 100644 --- a/go.sum +++ b/go.sum @@ -222,8 +222,8 @@ github.com/mattn/go-runewidth v0.0.14 h1:+xnbZSEeDbOIg5/mE6JF0w6n9duR1l3/WmbinWV github.com/mattn/go-runewidth v0.0.14/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/mdehoog/op-geth v0.0.0-20241003075401-d8f4cde5a852 h1:sxczPjBB5xXnQWhdSrLdQLLIi/ZVv22pVOmd6xrHq8A= github.com/mdehoog/op-geth v0.0.0-20241003075401-d8f4cde5a852/go.mod h1:7S4pp8KHBmEmKkRjL1BPOc6jY9hW+64YeMUjR3RVLw4= -github.com/mdehoog/optimism v0.0.0-20241004232359-14da93be19e5 h1:9Lm/4Jq+YEBWgEw9dX8QUCB8CvyZ8fDSi62Al7ebZns= -github.com/mdehoog/optimism v0.0.0-20241004232359-14da93be19e5/go.mod h1:cg1f0VUXn/9jfek2sWOrVRxxlxH7ia4UEZvEMa1X5/4= +github.com/mdehoog/optimism v0.0.0-20241005031606-445adf498c86 h1:FXEWfWyCxCrtUhJmlqljN7nwF6SLl2CdvhxOJYsQ5Lc= +github.com/mdehoog/optimism v0.0.0-20241005031606-445adf498c86/go.mod h1:cg1f0VUXn/9jfek2sWOrVRxxlxH7ia4UEZvEMa1X5/4= github.com/mdlayher/socket v0.4.1 h1:eM9y2/jlbs1M615oshPQOHZzj6R6wMT7bX5NPiQvn2U= github.com/mdlayher/socket v0.4.1/go.mod h1:cAqeGjoufqdxWkD7DkpyS+wcefOtmu5OQ8KuoJGIReA= github.com/mdlayher/vsock v1.2.1 h1:pC1mTJTvjo1r9n9fbm7S1j04rCgCzhCOS5DY0zqHlnQ= diff --git a/op-batcher/batcher/batch_submitter.go b/op-batcher/batcher/batch_submitter.go index 646f289..0c220c2 100644 --- a/op-batcher/batcher/batch_submitter.go +++ b/op-batcher/batcher/batch_submitter.go @@ -9,6 +9,8 @@ import ( opservice "github.com/ethereum-optimism/optimism/op-service" "github.com/ethereum-optimism/optimism/op-service/cliapp" oplog "github.com/ethereum-optimism/optimism/op-service/log" + "github.com/ethereum/go-ethereum/rpc" + thisflags "github.com/mdehoog/op-enclave/op-batcher/flags" "github.com/urfave/cli/v2" ) @@ -26,11 +28,18 @@ func Main(version string) cliapp.LifecycleAction { l := oplog.NewLogger(oplog.AppOut(cliCtx), cfg.LogConfig) oplog.SetGlobalLogHandler(l.Handler()) - opservice.ValidateEnvVars(flags.EnvVarPrefix, flags.Flags, l) + opservice.ValidateEnvVars(flags.EnvVarPrefix, thisflags.Flags, l) + + proposerClient, err := rpc.DialContext(cliCtx.Context, cliCtx.String(thisflags.ProposerRpcFlag.Name)) + if err != nil { + return nil, fmt.Errorf("failed to connect to Proposer: %w", err) + } l.Info("Initializing Batch Submitter") - channelFactoryOpt := func(service *batcher.BatcherService, cfg *batcher.CLIConfig) { - service.ChannelFactory = NewChannel + channelFactoryOpt := func(setup *batcher.DriverSetup) { + metricer := NewMetricer(setup.Metr, setup.Log, proposerClient) + setup.Metr = metricer + setup.ChannelOutFactory = ChannelOutFactory(metricer) } return batcher.BatcherServiceFromCLIConfig(cliCtx.Context, version, cfg, l, channelFactoryOpt) } diff --git a/op-batcher/batcher/channel.go b/op-batcher/batcher/channel.go deleted file mode 100644 index f97810e..0000000 --- a/op-batcher/batcher/channel.go +++ /dev/null @@ -1,44 +0,0 @@ -package batcher - -import ( - "errors" - - "github.com/ethereum-optimism/optimism/op-batcher/batcher" - "github.com/ethereum-optimism/optimism/op-batcher/metrics" - "github.com/ethereum-optimism/optimism/op-node/rollup" - "github.com/ethereum-optimism/optimism/op-node/rollup/derive" - "github.com/ethereum-optimism/optimism/op-service/predeploys" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/log" -) - -var ErrWithdrawalDetected = errors.New("withdrawal detected") - -func NewChannel(log log.Logger, metr metrics.Metricer, cfg batcher.ChannelConfig, rollupCfg *rollup.Config, latestL1OriginBlockNum uint64) (batcher.Channel, error) { - co, err := batcher.NewChannel(log, metr, cfg, rollupCfg, latestL1OriginBlockNum) - if err != nil { - return nil, err - } - return &channel{ - Channel: co, - }, nil -} - -type channel struct { - batcher.Channel - fullErr error -} - -func (c *channel) AddBlock(block *types.Block) (*derive.L1BlockInfo, error) { - if block.Bloom().Test(predeploys.L2ToL1MessagePasserAddr.Bytes()) { - c.fullErr = ErrWithdrawalDetected - } - return c.Channel.AddBlock(block) -} - -func (c *channel) FullErr() error { - if c.fullErr != nil { - return c.fullErr - } - return c.Channel.FullErr() -} diff --git a/op-batcher/batcher/channel_out.go b/op-batcher/batcher/channel_out.go new file mode 100644 index 0000000..513d0ee --- /dev/null +++ b/op-batcher/batcher/channel_out.go @@ -0,0 +1,57 @@ +package batcher + +import ( + "errors" + + "github.com/ethereum-optimism/optimism/op-batcher/batcher" + "github.com/ethereum-optimism/optimism/op-node/rollup" + "github.com/ethereum-optimism/optimism/op-node/rollup/derive" + "github.com/ethereum-optimism/optimism/op-service/predeploys" + "github.com/ethereum/go-ethereum/core/types" +) + +var ErrWithdrawalDetected = errors.New("withdrawal detected") + +type ChannelOut interface { + derive.ChannelOut + Blocks() []*types.Block +} + +func ChannelOutFactory(metricer Metricer) batcher.ChannelOutFactory { + return func(cfg batcher.ChannelConfig, rollupCfg *rollup.Config) (derive.ChannelOut, error) { + co, err := batcher.NewChannelOut(cfg, rollupCfg) + if err != nil { + return nil, err + } + wrapped := &channelOut{ + ChannelOut: co, + } + metricer.RegisterChannel(wrapped) + return wrapped, nil + } +} + +type channelOut struct { + derive.ChannelOut + fullErr error + blocks []*types.Block +} + +func (c *channelOut) Blocks() []*types.Block { + return c.blocks +} + +func (c *channelOut) AddBlock(config *rollup.Config, block *types.Block) (*derive.L1BlockInfo, error) { + c.blocks = append(c.blocks, block) + if block.Bloom().Test(predeploys.L2ToL1MessagePasserAddr.Bytes()) { + c.fullErr = ErrWithdrawalDetected + } + return c.ChannelOut.AddBlock(config, block) +} + +func (c *channelOut) FullErr() error { + if c.fullErr != nil { + return c.fullErr + } + return c.ChannelOut.FullErr() +} diff --git a/op-batcher/batcher/metricer.go b/op-batcher/batcher/metricer.go new file mode 100644 index 0000000..d31d134 --- /dev/null +++ b/op-batcher/batcher/metricer.go @@ -0,0 +1,55 @@ +package batcher + +import ( + "github.com/ethereum-optimism/optimism/op-batcher/metrics" + "github.com/ethereum-optimism/optimism/op-node/rollup/derive" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rpc" +) + +type Metricer interface { + metrics.Metricer + RegisterChannel(out ChannelOut) +} + +type metricer struct { + metrics.Metricer + log log.Logger + proposerClient *rpc.Client + channels map[derive.ChannelID]ChannelOut +} + +func NewMetricer(m metrics.Metricer, log log.Logger, proposerClient *rpc.Client) Metricer { + return &metricer{ + Metricer: m, + log: log, + proposerClient: proposerClient, + channels: make(map[derive.ChannelID]ChannelOut), + } +} + +func (m *metricer) RegisterChannel(out ChannelOut) { + m.channels[out.ID()] = out +} + +func (m *metricer) RecordChannelFullySubmitted(id derive.ChannelID) { + m.Metricer.RecordChannelFullySubmitted(id) + + channel, ok := m.channels[id] + if !ok { + return + } + delete(m.channels, id) + var numbers []uint64 + for _, b := range channel.Blocks() { + numbers = append(numbers, b.NumberU64()) + } + if err := m.proposerClient.Call(nil, "admin_blocksBatched", numbers); err != nil { + m.log.Error("failed to notify Proposer of batched blocks", "err", err) + } +} + +func (m *metricer) RecordChannelTimedOut(id derive.ChannelID) { + m.Metricer.RecordChannelTimedOut(id) + delete(m.channels, id) +} diff --git a/op-batcher/cmd/main.go b/op-batcher/cmd/main.go index 9354077..f44ae99 100644 --- a/op-batcher/cmd/main.go +++ b/op-batcher/cmd/main.go @@ -4,8 +4,8 @@ import ( "context" "os" - "github.com/ethereum-optimism/optimism/op-batcher/flags" "github.com/mdehoog/op-enclave/op-batcher/batcher" + "github.com/mdehoog/op-enclave/op-batcher/flags" "github.com/urfave/cli/v2" opservice "github.com/ethereum-optimism/optimism/op-service" diff --git a/op-batcher/flags/flags.go b/op-batcher/flags/flags.go new file mode 100644 index 0000000..8068ddf --- /dev/null +++ b/op-batcher/flags/flags.go @@ -0,0 +1,30 @@ +package flags + +import ( + "github.com/ethereum-optimism/optimism/op-batcher/flags" + opservice "github.com/ethereum-optimism/optimism/op-service" + "github.com/urfave/cli/v2" +) + +func prefixEnvVar(name string) []string { + return opservice.PrefixEnvVar(flags.EnvVarPrefix, name) +} + +var ( + ProposerRpcFlag = &cli.StringFlag{ + Name: "proposer-rpc", + Usage: "HTTP provider URL for the Proposer", + EnvVars: prefixEnvVar("PROPOSER_RPC"), + Required: true, + } +) + +var requiredFlags = []cli.Flag{ + ProposerRpcFlag, +} + +func init() { + Flags = append(requiredFlags, flags.Flags...) +} + +var Flags []cli.Flag diff --git a/op-proposer/proposer/driver.go b/op-proposer/proposer/driver.go index 896ea26..f4ccaa3 100644 --- a/op-proposer/proposer/driver.go +++ b/op-proposer/proposer/driver.go @@ -55,6 +55,9 @@ type L2OutputSubmitter struct { ooABI *abi.ABI prover *Prover + + blocksBatched map[uint64]struct{} + blocksBatchedMutex sync.Mutex } // NewL2OutputSubmitter creates a new L2 Output Submitter @@ -109,6 +112,8 @@ func newL2OOSubmitter(ctx context.Context, cancel context.CancelFunc, setup Driv ooContract: ooContract, ooABI: parsed, prover: prover, + + blocksBatched: make(map[uint64]struct{}), }, nil } @@ -157,6 +162,43 @@ func (l *L2OutputSubmitter) StopL2OutputSubmitting() error { return nil } +func (l *L2OutputSubmitter) BlocksBatched(numbers []uint64) error { + l.blocksBatchedMutex.Lock() + defer l.blocksBatchedMutex.Unlock() + for _, number := range numbers { + l.blocksBatched[number] = struct{}{} + } + return nil +} + +func (l *L2OutputSubmitter) LatestBlockBatched(ctx context.Context) (uint64, error) { + syncStatus, err := l.RollupClient.SyncStatus(ctx) + if err != nil { + return 0, fmt.Errorf("failed to get sync status from Rollup: %w", err) + } + batched := syncStatus.FinalizedL2.Number + if l.Cfg.AllowNonFinalized { + batched = syncStatus.PendingSafeL2.Number + + l.blocksBatchedMutex.Lock() + defer l.blocksBatchedMutex.Unlock() + + for number := range l.blocksBatched { + if number <= batched { + delete(l.blocksBatched, number) + } + } + + // iterate through the batched blocks to find the last contiguous batched block number + for i := batched + 1; ; i++ { + if _, ok := l.blocksBatched[i]; !ok { + return i - 1, nil + } + } + } + return batched, nil +} + // loop is responsible for creating & submitting the next outputs // The loop regularly polls the L2 chain to infer whether to make the next proposal. func (l *L2OutputSubmitter) loop() { @@ -224,31 +266,28 @@ func (l *L2OutputSubmitter) generateNextProposal(ctx context.Context, lastPropos } // generate new proposals up to the latest block - syncStatus, err := l.RollupClient.SyncStatus(ctx) + batchedBlockNumber, err := l.LatestBlockBatched(ctx) if err != nil { - return nil, false, fmt.Errorf("failed to get sync status from Rollup: %w", err) - } - latestBlockNumber := syncStatus.FinalizedL2.Number - if l.Cfg.AllowNonFinalized { - latestBlockNumber = syncStatus.PendingSafeL2.Number + return nil, false, err } // TODO implement proposal array limit (aggregate in chunks) // TODO implement a pool of go-routines for parallel proof generation + // TODO generate proofs for unsafe blocks ahead of time var proposals []*Proposal if lastProposal != nil { proposals = append(proposals, lastProposal) } - shouldPropose := lastProposalBlockNumber < latestBlockNumber && - l.Cfg.MinProposalInterval > 0 && latestBlockNumber-proposedBlockNumber > l.Cfg.MinProposalInterval - for i := lastProposalBlockNumber + 1; i <= latestBlockNumber; i++ { + shouldPropose := lastProposalBlockNumber < batchedBlockNumber && + l.Cfg.MinProposalInterval > 0 && batchedBlockNumber-proposedBlockNumber > l.Cfg.MinProposalInterval + for i := lastProposalBlockNumber + 1; i <= batchedBlockNumber; i++ { proposal, anyWithdrawals, err := l.prover.Generate(ctx, i) if err != nil { return nil, false, fmt.Errorf("failed to generate proof for block %d: %w", i, err) } proposals = append(proposals, proposal) shouldPropose = shouldPropose || anyWithdrawals - l.Log.Info("Generated proof for block", "block", i, "latest", latestBlockNumber, "shouldPropose", shouldPropose, "output", proposal.Output.OutputRoot.String()) + l.Log.Info("Generated proof for block", "block", i, "batched", batchedBlockNumber, "shouldPropose", shouldPropose, "output", proposal.Output.OutputRoot.String()) } if len(proposals) == 0 { diff --git a/op-proposer/proposer/rpc/admin.go b/op-proposer/proposer/rpc/admin.go new file mode 100644 index 0000000..f536052 --- /dev/null +++ b/op-proposer/proposer/rpc/admin.go @@ -0,0 +1,34 @@ +package rpc + +import ( + "context" + + "github.com/ethereum/go-ethereum/log" + gethrpc "github.com/ethereum/go-ethereum/rpc" + + "github.com/ethereum-optimism/optimism/op-service/metrics" + "github.com/ethereum-optimism/optimism/op-service/rpc" +) + +type ProposerDriver interface { + BlocksBatched(numbers []uint64) error +} + +type adminAPI struct { + *rpc.CommonAdminAPI + b ProposerDriver +} + +func NewAdminAPI(dr ProposerDriver, m metrics.RPCMetricer, log log.Logger) gethrpc.API { + return gethrpc.API{ + Namespace: "admin", + Service: &adminAPI{ + CommonAdminAPI: rpc.NewCommonAdminAPI(m, log), + b: dr, + }, + } +} + +func (a *adminAPI) BlocksBatched(_ context.Context, numbers []uint64) error { + return a.b.BlocksBatched(numbers) +} diff --git a/op-proposer/proposer/service.go b/op-proposer/proposer/service.go index 903e6d6..d500ca9 100644 --- a/op-proposer/proposer/service.go +++ b/op-proposer/proposer/service.go @@ -20,6 +20,7 @@ import ( "github.com/ethereum/go-ethereum/ethclient" "github.com/mdehoog/op-enclave/op-enclave/enclave" "github.com/mdehoog/op-enclave/op-proposer/metrics" + thisrpc "github.com/mdehoog/op-enclave/op-proposer/proposer/rpc" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" @@ -242,6 +243,7 @@ func (ps *ProposerService) initRPCServer(cfg *CLIConfig) error { if cfg.RPCConfig.EnableAdmin { adminAPI := rpc.NewAdminAPI(ps.driver, ps.Metrics, ps.Log) server.AddAPI(rpc.GetAdminAPI(adminAPI)) + server.AddAPI(thisrpc.NewAdminAPI(ps.driver, ps.Metrics, ps.Log)) server.AddAPI(ps.TxManager.API()) ps.Log.Info("Admin RPC enabled") }