Skip to content

Commit

Permalink
stop rebalance with force
Browse files Browse the repository at this point in the history
* CLI: `ais stop --force` option
* UUID cannot begin with 'g' - only rebalance can
* rebalance:
  - always check for maintenance/decommission in progress (fix)
  - force to stop despite maintenance/decommission (advanced usage)
* refactoring; usability; log

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Dec 1, 2024
1 parent b5bf239 commit ed4d00c
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 48 deletions.
50 changes: 35 additions & 15 deletions ais/prxclu.go
Original file line number Diff line number Diff line change
Expand Up @@ -1318,33 +1318,30 @@ func (p *proxy) blobdl(smap *smapX, xargs *xact.ArgsMsg, msg *apc.ActMsg) (tsi *
}

func (p *proxy) xstop(w http.ResponseWriter, r *http.Request, msg *apc.ActMsg) {
var (
xargs = xact.ArgsMsg{}
)
var xargs xact.ArgsMsg
if err := cos.MorphMarshal(msg.Value, &xargs); err != nil {
p.writeErrf(w, r, cmn.FmtErrMorphUnmarshal, p.si, msg.Action, msg.Value, err)
return
}

xargs.Kind, _ = xact.GetKindName(xargs.Kind) // display name => kind

// note: of all xaction kinds only rebalance can have a "valid rebalance ID" (see `cos.GenUUID`)
// make an exception for rebalance: assign its kind to reinforce maintenance check below
if xargs.Kind == "" && xact.IsValidRebID(xargs.ID) {
xargs.Kind = apc.ActRebalance
}

// (lso + tco) special
p.lstca.abort(&xargs)

if xargs.Kind == apc.ActRebalance {
// unless forced:
// disallow aborting rebalance during
// critical (meta.SnodeMaint => meta.SnodeMaintPostReb) and (meta.SnodeDecomm => removed) transitions
smap := p.owner.smap.get()
for _, tsi := range smap.Tmap {
if tsi.Flags.IsAnySet(meta.SnodeMaint) && !tsi.Flags.IsAnySet(meta.SnodeMaintPostReb) {
p.writeErrf(w, r, "cannot abort %s: putting %s in maintenance mode - rebalancing...",
xargs.String(), tsi.StringEx())
return
}
if tsi.Flags.IsAnySet(meta.SnodeDecomm) {
p.writeErrf(w, r, "cannot abort %s: decommissioning %s - rebalancing...",
xargs.String(), tsi.StringEx())
return
}
if err := p._checkMaint(&xargs); err != nil {
p.writeErr(w, r, err)
return
}
}

Expand All @@ -1364,6 +1361,29 @@ func (p *proxy) xstop(w http.ResponseWriter, r *http.Request, msg *apc.ActMsg) {
freeBcastRes(results)
}

func (p *proxy) _checkMaint(xargs *xact.ArgsMsg) error {
smap := p.owner.smap.get()
for _, tsi := range smap.Tmap {
switch {
case tsi.Flags == 0:
// do nothing
case tsi.Flags.IsAnySet(meta.SnodeMaint) && !tsi.Flags.IsAnySet(meta.SnodeMaintPostReb):
warn := "cluster is currently rebalancing while " + tsi.StringEx() + " transitions to maintenance mode"
if !xargs.Force {
return fmt.Errorf("cannot abort %s: %s", xargs.String(), warn)
}
nlog.Errorln("Warning:", warn, "- proceeding anyway")
case tsi.Flags.IsAnySet(meta.SnodeDecomm):
warn := "cluster is currently rebalancing while " + tsi.StringEx() + " is being decommissioned"
if !xargs.Force {
return fmt.Errorf("cannot abort %s: %s", xargs.String(), warn)
}
nlog.Errorln("Warning:", warn, "- proceeding anyway")
}
}
return nil
}

func (p *proxy) rebalanceCluster(w http.ResponseWriter, r *http.Request, msg *apc.ActMsg) {
// note operational priority over config-disabled `errRebalanceDisabled`
if err := p.canRebalance(); err != nil && err != errRebalanceDisabled {
Expand Down
15 changes: 9 additions & 6 deletions cmd/cli/cli/cluster_hdlr.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,13 @@ var (
Name: commandStart,
Usage: "rebalance ais cluster",
Flags: clusterCmdsFlags[commandStart],
Action: startClusterRebalanceHandler,
Action: startRebHandler,
}
stopRebalance = cli.Command{
Name: commandStop,
Usage: "stop rebalancing ais cluster",
Flags: clusterCmdsFlags[commandStop],
Action: stopClusterRebalanceHandler,
Action: stopRebHandler,
}

clusterCmd = cli.Command{
Expand Down Expand Up @@ -543,11 +543,11 @@ func setPrimaryHandler(c *cli.Context) error {
return err
}

func startClusterRebalanceHandler(c *cli.Context) (err error) {
func startRebHandler(c *cli.Context) (err error) {
return startXactionKind(c, apc.ActRebalance)
}

func stopClusterRebalanceHandler(c *cli.Context) error {
func stopRebHandler(c *cli.Context) error {
xargs := xact.ArgsMsg{Kind: apc.ActRebalance, OnlyRunning: true}
_, snap, err := getAnyXactSnap(&xargs)
if err != nil {
Expand All @@ -556,12 +556,15 @@ func stopClusterRebalanceHandler(c *cli.Context) error {
if snap == nil {
return errors.New("rebalance is not running")
}
return stopReb(c, snap.ID)
}

xargs.ID, xargs.OnlyRunning = snap.ID, false
func stopReb(c *cli.Context, xid string) error {
xargs := xact.ArgsMsg{Kind: apc.ActRebalance, ID: xid, Force: flagIsSet(c, forceFlag)}
if err := xstop(&xargs); err != nil {
return V(err)
}
fmt.Fprintf(c.App.Writer, "Stopped %s[%s]\n", apc.ActRebalance, snap.ID)
actionDone(c, fmt.Sprintf("Stopped %s[%s]\n", apc.ActRebalance, xid))
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/cli/cli/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ var (
indent1 + "\t see also: 'ais bucket props show' and 'ais bucket props set')",
}

forceFlag = cli.BoolFlag{Name: "force,f", Usage: "force an action"}
forceFlag = cli.BoolFlag{Name: "force,f", Usage: "force execution of the command " + advancedUsageOnly}
forceClnFlag = cli.BoolFlag{
Name: forceFlag.Name,
Usage: "disregard interrupted rebalance and possibly other conditions preventing full cleanup\n" +
Expand Down
50 changes: 29 additions & 21 deletions cmd/cli/cli/job_hdlr.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ const stopUsage = "terminate a single batch job or multiple jobs, e.g.:\n" +
indent1 + "\t- 'stop tco-cysbohAGL'\t- terminate a given job identified by its unique ID;\n" +
indent1 + "\t- 'stop copy-listrange'\t- terminate all multi-object copies;\n" +
indent1 + "\t- 'stop copy-objects'\t- same as above (using display name);\n" +
indent1 + "\t- 'stop g731 --force'\t- forcefully abort global rebalance g731 (advanced usage only);\n" +
indent1 + "\t- 'stop --all'\t- terminate all running jobs\n" +
indent1 + tabHelpOpt + "."

Expand Down Expand Up @@ -184,7 +185,7 @@ var (
Name: commandRebalance,
Usage: "rebalance ais cluster",
Flags: clusterCmdsFlags[commandStart],
Action: startClusterRebalanceHandler,
Action: startRebHandler,
},
cleanupCmd,
jobStartResilver,
Expand All @@ -198,6 +199,7 @@ var (
stopCmdsFlags = []cli.Flag{
allRunningJobsFlag,
regexJobsFlag,
forceFlag,
yesFlag,
}
jobStopSub = cli.Command{
Expand Down Expand Up @@ -732,25 +734,27 @@ func stopJobHandler(c *cli.Context) error {
actionWarn(c, warn)
}

regex := parseStrFlag(c, regexJobsFlag)

if xid != "" && (flagIsSet(c, allRunningJobsFlag) || regex != "") {
warn := fmt.Sprintf("in presence of %s argument ('%s') flags %s and %s will be ignored",
jobIDArgument, xid, qflprn(allRunningJobsFlag), qflprn(regexJobsFlag))
actionWarn(c, warn)
} else if xid == "" && (flagIsSet(c, allRunningJobsFlag) || regex != "") {
switch name {
case cmdDownload, cmdDsort:
// regex supported
case commandRebalance:
warn := fmt.Sprintf("global rebalance is global (ignoring %s and %s flags)",
qflprn(allRunningJobsFlag), qflprn(regexJobsFlag))
actionWarn(c, warn)
default:
if regex != "" {
warn := fmt.Sprintf("ignoring flag %s - "+NIY, qflprn(regexJobsFlag))
actionWarn(c, warn)
}
// warn
var (
warn string
regex = parseStrFlag(c, regexJobsFlag)
)
switch {
case flagIsSet(c, allRunningJobsFlag) && regex != "":
warn = fmt.Sprintf("flags %s and %s", qflprn(allRunningJobsFlag), qflprn(regexJobsFlag))
case flagIsSet(c, allRunningJobsFlag):
warn = "flag " + qflprn(allRunningJobsFlag)
case regex != "":
warn = "option " + qflprn(regexJobsFlag)
}
if warn != "" {
switch {
case xid != "":
actionWarn(c, fmt.Sprintf("ignoring %s in presence of %s argument ('%s')", warn, jobIDArgument, xid))
case name == commandRebalance:
actionWarn(c, "global rebalance is _global_ - ignoring"+warn)
case regex != "" && name != cmdDownload && name != cmdDsort:
actionWarn(c, "ignoring "+warn+" -"+NIY)
}
}

Expand Down Expand Up @@ -794,7 +798,11 @@ func stopJobHandler(c *cli.Context) error {
case commandETL:
return stopETLs(c, otherID /*etl name*/)
case commandRebalance:
return stopClusterRebalanceHandler(c)
if xid == "" {
return stopRebHandler(c)
} else {
return stopReb(c, xid)
}
}

// generic xstop
Expand Down
5 changes: 2 additions & 3 deletions cmn/cos/uuid.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,11 @@ func InitShortID(seed uint64) {
func GenUUID() (uuid string) {
var h, t string
uuid = sid.MustGenerate()
if !isAlpha(uuid[0]) {
if c := uuid[0]; c == 'g' || !isAlpha(c) { // see also: `xact.RebID2S`
tie := int(rtie.Add(1))
h = string(rune('A' + tie%26))
}
c := uuid[len(uuid)-1]
if c == '-' || c == '_' {
if c := uuid[len(uuid)-1]; c == '-' || c == '_' {
tie := int(rtie.Add(1))
t = string(rune('a' + tie%26))
}
Expand Down
2 changes: 1 addition & 1 deletion xact/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ func CheckValidUUID(id string) (err error) {
func (args *ArgsMsg) String() string {
var sb strings.Builder
sb.Grow(128)
sb.WriteString("xargs-")
sb.WriteString("xa-")
sb.WriteString(args.Kind)
sb.WriteByte('[')
if args.ID != "" {
Expand Down
2 changes: 1 addition & 1 deletion xact/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ func IsValidRebID(id string) (valid bool) {
_, err := S2RebID(id)
valid = err == nil
}
return
return valid
}

func CompareRebIDs(someID, fltID string) int {
Expand Down

0 comments on commit ed4d00c

Please sign in to comment.