From 54755c9274b8c3ff90fad5869af300adc09e39cf Mon Sep 17 00:00:00 2001 From: Alex Aizman Date: Thu, 5 Dec 2024 10:16:41 -0500 Subject: [PATCH] global rebalance: limited scope (advanced usage only) * part one Signed-off-by: Alex Aizman --- ais/prxclu.go | 20 +++++++++++++--- ais/rebmeta.go | 10 +++++++- ais/tgtcp.go | 34 +++++++++++++++++++-------- cmd/cli/cli/cluster_hdlr.go | 23 ++++++++++++++++++- cmd/cli/cli/verbfobj.go | 6 +++-- reb/globrun.go | 46 +++++++++++++++++++++++++------------ 6 files changed, 107 insertions(+), 32 deletions(-) diff --git a/ais/prxclu.go b/ais/prxclu.go index 72bd7298389..493f28dfe98 100644 --- a/ais/prxclu.go +++ b/ais/prxclu.go @@ -1193,14 +1193,28 @@ func (p *proxy) _syncConfFinal(ctx *configModifier, clone *globalConfig) { // xstart: rebalance, resilver, other "startables" (see xaction/api.go) func (p *proxy) xstart(w http.ResponseWriter, r *http.Request, msg *apc.ActMsg) { var xargs xact.ArgsMsg - if err := cos.MorphMarshal(msg.Value, &xargs); err != nil { - p.writeErrf(w, r, cmn.FmtErrMorphUnmarshal, p.si, msg.Action, msg.Value, err) - return + if msg.Value != nil { + if err := cos.MorphMarshal(msg.Value, &xargs); err != nil { + p.writeErrf(w, r, cmn.FmtErrMorphUnmarshal, p.si, msg.Action, msg.Value, err) + return + } } xargs.Kind, _ = xact.GetKindName(xargs.Kind) // display name => kind // rebalance if xargs.Kind == apc.ActRebalance { + if !xargs.Bck.IsEmpty() { + // NOTE: limiting the scope of rebalance to a given bucket[/prefix] (advanced usage) + b := (*meta.Bck)(&xargs.Bck) + if _, present := p.owner.bmd.get().Get(b); !present { + if b.IsRemote() { + p.writeErr(w, r, cmn.NewErrRemoteBckNotFound(&xargs.Bck)) + } else { + p.writeErr(w, r, cmn.NewErrBckNotFound(&xargs.Bck)) + } + return + } + } p.rebalanceCluster(w, r, msg) return } diff --git a/ais/rebmeta.go b/ais/rebmeta.go index e652368b2fe..b830ad14ed3 100644 --- a/ais/rebmeta.go +++ b/ais/rebmeta.go @@ -214,7 +214,15 @@ func rmdInc(_ *rmdModifier, clone *rebMD) { clone.inc() } func rmdSync(m *rmdModifier, clone *rebMD) { debug.Assert(m.cur == clone) m.listen(nil) - msg := &actMsgExt{ActMsg: apc.ActMsg{Action: apc.ActRebalance}, UUID: m.rebID} // user-requested rebalance + + msg := &actMsgExt{ + ActMsg: apc.ActMsg{ + Value: m.smapCtx.msg.Value, // from the original action msg + Name: m.smapCtx.msg.Name, // ditto + Action: apc.ActRebalance, + }, + UUID: m.rebID, // user-requested rebalance + } wg := m.p.metasyncer.sync(revsPair{m.cur, msg}) if m.wait { wg.Wait() diff --git a/ais/tgtcp.go b/ais/tgtcp.go index 749e46a2aad..2dff5a3304e 100644 --- a/ais/tgtcp.go +++ b/ais/tgtcp.go @@ -888,23 +888,37 @@ func (t *target) receiveRMD(newRMD *rebMD, msg *actMsgExt) (err error) { func (t *target) _runRe(newRMD *rebMD, msg *actMsgExt, smap *smapX, oxid string) { const tag = "rebalance[" var ( - notif = &xact.NotifXact{ - Base: nl.Base{When: core.UponTerm, Dsts: []string{equalIC}, F: t.notifyTerm}, + nxid = xact.RebID2S(newRMD.Version) + tname = t.String() + extArgs = reb.ExtArgs{ + Notif: &xact.NotifXact{ + Base: nl.Base{When: core.UponTerm, Dsts: []string{equalIC}, F: t.notifyTerm}, + }, + Tstats: t.statsT, + Oxid: oxid, + NID: newRMD.Version, } - nxid = xact.RebID2S(newRMD.Version) - tname = t.String() ) if msg.UUID != nxid { nlog.Warningln(tag, msg.UUID, "vs", nxid) } - // 1. by user + // 1. by user aka admin if msg.Action == apc.ActRebalance { xname := tag + msg.UUID + "]" - nlog.Infoln(tname, "starting user-requested", t, xname, nxid) + + if msg.Value != nil { + var xargs xact.ArgsMsg + if err := cos.MorphMarshal(msg.Value, &xargs); err == nil { + extArgs.Bck = (*meta.Bck)(&xargs.Bck) + extArgs.Prefix = msg.Name + } + } + + nlog.Infoln(tname, "starting user-requested", xname, nxid) // (##a) - go t.reb.RunRebalance(&smap.Smap, newRMD.Version, notif, t.statsT, oxid) + go t.reb.RunRebalance(&smap.Smap, &extArgs) return } @@ -926,7 +940,7 @@ func (t *target) _runRe(newRMD *rebMD, msg *actMsgExt, smap *smapX, oxid string) } nlog.Infoln(tname, "starting", msg.String(), "-triggered", xname, s, opts) // (##b) - go t.reb.RunRebalance(&smap.Smap, newRMD.Version, notif, t.statsT, oxid) + go t.reb.RunRebalance(&smap.Smap, &extArgs) // 2.2. "pure" metasync(newRMD) w/ no action - double-check with cluster config default: @@ -936,7 +950,7 @@ func (t *target) _runRe(newRMD *rebMD, msg *actMsgExt, smap *smapX, oxid string) if config.Rebalance.Enabled { nlog.Infoln(tname, "starting", xname) // (##c) - go t.reb.RunRebalance(&smap.Smap, newRMD.Version, notif, t.statsT, oxid) + go t.reb.RunRebalance(&smap.Smap, &extArgs) } else { runtime.Gosched() @@ -957,7 +971,7 @@ func (t *target) _runRe(newRMD *rebMD, msg *actMsgExt, smap *smapX, oxid string) // (##d) nlog.Infoln(tname, "starting", xname) - t.reb.RunRebalance(&smap.Smap, newRMD.Version, notif, t.statsT, oxid) + t.reb.RunRebalance(&smap.Smap, &extArgs) }() } } diff --git a/cmd/cli/cli/cluster_hdlr.go b/cmd/cli/cli/cluster_hdlr.go index 53d9b3facc7..7cf55bfb418 100644 --- a/cmd/cli/cli/cluster_hdlr.go +++ b/cmd/cli/cli/cluster_hdlr.go @@ -544,7 +544,28 @@ func setPrimaryHandler(c *cli.Context) error { } func startRebHandler(c *cli.Context) (err error) { - return startXactionKind(c, apc.ActRebalance) + var ( + extra string + xargs = xact.ArgsMsg{Kind: apc.ActRebalance} + ) + if c.NArg() > 0 { + uri := preparseBckObjURI(c.Args().Get(0)) + bck, prefix, err := parseBckObjURI(c, uri, true /*emptyObjnameOK*/) + if err != nil { + return err + } + if _, err := headBucket(bck, false /* don't add */); err != nil { + return err + } + actionWarn(c, "limiting the scope of rebalance to only '"+uri+"' is not recommended!") + briefPause(2) + + // beware + xargs.Bck = bck + extra = prefix + } + + return startXaction(c, &xargs, extra) } func stopRebHandler(c *cli.Context) error { diff --git a/cmd/cli/cli/verbfobj.go b/cmd/cli/cli/verbfobj.go index dde797f6299..ab3999a5f51 100644 --- a/cmd/cli/cli/verbfobj.go +++ b/cmd/cli/cli/verbfobj.go @@ -345,7 +345,8 @@ func (u *uctx) do(c *cli.Context, p *uparams, fobj fobj, fh *cos.FileHandle, upd if i < iters-1 { s := fmt.Sprintf("[#%d] %s: %v - retrying...", i+1, fobj.path, e) fmt.Fprintln(c.App.ErrWriter, s) - time.Sleep(time.Second) + briefPause(1) + ffh, errO := fh.Open() if errO != nil { fmt.Fprintf(c.App.ErrWriter, "failed to reopen %s: %v\n", fobj.path, errO) @@ -455,7 +456,8 @@ func putRegular(c *cli.Context, bck cmn.Bck, objName, path string, finfo os.File if i < iters-1 { s := fmt.Sprintf("[#%d] %s: %v - retrying...", i+1, path, e) fmt.Fprintln(c.App.ErrWriter, s) - time.Sleep(time.Second) + briefPause(1) + putArgs.Reader, err = fh.Open() if isTimeout(e) { putArgs.BaseParams.Client.Timeout = longClientTimeout diff --git a/reb/globrun.go b/reb/globrun.go index 4994a3b3b05..e773f394859 100644 --- a/reb/globrun.go +++ b/reb/globrun.go @@ -78,6 +78,17 @@ type ( // this state mu sync.Mutex } + ExtArgs struct { + Notif *xact.NotifXact + Tstats cos.StatsUpdater + Bck *meta.Bck // advanced usage, limited scope + Prefix string // ditto + Oxid string + NID int64 // newRMD.Version + } +) + +type ( lomAcks struct { mu *sync.Mutex q map[string]*core.LOM // on the wire, waiting for ACK @@ -93,6 +104,7 @@ type ( opts fs.WalkOpts ver int64 } + // internal runtime context (compare with caller's ExtArgs{} above) rebArgs struct { smap *meta.Smap config *cmn.Config @@ -207,14 +219,14 @@ func _preempt2(logHdr string, id int64) bool { // 4. Global rebalance performs checks such as `stage > rebStageTraverse` or // `stage < rebStageWaitAck`. Since all EC stages are between // `Traverse` and `WaitAck` non-EC rebalance does not "notice" stage changes. -func (reb *Reb) RunRebalance(smap *meta.Smap, id int64, notif *xact.NotifXact, tstats cos.StatsUpdater, oxid string) { - if reb.rebID.Load() == id { +func (reb *Reb) RunRebalance(smap *meta.Smap, extArgs *ExtArgs) { + if reb.rebID.Load() == extArgs.NID { return } - logHdr := reb.logHdr(id, smap, true /*initializing*/) + logHdr := reb.logHdr(extArgs.NID, smap, true /*initializing*/) // preempt - if xact.IsValidRebID(oxid) { - if err := reb._preempt(logHdr, oxid); err != nil { + if xact.IsValidRebID(extArgs.Oxid) { + if err := reb._preempt(logHdr, extArgs.Oxid); err != nil { nlog.Errorln(logHdr, "failed to preempt:", err) return } @@ -222,16 +234,16 @@ func (reb *Reb) RunRebalance(smap *meta.Smap, id int64, notif *xact.NotifXact, t var ( bmd = core.T.Bowner().Get() - rargs = &rebArgs{id: id, smap: smap, config: cmn.GCO.Get(), ecUsed: bmd.IsECUsed(), logHdr: logHdr} // and run with it + rargs = &rebArgs{id: extArgs.NID, smap: smap, config: cmn.GCO.Get(), ecUsed: bmd.IsECUsed(), logHdr: logHdr} // and run with it ) if !_pingall(rargs) { return } - if reb.rebID.Load() == id { + if reb.rebID.Load() == extArgs.NID { return } if err := reb.regRecv(); err != nil { - if !_preempt2(logHdr, id) { + if !_preempt2(logHdr, extArgs.NID) { nlog.Errorln(logHdr, "failed to preempt #2:", err) return } @@ -246,7 +258,7 @@ func (reb *Reb) RunRebalance(smap *meta.Smap, id int64, notif *xact.NotifXact, t if bmd.IsEmpty() { haveStreams = false } - if !reb.initRenew(rargs, notif, haveStreams) { + if !reb.initRenew(rargs, extArgs.Notif, haveStreams) { reb.unregRecv() return } @@ -255,13 +267,17 @@ func (reb *Reb) RunRebalance(smap *meta.Smap, id int64, notif *xact.NotifXact, t nlog.Infof("%s: nothing to do: %s, %s", logHdr, smap.StringEx(), bmd.StringEx()) reb.stages.stage.Store(rebStageDone) reb.unregRecv() - fs.RemoveMarker(fname.RebalanceMarker, tstats) - fs.RemoveMarker(fname.NodeRestartedPrev, tstats) + fs.RemoveMarker(fname.RebalanceMarker, extArgs.Tstats) + fs.RemoveMarker(fname.NodeRestartedPrev, extArgs.Tstats) rargs.xreb.Finish() return } - nlog.Infoln(logHdr, "initializing") + if extArgs.Bck == nil { + nlog.Infoln(logHdr, "initializing") + } else { + nlog.Warningln(logHdr, "initializing - limited scope: [", extArgs.Bck.Cname(extArgs.Prefix), "]") + } // abort all running `dtor.AbortRebRes` xactions (download, dsort, etl) xreg.AbortByNewReb(errors.New("reason: starting " + rargs.xreb.Name())) @@ -275,7 +291,7 @@ func (reb *Reb) RunRebalance(smap *meta.Smap, id int64, notif *xact.NotifXact, t } onGFN() - tstats.SetFlag(cos.NodeAlerts, cos.Rebalancing) + extArgs.Tstats.SetFlag(cos.NodeAlerts, cos.Rebalancing) // run err := reb.run(rargs) @@ -291,8 +307,8 @@ func (reb *Reb) RunRebalance(smap *meta.Smap, id int64, notif *xact.NotifXact, t } reb.changeStage(rebStageFin) - reb.fini(rargs, err, tstats) - tstats.ClrFlag(cos.NodeAlerts, cos.Rebalancing) + reb.fini(rargs, err, extArgs.Tstats) + extArgs.Tstats.ClrFlag(cos.NodeAlerts, cos.Rebalancing) offGFN() if rargs.ecUsed {