Skip to content

Commit

Permalink
global rebalance; node state flags (a.k.a. alerts)
Browse files Browse the repository at this point in the history
* when removing persistent markers update node flags as well
  (one operation, one state)
* CLI 'show cluster': rebalance disabled is _red_
* rebalance
  - when transitioning to receive-ready state check 'aborted'
  - clarify/improve logs; add state info
  - quiescence timeout is not Ok - do not cleanup markers
* remove `cleanupMark` (code and action message) from everywhere; add extended comment
* refactor and simplify

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Dec 2, 2024
1 parent e51f63c commit baa11bf
Show file tree
Hide file tree
Showing 32 changed files with 300 additions and 302 deletions.
8 changes: 4 additions & 4 deletions ais/aismsg_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (atc aismsgTestConf) Name() string {

func testAisMsgMarshal(t *testing.T, tc aismsgTestConf) {
t.Run(tc.Name(), func(t *testing.T) {
beforeMsg := &aisMsg{}
beforeMsg := &actMsgExt{}
if tc.actionMsgPresent {
actionMsg := apc.ActMsg{
Action: "test-action",
Expand All @@ -53,18 +53,18 @@ func testAisMsgMarshal(t *testing.T, tc aismsgTestConf) {
if err != nil {
t.Errorf("Failed to marshal beforeMsg: %v", err)
}
afterAisMsg := &aisMsg{}
afterAisMsg := &actMsgExt{}

err = jsoniter.Unmarshal(b, afterAisMsg)
if err != nil {
t.Errorf("Unmarshal failed for aisMsg, err: %v", err)
t.Errorf("Unmarshal failed for actMsgExt, err: %v", err)
}

if afterAisMsg.Value != nil {
bck := &cmn.Bck{}
err = cos.MorphMarshal(afterAisMsg.Value, bck)
if err != nil {
t.Errorf("Morph marshal failed for aisMsg.Value: %v, err: %v", afterAisMsg.Value, err)
t.Errorf("Morph marshal failed for actMsgExt.Value: %v, err: %v", afterAisMsg.Value, err)
}
afterAisMsg.Value = bck
}
Expand Down
14 changes: 7 additions & 7 deletions ais/earlystart.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,8 @@ func (p *proxy) primaryStartup(loadedSmap *smapX, config *cmn.Config, ntargets i
// 10. metasync (smap, config, etl & bmd) and startup as primary
smap = p.owner.smap.get()
var (
aisMsg = p.newAmsgStr(metaction2, bmd)
pairs = []revsPair{{smap, aisMsg}, {bmd, aisMsg}, {cluConfig, aisMsg}}
actMsgExt = p.newAmsgStr(metaction2, bmd)
pairs = []revsPair{{smap, actMsgExt}, {bmd, actMsgExt}, {cluConfig, actMsgExt}}
)
wg := p.metasyncer.sync(pairs...)
wg.Wait()
Expand All @@ -349,7 +349,7 @@ func (p *proxy) primaryStartup(loadedSmap *smapX, config *cmn.Config, ntargets i
nlog.Infoln(smap.StringEx()+",", bmd.StringEx())

if etlMD.Version > 0 {
_ = p.metasyncer.sync(revsPair{etlMD, aisMsg})
_ = p.metasyncer.sync(revsPair{etlMD, actMsgExt})
}

// 11. Clear regpool
Expand Down Expand Up @@ -453,9 +453,9 @@ until:

// do
var (
msg = &apc.ActMsg{Action: apc.ActRebalance, Value: metaction3}
aisMsg = p.newAmsg(msg, nil)
ctx = &rmdModifier{
msg = &apc.ActMsg{Action: apc.ActRebalance, Value: metaction3}
actMsgExt = p.newAmsg(msg, nil)
ctx = &rmdModifier{
pre: func(_ *rmdModifier, clone *rebMD) { clone.Version += 100 },
smapCtx: &smapModifier{smap: smap},
cluID: smap.UUID,
Expand All @@ -465,7 +465,7 @@ until:
if err != nil {
cos.ExitLog(err)
}
wg := p.metasyncer.sync(revsPair{rmd, aisMsg})
wg := p.metasyncer.sync(revsPair{rmd, actMsgExt})

p.owner.rmd.starting.Store(false) // done
p.owner.smap.mu.Unlock()
Expand Down
21 changes: 9 additions & 12 deletions ais/htcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,12 @@ type (
}

// extend control msg: ActionMsg with an extra information for node <=> node control plane communications
aisMsg struct {
actMsgExt struct {
apc.ActMsg
UUID string `json:"uuid"` // cluster-wide ID of this action (operation, transaction)
BMDVersion int64 `json:"bmdversion,string"`
RMDVersion int64 `json:"rmdversion,string"`
}

cleanmark struct {
OldVer int64 `json:"oldver,string"`
NewVer int64 `json:"newver,string"`
Interrupted bool `json:"interrupted"`
Restarted bool `json:"restarted"`
}
)

type (
Expand Down Expand Up @@ -678,15 +671,19 @@ func (p *proxy) fillNsti(nsti *cos.NodeStateInfo) {
func (t *target) fillNsti(nsti *cos.NodeStateInfo) {
t.htrun.fill(nsti)
marked := xreg.GetRebMarked()
if marked.Xact != nil {

// (running | interrupted | ok)
if xreb := marked.Xact; xreb != nil && !xreb.IsAborted() && !xreb.Finished() {
nsti.Flags = nsti.Flags.Set(cos.Rebalancing)
}
if marked.Interrupted {
} else if marked.Interrupted {
nsti.Flags = nsti.Flags.Set(cos.RebalanceInterrupted)
}

// node restarted
if marked.Restarted {
nsti.Flags = nsti.Flags.Set(cos.Restarted)
nsti.Flags = nsti.Flags.Set(cos.NodeRestarted)
}

marked = xreg.GetResilverMarked()
if marked.Xact != nil {
nsti.Flags = nsti.Flags.Set(cos.Resilvering)
Expand Down
52 changes: 26 additions & 26 deletions ais/htrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (h *htrun) cluMeta(opts cmetaFillOpt) (*cluMeta, error) {
cm.Flags = cm.Flags.Set(cos.RebalanceInterrupted)
}
if restarted {
cm.Flags = cm.Flags.Set(cos.Restarted)
cm.Flags = cm.Flags.Set(cos.NodeRestarted)
}
}
if !opts.skipPrimeTime && smap.IsPrimary(h.si) {
Expand Down Expand Up @@ -897,7 +897,7 @@ func (h *htrun) bcastSelected(bargs *bcastArgs) sliceResults {
return results.s
}

func (h *htrun) bcastAsyncIC(msg *aisMsg) {
func (h *htrun) bcastAsyncIC(msg *actMsgExt) {
var (
wg = &sync.WaitGroup{}
smap = h.owner.smap.get()
Expand Down Expand Up @@ -1527,7 +1527,7 @@ func (h *htrun) warnMsync(r *http.Request, smap *smapX) {
}
}

func logmsync(lver int64, revs revs, msg *aisMsg, opts ...string) { // caller [, what, luuid]
func logmsync(lver int64, revs revs, msg *actMsgExt, opts ...string) { // caller [, what, luuid]
const tag = "msync Rx:"
var (
what string
Expand Down Expand Up @@ -1575,11 +1575,11 @@ func logmsync(lver int64, revs revs, msg *aisMsg, opts ...string) { // caller [,
}
}

func (h *htrun) extractConfig(payload msPayload, caller string) (newConfig *globalConfig, msg *aisMsg, err error) {
func (h *htrun) extractConfig(payload msPayload, caller string) (newConfig *globalConfig, msg *actMsgExt, err error) {
if _, ok := payload[revsConfTag]; !ok {
return
}
newConfig, msg = &globalConfig{}, &aisMsg{}
newConfig, msg = &globalConfig{}, &actMsgExt{}
confValue := payload[revsConfTag]
reader := bytes.NewBuffer(confValue)
if _, err1 := jsp.Decode(io.NopCloser(reader), newConfig, newConfig.JspOpts(), "extractConfig"); err1 != nil {
Expand All @@ -1605,11 +1605,11 @@ func (h *htrun) extractConfig(payload msPayload, caller string) (newConfig *glob
return
}

func (h *htrun) extractEtlMD(payload msPayload, caller string) (newMD *etlMD, msg *aisMsg, err error) {
func (h *htrun) extractEtlMD(payload msPayload, caller string) (newMD *etlMD, msg *actMsgExt, err error) {
if _, ok := payload[revsEtlMDTag]; !ok {
return
}
newMD, msg = newEtlMD(), &aisMsg{}
newMD, msg = newEtlMD(), &actMsgExt{}
etlMDValue := payload[revsEtlMDTag]
reader := bytes.NewBuffer(etlMDValue)
if _, err1 := jsp.Decode(io.NopCloser(reader), newMD, newMD.JspOpts(), "extractEtlMD"); err1 != nil {
Expand All @@ -1635,12 +1635,12 @@ func (h *htrun) extractEtlMD(payload msPayload, caller string) (newMD *etlMD, ms
return
}

func (h *htrun) extractSmap(payload msPayload, caller string, skipValidation bool) (newSmap *smapX, msg *aisMsg, err error) {
func (h *htrun) extractSmap(payload msPayload, caller string, skipValidation bool) (newSmap *smapX, msg *actMsgExt, err error) {
const act = "extract-smap"
if _, ok := payload[revsSmapTag]; !ok {
return
}
newSmap, msg = &smapX{}, &aisMsg{}
newSmap, msg = &smapX{}, &actMsgExt{}
smapValue := payload[revsSmapTag]
reader := bytes.NewBuffer(smapValue)
if _, err1 := jsp.Decode(io.NopCloser(reader), newSmap, newSmap.JspOpts(), act); err1 != nil {
Expand Down Expand Up @@ -1701,11 +1701,11 @@ func (h *htrun) extractSmap(payload msPayload, caller string, skipValidation boo
return
}

func (h *htrun) extractRMD(payload msPayload, caller string) (newRMD *rebMD, msg *aisMsg, err error) {
func (h *htrun) extractRMD(payload msPayload, caller string) (newRMD *rebMD, msg *actMsgExt, err error) {
if _, ok := payload[revsRMDTag]; !ok {
return
}
newRMD, msg = &rebMD{}, &aisMsg{}
newRMD, msg = &rebMD{}, &actMsgExt{}
rmdValue := payload[revsRMDTag]
if err1 := jsoniter.Unmarshal(rmdValue, newRMD); err1 != nil {
err = fmt.Errorf(cmn.FmtErrUnmarshal, h, "new RMD", cos.BHead(rmdValue), err1)
Expand Down Expand Up @@ -1739,11 +1739,11 @@ func (h *htrun) extractRMD(payload msPayload, caller string) (newRMD *rebMD, msg
return
}

func (h *htrun) extractBMD(payload msPayload, caller string) (newBMD *bucketMD, msg *aisMsg, err error) {
func (h *htrun) extractBMD(payload msPayload, caller string) (newBMD *bucketMD, msg *actMsgExt, err error) {
if _, ok := payload[revsBMDTag]; !ok {
return
}
newBMD, msg = &bucketMD{}, &aisMsg{}
newBMD, msg = &bucketMD{}, &actMsgExt{}
bmdValue := payload[revsBMDTag]
reader := bytes.NewBuffer(bmdValue)
if _, err1 := jsp.Decode(io.NopCloser(reader), newBMD, newBMD.JspOpts(), "extractBMD"); err1 != nil {
Expand Down Expand Up @@ -1773,7 +1773,7 @@ func (h *htrun) extractBMD(payload msPayload, caller string) (newBMD *bucketMD,
return
}

func (h *htrun) receiveSmap(newSmap *smapX, msg *aisMsg, payload msPayload, caller string, cb smapUpdatedCB) error {
func (h *htrun) receiveSmap(newSmap *smapX, msg *actMsgExt, payload msPayload, caller string, cb smapUpdatedCB) error {
if newSmap == nil {
return nil
}
Expand All @@ -1786,7 +1786,7 @@ func (h *htrun) receiveSmap(newSmap *smapX, msg *aisMsg, payload msPayload, call
return h.owner.smap.synchronize(h.si, newSmap, payload, cb)
}

func (h *htrun) receiveEtlMD(newEtlMD *etlMD, msg *aisMsg, payload msPayload, caller string, cb func(ne, oe *etlMD)) (err error) {
func (h *htrun) receiveEtlMD(newEtlMD *etlMD, msg *actMsgExt, payload msPayload, caller string, cb func(ne, oe *etlMD)) (err error) {
if newEtlMD == nil {
return
}
Expand All @@ -1813,7 +1813,7 @@ func (h *htrun) receiveEtlMD(newEtlMD *etlMD, msg *aisMsg, payload msPayload, ca
}

// under lock
func (h *htrun) _recvCfg(newConfig *globalConfig, msg *aisMsg, payload msPayload) (err error) {
func (h *htrun) _recvCfg(newConfig *globalConfig, msg *actMsgExt, payload msPayload) (err error) {
config := cmn.GCO.Get()
if newConfig.version() <= config.Version && msg.Action != apc.ActPrimaryForce {
if newConfig.version() == config.Version {
Expand All @@ -1836,7 +1836,7 @@ func (h *htrun) _recvCfg(newConfig *globalConfig, msg *aisMsg, payload msPayload

func (h *htrun) extractRevokedTokenList(payload msPayload, caller string) (*tokenList, error) {
var (
msg aisMsg
msg actMsgExt
bytes, ok = payload[revsTokenTag]
)
if !ok {
Expand Down Expand Up @@ -2340,16 +2340,16 @@ func ptLatency(tts int64, ptime, isPrimary string) (dur int64) {
}

//
// aisMsg reader & constructors
// actMsgExt reader & constructors
//

func (*htrun) readAisMsg(w http.ResponseWriter, r *http.Request) (msg *aisMsg, err error) {
msg = &aisMsg{}
func (*htrun) readAisMsg(w http.ResponseWriter, r *http.Request) (msg *actMsgExt, err error) {
msg = &actMsgExt{}
err = cmn.ReadJSON(w, r, msg)
return
}

func (msg *aisMsg) String() string {
func (msg *actMsgExt) String() string {
s := "aism[" + msg.Action
if msg.UUID != "" {
s += "[" + msg.UUID + "]"
Expand All @@ -2360,24 +2360,24 @@ func (msg *aisMsg) String() string {
return s + "]"
}

func (msg *aisMsg) StringEx() (s string) {
func (msg *actMsgExt) StringEx() (s string) {
s = msg.String()
vs, err := jsoniter.Marshal(msg.Value)
debug.AssertNoErr(err)
s += ",(" + strings.ReplaceAll(string(vs), ",", ", ") + ")"
return
}

func (h *htrun) newAmsgStr(msgStr string, bmd *bucketMD) *aisMsg {
func (h *htrun) newAmsgStr(msgStr string, bmd *bucketMD) *actMsgExt {
return h.newAmsg(&apc.ActMsg{Value: msgStr}, bmd)
}

func (h *htrun) newAmsgActVal(act string, val any) *aisMsg {
func (h *htrun) newAmsgActVal(act string, val any) *actMsgExt {
return h.newAmsg(&apc.ActMsg{Action: act, Value: val}, nil)
}

func (h *htrun) newAmsg(amsg *apc.ActMsg, bmd *bucketMD, uuid ...string) *aisMsg {
msg := &aisMsg{ActMsg: *amsg}
func (h *htrun) newAmsg(amsg *apc.ActMsg, bmd *bucketMD, uuid ...string) *actMsgExt {
msg := &actMsgExt{ActMsg: *amsg}
if bmd != nil {
msg.BMDVersion = bmd.Version
} else {
Expand Down
2 changes: 1 addition & 1 deletion ais/ic.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func (ic *ic) handleGet(w http.ResponseWriter, r *http.Request) {
func (ic *ic) handlePost(w http.ResponseWriter, r *http.Request) {
var (
smap = ic.p.owner.smap.get()
msg = &aisMsg{}
msg = &actMsgExt{}
)
if err := cmn.ReadJSON(w, r, msg); err != nil {
return
Expand Down
8 changes: 4 additions & 4 deletions ais/metasync.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import (
//
// (shared-replicated-object, associated action-message),
//
// where `associated action-message` (aisMsg) provides receivers with the operation
// where `associated action-message` (actMsgExt) provides receivers with the operation
// ("action") and other relevant context.
//
// Further, the metasyncer:
Expand Down Expand Up @@ -117,7 +117,7 @@ type (
}
revsPair struct {
revs revs
msg *aisMsg
msg *actMsgExt
}
revsReq struct {
wg *sync.WaitGroup
Expand Down Expand Up @@ -446,7 +446,7 @@ func (y *metasyncer) jit(pair revsPair) revs {
return revs
}

// keeping track of per-daemon versioning - TODO: extend to take care of aisMsg where pairs may be empty
// keeping track of per-daemon versioning - TODO: extend to take care of actMsgExt where pairs may be empty
func (y *metasyncer) syncDone(si *meta.Snode, pairs []revsPair) {
ndr, ok := y.nodesRevs[si.ID()]
smap := y.p.owner.smap.get()
Expand Down Expand Up @@ -526,7 +526,7 @@ func (y *metasyncer) _pending() (pending meta.NodeMap, smap *smapX) {
break
}
if v > revs.version() {
// skip older versions (TODO: don't skip sending associated aisMsg)
// skip older versions (TODO: don't skip sending associated actMsgExt)
nlog.Errorf("v: %d; revs.version: %d", v, revs.version())
}
}
Expand Down
10 changes: 5 additions & 5 deletions ais/metasync_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ func TestMetasyncData(t *testing.T) {
bmd = newBucketMD()
)

emptyAisMsg, err := jsoniter.Marshal(aisMsg{})
emptyAisMsg, err := jsoniter.Marshal(actMsgExt{})
if err != nil {
t.Fatal("Failed to marshal empty apc.ActMsg, err =", err)
}
Expand All @@ -634,7 +634,7 @@ func TestMetasyncData(t *testing.T) {
exp[revsSmapTag+revsActionTag] = emptyAisMsg
expRetry[revsSmapTag+revsActionTag] = emptyAisMsg

syncer.sync(revsPair{smap, &aisMsg{}})
syncer.sync(revsPair{smap, &actMsgExt{}})
match(t, expRetry, ch, 1)

// sync bucketmd, fail target and retry
Expand All @@ -656,7 +656,7 @@ func TestMetasyncData(t *testing.T) {
exp[revsBMDTag+revsActionTag] = emptyAisMsg
expRetry[revsBMDTag+revsActionTag] = emptyAisMsg

syncer.sync(revsPair{bmd, &aisMsg{}})
syncer.sync(revsPair{bmd, &actMsgExt{}})
match(t, exp, ch, 1)
match(t, expRetry, ch, 1)

Expand Down Expand Up @@ -794,7 +794,7 @@ func TestMetasyncMembership(t *testing.T) {
// TestMetasyncReceive tests extracting received sync data.
func TestMetasyncReceive(t *testing.T) {
{
emptyAisMsg := func(a *aisMsg) {
emptyAisMsg := func(a *actMsgExt) {
if a.Action != "" || a.Name != "" || a.Value != nil {
t.Fatal("Expecting empty action message", a)
}
Expand Down Expand Up @@ -847,7 +847,7 @@ func TestMetasyncReceive(t *testing.T) {
t.Fatal("Extract smap from empty payload returned data")
}

wg1 := syncer.sync(revsPair{primary.owner.smap.get(), &aisMsg{}})
wg1 := syncer.sync(revsPair{primary.owner.smap.get(), &actMsgExt{}})
wg1.Wait()
payload := <-chProxy

Expand Down
Loading

0 comments on commit baa11bf

Please sign in to comment.