From fe71dc88e94b0227e9bdfd23b4141154adfcd02b Mon Sep 17 00:00:00 2001 From: Alex Aizman Date: Wed, 4 Dec 2024 18:41:25 -0500 Subject: [PATCH] global rebalance: continued refactoring * consolidate runtime args (`rargs`); remove associated duplication * remove _abort() by (Smap version change) * data mover: further reduce quiescence time for aborted - in re: 5dffbe915dc243 Signed-off-by: Alex Aizman --- .gitlab-ci.yml | 8 +- reb/bcast.go | 59 +++++------ reb/ec.go | 55 +++++----- reb/globrun.go | 200 ++++++++++++++++++------------------- reb/qui.go | 11 +- transport/bundle/dmover.go | 4 +- 6 files changed, 162 insertions(+), 175 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 59553491f6..b4fc1498bd 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -57,7 +57,7 @@ include: stage: test-short tags: - $RUNNER_TAG - timeout: 32m + timeout: 35m <<: *default_only_def except: variables: @@ -69,7 +69,7 @@ include: stage: test-short tags: - $RUNNER_TAG - timeout: 32m + timeout: 35m rules: - if: '$CI_PIPELINE_SOURCE == "merge_request_event" || $CI_COMMIT_BRANCH == "main"' when: manual @@ -81,7 +81,7 @@ include: stage: test-short tags: - $RUNNER_TAG - timeout: 32m + timeout: 35m rules: - if: '$CI_PIPELINE_SOURCE == "schedule" || $CI_PIPELINE_SOURCE == "web"' - if: '$CI_PIPELINE_SOURCE == "merge_request_event" || $CI_COMMIT_BRANCH == "main"' @@ -505,7 +505,7 @@ test:short:python-etl: # e.g. RE: "ETLBucket|ETLConnectionError|ETLInitCode" (or any other regex to select tests) test:short:assorted:k8s: extends: .test_k8s_short_template - timeout: 32m + timeout: 35m rules: - if: '$CI_PIPELINE_SOURCE == "merge_request_event" || $CI_COMMIT_BRANCH == "main"' when: manual diff --git a/reb/bcast.go b/reb/bcast.go index 91db654afa..adde33153a 100644 --- a/reb/bcast.go +++ b/reb/bcast.go @@ -60,40 +60,37 @@ func bcast(rargs *rebArgs, cb syncCallback) (errCnt int) { }(tsi) } wg.Wait() - errCnt = int(cnt.Load()) - return + return int(cnt.Load()) } // pingTarget checks if target is running (type syncCallback) -// TODO: reuse keepalive -func (reb *Reb) pingTarget(tsi *meta.Snode, rargs *rebArgs) (ok bool) { +func _pingTarget(tsi *meta.Snode, rargs *rebArgs) (ok bool) { const retries = 4 var ( - ver = rargs.smap.Version - sleep = cmn.Rom.CplaneOperation() - logHdr = reb.logHdr(rargs.id, rargs.smap) - tname = tsi.StringEx() + ver = rargs.smap.Version + sleep = cmn.Rom.CplaneOperation() + tname = tsi.StringEx() ) for i := range retries { _, code, err := core.T.Health(tsi, cmn.Rom.MaxKeepalive(), nil) if err == nil { if i > 0 { - nlog.Infoln(logHdr+":", tname, "is now reachable") + nlog.Infoln(rargs.logHdr, tname, "is now reachable") } return true } if !cos.IsUnreachable(err, code) { - nlog.Errorf("%s: health(%s) returned %v(%d) - aborting", logHdr, tname, err, code) + nlog.Errorf("%s: health(%s) returned %v(%d) - aborting", rargs.logHdr, tname, err, code) return } - nlog.Warningf("%s: waiting for %s, err %v(%d)", logHdr, tname, err, code) + nlog.Warningln(rargs.logHdr, "waiting for:", tname, "[", err, code, "]") time.Sleep(sleep) nver := core.T.Sowner().Get().Version if nver > ver { return } } - nlog.Errorf("%s: timed out waiting for %s", logHdr, tname) + nlog.Errorln(rargs.logHdr, "timed out waiting for:", tname) return } @@ -103,7 +100,7 @@ func (reb *Reb) rxReady(tsi *meta.Snode, rargs *rebArgs) bool /*ready*/ { curwt time.Duration sleep = cmn.Rom.CplaneOperation() * 2 maxwt = rargs.config.Rebalance.DestRetryTime.D() + rargs.config.Rebalance.DestRetryTime.D()/2 - xreb = reb.xctn() + xreb = rargs.xreb ) for curwt < maxwt { if reb.stages.isInStage(tsi, rebStageTraverse) { @@ -124,8 +121,8 @@ func (reb *Reb) rxReady(tsi *meta.Snode, rargs *rebArgs) bool /*ready*/ { } curwt += sleep } - logHdr, tname := reb.logHdr(rargs.id, rargs.smap), tsi.StringEx() - nlog.Errorln(logHdr, "timed out waiting for", tname, "to reach", stages[rebStageTraverse], "stage") + tname := tsi.StringEx() + nlog.Errorln(rargs.logHdr, "timed out waiting for", tname, "to reach", stages[rebStageTraverse], "stage") return false } @@ -139,13 +136,12 @@ func (reb *Reb) waitAcksExtended(tsi *meta.Snode, rargs *rebArgs) (ok bool) { sleep = rargs.config.Timeout.CplaneOperation.D() maxwt = rargs.config.Rebalance.DestRetryTime.D() sleepRetry = cmn.KeepaliveRetryDuration(rargs.config) - logHdr = reb.logHdr(rargs.id, rargs.smap) - xreb = reb.xctn() + xreb = rargs.xreb ) - debug.Assertf(reb.RebID() == xreb.RebID(), "%s (rebID=%d) vs %s", logHdr, reb.RebID(), xreb) + debug.Assertf(reb.RebID() == xreb.RebID(), "%s (rebID=%d) vs %s", rargs.logHdr, reb.RebID(), xreb) for curwt < maxwt { if err := xreb.AbortedAfter(sleep); err != nil { - nlog.Infof("%s: abort wack (%v)", logHdr, err) + nlog.Infof("%s: abort wack (%v)", rargs.logHdr, err) return } if reb.stages.isInStage(tsi, rebStageFin) { @@ -157,7 +153,7 @@ func (reb *Reb) waitAcksExtended(tsi *meta.Snode, rargs *rebArgs) (ok bool) { return } if err := xreb.AbortErr(); err != nil { - nlog.Infof("%s: abort wack (%v)", logHdr, err) + nlog.Infof("%s: abort wack (%v)", rargs.logHdr, err) return } // @@ -166,20 +162,20 @@ func (reb *Reb) waitAcksExtended(tsi *meta.Snode, rargs *rebArgs) (ok bool) { var w4me bool // true: this target is waiting for ACKs from me for _, si := range status.Targets { if si.ID() == core.T.SID() { - nlog.Infof("%s: keep wack <= %s[%s]", logHdr, tsi.StringEx(), stages[status.Stage]) + nlog.Infof("%s: keep wack <= %s[%s]", rargs.logHdr, tsi.StringEx(), stages[status.Stage]) w4me = true break } } if !w4me { - nlog.Infof("%s: %s[%s] ok (not waiting for me)", logHdr, tsi.StringEx(), stages[status.Stage]) + nlog.Infof("%s: %s[%s] ok (not waiting for me)", rargs.logHdr, tsi.StringEx(), stages[status.Stage]) ok = true return } time.Sleep(sleepRetry) curwt += sleepRetry } - nlog.Errorf("%s: timed out waiting for %s to reach %s", logHdr, tsi.StringEx(), stages[rebStageFin]) + nlog.Errorf("%s: timed out waiting for %s to reach %s", rargs.logHdr, tsi.StringEx(), stages[rebStageFin]) return } @@ -190,19 +186,18 @@ func (reb *Reb) waitAcksExtended(tsi *meta.Snode, rargs *rebArgs) (ok bool) { func (reb *Reb) checkStage(tsi *meta.Snode, rargs *rebArgs, desiredStage uint32) (status *Status, ok bool) { var ( sleepRetry = cmn.KeepaliveRetryDuration(rargs.config) - logHdr = reb.logHdr(rargs.id, rargs.smap) query = url.Values{apc.QparamRebStatus: []string{"true"}} - xreb = reb.xctn() + xreb = rargs.xreb tname = tsi.StringEx() ) if xreb == nil || xreb.IsAborted() { return } - debug.Assertf(reb.RebID() == xreb.RebID(), "%s (rebID=%d) vs %s", logHdr, reb.RebID(), xreb) + debug.Assertf(reb.RebID() == xreb.RebID(), "%s (rebID=%d) vs %s", rargs.logHdr, reb.RebID(), xreb) body, code, err := core.T.Health(tsi, apc.DefaultTimeout, query) if err != nil { if errAborted := xreb.AbortedAfter(sleepRetry); errAborted != nil { - nlog.Infoln(logHdr, "abort check status", errAborted) + nlog.Infoln(rargs.logHdr, "abort check status", errAborted) return } body, code, err = core.T.Health(tsi, apc.DefaultTimeout, query) // retry once @@ -217,7 +212,7 @@ func (reb *Reb) checkStage(tsi *meta.Snode, rargs *rebArgs, desiredStage uint32) status = &Status{} err = jsoniter.Unmarshal(body, status) if err != nil { - err = fmt.Errorf(cmn.FmtErrUnmarshal, logHdr, "reb status from "+tname, cos.BHead(body), err) + err = fmt.Errorf(cmn.FmtErrUnmarshal, rargs.logHdr, "reb status from "+tname, cos.BHead(body), err) reb.abortAndBroadcast(err) return } @@ -226,7 +221,7 @@ func (reb *Reb) checkStage(tsi *meta.Snode, rargs *rebArgs, desiredStage uint32) // otherXid := xact.RebID2S(status.RebID) if status.RebID > reb.rebID.Load() { - err := cmn.NewErrAborted(xreb.Name(), logHdr, errors.New(tname+" runs newer "+otherXid)) + err := cmn.NewErrAborted(xreb.Name(), rargs.logHdr, errors.New(tname+" runs newer "+otherXid)) reb.abortAndBroadcast(err) return } @@ -239,12 +234,12 @@ func (reb *Reb) checkStage(tsi *meta.Snode, rargs *rebArgs, desiredStage uint32) if !status.Running { what = "transitioning(?) from" } - nlog.Warningf("%s: %s[%s, v%d] %s older rebalance - keep waiting", logHdr, tname, otherXid, status.RebVersion, what) + nlog.Warningf("%s: %s[%s, v%d] %s older rebalance - keep waiting", rargs.logHdr, tname, otherXid, status.RebVersion, what) return } // other target aborted same ID (do not call `reb.abortAndBroadcast` - no need) if status.RebID == reb.RebID() && status.Aborted { - err := cmn.NewErrAborted(xreb.Name(), logHdr, fmt.Errorf("status 'aborted' from %s", tname)) + err := cmn.NewErrAborted(xreb.Name(), rargs.logHdr, fmt.Errorf("status 'aborted' from %s", tname)) xreb.Abort(err) return } @@ -253,6 +248,6 @@ func (reb *Reb) checkStage(tsi *meta.Snode, rargs *rebArgs, desiredStage uint32) return // Ok } - nlog.Infof("%s: %s[%s, v%d] not yet at the right stage %s", logHdr, tname, stages[status.Stage], status.RebVersion, stages[desiredStage]) + nlog.Infof("%s: %s[%s, v%d] not yet at the right stage %s", rargs.logHdr, tname, stages[status.Stage], status.RebVersion, stages[desiredStage]) return } diff --git a/reb/ec.go b/reb/ec.go index a4d13fd9d2..e52bdeeaba 100644 --- a/reb/ec.go +++ b/reb/ec.go @@ -45,36 +45,34 @@ import ( // update their metafiles. Targets do not overwrite their metafiles with a new // one. They update only `Daemons` and `FullReplica` fields. -func (reb *Reb) runECjoggers() { +func (reb *Reb) runECjoggers(rargs *rebArgs) { var ( - wg = &sync.WaitGroup{} - avail = fs.GetAvail() - cfg = cmn.GCO.Get() - b = reb.xctn().Bck() + wg = &sync.WaitGroup{} + b = rargs.xreb.Bck() ) - for _, mi := range avail { + for _, mi := range rargs.apaths { bck := cmn.Bck{Provider: apc.AIS} if b != nil { bck = cmn.Bck{Name: b.Name, Provider: apc.AIS, Ns: b.Ns} } wg.Add(1) - go reb.jogEC(mi, &bck, wg) + go reb.jogEC(mi, &bck, wg, rargs) } - for _, provider := range cfg.Backend.Providers { - for _, mi := range avail { + for _, provider := range rargs.config.Backend.Providers { + for _, mi := range rargs.apaths { bck := cmn.Bck{Provider: provider.Name} if b != nil { bck = cmn.Bck{Name: bck.Name, Provider: provider.Name, Ns: bck.Ns} } wg.Add(1) - go reb.jogEC(mi, &bck, wg) + go reb.jogEC(mi, &bck, wg, rargs) } } wg.Wait() } // mountpath walker - walks through files in /meta/ directory -func (reb *Reb) jogEC(mi *fs.Mountpath, bck *cmn.Bck, wg *sync.WaitGroup) { +func (reb *Reb) jogEC(mi *fs.Mountpath, bck *cmn.Bck, wg *sync.WaitGroup, rargs *rebArgs) { defer wg.Done() opts := &fs.WalkOpts{ Mi: mi, @@ -84,11 +82,11 @@ func (reb *Reb) jogEC(mi *fs.Mountpath, bck *cmn.Bck, wg *sync.WaitGroup) { } opts.Bck.Copy(bck) if err := fs.Walk(opts); err != nil { - xreb := reb.xctn() + xreb := rargs.xreb if xreb.IsAborted() || xreb.Finished() { - nlog.Infof("aborting traversal") + nlog.Infoln(rargs.logHdr, "aborting traversal") } else { - nlog.Warningf("failed to traverse, err: %v", err) + nlog.Warningln(rargs.logHdr, "failed to traverse, err:", err) } } } @@ -130,7 +128,7 @@ func (reb *Reb) sendFromDisk(ct *core.CT, meta *ec.Metadata, target *meta.Snode, ) if lom != nil { defer core.FreeLOM(lom) - roc, errReader = lom.NewDeferROC() + roc, errReader = lom.NewDeferROC() // + unlock } else { roc, errReader = cos.NewFileHandle(fqn) } @@ -155,8 +153,8 @@ func (reb *Reb) sendFromDisk(ct *core.CT, meta *ec.Metadata, target *meta.Snode, return fmt.Errorf("failed to send slices to nodes [%s..]: %v", target.ID(), err) } - xreb := reb.xctn() - xreb.OutObjsAdd(1, o.Hdr.ObjAttrs.Size) + xctn := reb.xctn() + xctn.OutObjsAdd(1, o.Hdr.ObjAttrs.Size) return nil } @@ -164,28 +162,29 @@ func (reb *Reb) sendFromDisk(ct *core.CT, meta *ec.Metadata, target *meta.Snode, // 1. Full object/replica is received // 2. A CT is received and this target is not the default target (it // means that the CTs came from default target after EC had been rebuilt) -func (reb *Reb) saveCTToDisk(ntfn *stageNtfn, hdr *transport.ObjHdr, data io.Reader) error { - cos.Assert(ntfn.md != nil) +func (reb *Reb) saveCTToDisk(ntfn *stageNtfn, hdr *transport.ObjHdr, data io.Reader) (err error) { var ( - err error - bck = meta.CloneBck(&hdr.Bck) + bck = meta.CloneBck(&hdr.Bck) + xctn = reb.xctn() ) - if err := bck.Init(core.T.Bowner()); err != nil { - return err + if ern := bck.Init(core.T.Bowner()); ern != nil { + return ern } + md := ntfn.md.NewPack() if ntfn.md.SliceID != 0 { - args := &ec.WriteArgs{Reader: data, MD: md, Xact: reb.xctn()} + args := &ec.WriteArgs{Reader: data, MD: md, Xact: xctn} err = ec.WriteSliceAndMeta(hdr, args) } else { var lom *core.LOM lom, err = ec.AllocLomFromHdr(hdr) if err == nil { - args := &ec.WriteArgs{Reader: data, MD: md, Cksum: hdr.ObjAttrs.Cksum, Xact: reb.xctn()} + args := &ec.WriteArgs{Reader: data, MD: md, Cksum: hdr.ObjAttrs.Cksum, Xact: xctn} err = ec.WriteReplicaAndMeta(lom, args) } core.FreeLOM(lom) } + return err } @@ -283,10 +282,10 @@ func (reb *Reb) renameLocalCT(req *stageNtfn, ct *core.CT, md *ec.Metadata) ( } func (reb *Reb) walkEC(fqn string, de fs.DirEntry) error { - xreb := reb.xctn() - if err := xreb.AbortErr(); err != nil { + xctn := reb.xctn() + if err := xctn.AbortErr(); err != nil { // notify `dir.Walk` to stop iterations - nlog.Infoln(xreb.Name(), "walk-ec aborted", err) + nlog.Infoln(xctn.Name(), "walk-ec aborted", err) return err } diff --git a/reb/globrun.go b/reb/globrun.go index 19df0ef212..5258f46f26 100644 --- a/reb/globrun.go +++ b/reb/globrun.go @@ -96,7 +96,9 @@ type ( rebArgs struct { smap *meta.Smap config *cmn.Config + xreb *xs.Rebalance apaths fs.MPI + logHdr string id int64 ecUsed bool } @@ -217,9 +219,9 @@ func (reb *Reb) RunRebalance(smap *meta.Smap, id int64, notif *xact.NotifXact, t var ( bmd = core.T.Bowner().Get() - rargs = &rebArgs{id: id, smap: smap, config: cmn.GCO.Get(), ecUsed: bmd.IsECUsed()} + rargs = &rebArgs{id: id, smap: smap, config: cmn.GCO.Get(), ecUsed: bmd.IsECUsed(), logHdr: logHdr} // and run with it ) - if !reb.pingall(rargs, logHdr) { + if !_pingall(rargs) { return } if err := reb.regRecv(); err != nil { @@ -238,7 +240,7 @@ func (reb *Reb) RunRebalance(smap *meta.Smap, id int64, notif *xact.NotifXact, t if bmd.IsEmpty() { haveStreams = false } - if !reb.initRenew(rargs, notif, logHdr, haveStreams) { + if !reb.initRenew(rargs, notif, haveStreams) { reb.unregRecv() return } @@ -249,14 +251,14 @@ func (reb *Reb) RunRebalance(smap *meta.Smap, id int64, notif *xact.NotifXact, t reb.unregRecv() fs.RemoveMarker(fname.RebalanceMarker, tstats) fs.RemoveMarker(fname.NodeRestartedPrev, tstats) - reb.xctn().Finish() + rargs.xreb.Finish() return } nlog.Infoln(logHdr, "initializing") // abort all running `dtor.AbortRebRes` xactions (download, dsort, etl) - xreg.AbortByNewReb(errors.New("reason: starting " + reb.xctn().Name())) + xreg.AbortByNewReb(errors.New("reason: starting " + rargs.xreb.Name())) // only one rebalance is running ----------------- @@ -283,7 +285,7 @@ func (reb *Reb) RunRebalance(smap *meta.Smap, id int64, notif *xact.NotifXact, t } reb.changeStage(rebStageFin) - reb.fini(rargs, logHdr, err, tstats) + reb.fini(rargs, err, tstats) tstats.ClrFlag(cos.NodeAlerts, cos.Rebalancing) offGFN() @@ -303,42 +305,34 @@ func (reb *Reb) run(rargs *rebArgs) error { reb.stages.stage.Store(rebStageTraverse) // No EC-enabled buckets - run only regular rebalance - xid := xact.RebID2S(rargs.id) if !rargs.ecUsed { - nlog.Infoln("starting", xid) return reb.runNoEC(rargs) } // In all other cases run both rebalances simultaneously group := &errgroup.Group{} group.Go(func() error { - nlog.Infoln("starting non-EC", xid) return reb.runNoEC(rargs) }) group.Go(func() error { - nlog.Infoln("starting EC", xid) return reb.runEC(rargs) }) return group.Wait() } -func (reb *Reb) pingall(rargs *rebArgs, logHdr string) bool { - if rargs.smap.Version == 0 { - rargs.smap = core.T.Sowner().Get() - } +func _pingall(rargs *rebArgs) bool { + debug.Assert(rargs.smap.Version > 0) // validated in _runRe + // whether other targets are up and running - if errCnt := bcast(rargs, reb.pingTarget); errCnt > 0 { - nlog.Errorln(logHdr, "not starting: ping err-s", errCnt) + if errCnt := bcast(rargs, _pingTarget); errCnt > 0 { + nlog.Errorln(rargs.logHdr, "not starting: ping err-s", errCnt) return false } - if rargs.smap.Version == 0 { - rargs.smap = core.T.Sowner().Get() - } rargs.apaths = fs.GetAvail() return true } -func (reb *Reb) initRenew(rargs *rebArgs, notif *xact.NotifXact, logHdr string, haveStreams bool) bool { +func (reb *Reb) initRenew(rargs *rebArgs, notif *xact.NotifXact, haveStreams bool) bool { rns := xreg.RenewRebalance(rargs.id) if rns.Err != nil { return false @@ -347,15 +341,15 @@ func (reb *Reb) initRenew(rargs *rebArgs, notif *xact.NotifXact, logHdr string, return false } xctn := rns.Entry.Get() + rargs.xreb = xctn.(*xs.Rebalance) - notif.Xact = xctn - xctn.AddNotif(notif) + notif.Xact = rargs.xreb + rargs.xreb.AddNotif(notif) reb.mu.Lock() reb.stages.stage.Store(rebStageInit) - xreb := xctn.(*xs.Rebalance) - reb.setXact(xreb) + reb.setXact(rargs.xreb) reb.rebID.Store(rargs.id) // prior to opening streams: @@ -363,14 +357,14 @@ func (reb *Reb) initRenew(rargs *rebArgs, notif *xact.NotifXact, logHdr string, smap := core.T.Sowner().Get() if smap.CountActiveTs() != rargs.smap.CountActiveTs() { debug.Assert(smap.Version > rargs.smap.Version) - err := fmt.Errorf("%s post-renew change %s => %s", xreb, rargs.smap.StringEx(), smap.StringEx()) - xctn.Abort(err) + err := fmt.Errorf("%s post-renew change %s => %s", rargs.xreb, rargs.smap.StringEx(), smap.StringEx()) + rargs.xreb.Abort(err) reb.mu.Unlock() nlog.Errorln(err) return false } if smap.Version != rargs.smap.Version { - nlog.Warningln(logHdr, "post-renew change:", rargs.smap.StringEx(), "=>", smap.StringEx(), "- proceeding anyway") + nlog.Warningln(rargs.logHdr, "post-renew change:", rargs.smap.StringEx(), "=>", smap.StringEx(), "- proceeding anyway") } // 3. init streams and data structures @@ -384,7 +378,7 @@ func (reb *Reb) initRenew(rargs *rebArgs, notif *xact.NotifXact, logHdr string, if dm := reb.dm.Renew(trname, reb.recvObj, cmn.OwtRebalance, dmExtra); dm != nil { reb.dm = dm } - reb.beginStreams(rargs.config) + reb.beginStreams(rargs) } if reb.awaiting.targets == nil { @@ -404,7 +398,7 @@ func (reb *Reb) initRenew(rargs *rebArgs, notif *xact.NotifXact, logHdr string, err = fatalErr } reb.endStreams(err) - xctn.Abort(err) + rargs.xreb.Abort(err) reb.mu.Unlock() nlog.Errorln("FATAL:", fatalErr, "WRITE:", writeErr) return false @@ -416,21 +410,20 @@ func (reb *Reb) initRenew(rargs *rebArgs, notif *xact.NotifXact, logHdr string, reb.mu.Unlock() - nlog.Infoln(logHdr, "- running", reb.xctn()) + nlog.Infoln(rargs.logHdr, "- running", rargs.xreb.String()) return true } -func (reb *Reb) beginStreams(config *cmn.Config) { +func (reb *Reb) beginStreams(rargs *rebArgs) { debug.Assert(reb.stages.stage.Load() == rebStageInit) - xreb := reb.xctn() - reb.dm.SetXact(xreb) + reb.dm.SetXact(rargs.xreb) reb.dm.Open() pushArgs := bundle.Args{ Net: reb.dm.NetC(), Trname: trnamePsh, - Multiplier: config.Rebalance.SbundleMult, - Extra: &transport.Extra{SenderID: xreb.ID(), Config: config}, + Multiplier: rargs.config.Rebalance.SbundleMult, + Extra: &transport.Extra{SenderID: rargs.xreb.ID(), Config: rargs.config}, } reb.pushes = bundle.New(transport.NewIntraDataClient(), pushArgs) } @@ -444,48 +437,46 @@ func (reb *Reb) endStreams(err error) { // when at least one bucket has EC enabled func (reb *Reb) runEC(rargs *rebArgs) error { + nlog.Infoln(rargs.logHdr, "start ec run") + errCnt := bcast(rargs, reb.rxReady) // ignore timeout - xreb := reb.xctn() - if err := xreb.AbortErr(); err != nil { - logHdr := reb.logHdr(rargs.id, rargs.smap) - nlog.Infoln(logHdr, "abort ec rx-ready", err, "num-fail", errCnt) + if err := rargs.xreb.AbortErr(); err != nil { + nlog.Infoln(rargs.logHdr, "abort ec rx-ready", err, "num-fail", errCnt) return err } if errCnt > 0 { - logHdr := reb.logHdr(rargs.id, rargs.smap) - nlog.Errorln(logHdr, "ec rx-ready num-fail", errCnt) // unlikely + nlog.Errorln(rargs.logHdr, "ec rx-ready num-fail", errCnt) // unlikely } - reb.runECjoggers() + reb.runECjoggers(rargs) - if err := xreb.AbortErr(); err != nil { - logHdr := reb.logHdr(rargs.id, rargs.smap) - nlog.Infoln(logHdr, "abort ec-joggers", err) + if err := rargs.xreb.AbortErr(); err != nil { + nlog.Warningln(rargs.logHdr, "finish ec run, abort ec-joggers: [", err) return err } - nlog.Infof("[%s] RebalanceEC done", core.T.SID()) + nlog.Infoln(rargs.logHdr, "finish ec run") return nil } // when not a single bucket has EC enabled func (reb *Reb) runNoEC(rargs *rebArgs) error { + nlog.Infoln(rargs.logHdr, "start no-ec run") + errCnt := bcast(rargs, reb.rxReady) // ignore timeout - xreb := reb.xctn() - if err := xreb.AbortErr(); err != nil { - logHdr := reb.logHdr(rargs.id, rargs.smap) - nlog.Infoln(logHdr, "abort rx-ready", err, "num-fail", errCnt) + if err := rargs.xreb.AbortErr(); err != nil { + nlog.Infoln(rargs.logHdr, "abort rx-ready", err, "num-fail", errCnt) return err } if errCnt > 0 { - logHdr := reb.logHdr(rargs.id, rargs.smap) - nlog.Errorln(logHdr, "rx-ready num-fail", errCnt) // unlikely + nlog.Errorln(rargs.logHdr, "rx-ready num-fail:", errCnt) // unlikely } - - wg := &sync.WaitGroup{} - ver := rargs.smap.Version + var ( + wg = &sync.WaitGroup{} + ver = rargs.smap.Version + ) for _, mi := range rargs.apaths { rl := &rebJogger{ - joggerBase: joggerBase{m: reb, xreb: reb.xctn(), wg: wg}, + joggerBase: joggerBase{m: reb, xreb: rargs.xreb, wg: wg}, smap: rargs.smap, ver: ver, } wg.Add(1) @@ -493,14 +484,11 @@ func (reb *Reb) runNoEC(rargs *rebArgs) error { } wg.Wait() - if err := xreb.AbortErr(); err != nil { - logHdr := reb.logHdr(rargs.id, rargs.smap) - nlog.Infoln(logHdr, "abort joggers", err) + if err := rargs.xreb.AbortErr(); err != nil { + nlog.Warningln(rargs.logHdr, "finish no-ec run, abort joggers: [", err, "]") return err } - if cmn.Rom.FastV(4, cos.SmoduleReb) { - nlog.Infof("finished rebalance walk (g%d)", rargs.id) - } + nlog.Infoln(rargs.logHdr, "finish no-ec run") return nil } @@ -509,13 +497,12 @@ func (reb *Reb) rebWaitAck(rargs *rebArgs) (errCnt int) { cnt int sleep = rargs.config.Timeout.CplaneOperation.D() maxwt = rargs.config.Rebalance.DestRetryTime.D() - xreb = reb.xctn() + xreb = rargs.xreb smap = rargs.smap ) maxwt += time.Duration(int64(time.Minute) * int64(rargs.smap.CountTargets()/10)) maxwt = min(maxwt, rargs.config.Rebalance.DestRetryTime.D()*2) reb.changeStage(rebStageWaitAck) - logHdr := reb.logHdr(rargs.id, rargs.smap) for { curwt := time.Duration(0) @@ -532,7 +519,7 @@ func (reb *Reb) rebWaitAck(rargs *rebArgs) (errCnt int) { for _, lom := range lomack.q { tsi, err := smap.HrwHash2T(lom.Digest()) if err == nil { - nlog.Infoln("waiting for", lom.String(), "ACK from", tsi.StringEx()) + nlog.Infoln(rargs.logHdr, "waiting for", lom.String(), "ACK from", tsi.StringEx()) logged = true break } @@ -540,25 +527,26 @@ func (reb *Reb) rebWaitAck(rargs *rebArgs) (errCnt int) { } } lomack.mu.Unlock() + if err := xreb.AbortErr(); err != nil { - nlog.Infoln(logHdr, "abort wait-ack:", err) + nlog.Infoln(rargs.logHdr, "abort wait-ack:", err) return } } if cnt == 0 { - nlog.Infoln(logHdr, "received all ACKs") + nlog.Infoln(rargs.logHdr, "received all ACKs") break } - nlog.Warningln(logHdr, "waiting for", cnt, "ACKs") if err := xreb.AbortedAfter(sleep); err != nil { - nlog.Infoln(logHdr, "abort wait-ack:", err) + nlog.Infoln(rargs.logHdr, "abort wait-ack:", err) return } + nlog.Warningln(rargs.logHdr, "waiting for", cnt, "ACKs") curwt += sleep } if cnt > 0 { - nlog.Warningf("%s: timed out waiting for %d ACK%s", logHdr, cnt, cos.Plural(cnt)) + nlog.Warningf("%s: timed out waiting for %d ACK%s", rargs.logHdr, cnt, cos.Plural(cnt)) } if xreb.IsAborted() { return @@ -567,37 +555,42 @@ func (reb *Reb) rebWaitAck(rargs *rebArgs) (errCnt int) { // NOTE: requires locally migrated objects *not* to be removed at the src aPaths, _ := fs.Get() if len(aPaths) > len(rargs.apaths) { - nlog.Warningf("%s: mountpath changes detected (%d, %d)", logHdr, len(aPaths), len(rargs.apaths)) + nlog.Warningf("%s: mountpath changes detected (%d, %d)", rargs.logHdr, len(aPaths), len(rargs.apaths)) } // 8. synchronize - nlog.Infof("%s: poll targets for: stage=(%s or %s***)", logHdr, stages[rebStageFin], stages[rebStageWaitAck]) + nlog.Infof("%s: poll targets for: stage=(%s or %s***)", rargs.logHdr, stages[rebStageFin], stages[rebStageWaitAck]) errCnt = bcast(rargs, reb.waitAcksExtended) if xreb.IsAborted() { return } // 9. retransmit if needed - cnt = reb.retransmit(rargs, xreb) - if cnt == 0 || reb.xctn().IsAborted() { + cnt = reb.retransmit(rargs) + if cnt == 0 || xreb.IsAborted() { break } - nlog.Warningf("%s: retransmitted %d, more wack...", logHdr, cnt) + nlog.Warningln(rargs.logHdr, "retransmitted", cnt, "keeping wack...") } return } -func (reb *Reb) retransmit(rargs *rebArgs, xreb *xs.Rebalance) (cnt int) { - if reb._aborted(rargs) { +func (reb *Reb) retransmit(rargs *rebArgs) (cnt int) { + xreb := rargs.xreb + if xreb.IsAborted() { return } var ( - rj = &rebJogger{joggerBase: joggerBase{ - m: reb, xreb: reb.xctn(), - wg: &sync.WaitGroup{}, - }, smap: rargs.smap} - loghdr = reb.logHdr(rargs.id, rargs.smap) + rj = &rebJogger{ + joggerBase: joggerBase{ + m: reb, + xreb: rargs.xreb, + wg: &sync.WaitGroup{}, + }, + smap: rargs.smap, + } + loghdr = rargs.logHdr ) for _, lomAck := range reb.lomAcks() { lomAck.mu.Lock() @@ -635,43 +628,39 @@ func (reb *Reb) retransmit(rargs *rebArgs, xreb *xs.Rebalance) (cnt int) { } else { if cmn.IsErrStreamTerminated(err) { xreb.Abort(err) - nlog.Errorf("%s: stream term-ed (%v)", loghdr, err) + nlog.Errorln(loghdr, "stream term-ed:", err) } else { err = fmt.Errorf("%s: failed to retransmit %s => %s: %w", loghdr, lom, tsi.StringEx(), err) rj.xreb.AddErr(err) } } - if reb._aborted(rargs) { + + if xreb.IsAborted() { lomAck.mu.Unlock() return 0 } } + lomAck.mu.Unlock() - if reb._aborted(rargs) { + if xreb.IsAborted() { return 0 } } return } -func (reb *Reb) _aborted(rargs *rebArgs) (yes bool) { - yes = reb.xctn().IsAborted() - yes = yes || (rargs.smap.Version != core.T.Sowner().Get().Version) - return -} - -func (reb *Reb) fini(rargs *rebArgs, logHdr string, err error, tstats cos.StatsUpdater) { +func (reb *Reb) fini(rargs *rebArgs, err error, tstats cos.StatsUpdater) { var ( tag string stats core.Stats - qui = &qui{rargs: rargs, reb: reb, logHdr: logHdr} - xreb = reb.xctn() + qui = &qui{rargs: rargs, reb: reb} + xreb = rargs.xreb cnt = xreb.ErrCnt() ) if cnt == 0 { - nlog.Infoln(logHdr, "fini => quiesce") + nlog.Infoln(rargs.logHdr, "fini => quiesce") } else { - nlog.Warningln(logHdr, "fini [", cnt, "] => quiesce") + nlog.Warningln(rargs.logHdr, "fini [", cnt, "] => quiesce") } // prior to closing streams @@ -682,7 +671,7 @@ func (reb *Reb) fini(rargs *rebArgs, logHdr string, err error, tstats cos.StatsU if ret != core.QuiAborted && ret != core.QuiTimeout { tag = "qui-aborted" if errM := fs.RemoveMarker(fname.RebalanceMarker, tstats); errM == nil { - nlog.Infoln(logHdr, "removed marker ok") + nlog.Infoln(rargs.logHdr, "removed marker ok") } _ = fs.RemoveMarker(fname.NodeRestartedPrev, tstats) if ret == core.QuiTimeout { @@ -706,9 +695,9 @@ func (reb *Reb) fini(rargs *rebArgs, logHdr string, err error, tstats cos.StatsU xreb.Finish() if ret != core.QuiAborted && ret != core.QuiTimeout && cnt == 0 { - nlog.Infoln(logHdr, "done", xreb.String()) + nlog.Infoln(rargs.logHdr, "done", xreb.String()) } else { - nlog.Infoln(logHdr, "finished with errors [", tag, cnt, "]", xreb.String()) + nlog.Infoln(rargs.logHdr, "finished with errors: [", tag, cnt, "]", xreb.String()) } } @@ -750,14 +739,19 @@ func (rj *rebJogger) objSentCallback(hdr *transport.ObjHdr, _ io.ReadCloser, arg rj.xreb.OutObjsAdd(1, hdr.ObjAttrs.Size) // NOTE: double-counts retransmissions return } - // log err + + // err if cmn.Rom.FastV(4, cos.SmoduleReb) || !cos.IsRetriableConnErr(err) { - if bundle.IsErrDestinationMissing(err) { + switch { + case bundle.IsErrDestinationMissing(err): nlog.Errorf("%s: %v, %s", rj.xreb.Name(), err, rj.smap) - } else { + case cmn.IsErrStreamTerminated(err): + rj.xreb.Abort(err) + nlog.Errorln("stream term-ed: [", err, rj.xreb.Name(), "]") + default: lom, ok := arg.(*core.LOM) debug.Assert(ok) - nlog.Errorf("%s: %s failed to send %s: %v", core.T, rj.xreb.Name(), lom, err) + nlog.Errorf("%s: %s failed to send %s: %v (%T)", core.T, rj.xreb.Name(), lom, err, err) // abort??? } } } diff --git a/reb/qui.go b/reb/qui.go index c5ecd5181f..bd26986ec2 100644 --- a/reb/qui.go +++ b/reb/qui.go @@ -20,10 +20,9 @@ import ( const logIval = time.Minute type qui struct { - rargs *rebArgs - reb *Reb - logHdr string - i time.Duration // to log every logIval + rargs *rebArgs + reb *Reb + i time.Duration // to log every logIval } func (q *qui) quicb(total time.Duration) core.QuiRes { @@ -41,7 +40,7 @@ func (q *qui) quicb(total time.Duration) core.QuiRes { if i := total / logIval; i > q.i { q.i = i locStage := q.reb.stages.stage.Load() - nlog.Infoln(q.logHdr, "keep receiving in", stages[locStage], "stage") + nlog.Infoln(q.rargs.logHdr, "keep receiving in", stages[locStage], "stage") } return core.QuiActive } @@ -56,7 +55,7 @@ func (q *qui) quicb(total time.Duration) core.QuiRes { if status != nil && status.Running && status.Stage < rebStageFin { if i := total / logIval; i > q.i { q.i = i - nlog.Infoln(q.logHdr, "in", stages[locStage], "waiting for:", tsi.StringEx(), stages[status.Stage]) + nlog.Infoln(q.rargs.logHdr, "in", stages[locStage], "waiting for:", tsi.StringEx(), stages[status.Stage]) } return core.QuiActiveDontBump } diff --git a/transport/bundle/dmover.go b/transport/bundle/dmover.go index 9e183e0cad..5ab0ef8a23 100644 --- a/transport/bundle/dmover.go +++ b/transport/bundle/dmover.go @@ -1,7 +1,7 @@ // Package bundle provides multi-streaming transport with the functionality // to dynamically (un)register receive endpoints, establish long-lived flows, and more. /* - * Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2018-2024, NVIDIA CORPORATION. All rights reserved. */ package bundle @@ -159,7 +159,7 @@ func (dm *DataMover) UnregRecv() { if dm.xctn != nil { timeout := dm.config.Transport.QuiesceTime.D() if dm.xctn.IsAborted() { - timeout = min(timeout>>1, dm.config.Timeout.CplaneOperation.D()) + timeout = time.Second } dm.Quiesce(timeout) }