diff --git a/ais/aismsg_internal_test.go b/ais/aismsg_internal_test.go index ec7c1ed6629..cc152899bfa 100644 --- a/ais/aismsg_internal_test.go +++ b/ais/aismsg_internal_test.go @@ -28,7 +28,7 @@ func (atc aismsgTestConf) Name() string { func testAisMsgMarshal(t *testing.T, tc aismsgTestConf) { t.Run(tc.Name(), func(t *testing.T) { - beforeMsg := &aisMsg{} + beforeMsg := &actMsgExt{} if tc.actionMsgPresent { actionMsg := apc.ActMsg{ Action: "test-action", @@ -53,18 +53,18 @@ func testAisMsgMarshal(t *testing.T, tc aismsgTestConf) { if err != nil { t.Errorf("Failed to marshal beforeMsg: %v", err) } - afterAisMsg := &aisMsg{} + afterAisMsg := &actMsgExt{} err = jsoniter.Unmarshal(b, afterAisMsg) if err != nil { - t.Errorf("Unmarshal failed for aisMsg, err: %v", err) + t.Errorf("Unmarshal failed for actMsgExt, err: %v", err) } if afterAisMsg.Value != nil { bck := &cmn.Bck{} err = cos.MorphMarshal(afterAisMsg.Value, bck) if err != nil { - t.Errorf("Morph marshal failed for aisMsg.Value: %v, err: %v", afterAisMsg.Value, err) + t.Errorf("Morph marshal failed for actMsgExt.Value: %v, err: %v", afterAisMsg.Value, err) } afterAisMsg.Value = bck } diff --git a/ais/earlystart.go b/ais/earlystart.go index 1209911231f..4756f51c01f 100644 --- a/ais/earlystart.go +++ b/ais/earlystart.go @@ -339,8 +339,8 @@ func (p *proxy) primaryStartup(loadedSmap *smapX, config *cmn.Config, ntargets i // 10. metasync (smap, config, etl & bmd) and startup as primary smap = p.owner.smap.get() var ( - aisMsg = p.newAmsgStr(metaction2, bmd) - pairs = []revsPair{{smap, aisMsg}, {bmd, aisMsg}, {cluConfig, aisMsg}} + actMsgExt = p.newAmsgStr(metaction2, bmd) + pairs = []revsPair{{smap, actMsgExt}, {bmd, actMsgExt}, {cluConfig, actMsgExt}} ) wg := p.metasyncer.sync(pairs...) wg.Wait() @@ -349,7 +349,7 @@ func (p *proxy) primaryStartup(loadedSmap *smapX, config *cmn.Config, ntargets i nlog.Infoln(smap.StringEx()+",", bmd.StringEx()) if etlMD.Version > 0 { - _ = p.metasyncer.sync(revsPair{etlMD, aisMsg}) + _ = p.metasyncer.sync(revsPair{etlMD, actMsgExt}) } // 11. Clear regpool @@ -453,9 +453,9 @@ until: // do var ( - msg = &apc.ActMsg{Action: apc.ActRebalance, Value: metaction3} - aisMsg = p.newAmsg(msg, nil) - ctx = &rmdModifier{ + msg = &apc.ActMsg{Action: apc.ActRebalance, Value: metaction3} + actMsgExt = p.newAmsg(msg, nil) + ctx = &rmdModifier{ pre: func(_ *rmdModifier, clone *rebMD) { clone.Version += 100 }, smapCtx: &smapModifier{smap: smap}, cluID: smap.UUID, @@ -465,7 +465,7 @@ until: if err != nil { cos.ExitLog(err) } - wg := p.metasyncer.sync(revsPair{rmd, aisMsg}) + wg := p.metasyncer.sync(revsPair{rmd, actMsgExt}) p.owner.rmd.starting.Store(false) // done p.owner.smap.mu.Unlock() diff --git a/ais/htcommon.go b/ais/htcommon.go index d40faaaec7f..a5e96be1950 100644 --- a/ais/htcommon.go +++ b/ais/htcommon.go @@ -72,19 +72,12 @@ type ( } // extend control msg: ActionMsg with an extra information for node <=> node control plane communications - aisMsg struct { + actMsgExt struct { apc.ActMsg UUID string `json:"uuid"` // cluster-wide ID of this action (operation, transaction) BMDVersion int64 `json:"bmdversion,string"` RMDVersion int64 `json:"rmdversion,string"` } - - cleanmark struct { - OldVer int64 `json:"oldver,string"` - NewVer int64 `json:"newver,string"` - Interrupted bool `json:"interrupted"` - Restarted bool `json:"restarted"` - } ) type ( @@ -678,15 +671,19 @@ func (p *proxy) fillNsti(nsti *cos.NodeStateInfo) { func (t *target) fillNsti(nsti *cos.NodeStateInfo) { t.htrun.fill(nsti) marked := xreg.GetRebMarked() - if marked.Xact != nil { + + // (running | interrupted | ok) + if xreb := marked.Xact; xreb != nil && !xreb.IsAborted() && !xreb.Finished() { nsti.Flags = nsti.Flags.Set(cos.Rebalancing) - } - if marked.Interrupted { + } else if marked.Interrupted { nsti.Flags = nsti.Flags.Set(cos.RebalanceInterrupted) } + + // node restarted if marked.Restarted { - nsti.Flags = nsti.Flags.Set(cos.Restarted) + nsti.Flags = nsti.Flags.Set(cos.NodeRestarted) } + marked = xreg.GetResilverMarked() if marked.Xact != nil { nsti.Flags = nsti.Flags.Set(cos.Resilvering) diff --git a/ais/htrun.go b/ais/htrun.go index 22389ad982c..5c7d0dbdf7f 100644 --- a/ais/htrun.go +++ b/ais/htrun.go @@ -170,7 +170,7 @@ func (h *htrun) cluMeta(opts cmetaFillOpt) (*cluMeta, error) { cm.Flags = cm.Flags.Set(cos.RebalanceInterrupted) } if restarted { - cm.Flags = cm.Flags.Set(cos.Restarted) + cm.Flags = cm.Flags.Set(cos.NodeRestarted) } } if !opts.skipPrimeTime && smap.IsPrimary(h.si) { @@ -897,7 +897,7 @@ func (h *htrun) bcastSelected(bargs *bcastArgs) sliceResults { return results.s } -func (h *htrun) bcastAsyncIC(msg *aisMsg) { +func (h *htrun) bcastAsyncIC(msg *actMsgExt) { var ( wg = &sync.WaitGroup{} smap = h.owner.smap.get() @@ -1527,7 +1527,7 @@ func (h *htrun) warnMsync(r *http.Request, smap *smapX) { } } -func logmsync(lver int64, revs revs, msg *aisMsg, opts ...string) { // caller [, what, luuid] +func logmsync(lver int64, revs revs, msg *actMsgExt, opts ...string) { // caller [, what, luuid] const tag = "msync Rx:" var ( what string @@ -1575,11 +1575,11 @@ func logmsync(lver int64, revs revs, msg *aisMsg, opts ...string) { // caller [, } } -func (h *htrun) extractConfig(payload msPayload, caller string) (newConfig *globalConfig, msg *aisMsg, err error) { +func (h *htrun) extractConfig(payload msPayload, caller string) (newConfig *globalConfig, msg *actMsgExt, err error) { if _, ok := payload[revsConfTag]; !ok { return } - newConfig, msg = &globalConfig{}, &aisMsg{} + newConfig, msg = &globalConfig{}, &actMsgExt{} confValue := payload[revsConfTag] reader := bytes.NewBuffer(confValue) if _, err1 := jsp.Decode(io.NopCloser(reader), newConfig, newConfig.JspOpts(), "extractConfig"); err1 != nil { @@ -1605,11 +1605,11 @@ func (h *htrun) extractConfig(payload msPayload, caller string) (newConfig *glob return } -func (h *htrun) extractEtlMD(payload msPayload, caller string) (newMD *etlMD, msg *aisMsg, err error) { +func (h *htrun) extractEtlMD(payload msPayload, caller string) (newMD *etlMD, msg *actMsgExt, err error) { if _, ok := payload[revsEtlMDTag]; !ok { return } - newMD, msg = newEtlMD(), &aisMsg{} + newMD, msg = newEtlMD(), &actMsgExt{} etlMDValue := payload[revsEtlMDTag] reader := bytes.NewBuffer(etlMDValue) if _, err1 := jsp.Decode(io.NopCloser(reader), newMD, newMD.JspOpts(), "extractEtlMD"); err1 != nil { @@ -1635,12 +1635,12 @@ func (h *htrun) extractEtlMD(payload msPayload, caller string) (newMD *etlMD, ms return } -func (h *htrun) extractSmap(payload msPayload, caller string, skipValidation bool) (newSmap *smapX, msg *aisMsg, err error) { +func (h *htrun) extractSmap(payload msPayload, caller string, skipValidation bool) (newSmap *smapX, msg *actMsgExt, err error) { const act = "extract-smap" if _, ok := payload[revsSmapTag]; !ok { return } - newSmap, msg = &smapX{}, &aisMsg{} + newSmap, msg = &smapX{}, &actMsgExt{} smapValue := payload[revsSmapTag] reader := bytes.NewBuffer(smapValue) if _, err1 := jsp.Decode(io.NopCloser(reader), newSmap, newSmap.JspOpts(), act); err1 != nil { @@ -1701,11 +1701,11 @@ func (h *htrun) extractSmap(payload msPayload, caller string, skipValidation boo return } -func (h *htrun) extractRMD(payload msPayload, caller string) (newRMD *rebMD, msg *aisMsg, err error) { +func (h *htrun) extractRMD(payload msPayload, caller string) (newRMD *rebMD, msg *actMsgExt, err error) { if _, ok := payload[revsRMDTag]; !ok { return } - newRMD, msg = &rebMD{}, &aisMsg{} + newRMD, msg = &rebMD{}, &actMsgExt{} rmdValue := payload[revsRMDTag] if err1 := jsoniter.Unmarshal(rmdValue, newRMD); err1 != nil { err = fmt.Errorf(cmn.FmtErrUnmarshal, h, "new RMD", cos.BHead(rmdValue), err1) @@ -1739,11 +1739,11 @@ func (h *htrun) extractRMD(payload msPayload, caller string) (newRMD *rebMD, msg return } -func (h *htrun) extractBMD(payload msPayload, caller string) (newBMD *bucketMD, msg *aisMsg, err error) { +func (h *htrun) extractBMD(payload msPayload, caller string) (newBMD *bucketMD, msg *actMsgExt, err error) { if _, ok := payload[revsBMDTag]; !ok { return } - newBMD, msg = &bucketMD{}, &aisMsg{} + newBMD, msg = &bucketMD{}, &actMsgExt{} bmdValue := payload[revsBMDTag] reader := bytes.NewBuffer(bmdValue) if _, err1 := jsp.Decode(io.NopCloser(reader), newBMD, newBMD.JspOpts(), "extractBMD"); err1 != nil { @@ -1773,7 +1773,7 @@ func (h *htrun) extractBMD(payload msPayload, caller string) (newBMD *bucketMD, return } -func (h *htrun) receiveSmap(newSmap *smapX, msg *aisMsg, payload msPayload, caller string, cb smapUpdatedCB) error { +func (h *htrun) receiveSmap(newSmap *smapX, msg *actMsgExt, payload msPayload, caller string, cb smapUpdatedCB) error { if newSmap == nil { return nil } @@ -1786,7 +1786,7 @@ func (h *htrun) receiveSmap(newSmap *smapX, msg *aisMsg, payload msPayload, call return h.owner.smap.synchronize(h.si, newSmap, payload, cb) } -func (h *htrun) receiveEtlMD(newEtlMD *etlMD, msg *aisMsg, payload msPayload, caller string, cb func(ne, oe *etlMD)) (err error) { +func (h *htrun) receiveEtlMD(newEtlMD *etlMD, msg *actMsgExt, payload msPayload, caller string, cb func(ne, oe *etlMD)) (err error) { if newEtlMD == nil { return } @@ -1813,7 +1813,7 @@ func (h *htrun) receiveEtlMD(newEtlMD *etlMD, msg *aisMsg, payload msPayload, ca } // under lock -func (h *htrun) _recvCfg(newConfig *globalConfig, msg *aisMsg, payload msPayload) (err error) { +func (h *htrun) _recvCfg(newConfig *globalConfig, msg *actMsgExt, payload msPayload) (err error) { config := cmn.GCO.Get() if newConfig.version() <= config.Version && msg.Action != apc.ActPrimaryForce { if newConfig.version() == config.Version { @@ -1836,7 +1836,7 @@ func (h *htrun) _recvCfg(newConfig *globalConfig, msg *aisMsg, payload msPayload func (h *htrun) extractRevokedTokenList(payload msPayload, caller string) (*tokenList, error) { var ( - msg aisMsg + msg actMsgExt bytes, ok = payload[revsTokenTag] ) if !ok { @@ -2340,16 +2340,16 @@ func ptLatency(tts int64, ptime, isPrimary string) (dur int64) { } // -// aisMsg reader & constructors +// actMsgExt reader & constructors // -func (*htrun) readAisMsg(w http.ResponseWriter, r *http.Request) (msg *aisMsg, err error) { - msg = &aisMsg{} +func (*htrun) readAisMsg(w http.ResponseWriter, r *http.Request) (msg *actMsgExt, err error) { + msg = &actMsgExt{} err = cmn.ReadJSON(w, r, msg) return } -func (msg *aisMsg) String() string { +func (msg *actMsgExt) String() string { s := "aism[" + msg.Action if msg.UUID != "" { s += "[" + msg.UUID + "]" @@ -2360,7 +2360,7 @@ func (msg *aisMsg) String() string { return s + "]" } -func (msg *aisMsg) StringEx() (s string) { +func (msg *actMsgExt) StringEx() (s string) { s = msg.String() vs, err := jsoniter.Marshal(msg.Value) debug.AssertNoErr(err) @@ -2368,16 +2368,16 @@ func (msg *aisMsg) StringEx() (s string) { return } -func (h *htrun) newAmsgStr(msgStr string, bmd *bucketMD) *aisMsg { +func (h *htrun) newAmsgStr(msgStr string, bmd *bucketMD) *actMsgExt { return h.newAmsg(&apc.ActMsg{Value: msgStr}, bmd) } -func (h *htrun) newAmsgActVal(act string, val any) *aisMsg { +func (h *htrun) newAmsgActVal(act string, val any) *actMsgExt { return h.newAmsg(&apc.ActMsg{Action: act, Value: val}, nil) } -func (h *htrun) newAmsg(amsg *apc.ActMsg, bmd *bucketMD, uuid ...string) *aisMsg { - msg := &aisMsg{ActMsg: *amsg} +func (h *htrun) newAmsg(amsg *apc.ActMsg, bmd *bucketMD, uuid ...string) *actMsgExt { + msg := &actMsgExt{ActMsg: *amsg} if bmd != nil { msg.BMDVersion = bmd.Version } else { diff --git a/ais/ic.go b/ais/ic.go index f7578920467..465cf3c83a9 100644 --- a/ais/ic.go +++ b/ais/ic.go @@ -304,7 +304,7 @@ func (ic *ic) handleGet(w http.ResponseWriter, r *http.Request) { func (ic *ic) handlePost(w http.ResponseWriter, r *http.Request) { var ( smap = ic.p.owner.smap.get() - msg = &aisMsg{} + msg = &actMsgExt{} ) if err := cmn.ReadJSON(w, r, msg); err != nil { return diff --git a/ais/metasync.go b/ais/metasync.go index f2fe8e5af14..f7029ba9714 100644 --- a/ais/metasync.go +++ b/ais/metasync.go @@ -46,7 +46,7 @@ import ( // // (shared-replicated-object, associated action-message), // -// where `associated action-message` (aisMsg) provides receivers with the operation +// where `associated action-message` (actMsgExt) provides receivers with the operation // ("action") and other relevant context. // // Further, the metasyncer: @@ -117,7 +117,7 @@ type ( } revsPair struct { revs revs - msg *aisMsg + msg *actMsgExt } revsReq struct { wg *sync.WaitGroup @@ -446,7 +446,7 @@ func (y *metasyncer) jit(pair revsPair) revs { return revs } -// keeping track of per-daemon versioning - TODO: extend to take care of aisMsg where pairs may be empty +// keeping track of per-daemon versioning - TODO: extend to take care of actMsgExt where pairs may be empty func (y *metasyncer) syncDone(si *meta.Snode, pairs []revsPair) { ndr, ok := y.nodesRevs[si.ID()] smap := y.p.owner.smap.get() @@ -526,7 +526,7 @@ func (y *metasyncer) _pending() (pending meta.NodeMap, smap *smapX) { break } if v > revs.version() { - // skip older versions (TODO: don't skip sending associated aisMsg) + // skip older versions (TODO: don't skip sending associated actMsgExt) nlog.Errorf("v: %d; revs.version: %d", v, revs.version()) } } diff --git a/ais/metasync_internal_test.go b/ais/metasync_internal_test.go index ed225d90004..610ce261748 100644 --- a/ais/metasync_internal_test.go +++ b/ais/metasync_internal_test.go @@ -607,7 +607,7 @@ func TestMetasyncData(t *testing.T) { bmd = newBucketMD() ) - emptyAisMsg, err := jsoniter.Marshal(aisMsg{}) + emptyAisMsg, err := jsoniter.Marshal(actMsgExt{}) if err != nil { t.Fatal("Failed to marshal empty apc.ActMsg, err =", err) } @@ -634,7 +634,7 @@ func TestMetasyncData(t *testing.T) { exp[revsSmapTag+revsActionTag] = emptyAisMsg expRetry[revsSmapTag+revsActionTag] = emptyAisMsg - syncer.sync(revsPair{smap, &aisMsg{}}) + syncer.sync(revsPair{smap, &actMsgExt{}}) match(t, expRetry, ch, 1) // sync bucketmd, fail target and retry @@ -656,7 +656,7 @@ func TestMetasyncData(t *testing.T) { exp[revsBMDTag+revsActionTag] = emptyAisMsg expRetry[revsBMDTag+revsActionTag] = emptyAisMsg - syncer.sync(revsPair{bmd, &aisMsg{}}) + syncer.sync(revsPair{bmd, &actMsgExt{}}) match(t, exp, ch, 1) match(t, expRetry, ch, 1) @@ -794,7 +794,7 @@ func TestMetasyncMembership(t *testing.T) { // TestMetasyncReceive tests extracting received sync data. func TestMetasyncReceive(t *testing.T) { { - emptyAisMsg := func(a *aisMsg) { + emptyAisMsg := func(a *actMsgExt) { if a.Action != "" || a.Name != "" || a.Value != nil { t.Fatal("Expecting empty action message", a) } @@ -847,7 +847,7 @@ func TestMetasyncReceive(t *testing.T) { t.Fatal("Extract smap from empty payload returned data") } - wg1 := syncer.sync(revsPair{primary.owner.smap.get(), &aisMsg{}}) + wg1 := syncer.sync(revsPair{primary.owner.smap.get(), &actMsgExt{}}) wg1.Wait() payload := <-chProxy diff --git a/ais/proxy.go b/ais/proxy.go index 90ef249345b..660c96552b1 100644 --- a/ais/proxy.go +++ b/ais/proxy.go @@ -2286,7 +2286,7 @@ func (p *proxy) redirectURL(r *http.Request, si *meta.Snode, ts time.Time, netIn // buffer (see: `queryBuffers`) so we won't request the same objects again. func (p *proxy) lsObjsA(bck *meta.Bck, lsmsg *apc.LsoMsg) (allEntries *cmn.LsoRes, err error) { var ( - aisMsg *aisMsg + actMsgExt *actMsgExt args *bcastArgs entries cmn.LsoEntries results sliceResults @@ -2323,13 +2323,13 @@ func (p *proxy) lsObjsA(bck *meta.Bck, lsmsg *apc.LsoMsg) (allEntries *cmn.LsoRe // what we have locally, so we don't re-request the objects. lsmsg.ContinuationToken = p.qm.b.last(lsmsg.UUID, token) - aisMsg = p.newAmsgActVal(apc.ActList, &lsmsg) + actMsgExt = p.newAmsgActVal(apc.ActList, &lsmsg) args = allocBcArgs() args.req = cmn.HreqArgs{ Method: http.MethodGet, Path: apc.URLPathBuckets.Join(bck.Name), Query: bck.NewQuery(), - Body: cos.MustMarshal(aisMsg), + Body: cos.MustMarshal(actMsgExt), } args.timeout = apc.LongTimeout args.smap = smap @@ -2393,10 +2393,10 @@ end: func (p *proxy) lsObjsR(bck *meta.Bck, lsmsg *apc.LsoMsg, hdr http.Header, smap *smapX, tsi *meta.Snode, config *cmn.Config, wantOnlyRemote bool) (*cmn.LsoRes, error) { var ( - results sliceResults - aisMsg = p.newAmsgActVal(apc.ActList, &lsmsg) - args = allocBcArgs() - timeout = config.Client.ListObjTimeout.D() + results sliceResults + actMsgExt = p.newAmsgActVal(apc.ActList, &lsmsg) + args = allocBcArgs() + timeout = config.Client.ListObjTimeout.D() ) if cos.IsParseBool(hdr.Get(apc.HdrInventory)) { // TODO: extend to other Clouds or, more precisely, other list-objects supporting backends @@ -2423,7 +2423,7 @@ func (p *proxy) lsObjsR(bck *meta.Bck, lsmsg *apc.LsoMsg, hdr http.Header, smap Path: apc.URLPathBuckets.Join(bck.Name), Header: hdr, Query: bck.NewQuery(), - Body: cos.MustMarshal(aisMsg), + Body: cos.MustMarshal(actMsgExt), } if wantOnlyRemote { cargs := allocCargs() @@ -2491,12 +2491,12 @@ func (p *proxy) redirectAction(w http.ResponseWriter, r *http.Request, bck *meta func (p *proxy) listrange(method, bucket string, msg *apc.ActMsg, query url.Values) (xid string, err error) { var ( - smap = p.owner.smap.get() - aisMsg = p.newAmsg(msg, nil, cos.GenUUID()) - body = cos.MustMarshal(aisMsg) - path = apc.URLPathBuckets.Join(bucket) + smap = p.owner.smap.get() + actMsgExt = p.newAmsg(msg, nil, cos.GenUUID()) + body = cos.MustMarshal(actMsgExt) + path = apc.URLPathBuckets.Join(bucket) ) - nlb := xact.NewXactNL(aisMsg.UUID, aisMsg.Action, &smap.Smap, nil) + nlb := xact.NewXactNL(actMsgExt.UUID, actMsgExt.Action, &smap.Smap, nil) nlb.SetOwner(equalIC) p.ic.registerEqual(regIC{smap: smap, query: query, nl: nlb}) args := allocBcArgs() @@ -2513,7 +2513,7 @@ func (p *proxy) listrange(method, bucket string, msg *apc.ActMsg, query url.Valu break } freeBcastRes(results) - xid = aisMsg.UUID + xid = actMsgExt.UUID return } @@ -3081,7 +3081,7 @@ func (p *proxy) htHandler(w http.ResponseWriter, r *http.Request) { // // compare w/ t.receiveConfig -func (p *proxy) receiveConfig(newConfig *globalConfig, msg *aisMsg, payload msPayload, caller string) (err error) { +func (p *proxy) receiveConfig(newConfig *globalConfig, msg *actMsgExt, payload msPayload, caller string) (err error) { oldConfig := cmn.GCO.Get() logmsync(oldConfig.Version, newConfig, msg, caller, newConfig.String(), oldConfig.UUID) @@ -3174,7 +3174,7 @@ func (p *proxy) _remais(newConfig *cmn.ClusterConfig, blocking bool) { nlog.Infof("%s: remais v%d => v%d", p, over, nver) } -func (p *proxy) receiveRMD(newRMD *rebMD, msg *aisMsg, caller string) (err error) { +func (p *proxy) receiveRMD(newRMD *rebMD, msg *actMsgExt, caller string) (err error) { rmd := p.owner.rmd.get() logmsync(rmd.Version, newRMD, msg, caller, newRMD.String(), rmd.CluID) @@ -3229,7 +3229,7 @@ func (p *proxy) smapOnUpdate(newSmap, oldSmap *smapX, nfl, ofl cos.BitFlags) { p.htrun.smapUpdatedCB(newSmap, oldSmap, nfl, ofl) } -func (p *proxy) receiveBMD(newBMD *bucketMD, msg *aisMsg, payload msPayload, caller string) (err error) { +func (p *proxy) receiveBMD(newBMD *bucketMD, msg *actMsgExt, payload msPayload, caller string) (err error) { bmd := p.owner.bmd.get() logmsync(bmd.Version, newBMD, msg, caller, newBMD.String(), bmd.UUID) diff --git a/ais/prxbsumm.go b/ais/prxbsumm.go index c567d150337..c4493c2b66b 100644 --- a/ais/prxbsumm.go +++ b/ais/prxbsumm.go @@ -50,14 +50,14 @@ func (p *proxy) bsummNew(qbck *cmn.QueryBcks, msg *apc.BsummCtrlMsg) (err error) q := qbck.NewQuery() msg.UUID = cos.GenUUID() - aisMsg := p.newAmsgActVal(apc.ActSummaryBck, msg) + actMsgExt := p.newAmsgActVal(apc.ActSummaryBck, msg) args := allocBcArgs() args.req = cmn.HreqArgs{ Method: http.MethodGet, Path: apc.URLPathBuckets.Join(qbck.Name, apc.ActBegin), // compare w/ txn Query: q, - Body: cos.MustMarshal(aisMsg), + Body: cos.MustMarshal(actMsgExt), } // not using default control-plane timeout - // returning only _after_ all targets start running this new job @@ -84,14 +84,14 @@ func (p *proxy) bsummNew(qbck *cmn.QueryBcks, msg *apc.BsummCtrlMsg) (err error) func (p *proxy) bsummCollect(qbck *cmn.QueryBcks, msg *apc.BsummCtrlMsg) (_ cmn.AllBsummResults, status int, err error) { var ( - q = make(url.Values, 4) - aisMsg = p.newAmsgActVal(apc.ActSummaryBck, msg) - args = allocBcArgs() + q = make(url.Values, 4) + actMsgExt = p.newAmsgActVal(apc.ActSummaryBck, msg) + args = allocBcArgs() ) args.req = cmn.HreqArgs{ Method: http.MethodGet, Path: apc.URLPathBuckets.Join(qbck.Name, apc.ActQuery), - Body: cos.MustMarshal(aisMsg), + Body: cos.MustMarshal(actMsgExt), } args.smap = p.owner.smap.get() if cnt := args.smap.CountActiveTs(); cnt < 1 { diff --git a/ais/prxclu.go b/ais/prxclu.go index 0366eabe897..72bd7298389 100644 --- a/ais/prxclu.go +++ b/ais/prxclu.go @@ -516,9 +516,9 @@ func (p *proxy) httpclupost(w http.ResponseWriter, r *http.Request) { if !config.Rebalance.Enabled { regReq.Flags = regReq.Flags.Clear(cos.RebalanceInterrupted) - regReq.Flags = regReq.Flags.Clear(cos.Restarted) + regReq.Flags = regReq.Flags.Clear(cos.NodeRestarted) } - interrupted, restarted := regReq.Flags.IsSet(cos.RebalanceInterrupted), regReq.Flags.IsSet(cos.Restarted) + interrupted, restarted := regReq.Flags.IsSet(cos.RebalanceInterrupted), regReq.Flags.IsSet(cos.NodeRestarted) if nsi.IsTarget() && (interrupted || restarted) { if a, b := p.ClusterStarted(), p.owner.rmd.starting.Load(); !a || b { // handle via rmd.starting + resumeReb @@ -663,7 +663,7 @@ func (p *proxy) _joinKalive(nsi *meta.Snode, regSmap *smapX, apiOp string, flags switch { case keepalive: upd = p.kalive(nsi, osi) - case regReq.Flags.IsSet(cos.Restarted): + case regReq.Flags.IsSet(cos.NodeRestarted): upd = true default: upd = p.rereg(nsi, osi) @@ -692,8 +692,8 @@ func (p *proxy) _joinKalive(nsi *meta.Snode, regSmap *smapX, apiOp string, flags p.owner.smap.put(clone) upd = false if a { - aisMsg := p.newAmsg(msg, nil) - _ = p.metasyncer.sync(revsPair{clone, aisMsg}) + actMsgExt := p.newAmsg(msg, nil) + _ = p.metasyncer.sync(revsPair{clone, actMsgExt}) } return } @@ -752,7 +752,7 @@ func (p *proxy) mcastJoined(nsi *meta.Snode, msg *apc.ActMsg, flags cos.BitFlags msg: msg, flags: flags, interrupted: regReq.Flags.IsSet(cos.RebalanceInterrupted), - restarted: regReq.Flags.IsSet(cos.Restarted), + restarted: regReq.Flags.IsSet(cos.NodeRestarted), } if err = p._earlyGFN(ctx, ctx.nsi, msg.Action, true /*joining*/); err != nil { return @@ -767,14 +767,17 @@ func (p *proxy) mcastJoined(nsi *meta.Snode, msg *apc.ActMsg, flags cos.BitFlags xid = ctx.rmdCtx.rebID return } - // cleanup target state - if ctx.restarted || ctx.interrupted { - go p.cleanupMark(ctx) - } + + // [NOTE] + // one (arguably, cosmetic) side effect of not rebalancing is: markers and node state flags + // e.g. when a node crashes (and then rejoins back again) in a cluster with rebalance disabled + // the node's marker will state "restarted" + // and will remain as such until and if the cluster gets eventually rebalanced + if ctx.gfn { - aisMsg := p.newAmsgActVal(apc.ActStopGFN, nil) // "stop-gfn" timed - aisMsg.UUID = ctx.nsi.ID() - revs := revsPair{&smapX{Smap: meta.Smap{Version: ctx.nver}}, aisMsg} + actMsgExt := p.newAmsgActVal(apc.ActStopGFN, nil) // "stop-gfn" timed + actMsgExt.UUID = ctx.nsi.ID() + revs := revsPair{&smapX{Smap: meta.Smap{Version: ctx.nver}}, actMsgExt} _ = p.metasyncer.notify(false /*wait*/, revs) // async, failed-cnt always zero } return @@ -814,43 +817,6 @@ func (p *proxy) _earlyGFN(ctx *smapModifier, si *meta.Snode, action string, join return nil } -// calls t.cleanupMark -func (p *proxy) cleanupMark(ctx *smapModifier) { - var ( - val = cleanmark{OldVer: ctx.smap.version(), NewVer: ctx.nver, - Interrupted: ctx.interrupted, Restarted: ctx.restarted, - } - msg = apc.ActMsg{Action: apc.ActCleanupMarkers, Value: &val} - cargs = allocCargs() - smap = p.owner.smap.get() - timeout = cmn.Rom.CplaneOperation() - sleep = timeout >> 1 - ) - { - cargs.si = ctx.nsi - cargs.req = cmn.HreqArgs{Method: http.MethodPut, Path: apc.URLPathDae.S, Body: cos.MustMarshal(msg)} - cargs.timeout = timeout - } - time.Sleep(sleep) - for i := range 4 { // retry - res := p.call(cargs, smap) - err := res.err - freeCR(res) - if err == nil { - break - } - if cos.IsRetriableConnErr(err) { - time.Sleep(sleep) - smap = p.owner.smap.get() - nlog.Warningf("%s: %v (cleanmark #%d)", p, err, i+1) - continue - } - nlog.Errorln(err) - break - } - freeCargs(cargs) -} - func (p *proxy) _joinedPre(ctx *smapModifier, clone *smapX) error { if !clone.isPrimary(p.si) { return newErrNotPrimary(p.si, clone, fmt.Sprintf("cannot add %s", ctx.nsi)) @@ -894,11 +860,11 @@ func (p *proxy) _joinedPost(ctx *smapModifier, clone *smapX) { func (p *proxy) _joinedFinal(ctx *smapModifier, clone *smapX) { var ( - tokens = p.authn.revokedTokenList() - bmd = p.owner.bmd.get() - etlMD = p.owner.etl.get() - aisMsg = p.newAmsg(ctx.msg, bmd) - pairs = make([]revsPair, 0, 5) + tokens = p.authn.revokedTokenList() + bmd = p.owner.bmd.get() + etlMD = p.owner.etl.get() + actMsgExt = p.newAmsg(ctx.msg, bmd) + pairs = make([]revsPair, 0, 5) ) // when targets join as well (redundant?, minor) config, err := p.ensureConfigURLs() @@ -909,27 +875,27 @@ func (p *proxy) _joinedFinal(ctx *smapModifier, clone *smapX) { nlog.Errorln(err) // proceed anyway } else if config != nil { - pairs = append(pairs, revsPair{config, aisMsg}) + pairs = append(pairs, revsPair{config, actMsgExt}) } - pairs = append(pairs, revsPair{clone, aisMsg}, revsPair{bmd, aisMsg}) + pairs = append(pairs, revsPair{clone, actMsgExt}, revsPair{bmd, actMsgExt}) if etlMD != nil && etlMD.version() > 0 { - pairs = append(pairs, revsPair{etlMD, aisMsg}) + pairs = append(pairs, revsPair{etlMD, actMsgExt}) } reb := ctx.rmdCtx != nil && ctx.rmdCtx.rebID != "" if !reb { // replicate RMD across (existing nodes will drop it upon version comparison) rmd := p.owner.rmd.get() - pairs = append(pairs, revsPair{rmd, aisMsg}) + pairs = append(pairs, revsPair{rmd, actMsgExt}) } else { debug.Assert(ctx.rmdCtx.prev.version() < ctx.rmdCtx.cur.version()) - aisMsg.UUID = ctx.rmdCtx.rebID - pairs = append(pairs, revsPair{ctx.rmdCtx.cur, aisMsg}) + actMsgExt.UUID = ctx.rmdCtx.rebID + pairs = append(pairs, revsPair{ctx.rmdCtx.cur, actMsgExt}) } if tokens != nil { - pairs = append(pairs, revsPair{tokens, aisMsg}) + pairs = append(pairs, revsPair{tokens, actMsgExt}) } _ = p.metasyncer.sync(pairs...) p.syncNewICOwners(ctx.smap, clone) @@ -937,15 +903,15 @@ func (p *proxy) _joinedFinal(ctx *smapModifier, clone *smapX) { func (p *proxy) _syncFinal(ctx *smapModifier, clone *smapX) { var ( - aisMsg = p.newAmsg(ctx.msg, nil) - pairs = make([]revsPair, 0, 2) - reb = ctx.rmdCtx != nil && ctx.rmdCtx.rebID != "" + actMsgExt = p.newAmsg(ctx.msg, nil) + pairs = make([]revsPair, 0, 2) + reb = ctx.rmdCtx != nil && ctx.rmdCtx.rebID != "" ) - pairs = append(pairs, revsPair{clone, aisMsg}) + pairs = append(pairs, revsPair{clone, actMsgExt}) if reb { debug.Assert(ctx.rmdCtx.prev.version() < ctx.rmdCtx.cur.version()) - aisMsg.UUID = ctx.rmdCtx.rebID - pairs = append(pairs, revsPair{ctx.rmdCtx.cur, aisMsg}) + actMsgExt.UUID = ctx.rmdCtx.rebID + pairs = append(pairs, revsPair{ctx.rmdCtx.cur, actMsgExt}) } debug.Assert(clone._sgl != nil) @@ -955,7 +921,7 @@ func (p *proxy) _syncFinal(ctx *smapModifier, clone *smapX) { return } if config != nil /*updated*/ { - pairs = append(pairs, revsPair{config, aisMsg}) + pairs = append(pairs, revsPair{config, actMsgExt}) } wg := p.metasyncer.sync(pairs...) @@ -1518,9 +1484,9 @@ func (p *proxy) rmTarget(si *meta.Snode, msg *apc.ActMsg, reb bool) (rebID strin } else if ctx.rmdCtx != nil { rebID = ctx.rmdCtx.rebID if rebID == "" && ctx.gfn { // stop early gfn - aisMsg := p.newAmsgActVal(apc.ActStopGFN, nil) - aisMsg.UUID = si.ID() - revs := revsPair{&smapX{Smap: meta.Smap{Version: ctx.nver}}, aisMsg} + actMsgExt := p.newAmsgActVal(apc.ActStopGFN, nil) + actMsgExt.UUID = si.ID() + revs := revsPair{&smapX{Smap: meta.Smap{Version: ctx.nver}}, actMsgExt} _ = p.metasyncer.notify(false /*wait*/, revs) // async, failed-cnt always zero } } diff --git a/ais/prxtxn.go b/ais/prxtxn.go index c038bdef9c7..3cd713162bb 100644 --- a/ais/prxtxn.go +++ b/ais/prxtxn.go @@ -32,7 +32,7 @@ import ( type txnCln struct { p *proxy smap *smapX - msg *aisMsg + msg *actMsgExt uuid string path string req cmn.HreqArgs diff --git a/ais/psetforce.go b/ais/psetforce.go index e533c7225a7..25eb227edfb 100644 --- a/ais/psetforce.go +++ b/ais/psetforce.go @@ -630,7 +630,7 @@ func (h *htrun) daeForceJoin(w http.ResponseWriter, r *http.Request) { } } -func (h *htrun) _prepForceJoin(w http.ResponseWriter, r *http.Request, msg *aisMsg) { +func (h *htrun) _prepForceJoin(w http.ResponseWriter, r *http.Request, msg *actMsgExt) { const tag = "prep-force-join:" var ( callerID = r.Header.Get(apc.HdrCallerID) @@ -662,7 +662,7 @@ func (h *htrun) _prepForceJoin(w http.ResponseWriter, r *http.Request, msg *aisM nlog.Infoln(tag, h.String(), smap.StringEx(), "-> [", npname, nsmap.StringEx(), "]") } -func (h *htrun) _commitForceJoin(w http.ResponseWriter, r *http.Request, msg *aisMsg) { +func (h *htrun) _commitForceJoin(w http.ResponseWriter, r *http.Request, msg *actMsgExt) { const tag = "commit-force-join:" ncm := &cluMeta{} @@ -830,7 +830,7 @@ func (cm *cluMeta) validate() error { return nil } -func (cm *cluMeta) metasync(p *proxy, msg *aisMsg, wait bool) { +func (cm *cluMeta) metasync(p *proxy, msg *actMsgExt, wait bool) { var ( detail string revs = make([]revsPair, 0, 5) diff --git a/ais/rebmeta.go b/ais/rebmeta.go index 9c4878d4714..e652368b2fe 100644 --- a/ais/rebmeta.go +++ b/ais/rebmeta.go @@ -214,7 +214,7 @@ func rmdInc(_ *rmdModifier, clone *rebMD) { clone.inc() } func rmdSync(m *rmdModifier, clone *rebMD) { debug.Assert(m.cur == clone) m.listen(nil) - msg := &aisMsg{ActMsg: apc.ActMsg{Action: apc.ActRebalance}, UUID: m.rebID} // user-requested rebalance + msg := &actMsgExt{ActMsg: apc.ActMsg{Action: apc.ActRebalance}, UUID: m.rebID} // user-requested rebalance wg := m.p.metasyncer.sync(revsPair{m.cur, msg}) if m.wait { wg.Wait() diff --git a/ais/target.go b/ais/target.go index 582d927e3a3..ab89c2fd71a 100644 --- a/ais/target.go +++ b/ais/target.go @@ -91,10 +91,10 @@ var ( func (*target) Name() string { return apc.Target } // as cos.Runner // as htext -func (*target) interruptedRestarted() (interrupted, restarted bool) { - interrupted = fs.MarkerExists(fname.RebalanceMarker) - restarted = fs.MarkerExists(fname.NodeRestartedPrev) - return +func (*target) interruptedRestarted() (i, r bool) { + i = fs.MarkerExists(fname.RebalanceMarker) + r = fs.MarkerExists(fname.NodeRestartedPrev) + return i, r } // @@ -408,9 +408,12 @@ func (t *target) Run() error { err = t.htrun.run(config) - etl.StopAll() // stop all running ETLs if any - cos.Close(db) // close kv db - fs.RemoveMarker(fname.NodeRestartedMarker) // exit gracefully + etl.StopAll() // stop all running ETLs if any + cos.Close(db) // close kv db + + // gracefully + fs.RemoveMarker(fname.NodeRestartedPrev, t.statsT) + fs.RemoveMarker(fname.NodeRestartedMarker, t.statsT) return err } @@ -460,7 +463,7 @@ func (t *target) runResilver(args res.Args, wg *sync.WaitGroup) { if wg != nil { wg.Done() // compare w/ xact.GoRunW(() } - t.res.RunResilver(args) + t.res.RunResilver(args, t.statsT) } func (t *target) endStartupStandby() (err error) { @@ -514,7 +517,7 @@ func (t *target) checkRestarted(config *cmn.Config) (fatalErr, writeErr error) { fatalErr = fmt.Errorf("%s: %q is in use (duplicate or overlapping run?)", t, red.inUse) return } - t.statsT.SetFlag(cos.NodeAlerts, cos.Restarted) + t.statsT.SetFlag(cos.NodeAlerts, cos.NodeRestarted) fs.PersistMarker(fname.NodeRestartedPrev) } fatalErr, writeErr = fs.PersistMarker(fname.NodeRestartedMarker) @@ -906,7 +909,7 @@ func (t *target) httpobjput(w http.ResponseWriter, r *http.Request, apireq *apiR // DELETE [ { action } ] /v1/objects/bucket-name/object-name func (t *target) httpobjdelete(w http.ResponseWriter, r *http.Request, apireq *apiRequest) { - var msg aisMsg + var msg actMsgExt if err := readJSON(w, r, &msg); err != nil { return } diff --git a/ais/tgtbck.go b/ais/tgtbck.go index 715cbb4bbfc..d5e00e9f291 100644 --- a/ais/tgtbck.go +++ b/ais/tgtbck.go @@ -345,7 +345,7 @@ func (t *target) bsumm(w http.ResponseWriter, r *http.Request, phase string, bck // DELETE { action } /v1/buckets/bucket-name // (evict | delete) (list | range) func (t *target) httpbckdelete(w http.ResponseWriter, r *http.Request, apireq *apiRequest) { - msg := aisMsg{} + msg := actMsgExt{} if err := readJSON(w, r, &msg); err != nil { return } diff --git a/ais/tgtcp.go b/ais/tgtcp.go index 3f076c3fb80..126f97d26c0 100644 --- a/ais/tgtcp.go +++ b/ais/tgtcp.go @@ -20,7 +20,6 @@ import ( "github.com/NVIDIA/aistore/cmn" "github.com/NVIDIA/aistore/cmn/cos" "github.com/NVIDIA/aistore/cmn/debug" - "github.com/NVIDIA/aistore/cmn/fname" "github.com/NVIDIA/aistore/cmn/nlog" "github.com/NVIDIA/aistore/core" "github.com/NVIDIA/aistore/core/meta" @@ -213,16 +212,6 @@ func (t *target) daeputMsg(w http.ResponseWriter, r *http.Request) { } t.termKaliveX(msg.Action, opts.NoShutdown) t.decommission(msg.Action, &opts) - case apc.ActCleanupMarkers: - if !t.ensureIntraControl(w, r, true /* from primary */) { - return - } - var ctx cleanmark - if err := cos.MorphMarshal(msg.Value, &ctx); err != nil { - t.writeErr(w, r, err) - return - } - t.cleanupMark(&ctx) default: t.writeErrAct(w, r, msg.Action) } @@ -539,29 +528,6 @@ func (t *target) httpdaedelete(w http.ResponseWriter, r *http.Request) { } } -// called by p.cleanupMark -func (t *target) cleanupMark(ctx *cleanmark) { - smap := t.owner.smap.get() - if smap.version() > ctx.NewVer { - nlog.Warningf("%s: %s is newer - ignoring (and dropping) %v", t, smap, ctx) - return - } - if ctx.Interrupted { - if err := fs.RemoveMarker(fname.RebalanceMarker); err == nil { - nlog.Infof("%s: cleanmark 'rebalance', %s", t, smap) - } else { - nlog.Errorf("%s: failed to cleanmark 'rebalance': %v, %s", t, err, smap) - } - } - if ctx.Restarted { - if err := fs.RemoveMarker(fname.NodeRestartedPrev); err == nil { - nlog.Infof("%s: cleanmark 'restarted', %s", t, smap) - } else { - nlog.Errorf("%s: failed to cleanmark 'restarted': %v, %s", t, err, smap) - } - } -} - func (t *target) handleMpathReq(w http.ResponseWriter, r *http.Request) { msg, err := t.readActionMsg(w, r) if err != nil { @@ -701,7 +667,7 @@ func (t *target) detachMpath(w http.ResponseWriter, r *http.Request, mpath strin } } -func (t *target) receiveBMD(newBMD *bucketMD, msg *aisMsg, payload msPayload, tag, caller string, silent bool) (err error) { +func (t *target) receiveBMD(newBMD *bucketMD, msg *actMsgExt, payload msPayload, tag, caller string, silent bool) (err error) { var oldVer int64 if msg.UUID == "" { oldVer, err = t.applyBMD(newBMD, msg, payload, tag) @@ -740,7 +706,7 @@ func (t *target) receiveBMD(newBMD *bucketMD, msg *aisMsg, payload msPayload, ta return } -func (t *target) applyBMD(newBMD *bucketMD, msg *aisMsg, payload msPayload, tag string) (int64, error) { +func (t *target) applyBMD(newBMD *bucketMD, msg *actMsgExt, payload msPayload, tag string) (int64, error) { var ( smap = t.owner.smap.get() psi *meta.Snode @@ -766,7 +732,7 @@ func (t *target) applyBMD(newBMD *bucketMD, msg *aisMsg, payload msPayload, tag } // executes under lock -func (t *target) _syncBMD(newBMD *bucketMD, msg *aisMsg, payload msPayload, psi *meta.Snode) (rmbcks []*meta.Bck, oldVer int64, emsg string, err error) { +func (t *target) _syncBMD(newBMD *bucketMD, msg *actMsgExt, payload msPayload, psi *meta.Snode) (rmbcks []*meta.Bck, oldVer int64, emsg string, err error) { var ( createErrs []error destroyErrs []error @@ -879,7 +845,7 @@ func (t *target) _postBMD(newBMD *bucketMD, tag string, rmbcks []*meta.Bck) { } // is called under lock -func (t *target) receiveRMD(newRMD *rebMD, msg *aisMsg) (err error) { +func (t *target) receiveRMD(newRMD *rebMD, msg *actMsgExt) (err error) { if msg.Action == apc.ActPrimaryForce { err = t.owner.rmd.synch(newRMD, true) return err @@ -918,7 +884,7 @@ func (t *target) receiveRMD(newRMD *rebMD, msg *aisMsg) (err error) { return } -func (t *target) _runRe(newRMD *rebMD, msg *aisMsg, smap *smapX) { +func (t *target) _runRe(newRMD *rebMD, msg *actMsgExt, smap *smapX) { const tag = "rebalance[" notif := &xact.NotifXact{ Base: nl.Base{When: core.UponTerm, Dsts: []string{equalIC}, F: t.notifyTerm}, @@ -997,7 +963,7 @@ func (t *target) _runRe(newRMD *rebMD, msg *aisMsg, smap *smapX) { t.owner.rmd.put(newRMD) } -func (t *target) ensureLatestBMD(msg *aisMsg, r *http.Request) { +func (t *target) ensureLatestBMD(msg *actMsgExt, r *http.Request) { bmd, bmdVersion := t.owner.bmd.Get(), msg.BMDVersion if bmd.Version < bmdVersion { nlog.Errorf("%s: local %s < v%d %s - running fixup...", t, bmd, bmdVersion, msg) @@ -1167,7 +1133,7 @@ func _stopETLs(newEtlMD, oldEtlMD *etlMD) { } // compare w/ p.receiveConfig -func (t *target) receiveConfig(newConfig *globalConfig, msg *aisMsg, payload msPayload, caller string) (err error) { +func (t *target) receiveConfig(newConfig *globalConfig, msg *actMsgExt, payload msPayload, caller string) (err error) { oldConfig := cmn.GCO.Get() logmsync(oldConfig.Version, newConfig, msg, caller, newConfig.String(), oldConfig.UUID) @@ -1205,7 +1171,7 @@ func (t *target) receiveConfig(newConfig *globalConfig, msg *aisMsg, payload msP } // NOTE: apply the entire config: add new and update existing entries (remote clusters) -func (t *target) attachDetachRemAis(newConfig *globalConfig, msg *aisMsg) (err error) { +func (t *target) attachDetachRemAis(newConfig *globalConfig, msg *actMsgExt) (err error) { var ( aisbp *backend.AISbp aisConf = newConfig.Backend.Get(apc.AIS) diff --git a/ais/tgttxn.go b/ais/tgttxn.go index 1acbea7fcf6..3f3e91110f1 100644 --- a/ais/tgttxn.go +++ b/ais/tgttxn.go @@ -40,7 +40,7 @@ const ActCleanup = "cleanup" // in addition to (apc.ActBegin, ...) // (compare with txnCln) type txnSrv struct { t *target - msg *aisMsg + msg *actMsgExt bck *meta.Bck // aka bckFrom bckTo *meta.Bck query url.Values @@ -281,7 +281,7 @@ func (t *target) makeNCopies(c *txnSrv) (string, error) { return "", nil } -func (t *target) validateMakeNCopies(bck *meta.Bck, msg *aisMsg) (curCopies, newCopies int64, err error) { +func (t *target) validateMakeNCopies(bck *meta.Bck, msg *actMsgExt) (curCopies, newCopies int64, err error) { curCopies = bck.Props.Mirror.Copies newCopies, err = _parseNCopies(msg.Value) if err == nil { @@ -383,7 +383,7 @@ func (t *target) setBprops(c *txnSrv) (string, error) { return "", nil } -func (t *target) validateNprops(bck *meta.Bck, msg *aisMsg) (nprops *cmn.Bprops, err error) { +func (t *target) validateNprops(bck *meta.Bck, msg *actMsgExt) (nprops *cmn.Bprops, err error) { var ( body = cos.MustMarshal(msg.Value) cs = fs.Cap() @@ -474,7 +474,7 @@ func (t *target) renameBucket(c *txnSrv) (string, error) { return "", nil } -func (t *target) validateBckRenTxn(bckFrom, bckTo *meta.Bck, msg *aisMsg) error { +func (t *target) validateBckRenTxn(bckFrom, bckTo *meta.Bck, msg *actMsgExt) error { cs := fs.Cap() if err := cs.Err(); err != nil { return err @@ -777,7 +777,7 @@ func (t *target) ecEncode(c *txnSrv) (string, error) { return "", nil } -func (t *target) validateECEncode(bck *meta.Bck, msg *aisMsg) error { +func (t *target) validateECEncode(bck *meta.Bck, msg *actMsgExt) error { cs := fs.Cap() if err := cs.Err(); err != nil { return err diff --git a/ais/txn.go b/ais/txn.go index 08111166657..24b5b07fb91 100644 --- a/ais/txn.go +++ b/ais/txn.go @@ -39,7 +39,7 @@ type ( isDone() (done bool, err error) set(nlps []core.NLP) // triggers - commitAfter(caller string, msg *aisMsg, err error, args ...any) (bool, error) + commitAfter(caller string, msg *actMsgExt, err error, args ...any) (bool, error) rsvp(err error) // cleanup abort(error) @@ -217,7 +217,7 @@ func (txns *transactions) find(uuid, act string) (txn, error) { return txn, nil } -func (txns *transactions) commitBefore(caller string, msg *aisMsg) error { +func (txns *transactions) commitBefore(caller string, msg *actMsgExt) error { var ( rndzvs rndzvs ok bool @@ -233,7 +233,7 @@ func (txns *transactions) commitBefore(caller string, msg *aisMsg) error { return fmt.Errorf("rendezvous record %s:%d already exists", msg.UUID, rndzvs.timestamp) } -func (txns *transactions) commitAfter(caller string, msg *aisMsg, err error, args ...any) (errDone error) { +func (txns *transactions) commitAfter(caller string, msg *actMsgExt, err error, args ...any) (errDone error) { txns.mtx.Lock() txn, ok := txns.m[msg.UUID] txns.mtx.Unlock() @@ -489,7 +489,7 @@ func (txn *txnBckBase) String() string { return fmt.Sprintf("txn-%s[%s]-%s%s%s]", txn.action, txn.uid, txn.bck.Bucket().String(), tm, res) } -func (txn *txnBckBase) commitAfter(caller string, msg *aisMsg, err error, args ...any) (found bool, errDone error) { +func (txn *txnBckBase) commitAfter(caller string, msg *actMsgExt, err error, args ...any) (found bool, errDone error) { if txn.callerName != caller || msg.UUID != txn.uuid() { return } diff --git a/api/apc/actmsg.go b/api/apc/actmsg.go index 803e600ef08..022f1e85214 100644 --- a/api/apc/actmsg.go +++ b/api/apc/actmsg.go @@ -102,14 +102,13 @@ const ( // internal use const ( - ActAddRemoteBck = "add-remote-bck" // add to BMD existing remote bucket, usually on the fly - ActRmNodeUnsafe = "rm-unsafe" // primary => the node to be removed - ActStartGFN = "start-gfn" // get-from-neighbor - ActStopGFN = "stop-gfn" // off - ActCleanupMarkers = "cleanup-markers" // part of the target joining sequence - ActSelfRemove = "self-initiated-removal" // e.g., when losing last mountpath - ActPrimaryForce = "primary-force" // set primary with force (BEWARE! advanced usage only) - ActBumpMetasync = "bump-metasync" // when executing ActPrimaryForce - the final step + ActAddRemoteBck = "add-remote-bck" // add to BMD existing remote bucket, usually on the fly + ActRmNodeUnsafe = "rm-unsafe" // primary => the node to be removed + ActStartGFN = "start-gfn" // get-from-neighbor + ActStopGFN = "stop-gfn" // off + ActSelfRemove = "self-initiated-removal" // e.g., when losing last mountpath + ActPrimaryForce = "primary-force" // set primary with force (BEWARE! advanced usage only) + ActBumpMetasync = "bump-metasync" // when executing ActPrimaryForce - the final step ) const ( diff --git a/cmd/cli/teb/templates.go b/cmd/cli/teb/templates.go index c92f88d891a..133f8a658d0 100644 --- a/cmd/cli/teb/templates.go +++ b/cmd/cli/teb/templates.go @@ -55,7 +55,7 @@ const ( indent1 + "Software:\t{{FormatCluSoft .Version .BuildTime}}\n" + indent1 + "Deployment:\t{{ ( Deployments .Status) }}\n" + indent1 + "Status:\t{{ ( OnlineStatus .Status) }}\n" + - indent1 + "Rebalance:\t{{ ( Rebalance .Status) }}\n" + + indent1 + "Rebalance:\t{{FormatRebalance .Status .CluConfig}}\n" + indent1 + "Authentication:\t{{if .CluConfig.Auth.Enabled}}enabled{{else}}disabled{{end}}\n" + indent1 + "Version:\t{{ ( Versions .Status) }}\n" + indent1 + "Build:\t{{ ( BuildTimes .Status) }}\n" @@ -400,6 +400,7 @@ var ( "FormatDaemonID": fmtDaemonID, "FormatSmap": fmtSmap, "FormatCluSoft": fmtCluSoft, + "FormatRebalance": fmtRebalance, "FormatProxiesSumm": fmtProxiesSumm, "FormatTargetsSumm": fmtTargetsSumm, "FormatCapPctMAM": fmtCapPctMAM, @@ -424,7 +425,6 @@ var ( "Deployments": func(h StatsAndStatusHelper) string { return toString(h.deployments()) }, "Versions": func(h StatsAndStatusHelper) string { return toString(h.versions()) }, "BuildTimes": func(h StatsAndStatusHelper) string { return toString(h.buildTimes()) }, - "Rebalance": func(h StatsAndStatusHelper) string { return toString(h.rebalance()) }, } AliasTemplate = "ALIAS\tCOMMAND\n{{range $alias := .}}" + diff --git a/cmd/cli/teb/utils.go b/cmd/cli/teb/utils.go index 3a492d99bbb..5c7b36991ee 100644 --- a/cmd/cli/teb/utils.go +++ b/cmd/cli/teb/utils.go @@ -195,6 +195,18 @@ func fmtCluSoft(version, build string) string { return version + " (build: " + build + ")" } +func fmtRebalance(h StatsAndStatusHelper, config *cmn.ClusterConfig) (out string) { + out = toString(h.rebalance()) + if config.Rebalance.Enabled { + return out + } + disabled := fred("disabled") + if out == NotSetVal || out == UnknownStatusVal { + return disabled + } + return out + " (" + disabled + ")" +} + func fmtStringList(lst []string) string { if len(lst) == 0 { return unknownVal diff --git a/cmn/cos/ioutils.go b/cmn/cos/ioutils.go index 3ed81004eb3..d1f57fc1356 100644 --- a/cmn/cos/ioutils.go +++ b/cmn/cos/ioutils.go @@ -116,7 +116,7 @@ func RemoveFile(path string) (err error) { if os.IsNotExist(err) { err = nil } - return + return err } // and computes checksum, if requested diff --git a/cmn/cos/node_state.go b/cmn/cos/node_state.go index 9c9b69bad9f..ed5fde87bdc 100644 --- a/cmn/cos/node_state.go +++ b/cmn/cos/node_state.go @@ -23,7 +23,7 @@ const ( RebalanceInterrupted // warning Resilvering // warning ResilverInterrupted // warning - Restarted // warning + NodeRestarted // warning (powercycle, crash) OOS // red alert (see IsRed below) OOM // red alert MaintenanceMode // warning @@ -48,7 +48,7 @@ func (f NodeStateFlags) IsRed() bool { func (f NodeStateFlags) IsWarn() bool { return f.IsSet(Rebalancing) || f.IsSet(RebalanceInterrupted) || f.IsSet(Resilvering) || f.IsSet(ResilverInterrupted) || - f.IsSet(Restarted) || f.IsSet(MaintenanceMode) || + f.IsSet(NodeRestarted) || f.IsSet(MaintenanceMode) || f.IsSet(LowCapacity) || f.IsSet(LowMemory) || f.IsSet(CertWillSoonExpire) } @@ -94,7 +94,7 @@ func (f NodeStateFlags) String() string { if f&ResilverInterrupted == ResilverInterrupted { sb = append(sb, "resilver-interrupted") } - if f&Restarted == Restarted { + if f&NodeRestarted == NodeRestarted { sb = append(sb, "restarted") } if f&OOS == OOS { diff --git a/fs/persistent_md.go b/fs/persistent_md.go index 5e10095a523..40aa4e742c5 100644 --- a/fs/persistent_md.go +++ b/fs/persistent_md.go @@ -74,18 +74,25 @@ func PersistMarker(marker string) (fatalErr, writeErr error) { return fatalErr, writeErr } -func RemoveMarker(marker string) (err error) { +func RemoveMarker(marker string, stup cos.StatsUpdater) (err error) { var ( avail = GetAvail() relname = filepath.Join(fname.MarkersDir, marker) ) for _, mi := range avail { if er1 := cos.RemoveFile(filepath.Join(mi.Path, relname)); er1 != nil { - nlog.Errorf("Failed to remove %q marker from %q: %v", relname, mi.Path, er1) err = er1 } } - return + switch marker { + case fname.RebalanceMarker: + stup.ClrFlag(cos.NodeAlerts, cos.RebalanceInterrupted|cos.Rebalancing) + case fname.ResilverMarker: + stup.ClrFlag(cos.NodeAlerts, cos.ResilverInterrupted|cos.Resilvering) + case fname.NodeRestartedPrev: + stup.ClrFlag(cos.NodeAlerts, cos.NodeRestarted) + } + return err } // PersistOnMpaths persists `what` on mountpaths under "mountpath.Path/path" filename. diff --git a/fs/persistent_md_test.go b/fs/persistent_md_test.go index 8e8b5e7d50e..a0e0141c6bb 100644 --- a/fs/persistent_md_test.go +++ b/fs/persistent_md_test.go @@ -8,6 +8,7 @@ import ( "testing" "github.com/NVIDIA/aistore/cmn/fname" + "github.com/NVIDIA/aistore/core/mock" "github.com/NVIDIA/aistore/fs" "github.com/NVIDIA/aistore/tools" "github.com/NVIDIA/aistore/tools/tassert" @@ -28,6 +29,7 @@ func checkMarkersExist(t *testing.T, xs ...markerEntry) { func TestMarkers(t *testing.T) { const mpathsCnt = 5 mpaths := tools.PrepareMountPaths(t, mpathsCnt) + mockst := mock.NewStatsTracker() defer tools.RemoveMpaths(t, mpaths) checkMarkersExist(t, @@ -53,14 +55,14 @@ func TestMarkers(t *testing.T) { markerEntry{marker: fname.ResilverMarker, exists: true}, ) - fs.RemoveMarker(fname.RebalanceMarker) + fs.RemoveMarker(fname.RebalanceMarker, mockst) checkMarkersExist(t, markerEntry{marker: fname.RebalanceMarker, exists: false}, markerEntry{marker: fname.ResilverMarker, exists: true}, ) - fs.RemoveMarker(fname.ResilverMarker) + fs.RemoveMarker(fname.ResilverMarker, mockst) checkMarkersExist(t, markerEntry{marker: fname.RebalanceMarker, exists: false}, diff --git a/reb/bcast.go b/reb/bcast.go index a0d45592fbf..91db654afa0 100644 --- a/reb/bcast.go +++ b/reb/bcast.go @@ -5,6 +5,7 @@ package reb import ( + "errors" "fmt" "net/url" "time" @@ -17,6 +18,7 @@ import ( "github.com/NVIDIA/aistore/cmn/nlog" "github.com/NVIDIA/aistore/core" "github.com/NVIDIA/aistore/core/meta" + "github.com/NVIDIA/aistore/xact" jsoniter "github.com/json-iterator/go" ) @@ -96,7 +98,7 @@ func (reb *Reb) pingTarget(tsi *meta.Snode, rargs *rebArgs) (ok bool) { } // wait for target to get ready to receive objects (type syncCallback) -func (reb *Reb) rxReady(tsi *meta.Snode, rargs *rebArgs) (ok bool) { +func (reb *Reb) rxReady(tsi *meta.Snode, rargs *rebArgs) bool /*ready*/ { var ( curwt time.Duration sleep = cmn.Rom.CplaneOperation() * 2 @@ -108,17 +110,23 @@ func (reb *Reb) rxReady(tsi *meta.Snode, rargs *rebArgs) (ok bool) { // do not request the node stage if it has sent stage notification return true } - if _, ok = reb.checkStage(tsi, rargs, rebStageTraverse); ok { - return + status, ok := reb.checkStage(tsi, rargs, rebStageTraverse) + if ok { + debug.Assert(status.Running, "running: ", status.Running) + debug.Assert(xact.RebID2S(status.RebID) == xreb.ID(), "xid: ", status.RebID, " vs ", xreb.ID()) + return true + } + if xreb.IsAborted() { + return false } if err := xreb.AbortedAfter(sleep); err != nil { - return + return false } curwt += sleep } logHdr, tname := reb.logHdr(rargs.id, rargs.smap), tsi.StringEx() nlog.Errorln(logHdr, "timed out waiting for", tname, "to reach", stages[rebStageTraverse], "stage") - return + return false } // wait for the target to reach `rebStageFin` (i.e., finish traversing and sending) @@ -213,22 +221,28 @@ func (reb *Reb) checkStage(tsi *meta.Snode, rargs *rebArgs, desiredStage uint32) reb.abortAndBroadcast(err) return } - // enforce global transaction ID + // + // enforce global RebID + // + otherXid := xact.RebID2S(status.RebID) if status.RebID > reb.rebID.Load() { - err := cmn.NewErrAborted(xreb.Name(), logHdr, fmt.Errorf("%s runs newer g%d", tname, status.RebID)) + err := cmn.NewErrAborted(xreb.Name(), logHdr, errors.New(tname+" runs newer "+otherXid)) reb.abortAndBroadcast(err) return } if xreb.IsAborted() { return } - // let the target to catch-up + // keep waiting if status.RebID < reb.RebID() { - nlog.Warningf("%s: %s runs older (g%d) global rebalance - keep waiting...", logHdr, tname, status.RebID) + var what = "runs" + if !status.Running { + what = "transitioning(?) from" + } + nlog.Warningf("%s: %s[%s, v%d] %s older rebalance - keep waiting", logHdr, tname, otherXid, status.RebVersion, what) return } - // Remote target has aborted its running rebalance with the same ID. - // Do not call `reb.abortAndBroadcast()` - no need. + // other target aborted same ID (do not call `reb.abortAndBroadcast` - no need) if status.RebID == reb.RebID() && status.Aborted { err := cmn.NewErrAborted(xreb.Name(), logHdr, fmt.Errorf("status 'aborted' from %s", tname)) xreb.Abort(err) @@ -236,8 +250,9 @@ func (reb *Reb) checkStage(tsi *meta.Snode, rargs *rebArgs, desiredStage uint32) } if status.Stage >= desiredStage { ok = true - return + return // Ok } - nlog.Infof("%s: %s[%s] not yet at the right stage %s", logHdr, tname, stages[status.Stage], stages[desiredStage]) + + nlog.Infof("%s: %s[%s, v%d] not yet at the right stage %s", logHdr, tname, stages[status.Stage], status.RebVersion, stages[desiredStage]) return } diff --git a/reb/globrun.go b/reb/globrun.go index d449d97c9e5..813422d9fae 100644 --- a/reb/globrun.go +++ b/reb/globrun.go @@ -55,16 +55,7 @@ const ( const maxWackTargets = 4 -var stages = map[uint32]string{ - rebStageInactive: "", - rebStageInit: "", - rebStageTraverse: "", - rebStageWaitAck: "", - rebStageFin: "", - rebStageFinStreams: "", - rebStageDone: "", - rebStageAbort: "", -} +const initCapLomAcks = 128 const fmtpend = "%s: newer rebalance[g%d] pending - not running" @@ -116,6 +107,17 @@ type ( } ) +var stages = map[uint32]string{ + rebStageInactive: "", + rebStageInit: "", + rebStageTraverse: "", + rebStageWaitAck: "", + rebStageFin: "", + rebStageFinStreams: "", + rebStageDone: "", + rebStageAbort: "", +} + func New(config *cmn.Config) *Reb { var ( reb = &Reb{ @@ -211,8 +213,8 @@ func (reb *Reb) RunRebalance(smap *meta.Smap, id int64, notif *xact.NotifXact, t reb.stages.stage.Store(rebStageDone) reb.unregRecv() reb.semaCh.Release() - fs.RemoveMarker(fname.RebalanceMarker) - fs.RemoveMarker(fname.NodeRestartedPrev) + fs.RemoveMarker(fname.RebalanceMarker, tstats) + fs.RemoveMarker(fname.NodeRestartedPrev, tstats) reb.xctn().Finish() return } @@ -220,7 +222,7 @@ func (reb *Reb) RunRebalance(smap *meta.Smap, id int64, notif *xact.NotifXact, t // abort all running `dtor.AbortRebRes` xactions (download, dsort, etl) xreg.AbortByNewReb(errors.New("reason: starting " + reb.xctn().Name())) - // At this point, only one rebalance is running + // only one rebalance is running ----------------- reb.lastrx.Store(0) @@ -231,19 +233,21 @@ func (reb *Reb) RunRebalance(smap *meta.Smap, id int64, notif *xact.NotifXact, t tstats.SetFlag(cos.NodeAlerts, cos.Rebalancing) - var ( - errCnt int - err = reb.run(rargs) - ) + // run + err := reb.run(rargs) if err == nil { - errCnt = reb.rebWaitAck(rargs) + errCnt := reb.rebWaitAck(rargs) + if errCnt == 0 { + nlog.Infoln(logHdr, "=> stage-fin") + } else { + nlog.Warningln(logHdr, "=> stage-fin", "[ num-errors:", errCnt, "]") + } } else { - nlog.Warningln(err) + nlog.Errorln(logHdr, "fail => stage-fin:", err) } - nlog.Infoln(logHdr, "=> stage-fin", "[", err, errCnt, "]") reb.changeStage(rebStageFin) - reb.fini(rargs, logHdr, err) + reb.fini(rargs, logHdr, err, tstats) tstats.ClrFlag(cos.NodeAlerts, cos.Rebalancing) offGFN() @@ -260,23 +264,23 @@ func (reb *Reb) RunRebalance(smap *meta.Smap, id int64, notif *xact.NotifXact, t // 2. Multi-bucket rebalance may start both non-EC and EC in parallel. // It then waits until everything finishes. func (reb *Reb) run(rargs *rebArgs) error { - // 6. Capture stats, start mpath joggers reb.stages.stage.Store(rebStageTraverse) // No EC-enabled buckets - run only regular rebalance + xid := xact.RebID2S(rargs.id) if !rargs.ecUsed { - nlog.Infof("starting g%d", rargs.id) + nlog.Infoln("starting", xid) return reb.runNoEC(rargs) } // In all other cases run both rebalances simultaneously group := &errgroup.Group{} group.Go(func() error { - nlog.Infof("starting non-EC g%d", rargs.id) + nlog.Infoln("starting non-EC", xid) return reb.runNoEC(rargs) }) group.Go(func() error { - nlog.Infof("starting EC g%d", rargs.id) + nlog.Infoln("starting EC", xid) return reb.runEC(rargs) }) return group.Wait() @@ -346,7 +350,7 @@ func (reb *Reb) acquire(rargs *rebArgs, logHdr string) (newerRMD, alreadyRunning err := reb._preempt(rargs, logHdr, total, maxTotal, errcnt) if err != nil { if total > maxwt { - cos.ExitLog(err) + cos.ExitLog(err) // TODO -- FIXME: rewrite } errcnt++ } @@ -417,17 +421,25 @@ func (reb *Reb) initRenew(rargs *rebArgs, notif *xact.NotifXact, logHdr string, nlog.Warningf(fmtpend, logHdr, id) return false } + reb.stages.stage.Store(rebStageInit) xreb := xctn.(*xs.Rebalance) reb.setXact(xreb) reb.rebID.Store(rargs.id) - // check Smap _prior_ to opening streams + // prior to opening streams: + // not every change in Smap warants a different rebalance but this one (below) definitely does smap := core.T.Sowner().Get() - if smap.Version != rargs.smap.Version { + if smap.CountActiveTs() != rargs.smap.CountActiveTs() { debug.Assert(smap.Version > rargs.smap.Version) - nlog.Errorf("Warning %s: %s post-init version change %s => %s", core.T, xreb, rargs.smap, smap) - // TODO: handle an unlikely corner case keeping in mind that not every change warants a different rebalance + err := fmt.Errorf("%s post-renew change %s => %s", xreb, rargs.smap.StringEx(), smap.StringEx()) + xctn.Abort(err) + reb.mu.Unlock() + nlog.Errorln(err) + return false + } + if smap.Version != rargs.smap.Version { + nlog.Warningln(logHdr, "post-renew change:", rargs.smap.StringEx(), "=>", smap.StringEx(), "- proceeding anyway") } // 3. init streams and data structures @@ -442,7 +454,7 @@ func (reb *Reb) initRenew(rargs *rebArgs, notif *xact.NotifXact, logHdr string, } acks := reb.lomAcks() for i := range len(acks) { // init lom acks - acks[i] = &lomAcks{mu: &sync.Mutex{}, q: make(map[string]*core.LOM, 64)} + acks[i] = &lomAcks{mu: &sync.Mutex{}, q: make(map[string]*core.LOM, initCapLomAcks)} } // 4. create persistent mark @@ -463,7 +475,8 @@ func (reb *Reb) initRenew(rargs *rebArgs, notif *xact.NotifXact, logHdr string, reb.stages.cleanup() reb.mu.Unlock() - nlog.Infoln(reb.logHdr(rargs.id, rargs.smap), "- running", reb.xctn()) + + nlog.Infoln(logHdr, "- running", reb.xctn()) return true } @@ -712,21 +725,36 @@ func (reb *Reb) _aborted(rargs *rebArgs) (yes bool) { return } -func (reb *Reb) fini(rargs *rebArgs, logHdr string, err error) { +func (reb *Reb) fini(rargs *rebArgs, logHdr string, err error, tstats cos.StatsUpdater) { var ( + tag string stats core.Stats qui = &qui{rargs: rargs, reb: reb, logHdr: logHdr} xreb = reb.xctn() + cnt = xreb.ErrCnt() ) - nlog.Infoln(logHdr, "fini") + if cnt == 0 { + nlog.Infoln(logHdr, "fini => quiesce") + } else { + nlog.Warningln(logHdr, "fini [", cnt, "] => quiesce") + } - // prior to closing the streams - if xreb.Quiesce(rargs.config.Transport.QuiesceTime.D(), qui.quicb) != core.QuiAborted { - if errM := fs.RemoveMarker(fname.RebalanceMarker); errM == nil { + // prior to closing streams + ret := xreb.Quiesce(rargs.config.Transport.QuiesceTime.D(), qui.quicb) + cnt = xreb.ErrCnt() + + // cleanup markers + if ret != core.QuiAborted && ret != core.QuiTimeout { + tag = "qui-aborted" + if errM := fs.RemoveMarker(fname.RebalanceMarker, tstats); errM == nil { nlog.Infoln(logHdr, "removed marker ok") } - _ = fs.RemoveMarker(fname.NodeRestartedPrev) + _ = fs.RemoveMarker(fname.NodeRestartedPrev, tstats) + if ret == core.QuiTimeout { + tag = "qui-timeout" + } } + reb.endStreams(err) reb.filterGFN.Reset() @@ -741,10 +769,13 @@ func (reb *Reb) fini(rargs *rebArgs, logHdr string, err error) { reb.unregRecv() reb.semaCh.Release() - if !xreb.Finished() { - xreb.Finish() + xreb.Finish() + + if ret != core.QuiAborted && ret != core.QuiTimeout && cnt == 0 { + nlog.Infoln(logHdr, "done", xreb.String()) + } else { + nlog.Infoln(logHdr, "finished with errors [", tag, cnt, "]", xreb.String()) } - nlog.Infoln(logHdr, "done", xreb.String()) } ////////////////////////////// diff --git a/reb/status.go b/reb/status.go index 5ec93a7e21f..9baae29e037 100644 --- a/reb/status.go +++ b/reb/status.go @@ -55,7 +55,7 @@ func (reb *Reb) RebStatus(status *Status) { } } } else if status.Running { - nlog.Warningln(core.T.String()+": transitioning (renewing) to", marked.Xact.String()) + nlog.Warningln(core.T.String(), "transitioning (renewing) to", marked.Xact.String()) status.Running = false } diff --git a/reb/utils.go b/reb/utils.go index 01589984006..ad81d59bc94 100644 --- a/reb/utils.go +++ b/reb/utils.go @@ -49,7 +49,7 @@ func (reb *Reb) logHdr(rebID int64, smap *meta.Smap, initializing ...bool) strin sb.WriteString(core.T.String()) sb.WriteString("[g") - sb.WriteString(strconv.FormatInt(rebID, 10)) + sb.WriteString(strconv.FormatInt(rebID, 10)) // (compare with `xact.RebID2S`) sb.WriteByte(',') if smap != nil { sb.WriteByte('v') diff --git a/res/resilver.go b/res/resilver.go index 3505e20090f..5b8be370286 100644 --- a/res/resilver.go +++ b/res/resilver.go @@ -77,7 +77,7 @@ func (res *Res) _end() { res.end.Store(mono.NanoTime()) } -func (res *Res) RunResilver(args Args) { +func (res *Res) RunResilver(args Args, tstats cos.StatsUpdater) { res._begin() defer res._end() if fatalErr, writeErr := fs.PersistMarker(fname.ResilverMarker); fatalErr != nil || writeErr != nil { @@ -128,7 +128,7 @@ func (res *Res) RunResilver(args Args) { // run and block waiting res.end.Store(0) jg.Run() - err = wait(jg, xres) + err = wait(jg, xres, tstats) if err != nil { xres.AddErr(err) } @@ -140,7 +140,7 @@ func (res *Res) RunResilver(args Args) { } // Wait for an abort or for resilvering joggers to finish. -func wait(jg *mpather.Jgroup, xres *xs.Resilver) (err error) { +func wait(jg *mpather.Jgroup, xres *xs.Resilver, tstats cos.StatsUpdater) (err error) { for { select { case errCause := <-xres.ChanAbort(): @@ -151,7 +151,7 @@ func wait(jg *mpather.Jgroup, xres *xs.Resilver) (err error) { } return cmn.NewErrAborted(xres.Name(), "", errCause) case <-jg.ListenFinished(): - if err = fs.RemoveMarker(fname.ResilverMarker); err == nil { + if err = fs.RemoveMarker(fname.ResilverMarker, tstats); err == nil { nlog.Infoln(core.T.String()+":", xres.Name(), "removed marker ok") } return diff --git a/stats/target_stats.go b/stats/target_stats.go index cfb84790ea4..00ac0a25620 100644 --- a/stats/target_stats.go +++ b/stats/target_stats.go @@ -629,7 +629,7 @@ func (r *Trunner) log(now int64, uptime time.Duration, config *cmn.Config) { // clear 'node-restarted' if uptime > 10*time.Hour { - clr |= cos.Restarted + clr |= cos.NodeRestarted } // 7. separately, memory w/ set/clr flags cumulative diff --git a/xact/base.go b/xact/base.go index 8b85e87cbf4..6a79d0caa54 100644 --- a/xact/base.go +++ b/xact/base.go @@ -408,7 +408,7 @@ func (xctn *Base) ToStats(stats *core.Stats) { // RebID helpers -func RebID2S(id int64) string { return fmt.Sprintf("g%d", id) } +func RebID2S(id int64) string { return "g" + strconv.FormatInt(id, 10) } func S2RebID(id string) (int64, error) { return strconv.ParseInt(id[1:], 10, 64) } func IsValidRebID(id string) (valid bool) {