Skip to content

Commit

Permalink
PR feedback: value receiver, no extra range calc
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaptic committed Sep 4, 2024
1 parent d7ebcf9 commit a5bfc3b
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 45 deletions.
8 changes: 5 additions & 3 deletions cmd/soroban-rpc/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows
// will return the *applicable* range for the incomplete set of migrations.
// That means **it may be empty** if all migrations have occurred.
//
dataMigrations, applicableRange, err := db.BuildMigrations(
dataMigrations, err := db.BuildMigrations(
readTxMetaCtx, d.logger, d.db, cfg.NetworkPassphrase, retentionRange)
if err != nil {
d.logger.WithError(err).Fatal("could not build migrations")
Expand All @@ -364,7 +364,7 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows
if err != nil {
d.logger.WithError(err).Fatal("could not get ledger range for fee stats")
}
ledgerSeqRange := feeStatsRange.Merge(&applicableRange)
ledgerSeqRange := dataMigrations.ApplicableRange().Merge(feeStatsRange)

//
// 5. Apply migration for events & transactions, and perform fee stat analysis.
Expand Down Expand Up @@ -409,7 +409,9 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows
}

if currentSeq != 0 {
d.logger.WithField("seq", currentSeq).
d.logger.
WithField("first", retentionRange.First).
WithField("last", retentionRange.Last).
Info("Finished initializing in-memory store and applying DB data migrations")
}

Expand Down
6 changes: 3 additions & 3 deletions cmd/soroban-rpc/internal/db/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,8 @@ type eventTableMigration struct {
writer EventWriter
}

func (e *eventTableMigration) ApplicableRange() *LedgerSeqRange {
return &LedgerSeqRange{
func (e *eventTableMigration) ApplicableRange() LedgerSeqRange {
return LedgerSeqRange{
First: e.firstLedger,
Last: e.lastLedger,
}
Expand All @@ -326,7 +326,7 @@ func newEventTableMigration(
_ context.Context,
logger *log.Entry,
passphrase string,
ledgerSeqRange *LedgerSeqRange,
ledgerSeqRange LedgerSeqRange,
) migrationApplierFactory {
return migrationApplierFactoryF(func(db *DB) (MigrationApplier, error) {
migration := eventTableMigration{
Expand Down
67 changes: 31 additions & 36 deletions cmd/soroban-rpc/internal/db/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,49 +19,40 @@ type LedgerSeqRange struct {
Last uint32
}

func (mlr *LedgerSeqRange) IsLedgerIncluded(ledgerSeq uint32) bool {
if mlr == nil {
return false
}
func (mlr LedgerSeqRange) IsLedgerIncluded(ledgerSeq uint32) bool {
return ledgerSeq >= mlr.First && ledgerSeq <= mlr.Last
}

func (mlr *LedgerSeqRange) Merge(other *LedgerSeqRange) *LedgerSeqRange {
func (mlr LedgerSeqRange) Merge(other LedgerSeqRange) LedgerSeqRange {
if mlr.Empty() {
return other
}
if other.Empty() {
return mlr
}

// TODO: using min/max can result in a much larger range than needed,
// as an optimization, we should probably use a sequence of ranges instead.
return &LedgerSeqRange{
return LedgerSeqRange{
First: min(mlr.First, other.First),
Last: max(mlr.Last, other.Last),
}
}

func (mlr *LedgerSeqRange) MergeInPlace(other LedgerSeqRange) {
// TODO: using min/max can result in a much larger range than needed,
// as an optimization, we should probably use a sequence of ranges instead.
mlr.First = min(mlr.First, other.First)
mlr.Last = max(mlr.Last, other.Last)
}

func (mlr *LedgerSeqRange) Empty() bool {
return mlr == nil || (mlr.First == 0 && mlr.Last == 0)
func (mlr LedgerSeqRange) Empty() bool {
return mlr.First == 0 && mlr.Last == 0
}

type MigrationApplier interface {
// ApplicableRange returns the closed ledger sequence interval,
// where Apply() should be called. A null result indicates the empty range
ApplicableRange() *LedgerSeqRange
// where Apply() should be called.
ApplicableRange() LedgerSeqRange
// Apply applies the migration on a ledger. It should never be applied
// in ledgers outside the ApplicableRange()
Apply(ctx context.Context, meta xdr.LedgerCloseMeta) error
}

type migrationApplierF func(context.Context, *log.Entry, string, *LedgerSeqRange) migrationApplierFactory
type migrationApplierF func(context.Context, *log.Entry, string, LedgerSeqRange) migrationApplierFactory

type migrationApplierFactory interface {
New(db *DB) (MigrationApplier, error)
Expand All @@ -83,8 +74,15 @@ type MultiMigration struct {
db *DB
}

func (mm MultiMigration) ApplicableRange() *LedgerSeqRange {
var result *LedgerSeqRange
func (mm *MultiMigration) Append(m Migration) {
r := m.ApplicableRange()
if !r.Empty() {
mm.migrations = append(mm.migrations, m)
}
}

func (mm MultiMigration) ApplicableRange() LedgerSeqRange {
var result LedgerSeqRange
for _, m := range mm.migrations {
result = m.ApplicableRange().Merge(result)
}
Expand Down Expand Up @@ -167,9 +165,9 @@ func (g *guardedMigration) Apply(ctx context.Context, meta xdr.LedgerCloseMeta)
return g.migration.Apply(ctx, meta)
}

func (g *guardedMigration) ApplicableRange() *LedgerSeqRange {
func (g *guardedMigration) ApplicableRange() LedgerSeqRange {
if g.alreadyMigrated {
return nil
return LedgerSeqRange{}
}
return g.migration.ApplicableRange()
}
Expand Down Expand Up @@ -199,14 +197,11 @@ func GetMigrationLedgerRange(ctx context.Context, db *DB, retentionWindow uint32
func BuildMigrations(
ctx context.Context, logger *log.Entry, db *DB, networkPassphrase string,
ledgerSeqRange LedgerSeqRange,
) (MultiMigration, LedgerSeqRange, error) {
// Track ranges for which migrations are actually necessary
applicableRange := LedgerSeqRange{}

) (MultiMigration, error) {
// Start a common db transaction for the entire migration duration
err := db.Begin(ctx)
if err != nil {
return MultiMigration{}, applicableRange, errors.Join(err, db.Rollback())
return MultiMigration{}, errors.Join(err, db.Rollback())
}

//
Expand All @@ -217,19 +212,23 @@ func BuildMigrations(
eventsMigrationName: newEventTableMigration,
}

migrations := make([]Migration, 0, len(currentMigrations))
mm := MultiMigration{
migrations: make([]Migration, 0, len(currentMigrations)),
db: db,
}

for migrationName, migrationFunc := range currentMigrations {
migrationLogger := logger.WithField("migration", migrationName)
factory := migrationFunc(
ctx,
migrationLogger,
networkPassphrase,
&ledgerSeqRange,
ledgerSeqRange,
)

guardedM, err := newGuardedDataMigration(ctx, migrationName, migrationLogger, factory, db)
if err != nil {
return MultiMigration{}, applicableRange, errors.Join(err, fmt.Errorf(
return MultiMigration{}, errors.Join(err, fmt.Errorf(
"could not create guarded migration for %s", migrationName), db.Rollback())
}

Expand All @@ -238,12 +237,8 @@ func BuildMigrations(
continue
}

applicableRange.MergeInPlace(ledgerSeqRange)
migrations = append(migrations, guardedM)
mm.Append(guardedM)
}

return MultiMigration{
migrations: migrations,
db: db,
}, applicableRange, nil
return mm, nil
}
6 changes: 3 additions & 3 deletions cmd/soroban-rpc/internal/db/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,8 @@ type transactionTableMigration struct {
writer TransactionWriter
}

func (t *transactionTableMigration) ApplicableRange() *LedgerSeqRange {
return &LedgerSeqRange{
func (t *transactionTableMigration) ApplicableRange() LedgerSeqRange {
return LedgerSeqRange{
First: t.firstLedger,
Last: t.lastLedger,
}
Expand All @@ -270,7 +270,7 @@ func newTransactionTableMigration(
ctx context.Context,
logger *log.Entry,
passphrase string,
ledgerSeqRange *LedgerSeqRange,
ledgerSeqRange LedgerSeqRange,
) migrationApplierFactory {
return migrationApplierFactoryF(func(db *DB) (MigrationApplier, error) {
// Truncate the table, since it may contain data, causing insert conflicts later on.
Expand Down

0 comments on commit a5bfc3b

Please sign in to comment.