Skip to content

Commit

Permalink
global rebalance: limited scope (advanced usage only)
Browse files Browse the repository at this point in the history
* part one

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Dec 5, 2024
1 parent f54e8e1 commit 54755c9
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 32 deletions.
20 changes: 17 additions & 3 deletions ais/prxclu.go
Original file line number Diff line number Diff line change
Expand Up @@ -1193,14 +1193,28 @@ func (p *proxy) _syncConfFinal(ctx *configModifier, clone *globalConfig) {
// xstart: rebalance, resilver, other "startables" (see xaction/api.go)
func (p *proxy) xstart(w http.ResponseWriter, r *http.Request, msg *apc.ActMsg) {
var xargs xact.ArgsMsg
if err := cos.MorphMarshal(msg.Value, &xargs); err != nil {
p.writeErrf(w, r, cmn.FmtErrMorphUnmarshal, p.si, msg.Action, msg.Value, err)
return
if msg.Value != nil {
if err := cos.MorphMarshal(msg.Value, &xargs); err != nil {
p.writeErrf(w, r, cmn.FmtErrMorphUnmarshal, p.si, msg.Action, msg.Value, err)
return
}
}
xargs.Kind, _ = xact.GetKindName(xargs.Kind) // display name => kind

// rebalance
if xargs.Kind == apc.ActRebalance {
if !xargs.Bck.IsEmpty() {
// NOTE: limiting the scope of rebalance to a given bucket[/prefix] (advanced usage)
b := (*meta.Bck)(&xargs.Bck)
if _, present := p.owner.bmd.get().Get(b); !present {
if b.IsRemote() {
p.writeErr(w, r, cmn.NewErrRemoteBckNotFound(&xargs.Bck))
} else {
p.writeErr(w, r, cmn.NewErrBckNotFound(&xargs.Bck))
}
return
}
}
p.rebalanceCluster(w, r, msg)
return
}
Expand Down
10 changes: 9 additions & 1 deletion ais/rebmeta.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,15 @@ func rmdInc(_ *rmdModifier, clone *rebMD) { clone.inc() }
func rmdSync(m *rmdModifier, clone *rebMD) {
debug.Assert(m.cur == clone)
m.listen(nil)
msg := &actMsgExt{ActMsg: apc.ActMsg{Action: apc.ActRebalance}, UUID: m.rebID} // user-requested rebalance

msg := &actMsgExt{
ActMsg: apc.ActMsg{
Value: m.smapCtx.msg.Value, // from the original action msg
Name: m.smapCtx.msg.Name, // ditto
Action: apc.ActRebalance,
},
UUID: m.rebID, // user-requested rebalance
}
wg := m.p.metasyncer.sync(revsPair{m.cur, msg})
if m.wait {
wg.Wait()
Expand Down
34 changes: 24 additions & 10 deletions ais/tgtcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -888,23 +888,37 @@ func (t *target) receiveRMD(newRMD *rebMD, msg *actMsgExt) (err error) {
func (t *target) _runRe(newRMD *rebMD, msg *actMsgExt, smap *smapX, oxid string) {
const tag = "rebalance["
var (
notif = &xact.NotifXact{
Base: nl.Base{When: core.UponTerm, Dsts: []string{equalIC}, F: t.notifyTerm},
nxid = xact.RebID2S(newRMD.Version)
tname = t.String()
extArgs = reb.ExtArgs{
Notif: &xact.NotifXact{
Base: nl.Base{When: core.UponTerm, Dsts: []string{equalIC}, F: t.notifyTerm},
},
Tstats: t.statsT,
Oxid: oxid,
NID: newRMD.Version,
}
nxid = xact.RebID2S(newRMD.Version)
tname = t.String()
)
if msg.UUID != nxid {
nlog.Warningln(tag, msg.UUID, "vs", nxid)
}

// 1. by user
// 1. by user aka admin
if msg.Action == apc.ActRebalance {
xname := tag + msg.UUID + "]"
nlog.Infoln(tname, "starting user-requested", t, xname, nxid)

if msg.Value != nil {
var xargs xact.ArgsMsg
if err := cos.MorphMarshal(msg.Value, &xargs); err == nil {
extArgs.Bck = (*meta.Bck)(&xargs.Bck)
extArgs.Prefix = msg.Name
}
}

nlog.Infoln(tname, "starting user-requested", xname, nxid)

// (##a)
go t.reb.RunRebalance(&smap.Smap, newRMD.Version, notif, t.statsT, oxid)
go t.reb.RunRebalance(&smap.Smap, &extArgs)
return
}

Expand All @@ -926,7 +940,7 @@ func (t *target) _runRe(newRMD *rebMD, msg *actMsgExt, smap *smapX, oxid string)
}
nlog.Infoln(tname, "starting", msg.String(), "-triggered", xname, s, opts)
// (##b)
go t.reb.RunRebalance(&smap.Smap, newRMD.Version, notif, t.statsT, oxid)
go t.reb.RunRebalance(&smap.Smap, &extArgs)

// 2.2. "pure" metasync(newRMD) w/ no action - double-check with cluster config
default:
Expand All @@ -936,7 +950,7 @@ func (t *target) _runRe(newRMD *rebMD, msg *actMsgExt, smap *smapX, oxid string)
if config.Rebalance.Enabled {
nlog.Infoln(tname, "starting", xname)
// (##c)
go t.reb.RunRebalance(&smap.Smap, newRMD.Version, notif, t.statsT, oxid)
go t.reb.RunRebalance(&smap.Smap, &extArgs)
} else {
runtime.Gosched()

Expand All @@ -957,7 +971,7 @@ func (t *target) _runRe(newRMD *rebMD, msg *actMsgExt, smap *smapX, oxid string)

// (##d)
nlog.Infoln(tname, "starting", xname)
t.reb.RunRebalance(&smap.Smap, newRMD.Version, notif, t.statsT, oxid)
t.reb.RunRebalance(&smap.Smap, &extArgs)
}()
}
}
Expand Down
23 changes: 22 additions & 1 deletion cmd/cli/cli/cluster_hdlr.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,28 @@ func setPrimaryHandler(c *cli.Context) error {
}

func startRebHandler(c *cli.Context) (err error) {
return startXactionKind(c, apc.ActRebalance)
var (
extra string
xargs = xact.ArgsMsg{Kind: apc.ActRebalance}
)
if c.NArg() > 0 {
uri := preparseBckObjURI(c.Args().Get(0))
bck, prefix, err := parseBckObjURI(c, uri, true /*emptyObjnameOK*/)
if err != nil {
return err
}
if _, err := headBucket(bck, false /* don't add */); err != nil {
return err
}
actionWarn(c, "limiting the scope of rebalance to only '"+uri+"' is not recommended!")
briefPause(2)

// beware
xargs.Bck = bck
extra = prefix
}

return startXaction(c, &xargs, extra)
}

func stopRebHandler(c *cli.Context) error {
Expand Down
6 changes: 4 additions & 2 deletions cmd/cli/cli/verbfobj.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,8 @@ func (u *uctx) do(c *cli.Context, p *uparams, fobj fobj, fh *cos.FileHandle, upd
if i < iters-1 {
s := fmt.Sprintf("[#%d] %s: %v - retrying...", i+1, fobj.path, e)
fmt.Fprintln(c.App.ErrWriter, s)
time.Sleep(time.Second)
briefPause(1)

ffh, errO := fh.Open()
if errO != nil {
fmt.Fprintf(c.App.ErrWriter, "failed to reopen %s: %v\n", fobj.path, errO)
Expand Down Expand Up @@ -455,7 +456,8 @@ func putRegular(c *cli.Context, bck cmn.Bck, objName, path string, finfo os.File
if i < iters-1 {
s := fmt.Sprintf("[#%d] %s: %v - retrying...", i+1, path, e)
fmt.Fprintln(c.App.ErrWriter, s)
time.Sleep(time.Second)
briefPause(1)

putArgs.Reader, err = fh.Open()
if isTimeout(e) {
putArgs.BaseParams.Client.Timeout = longClientTimeout
Expand Down
46 changes: 31 additions & 15 deletions reb/globrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,17 @@ type (
// this state
mu sync.Mutex
}
ExtArgs struct {
Notif *xact.NotifXact
Tstats cos.StatsUpdater
Bck *meta.Bck // advanced usage, limited scope
Prefix string // ditto
Oxid string
NID int64 // newRMD.Version
}
)

type (
lomAcks struct {
mu *sync.Mutex
q map[string]*core.LOM // on the wire, waiting for ACK
Expand All @@ -93,6 +104,7 @@ type (
opts fs.WalkOpts
ver int64
}
// internal runtime context (compare with caller's ExtArgs{} above)
rebArgs struct {
smap *meta.Smap
config *cmn.Config
Expand Down Expand Up @@ -207,31 +219,31 @@ func _preempt2(logHdr string, id int64) bool {
// 4. Global rebalance performs checks such as `stage > rebStageTraverse` or
// `stage < rebStageWaitAck`. Since all EC stages are between
// `Traverse` and `WaitAck` non-EC rebalance does not "notice" stage changes.
func (reb *Reb) RunRebalance(smap *meta.Smap, id int64, notif *xact.NotifXact, tstats cos.StatsUpdater, oxid string) {
if reb.rebID.Load() == id {
func (reb *Reb) RunRebalance(smap *meta.Smap, extArgs *ExtArgs) {
if reb.rebID.Load() == extArgs.NID {
return
}
logHdr := reb.logHdr(id, smap, true /*initializing*/)
logHdr := reb.logHdr(extArgs.NID, smap, true /*initializing*/)
// preempt
if xact.IsValidRebID(oxid) {
if err := reb._preempt(logHdr, oxid); err != nil {
if xact.IsValidRebID(extArgs.Oxid) {
if err := reb._preempt(logHdr, extArgs.Oxid); err != nil {
nlog.Errorln(logHdr, "failed to preempt:", err)
return
}
}

var (
bmd = core.T.Bowner().Get()
rargs = &rebArgs{id: id, smap: smap, config: cmn.GCO.Get(), ecUsed: bmd.IsECUsed(), logHdr: logHdr} // and run with it
rargs = &rebArgs{id: extArgs.NID, smap: smap, config: cmn.GCO.Get(), ecUsed: bmd.IsECUsed(), logHdr: logHdr} // and run with it
)
if !_pingall(rargs) {
return
}
if reb.rebID.Load() == id {
if reb.rebID.Load() == extArgs.NID {
return
}
if err := reb.regRecv(); err != nil {
if !_preempt2(logHdr, id) {
if !_preempt2(logHdr, extArgs.NID) {
nlog.Errorln(logHdr, "failed to preempt #2:", err)
return
}
Expand All @@ -246,7 +258,7 @@ func (reb *Reb) RunRebalance(smap *meta.Smap, id int64, notif *xact.NotifXact, t
if bmd.IsEmpty() {
haveStreams = false
}
if !reb.initRenew(rargs, notif, haveStreams) {
if !reb.initRenew(rargs, extArgs.Notif, haveStreams) {
reb.unregRecv()
return
}
Expand All @@ -255,13 +267,17 @@ func (reb *Reb) RunRebalance(smap *meta.Smap, id int64, notif *xact.NotifXact, t
nlog.Infof("%s: nothing to do: %s, %s", logHdr, smap.StringEx(), bmd.StringEx())
reb.stages.stage.Store(rebStageDone)
reb.unregRecv()
fs.RemoveMarker(fname.RebalanceMarker, tstats)
fs.RemoveMarker(fname.NodeRestartedPrev, tstats)
fs.RemoveMarker(fname.RebalanceMarker, extArgs.Tstats)
fs.RemoveMarker(fname.NodeRestartedPrev, extArgs.Tstats)
rargs.xreb.Finish()
return
}

nlog.Infoln(logHdr, "initializing")
if extArgs.Bck == nil {
nlog.Infoln(logHdr, "initializing")
} else {
nlog.Warningln(logHdr, "initializing - limited scope: [", extArgs.Bck.Cname(extArgs.Prefix), "]")
}

// abort all running `dtor.AbortRebRes` xactions (download, dsort, etl)
xreg.AbortByNewReb(errors.New("reason: starting " + rargs.xreb.Name()))
Expand All @@ -275,7 +291,7 @@ func (reb *Reb) RunRebalance(smap *meta.Smap, id int64, notif *xact.NotifXact, t
}
onGFN()

tstats.SetFlag(cos.NodeAlerts, cos.Rebalancing)
extArgs.Tstats.SetFlag(cos.NodeAlerts, cos.Rebalancing)

// run
err := reb.run(rargs)
Expand All @@ -291,8 +307,8 @@ func (reb *Reb) RunRebalance(smap *meta.Smap, id int64, notif *xact.NotifXact, t
}
reb.changeStage(rebStageFin)

reb.fini(rargs, err, tstats)
tstats.ClrFlag(cos.NodeAlerts, cos.Rebalancing)
reb.fini(rargs, err, extArgs.Tstats)
extArgs.Tstats.ClrFlag(cos.NodeAlerts, cos.Rebalancing)

offGFN()
if rargs.ecUsed {
Expand Down

0 comments on commit 54755c9

Please sign in to comment.