From 7e89c8554154193b7d9c5b7e99fafb23bd67b0bd Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Fri, 2 Dec 2022 11:08:04 -0700 Subject: [PATCH 1/4] Proof of concept dynamic chunk size --- go/base/context.go | 104 ++++++++++++++++++++++++++++++++++++++++-- go/base/utils.go | 8 ++++ go/base/utils_test.go | 17 +++++++ go/cmd/gh-ost/main.go | 2 + go/logic/applier.go | 4 +- go/logic/migrator.go | 6 ++- go/logic/server.go | 2 +- 7 files changed, 135 insertions(+), 8 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index e3472f5bd..4aeaf110a 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -52,6 +52,22 @@ const ( HTTPStatusOK = 200 MaxEventsBatchSize = 1000 ETAUnknown = math.MinInt64 + + // MaxDynamicScaleFactor is the maximum factor dynamic scaling can change the chunkSize from + // the setting chunkSize. For example, if the factor is 10, and chunkSize is 1000, then the + // values will be in the range of 100 to 10000. + MaxDynamicScaleFactor = 50 + // MaxDynamicStepFactor is the maximum amount each recalculation of the dynamic chunkSize can + // increase by. For example, if the newTarget is 5000 but the current target is 1000, the newTarget + // will be capped back down to 1500. Over time the number 5000 will be reached, but not straight away. + MaxDynamicStepFactor = 1.5 + // MinDynamicChunkSize is the minimum chunkSize that can be used when dynamic chunkSize is enabled. + // This helps prevent a scenario where the chunk size is too small (it can never be less than 1). + MinDynamicRowSize = 10 + // DynamicPanicFactor is the factor by which the feedback process takes immediate action when + // the chunkSize appears to be too large. For example, if the PanicFactor is 5, and the target *time* + // is 50ms, an actual time 250ms+ will cause the dynamic chunk size to immediately be reduced. + DynamicPanicFactor = 5 ) var ( @@ -118,7 +134,7 @@ type MigrationContext struct { HeartbeatIntervalMilliseconds int64 defaultNumRetries int64 - ChunkSize int64 + chunkSize int64 niceRatio float64 MaxLagMillisecondsThrottleThreshold int64 throttleControlReplicaKeys *mysql.InstanceKeyMap @@ -146,6 +162,12 @@ type MigrationContext struct { HooksHintToken string HooksStatusIntervalSec int64 + DynamicChunkSize bool + DynamicChunkSizeTargetMillis int64 + targetChunkSizeMutex sync.Mutex + targetchunkFeedback []time.Duration + targetChunkSize int64 + DropServeSocket bool ServeSocketFile string ServeTCPPort int64 @@ -269,7 +291,7 @@ func NewMigrationContext() *MigrationContext { return &MigrationContext{ Uuid: uuid.NewV4().String(), defaultNumRetries: 60, - ChunkSize: 1000, + chunkSize: 1000, InspectorConnectionConfig: mysql.NewConnectionConfig(), ApplierConnectionConfig: mysql.NewConnectionConfig(), MaxLagMillisecondsThrottleThreshold: 1500, @@ -287,6 +309,7 @@ func NewMigrationContext() *MigrationContext { ColumnRenameMap: make(map[string]string), PanicAbort: make(chan error), Log: NewDefaultLogger(), + DynamicChunkSize: false, } } @@ -554,6 +577,81 @@ func (this *MigrationContext) GetTotalRowsCopied() int64 { return atomic.LoadInt64(&this.TotalRowsCopied) } +func (this *MigrationContext) ChunkDurationFeedback(d time.Duration) { + this.targetChunkSizeMutex.Lock() + defer this.targetChunkSizeMutex.Unlock() + + if int64(d) > (this.DynamicChunkSizeTargetMillis * DynamicPanicFactor * int64(time.Millisecond)) { + // We're 5x our target size. Something went seriously wrong, + // let's pump the brakes immediately to get back into range. + fmt.Printf("## error: chunk was too slow! Dividing by 10\n") + this.targetchunkFeedback = []time.Duration{} + newTarget := float64(this.targetChunkSize) / float64(DynamicPanicFactor*2) + this.targetChunkSize = int64(newTarget) + } + this.targetchunkFeedback = append(this.targetchunkFeedback, d) +} + +func (this *MigrationContext) calculateNewTargetChunkSize() int64 { + this.targetChunkSizeMutex.Lock() + defer this.targetChunkSizeMutex.Unlock() + + // We do all our math as float64 of time in ns + p90 := float64(findP90(this.targetchunkFeedback)) + targetTime := float64(this.DynamicChunkSizeTargetMillis * int64(time.Millisecond)) + newTargetRows := float64(this.targetChunkSize) * (targetTime / p90) + + // Apply some final boundary checking: + // We are only allowed to scale up/down 50x from the original chunk size. + if newTargetRows < float64(this.chunkSize)/MaxDynamicScaleFactor { + newTargetRows = float64(this.chunkSize) / MaxDynamicScaleFactor + } + if newTargetRows > float64(this.chunkSize)*MaxDynamicScaleFactor { + newTargetRows = float64(this.chunkSize) * MaxDynamicScaleFactor + } + // We only ever increase by 50% at a time. + // This ensures a gradual step up if there is some non-linear behavior. + // There's plenty of time to increase more. + if newTargetRows > float64(this.targetChunkSize)*MaxDynamicStepFactor { + newTargetRows = float64(this.targetChunkSize) * MaxDynamicStepFactor + } + // The newTargetRows must be at least 10, + // otherwise we're going too low + if newTargetRows < MinDynamicRowSize { + newTargetRows = MinDynamicRowSize + } + this.targetchunkFeedback = []time.Duration{} // reset + return int64(newTargetRows) +} + +func (this *MigrationContext) GetChunkSize() int64 { + if !this.DynamicChunkSize { + return atomic.LoadInt64(&this.chunkSize) + } + // Historically gh-ost has used a static chunk size (i.e. 1000 rows) + // which is can be adjusted while gh-ost is running. + // An ideal chunk size is large enough that it can batch operations, + // but small enough that it doesn't cause spikes in replica lag. + // + // The problem with basing the configurable on row-size is two fold: + // - Fow very narrow rows, it's not enough (leaving performance on the table). + // - For very wide rows (or with many secondary indexes) 1000 might be too high! + // + // Dynamic chunk size addresses this by using row-size as a starting point, + // and then increases or decreases the number based on feedback from previous + // copy tasks. + if this.targetChunkSize == 0 { + this.targetChunkSize = atomic.LoadInt64(&this.chunkSize) + } + // We need 10 samples to make a decision because we + // calculate it from the p90 (i.e. 2nd to highest value). + if len(this.targetchunkFeedback) >= 10 { + this.targetChunkSize = this.calculateNewTargetChunkSize() + fmt.Printf("# Adjusting chunk size based on feedback: %d\n", this.targetChunkSize) + } + return this.targetChunkSize +} + func (this *MigrationContext) GetIteration() int64 { return atomic.LoadInt64(&this.Iteration) } @@ -611,7 +709,7 @@ func (this *MigrationContext) SetChunkSize(chunkSize int64) { if chunkSize > 100000 { chunkSize = 100000 } - atomic.StoreInt64(&this.ChunkSize, chunkSize) + atomic.StoreInt64(&this.chunkSize, chunkSize) } func (this *MigrationContext) SetDMLBatchSize(batchSize int64) { diff --git a/go/base/utils.go b/go/base/utils.go index e3950f2bd..d004b343c 100644 --- a/go/base/utils.go +++ b/go/base/utils.go @@ -9,6 +9,7 @@ import ( "fmt" "os" "regexp" + "sort" "strings" "time" @@ -93,3 +94,10 @@ func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig, return "", fmt.Errorf("Unexpected database port reported: %+v / extra_port: %+v", port, extraPort) } } + +func findP90(a []time.Duration) time.Duration { + sort.Slice(a, func(i, j int) bool { + return a[i] > a[j] + }) + return a[len(a)/10] +} diff --git a/go/base/utils_test.go b/go/base/utils_test.go index da98aeced..7de2aed39 100644 --- a/go/base/utils_test.go +++ b/go/base/utils_test.go @@ -7,6 +7,7 @@ package base import ( "testing" + "time" "github.com/openark/golib/log" test "github.com/openark/golib/tests" @@ -27,3 +28,19 @@ func TestStringContainsAll(t *testing.T) { test.S(t).ExpectTrue(StringContainsAll(s, "insert", "")) test.S(t).ExpectTrue(StringContainsAll(s, "insert", "update", "delete")) } + +func TestFindP90(t *testing.T) { + times := []time.Duration{ + 1 * time.Second, + 2 * time.Second, + 1 * time.Second, + 3 * time.Second, + 10 * time.Second, + 1 * time.Second, + 1 * time.Second, + 1 * time.Second, + 1 * time.Second, + 1 * time.Second, + } + test.S(t).ExpectEquals(findP90(times), 3*time.Second) +} diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index 3daf24441..8799a9a47 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -104,6 +104,8 @@ func main() { flag.BoolVar(&migrationContext.CutOverExponentialBackoff, "cut-over-exponential-backoff", false, "Wait exponentially longer intervals between failed cut-over attempts. Wait intervals obey a maximum configurable with 'exponential-backoff-max-interval').") exponentialBackoffMaxInterval := flag.Int64("exponential-backoff-max-interval", 64, "Maximum number of seconds to wait between attempts when performing various operations with exponential backoff.") chunkSize := flag.Int64("chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 10-100,000)") + flag.BoolVar(&migrationContext.DynamicChunkSize, "dynamic-chunk-size", true, "let this tool dynamically adjust the chunk size based on a time-target") + flag.Int64Var(&migrationContext.DynamicChunkSizeTargetMillis, "dynamic-chunk-size-target-millis", 50, "target duration of a chunk when dynamic chunk-size is enabled") dmlBatchSize := flag.Int64("dml-batch-size", 10, "batch size for DML events to apply in a single transaction (range 1-100)") defaultRetries := flag.Int64("default-retries", 60, "Default number of retries for various operations before panicking") cutOverLockTimeoutSeconds := flag.Int64("cut-over-lock-timeout-seconds", 3, "Max number of seconds to hold locks on tables while attempting to cut-over (retry attempted when lock exceeds timeout)") diff --git a/go/logic/applier.go b/go/logic/applier.go index ad6368e61..abe814811 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -577,7 +577,7 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo &this.migrationContext.UniqueKey.Columns, this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(), this.migrationContext.MigrationRangeMaxValues.AbstractValues(), - atomic.LoadInt64(&this.migrationContext.ChunkSize), + this.migrationContext.GetChunkSize(), this.migrationContext.GetIteration() == 0, fmt.Sprintf("iteration:%d", this.migrationContext.GetIteration()), ) @@ -614,7 +614,7 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo // data actually gets copied from original table. func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected int64, duration time.Duration, err error) { startTime := time.Now() - chunkSize = atomic.LoadInt64(&this.migrationContext.ChunkSize) + chunkSize = this.migrationContext.GetChunkSize() query, explodedArgs, err := sql.BuildRangeInsertPreparedQuery( this.migrationContext.DatabaseName, diff --git a/go/logic/migrator.go b/go/logic/migrator.go index a102188a8..448775407 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -844,7 +844,7 @@ func (this *Migrator) printMigrationStatusHint(writers ...io.Writer) { maxLoad := this.migrationContext.GetMaxLoad() criticalLoad := this.migrationContext.GetCriticalLoad() fmt.Fprintf(w, "# chunk-size: %+v; max-lag-millis: %+vms; dml-batch-size: %+v; max-load: %s; critical-load: %s; nice-ratio: %f\n", - atomic.LoadInt64(&this.migrationContext.ChunkSize), + this.migrationContext.GetChunkSize(), atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold), atomic.LoadInt64(&this.migrationContext.DMLBatchSize), maxLoad.String(), @@ -1315,8 +1315,10 @@ func (this *Migrator) executeWriteFuncs() error { if err := copyRowsFunc(); err != nil { return this.migrationContext.Log.Errore(err) } + // Send feedback to the chunker. + copyRowsDuration := time.Since(copyRowsStartTime) + this.migrationContext.ChunkDurationFeedback(copyRowsDuration) if niceRatio := this.migrationContext.GetNiceRatio(); niceRatio > 0 { - copyRowsDuration := time.Since(copyRowsStartTime) sleepTimeNanosecondFloat64 := niceRatio * float64(copyRowsDuration.Nanoseconds()) sleepTime := time.Duration(int64(sleepTimeNanosecondFloat64)) * time.Nanosecond time.Sleep(sleepTime) diff --git a/go/logic/server.go b/go/logic/server.go index 4b1b87023..5516d3ccc 100644 --- a/go/logic/server.go +++ b/go/logic/server.go @@ -196,7 +196,7 @@ help # This message case "chunk-size": { if argIsQuestion { - fmt.Fprintf(writer, "%+v\n", atomic.LoadInt64(&this.migrationContext.ChunkSize)) + fmt.Fprintf(writer, "%+v\n", this.migrationContext.GetChunkSize()) return NoPrintStatusRule, nil } if chunkSize, err := strconv.Atoi(arg); err != nil { From e7734543b1a69a48c2671f7d41e6591d3aaf69c6 Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Fri, 2 Dec 2022 15:07:36 -0700 Subject: [PATCH 2/4] cleanup locking semantics --- go/base/context.go | 70 ++++++++++++++++++++++++++-------------------- 1 file changed, 40 insertions(+), 30 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index 4aeaf110a..429d8b796 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -165,7 +165,7 @@ type MigrationContext struct { DynamicChunkSize bool DynamicChunkSizeTargetMillis int64 targetChunkSizeMutex sync.Mutex - targetchunkFeedback []time.Duration + targetChunkFeedback []time.Duration targetChunkSize int64 DropServeSocket bool @@ -577,37 +577,40 @@ func (this *MigrationContext) GetTotalRowsCopied() int64 { return atomic.LoadInt64(&this.TotalRowsCopied) } +// ChunkDurationFeedback collects samples from copy-rows tasks, and feeds them +// back into a moving p90 that is used to return a more precise value +// in GetChunkSize() calls. Usually we wait for multiple samples and then recalculate +// in GetChunkSize(), however if the input value far exceeds what was expected (>5x) +// we synchronously reduce the chunk size. If it was a one off, it's not an issue +// because the next few samples will always scale the value back up. func (this *MigrationContext) ChunkDurationFeedback(d time.Duration) { this.targetChunkSizeMutex.Lock() defer this.targetChunkSizeMutex.Unlock() - if int64(d) > (this.DynamicChunkSizeTargetMillis * DynamicPanicFactor * int64(time.Millisecond)) { - // We're 5x our target size. Something went seriously wrong, - // let's pump the brakes immediately to get back into range. - fmt.Printf("## error: chunk was too slow! Dividing by 10\n") - this.targetchunkFeedback = []time.Duration{} + this.targetChunkFeedback = []time.Duration{} newTarget := float64(this.targetChunkSize) / float64(DynamicPanicFactor*2) this.targetChunkSize = int64(newTarget) } - this.targetchunkFeedback = append(this.targetchunkFeedback, d) + this.targetChunkFeedback = append(this.targetChunkFeedback, d) } +// calculateNewTargetChunkSize is called by GetChunkSize() +// under a mutex. It's safe to read this.targetchunkFeedback. func (this *MigrationContext) calculateNewTargetChunkSize() int64 { - this.targetChunkSizeMutex.Lock() - defer this.targetChunkSizeMutex.Unlock() - // We do all our math as float64 of time in ns - p90 := float64(findP90(this.targetchunkFeedback)) + p90 := float64(findP90(this.targetChunkFeedback)) targetTime := float64(this.DynamicChunkSizeTargetMillis * int64(time.Millisecond)) newTargetRows := float64(this.targetChunkSize) * (targetTime / p90) // Apply some final boundary checking: - // We are only allowed to scale up/down 50x from the original chunk size. - if newTargetRows < float64(this.chunkSize)/MaxDynamicScaleFactor { - newTargetRows = float64(this.chunkSize) / MaxDynamicScaleFactor + // We are only allowed to scale up/down 50x from + // the original ("reference") chunk size. + referenceSize := float64(atomic.LoadInt64(&this.chunkSize)) + if newTargetRows < (referenceSize / MaxDynamicScaleFactor) { + newTargetRows = referenceSize / MaxDynamicScaleFactor } - if newTargetRows > float64(this.chunkSize)*MaxDynamicScaleFactor { - newTargetRows = float64(this.chunkSize) * MaxDynamicScaleFactor + if newTargetRows > (referenceSize * MaxDynamicScaleFactor) { + newTargetRows = referenceSize * MaxDynamicScaleFactor } // We only ever increase by 50% at a time. // This ensures a gradual step up if there is some non-linear behavior. @@ -620,33 +623,40 @@ func (this *MigrationContext) calculateNewTargetChunkSize() int64 { if newTargetRows < MinDynamicRowSize { newTargetRows = MinDynamicRowSize } - this.targetchunkFeedback = []time.Duration{} // reset return int64(newTargetRows) } +// GetChunkSize returns the number of rows to copy in a single chunk: +// - If DynamicChunkSize is disabled, it will return this.chunkSize. +// - If DynamicChunkSize is enabled, it will return a dynamic value that +// automatically adjusts based on the duration of the last few +// copy-rows tasks. +// +// Historically gh-ost has used a static chunk size (i.e. 1000 rows) +// which can be adjusted while gh-ost is running. +// An ideal chunk size is large enough that it can batch operations, +// but small enough that it doesn't cause spikes in replica lag. +// +// The problem with basing the configurable on row-size is two fold: +// - Fow very narrow rows, it's not enough (leaving performance on the table). +// - For very wide rows (or with many secondary indexes) 1000 might be too high! +// +// Dynamic chunk size addresses this by using row-size as a starting point, +// *but* the main configurable is based on time (in ms). func (this *MigrationContext) GetChunkSize() int64 { if !this.DynamicChunkSize { return atomic.LoadInt64(&this.chunkSize) } - // Historically gh-ost has used a static chunk size (i.e. 1000 rows) - // which is can be adjusted while gh-ost is running. - // An ideal chunk size is large enough that it can batch operations, - // but small enough that it doesn't cause spikes in replica lag. - // - // The problem with basing the configurable on row-size is two fold: - // - Fow very narrow rows, it's not enough (leaving performance on the table). - // - For very wide rows (or with many secondary indexes) 1000 might be too high! - // - // Dynamic chunk size addresses this by using row-size as a starting point, - // and then increases or decreases the number based on feedback from previous - // copy tasks. + this.targetChunkSizeMutex.Lock() + defer this.targetChunkSizeMutex.Unlock() if this.targetChunkSize == 0 { this.targetChunkSize = atomic.LoadInt64(&this.chunkSize) } // We need 10 samples to make a decision because we // calculate it from the p90 (i.e. 2nd to highest value). - if len(this.targetchunkFeedback) >= 10 { + if len(this.targetChunkFeedback) >= 10 { this.targetChunkSize = this.calculateNewTargetChunkSize() + this.targetChunkFeedback = []time.Duration{} // reset fmt.Printf("# Adjusting chunk size based on feedback: %d\n", this.targetChunkSize) } return this.targetChunkSize From 67fa12fb1bde09de3f0614b1c58ab6cdea2316fa Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Wed, 7 Dec 2022 09:06:30 -0700 Subject: [PATCH 3/4] Add tests, reorganize/improve code. --- doc/command-line-flags.md | 27 ++++++++++++++ go/base/context.go | 39 ++++++++++---------- go/base/context_test.go | 75 +++++++++++++++++++++++++++++++++++++++ go/base/utils.go | 5 ++- go/base/utils_test.go | 2 +- go/cmd/gh-ost/main.go | 4 +-- go/logic/migrator.go | 5 ++- 7 files changed, 134 insertions(+), 23 deletions(-) diff --git a/doc/command-line-flags.md b/doc/command-line-flags.md index 021462fa2..5c0a0045b 100644 --- a/doc/command-line-flags.md +++ b/doc/command-line-flags.md @@ -61,6 +61,12 @@ It is not reliable to parse the `ALTER` statement to determine if it is instant `gh-ost` will automatically fallback to the normal DDL process if the attempt to use instant DDL is unsuccessful. +### chunk-size + +Chunk size is the number of rows to copy in a single batch for copying data from the original table to the ghost table. The default value is 1000. Increasing the chunk-size can improve performance (via more batching) but also increases the risk of replica delay. + +See also: [`dynamic-chunking`](#dynamic-chunking) + ### conf `--conf=/path/to/my.cnf`: file where credentials are specified. Should be in (or contain) the following format: @@ -122,6 +128,27 @@ Why is this behavior configurable? Different workloads have different characteri Noteworthy is that setting `--dml-batch-size` to higher value _does not_ mean `gh-ost` blocks or waits on writes. The batch size is an upper limit on transaction size, not a minimal one. If `gh-ost` doesn't have "enough" events in the pipe, it does not wait on the binary log, it just writes what it already has. This conveniently suggests that if write load is light enough for `gh-ost` to only see a few events in the binary log at a given time, then it is also light enough for `gh-ost` to apply a fraction of the batch size. +### dynamic-chunking + +Dynamic chunking (default: `OFF`) is a feature that allows `gh-ost` to automatically increase or decrease the `--chunk-size` up to 50x, based on the execution time of previous copy-row operations. The goal is to find the optimal batch size to reach `--dynamic-chunk-size-target-millis` (default: 50). + +For example, assume `--chunk-size=1000`, `--dynamic-chunking=true` and `--dynamic-chunk-size-target-millis=50`: + +- The actual "target" chunk size used will always be in the range of `[20,50000]` (within 50x the chunk size) +- Approximately every 1 second, `gh-ost` will re-assess if the target chunk size is optimal based on the `p90` of recent executions. +- Increases in target chunk size are scaled up by no more than 50% of the current target size at a time. +- If any copy-row operations exceed 250ms (5x the target), the target chunk size is immediately reduced to 10% of its current value. + +Enabling dynamic chunk size can be more reliable than the static `--chunk-size=N`, because tables are not created equally. For a table with a very high number of columns and several indexes, `1000` rows may actually be too large of a chunk size. Similarly, for a table with very few columns and no indexes, the ideal batch size may be 20K+ rows (while still being under the 50ms target). + +See also: [`chunk-size`](#chunk-size), [`dynamic-chunk-size-target-millis`](#dynamic-chunk-size-target-millis) + +### dynamic-chunk-size-target-millis + +The target execution time for each copy-row operation when [`--dynamic-chunking`](#dynamic-chunking) (default: `OFF`) is enabled. + +The default value of `50` is a good starting point for most workloads. If you find that read-replicas are intermittently falling behind, you may want to decrease this value. Similarly, if you do not use read-replicas there may be a benefit from increasing this value slightly. The recommended range is `[10,10000]`. Values larger than this have limited added benefit and are not recommended. + ### exact-rowcount A `gh-ost` execution need to copy whatever rows you have in your existing table onto the ghost table. This can and often will be, a large number. Exactly what that number is? diff --git a/go/base/context.go b/go/base/context.go index 429d8b796..7a4311783 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -162,7 +162,7 @@ type MigrationContext struct { HooksHintToken string HooksStatusIntervalSec int64 - DynamicChunkSize bool + DynamicChunking bool DynamicChunkSizeTargetMillis int64 targetChunkSizeMutex sync.Mutex targetChunkFeedback []time.Duration @@ -309,7 +309,7 @@ func NewMigrationContext() *MigrationContext { ColumnRenameMap: make(map[string]string), PanicAbort: make(chan error), Log: NewDefaultLogger(), - DynamicChunkSize: false, + DynamicChunking: false, } } @@ -583,28 +583,37 @@ func (this *MigrationContext) GetTotalRowsCopied() int64 { // in GetChunkSize(), however if the input value far exceeds what was expected (>5x) // we synchronously reduce the chunk size. If it was a one off, it's not an issue // because the next few samples will always scale the value back up. -func (this *MigrationContext) ChunkDurationFeedback(d time.Duration) { +func (this *MigrationContext) ChunkDurationFeedback(d time.Duration) (outOfRange bool) { + if !this.DynamicChunking { + return false + } this.targetChunkSizeMutex.Lock() defer this.targetChunkSizeMutex.Unlock() if int64(d) > (this.DynamicChunkSizeTargetMillis * DynamicPanicFactor * int64(time.Millisecond)) { this.targetChunkFeedback = []time.Duration{} newTarget := float64(this.targetChunkSize) / float64(DynamicPanicFactor*2) - this.targetChunkSize = int64(newTarget) + this.targetChunkSize = this.boundaryCheckTargetChunkSize(newTarget) + return true // don't include in feedback } this.targetChunkFeedback = append(this.targetChunkFeedback, d) + return false } // calculateNewTargetChunkSize is called by GetChunkSize() // under a mutex. It's safe to read this.targetchunkFeedback. func (this *MigrationContext) calculateNewTargetChunkSize() int64 { // We do all our math as float64 of time in ns - p90 := float64(findP90(this.targetChunkFeedback)) + p90 := float64(lazyFindP90(this.targetChunkFeedback)) targetTime := float64(this.DynamicChunkSizeTargetMillis * int64(time.Millisecond)) newTargetRows := float64(this.targetChunkSize) * (targetTime / p90) + return this.boundaryCheckTargetChunkSize(newTargetRows) +} - // Apply some final boundary checking: - // We are only allowed to scale up/down 50x from - // the original ("reference") chunk size. +// boundaryCheckTargetChunkSize makes sure the new target is not +// too large/small since we are only allowed to scale up/down 50x from +// the original ("reference") chunk size, and only permitted to increase +// by 50% at a time. This is called under a mutex. +func (this *MigrationContext) boundaryCheckTargetChunkSize(newTargetRows float64) int64 { referenceSize := float64(atomic.LoadInt64(&this.chunkSize)) if newTargetRows < (referenceSize / MaxDynamicScaleFactor) { newTargetRows = referenceSize / MaxDynamicScaleFactor @@ -612,14 +621,9 @@ func (this *MigrationContext) calculateNewTargetChunkSize() int64 { if newTargetRows > (referenceSize * MaxDynamicScaleFactor) { newTargetRows = referenceSize * MaxDynamicScaleFactor } - // We only ever increase by 50% at a time. - // This ensures a gradual step up if there is some non-linear behavior. - // There's plenty of time to increase more. if newTargetRows > float64(this.targetChunkSize)*MaxDynamicStepFactor { newTargetRows = float64(this.targetChunkSize) * MaxDynamicStepFactor } - // The newTargetRows must be at least 10, - // otherwise we're going too low if newTargetRows < MinDynamicRowSize { newTargetRows = MinDynamicRowSize } @@ -627,8 +631,8 @@ func (this *MigrationContext) calculateNewTargetChunkSize() int64 { } // GetChunkSize returns the number of rows to copy in a single chunk: -// - If DynamicChunkSize is disabled, it will return this.chunkSize. -// - If DynamicChunkSize is enabled, it will return a dynamic value that +// - If Dynamic Chunking is disabled, it will return this.chunkSize. +// - If Dynamic Chunking is enabled, it will return a value that // automatically adjusts based on the duration of the last few // copy-rows tasks. // @@ -641,10 +645,10 @@ func (this *MigrationContext) calculateNewTargetChunkSize() int64 { // - Fow very narrow rows, it's not enough (leaving performance on the table). // - For very wide rows (or with many secondary indexes) 1000 might be too high! // -// Dynamic chunk size addresses this by using row-size as a starting point, +// Dynamic chunking addresses this by using row-size as a starting point, // *but* the main configurable is based on time (in ms). func (this *MigrationContext) GetChunkSize() int64 { - if !this.DynamicChunkSize { + if !this.DynamicChunking { return atomic.LoadInt64(&this.chunkSize) } this.targetChunkSizeMutex.Lock() @@ -657,7 +661,6 @@ func (this *MigrationContext) GetChunkSize() int64 { if len(this.targetChunkFeedback) >= 10 { this.targetChunkSize = this.calculateNewTargetChunkSize() this.targetChunkFeedback = []time.Duration{} // reset - fmt.Printf("# Adjusting chunk size based on feedback: %d\n", this.targetChunkSize) } return this.targetChunkSize } diff --git a/go/base/context_test.go b/go/base/context_test.go index de208bae4..42242f0f6 100644 --- a/go/base/context_test.go +++ b/go/base/context_test.go @@ -120,3 +120,78 @@ func TestReadConfigFile(t *testing.T) { } } } + +func TestDynamicChunker(t *testing.T) { + context := NewMigrationContext() + context.chunkSize = 1000 + context.DynamicChunking = true + context.DynamicChunkSizeTargetMillis = 50 + + // Before feedback it should match the static chunk size + test.S(t).ExpectEquals(context.GetChunkSize(), int64(1000)) + + // 1s is >5x the target, so it should immediately /10 the target + context.ChunkDurationFeedback(1 * time.Second) + test.S(t).ExpectEquals(context.GetChunkSize(), int64(100)) + + // Let's provide 10 pieces of feedback, and see the chunk size + // be adjusted based on the p90th value. + context.ChunkDurationFeedback(time.Duration(33 * time.Millisecond)) // 1st + context.ChunkDurationFeedback(time.Duration(33 * time.Millisecond)) // 2nd + context.ChunkDurationFeedback(time.Duration(32 * time.Millisecond)) // 3rd + context.ChunkDurationFeedback(time.Duration(40 * time.Millisecond)) + context.ChunkDurationFeedback(time.Duration(61 * time.Millisecond)) + context.ChunkDurationFeedback(time.Duration(37 * time.Millisecond)) + context.ChunkDurationFeedback(time.Duration(38 * time.Millisecond)) + context.ChunkDurationFeedback(time.Duration(35 * time.Millisecond)) + context.ChunkDurationFeedback(time.Duration(29 * time.Millisecond)) + test.S(t).ExpectEquals(context.GetChunkSize(), int64(100)) // 9th + context.ChunkDurationFeedback(time.Duration(38 * time.Millisecond)) // 10th + // Because 10 items of feedback have been received, + // the chunk size is recalculated. The p90 is 40ms (below our target) + // so the adjusted chunk size increases 25% to 125 + test.S(t).ExpectEquals(context.GetChunkSize(), int64(125)) + + // Collect some new feedback where the p90 is 500us (much lower than our target) + // We have boundary checking on the value which limits it to 50% greater + // than the previous chunk size. + + context.ChunkDurationFeedback(time.Duration(400 * time.Microsecond)) + context.ChunkDurationFeedback(time.Duration(450 * time.Microsecond)) + context.ChunkDurationFeedback(time.Duration(470 * time.Microsecond)) + context.ChunkDurationFeedback(time.Duration(520 * time.Microsecond)) + context.ChunkDurationFeedback(time.Duration(500 * time.Microsecond)) + context.ChunkDurationFeedback(time.Duration(490 * time.Microsecond)) + context.ChunkDurationFeedback(time.Duration(300 * time.Microsecond)) + context.ChunkDurationFeedback(time.Duration(450 * time.Microsecond)) + context.ChunkDurationFeedback(time.Duration(460 * time.Microsecond)) + context.ChunkDurationFeedback(time.Duration(480 * time.Microsecond)) + test.S(t).ExpectEquals(context.GetChunkSize(), int64(187)) // very minor increase + + // Test that the chunk size is not allowed to grow larger than 50x + // the original chunk size. Because of the gradual step up, we need to + // provide a lot of feedback first. + for i := 0; i < 1000; i++ { + context.ChunkDurationFeedback(time.Duration(480 * time.Microsecond)) + context.GetChunkSize() + } + test.S(t).ExpectEquals(context.GetChunkSize(), int64(50000)) + + // Similarly, the minimum chunksize is 1000/50=20 rows no matter what the feedback. + // The downscaling rule is /10 for values that immediately exceed 5x the target, + // so it usually scales down before the feedback re-evaluation kicks in. + for i := 0; i < 100; i++ { + context.ChunkDurationFeedback(time.Duration(10 * time.Second)) + context.GetChunkSize() + } + test.S(t).ExpectEquals(context.GetChunkSize(), int64(20)) + + // If we set the chunkSize to 100, then 100/50=2 is the minimum. + // But there is a hard coded minimum of 10 rows for safety. + context.chunkSize = 100 + for i := 0; i < 100; i++ { + context.ChunkDurationFeedback(time.Duration(10 * time.Second)) + context.GetChunkSize() + } + test.S(t).ExpectEquals(context.GetChunkSize(), int64(10)) +} diff --git a/go/base/utils.go b/go/base/utils.go index d004b343c..f51d0ceae 100644 --- a/go/base/utils.go +++ b/go/base/utils.go @@ -95,7 +95,10 @@ func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig, } } -func findP90(a []time.Duration) time.Duration { +// lazyFindP90 finds the second to last value in a slice. +// This is the same as a p90 if there are 10 values, but if +// there were 100 values it would technically be a p99 etc. +func lazyFindP90(a []time.Duration) time.Duration { sort.Slice(a, func(i, j int) bool { return a[i] > a[j] }) diff --git a/go/base/utils_test.go b/go/base/utils_test.go index 7de2aed39..a3cf12d59 100644 --- a/go/base/utils_test.go +++ b/go/base/utils_test.go @@ -42,5 +42,5 @@ func TestFindP90(t *testing.T) { 1 * time.Second, 1 * time.Second, } - test.S(t).ExpectEquals(findP90(times), 3*time.Second) + test.S(t).ExpectEquals(lazyFindP90(times), 3*time.Second) } diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index 8799a9a47..6d7667677 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -104,8 +104,8 @@ func main() { flag.BoolVar(&migrationContext.CutOverExponentialBackoff, "cut-over-exponential-backoff", false, "Wait exponentially longer intervals between failed cut-over attempts. Wait intervals obey a maximum configurable with 'exponential-backoff-max-interval').") exponentialBackoffMaxInterval := flag.Int64("exponential-backoff-max-interval", 64, "Maximum number of seconds to wait between attempts when performing various operations with exponential backoff.") chunkSize := flag.Int64("chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 10-100,000)") - flag.BoolVar(&migrationContext.DynamicChunkSize, "dynamic-chunk-size", true, "let this tool dynamically adjust the chunk size based on a time-target") - flag.Int64Var(&migrationContext.DynamicChunkSizeTargetMillis, "dynamic-chunk-size-target-millis", 50, "target duration of a chunk when dynamic chunk-size is enabled") + flag.BoolVar(&migrationContext.DynamicChunking, "dynamic-chunking", false, "automatically adjust the chunk size based on a time-target") + flag.Int64Var(&migrationContext.DynamicChunkSizeTargetMillis, "dynamic-chunk-size-target-millis", 50, "target duration of a chunk when dynamic-chunking is enabled") dmlBatchSize := flag.Int64("dml-batch-size", 10, "batch size for DML events to apply in a single transaction (range 1-100)") defaultRetries := flag.Int64("default-retries", 60, "Default number of retries for various operations before panicking") cutOverLockTimeoutSeconds := flag.Int64("cut-over-lock-timeout-seconds", 3, "Max number of seconds to hold locks on tables while attempting to cut-over (retry attempted when lock exceeds timeout)") diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 448775407..d1e5f95f7 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -1317,7 +1317,10 @@ func (this *Migrator) executeWriteFuncs() error { } // Send feedback to the chunker. copyRowsDuration := time.Since(copyRowsStartTime) - this.migrationContext.ChunkDurationFeedback(copyRowsDuration) + outOfRange := this.migrationContext.ChunkDurationFeedback(copyRowsDuration) + if outOfRange { + this.migrationContext.Log.Warningf("Chunk duration took: %s, throttling copy-rows", copyRowsDuration) + } if niceRatio := this.migrationContext.GetNiceRatio(); niceRatio > 0 { sleepTimeNanosecondFloat64 := niceRatio * float64(copyRowsDuration.Nanoseconds()) sleepTime := time.Duration(int64(sleepTimeNanosecondFloat64)) * time.Nanosecond From 58829be6f0e4841425010cc3759009b7a393449e Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Wed, 7 Dec 2022 09:37:29 -0700 Subject: [PATCH 4/4] fix linter issue --- go/base/context_test.go | 48 ++++++++++++++++++++--------------------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/go/base/context_test.go b/go/base/context_test.go index 42242f0f6..397ef95dd 100644 --- a/go/base/context_test.go +++ b/go/base/context_test.go @@ -136,17 +136,17 @@ func TestDynamicChunker(t *testing.T) { // Let's provide 10 pieces of feedback, and see the chunk size // be adjusted based on the p90th value. - context.ChunkDurationFeedback(time.Duration(33 * time.Millisecond)) // 1st - context.ChunkDurationFeedback(time.Duration(33 * time.Millisecond)) // 2nd - context.ChunkDurationFeedback(time.Duration(32 * time.Millisecond)) // 3rd - context.ChunkDurationFeedback(time.Duration(40 * time.Millisecond)) - context.ChunkDurationFeedback(time.Duration(61 * time.Millisecond)) - context.ChunkDurationFeedback(time.Duration(37 * time.Millisecond)) - context.ChunkDurationFeedback(time.Duration(38 * time.Millisecond)) - context.ChunkDurationFeedback(time.Duration(35 * time.Millisecond)) - context.ChunkDurationFeedback(time.Duration(29 * time.Millisecond)) - test.S(t).ExpectEquals(context.GetChunkSize(), int64(100)) // 9th - context.ChunkDurationFeedback(time.Duration(38 * time.Millisecond)) // 10th + context.ChunkDurationFeedback(33 * time.Millisecond) // 1st + context.ChunkDurationFeedback(33 * time.Millisecond) // 2nd + context.ChunkDurationFeedback(32 * time.Millisecond) // 3rd + context.ChunkDurationFeedback(40 * time.Millisecond) + context.ChunkDurationFeedback(61 * time.Millisecond) + context.ChunkDurationFeedback(37 * time.Millisecond) + context.ChunkDurationFeedback(38 * time.Millisecond) + context.ChunkDurationFeedback(35 * time.Millisecond) + context.ChunkDurationFeedback(29 * time.Millisecond) + test.S(t).ExpectEquals(context.GetChunkSize(), int64(100)) // 9th + context.ChunkDurationFeedback(38 * time.Millisecond) // 10th // Because 10 items of feedback have been received, // the chunk size is recalculated. The p90 is 40ms (below our target) // so the adjusted chunk size increases 25% to 125 @@ -156,23 +156,23 @@ func TestDynamicChunker(t *testing.T) { // We have boundary checking on the value which limits it to 50% greater // than the previous chunk size. - context.ChunkDurationFeedback(time.Duration(400 * time.Microsecond)) - context.ChunkDurationFeedback(time.Duration(450 * time.Microsecond)) - context.ChunkDurationFeedback(time.Duration(470 * time.Microsecond)) - context.ChunkDurationFeedback(time.Duration(520 * time.Microsecond)) - context.ChunkDurationFeedback(time.Duration(500 * time.Microsecond)) - context.ChunkDurationFeedback(time.Duration(490 * time.Microsecond)) - context.ChunkDurationFeedback(time.Duration(300 * time.Microsecond)) - context.ChunkDurationFeedback(time.Duration(450 * time.Microsecond)) - context.ChunkDurationFeedback(time.Duration(460 * time.Microsecond)) - context.ChunkDurationFeedback(time.Duration(480 * time.Microsecond)) + context.ChunkDurationFeedback(400 * time.Microsecond) + context.ChunkDurationFeedback(450 * time.Microsecond) + context.ChunkDurationFeedback(470 * time.Microsecond) + context.ChunkDurationFeedback(520 * time.Microsecond) + context.ChunkDurationFeedback(500 * time.Microsecond) + context.ChunkDurationFeedback(490 * time.Microsecond) + context.ChunkDurationFeedback(300 * time.Microsecond) + context.ChunkDurationFeedback(450 * time.Microsecond) + context.ChunkDurationFeedback(460 * time.Microsecond) + context.ChunkDurationFeedback(480 * time.Microsecond) test.S(t).ExpectEquals(context.GetChunkSize(), int64(187)) // very minor increase // Test that the chunk size is not allowed to grow larger than 50x // the original chunk size. Because of the gradual step up, we need to // provide a lot of feedback first. for i := 0; i < 1000; i++ { - context.ChunkDurationFeedback(time.Duration(480 * time.Microsecond)) + context.ChunkDurationFeedback(480 * time.Microsecond) context.GetChunkSize() } test.S(t).ExpectEquals(context.GetChunkSize(), int64(50000)) @@ -181,7 +181,7 @@ func TestDynamicChunker(t *testing.T) { // The downscaling rule is /10 for values that immediately exceed 5x the target, // so it usually scales down before the feedback re-evaluation kicks in. for i := 0; i < 100; i++ { - context.ChunkDurationFeedback(time.Duration(10 * time.Second)) + context.ChunkDurationFeedback(10 * time.Second) context.GetChunkSize() } test.S(t).ExpectEquals(context.GetChunkSize(), int64(20)) @@ -190,7 +190,7 @@ func TestDynamicChunker(t *testing.T) { // But there is a hard coded minimum of 10 rows for safety. context.chunkSize = 100 for i := 0; i < 100; i++ { - context.ChunkDurationFeedback(time.Duration(10 * time.Second)) + context.ChunkDurationFeedback(10 * time.Second) context.GetChunkSize() } test.S(t).ExpectEquals(context.GetChunkSize(), int64(10))