Skip to content

Commit

Permalink
global rebalance: continued refactoring
Browse files Browse the repository at this point in the history
* consolidate runtime args (`rargs`); remove associated duplication
* remove _abort() by (Smap version change)
* data mover: further reduce quiescence time for aborted
  - in re: 5dffbe9

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Dec 5, 2024
1 parent 0a307ed commit fe71dc8
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 175 deletions.
8 changes: 4 additions & 4 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ include:
stage: test-short
tags:
- $RUNNER_TAG
timeout: 32m
timeout: 35m
<<: *default_only_def
except:
variables:
Expand All @@ -69,7 +69,7 @@ include:
stage: test-short
tags:
- $RUNNER_TAG
timeout: 32m
timeout: 35m
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event" || $CI_COMMIT_BRANCH == "main"'
when: manual
Expand All @@ -81,7 +81,7 @@ include:
stage: test-short
tags:
- $RUNNER_TAG
timeout: 32m
timeout: 35m
rules:
- if: '$CI_PIPELINE_SOURCE == "schedule" || $CI_PIPELINE_SOURCE == "web"'
- if: '$CI_PIPELINE_SOURCE == "merge_request_event" || $CI_COMMIT_BRANCH == "main"'
Expand Down Expand Up @@ -505,7 +505,7 @@ test:short:python-etl:
# e.g. RE: "ETLBucket|ETLConnectionError|ETLInitCode" (or any other regex to select tests)
test:short:assorted:k8s:
extends: .test_k8s_short_template
timeout: 32m
timeout: 35m
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event" || $CI_COMMIT_BRANCH == "main"'
when: manual
Expand Down
59 changes: 27 additions & 32 deletions reb/bcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,40 +60,37 @@ func bcast(rargs *rebArgs, cb syncCallback) (errCnt int) {
}(tsi)
}
wg.Wait()
errCnt = int(cnt.Load())
return
return int(cnt.Load())
}

// pingTarget checks if target is running (type syncCallback)
// TODO: reuse keepalive
func (reb *Reb) pingTarget(tsi *meta.Snode, rargs *rebArgs) (ok bool) {
func _pingTarget(tsi *meta.Snode, rargs *rebArgs) (ok bool) {
const retries = 4
var (
ver = rargs.smap.Version
sleep = cmn.Rom.CplaneOperation()
logHdr = reb.logHdr(rargs.id, rargs.smap)
tname = tsi.StringEx()
ver = rargs.smap.Version
sleep = cmn.Rom.CplaneOperation()
tname = tsi.StringEx()
)
for i := range retries {
_, code, err := core.T.Health(tsi, cmn.Rom.MaxKeepalive(), nil)
if err == nil {
if i > 0 {
nlog.Infoln(logHdr+":", tname, "is now reachable")
nlog.Infoln(rargs.logHdr, tname, "is now reachable")
}
return true
}
if !cos.IsUnreachable(err, code) {
nlog.Errorf("%s: health(%s) returned %v(%d) - aborting", logHdr, tname, err, code)
nlog.Errorf("%s: health(%s) returned %v(%d) - aborting", rargs.logHdr, tname, err, code)
return
}
nlog.Warningf("%s: waiting for %s, err %v(%d)", logHdr, tname, err, code)
nlog.Warningln(rargs.logHdr, "waiting for:", tname, "[", err, code, "]")
time.Sleep(sleep)
nver := core.T.Sowner().Get().Version
if nver > ver {
return
}
}
nlog.Errorf("%s: timed out waiting for %s", logHdr, tname)
nlog.Errorln(rargs.logHdr, "timed out waiting for:", tname)
return
}

Expand All @@ -103,7 +100,7 @@ func (reb *Reb) rxReady(tsi *meta.Snode, rargs *rebArgs) bool /*ready*/ {
curwt time.Duration
sleep = cmn.Rom.CplaneOperation() * 2
maxwt = rargs.config.Rebalance.DestRetryTime.D() + rargs.config.Rebalance.DestRetryTime.D()/2
xreb = reb.xctn()
xreb = rargs.xreb
)
for curwt < maxwt {
if reb.stages.isInStage(tsi, rebStageTraverse) {
Expand All @@ -124,8 +121,8 @@ func (reb *Reb) rxReady(tsi *meta.Snode, rargs *rebArgs) bool /*ready*/ {
}
curwt += sleep
}
logHdr, tname := reb.logHdr(rargs.id, rargs.smap), tsi.StringEx()
nlog.Errorln(logHdr, "timed out waiting for", tname, "to reach", stages[rebStageTraverse], "stage")
tname := tsi.StringEx()
nlog.Errorln(rargs.logHdr, "timed out waiting for", tname, "to reach", stages[rebStageTraverse], "stage")
return false
}

Expand All @@ -139,13 +136,12 @@ func (reb *Reb) waitAcksExtended(tsi *meta.Snode, rargs *rebArgs) (ok bool) {
sleep = rargs.config.Timeout.CplaneOperation.D()
maxwt = rargs.config.Rebalance.DestRetryTime.D()
sleepRetry = cmn.KeepaliveRetryDuration(rargs.config)
logHdr = reb.logHdr(rargs.id, rargs.smap)
xreb = reb.xctn()
xreb = rargs.xreb
)
debug.Assertf(reb.RebID() == xreb.RebID(), "%s (rebID=%d) vs %s", logHdr, reb.RebID(), xreb)
debug.Assertf(reb.RebID() == xreb.RebID(), "%s (rebID=%d) vs %s", rargs.logHdr, reb.RebID(), xreb)
for curwt < maxwt {
if err := xreb.AbortedAfter(sleep); err != nil {
nlog.Infof("%s: abort wack (%v)", logHdr, err)
nlog.Infof("%s: abort wack (%v)", rargs.logHdr, err)
return
}
if reb.stages.isInStage(tsi, rebStageFin) {
Expand All @@ -157,7 +153,7 @@ func (reb *Reb) waitAcksExtended(tsi *meta.Snode, rargs *rebArgs) (ok bool) {
return
}
if err := xreb.AbortErr(); err != nil {
nlog.Infof("%s: abort wack (%v)", logHdr, err)
nlog.Infof("%s: abort wack (%v)", rargs.logHdr, err)
return
}
//
Expand All @@ -166,20 +162,20 @@ func (reb *Reb) waitAcksExtended(tsi *meta.Snode, rargs *rebArgs) (ok bool) {
var w4me bool // true: this target is waiting for ACKs from me
for _, si := range status.Targets {
if si.ID() == core.T.SID() {
nlog.Infof("%s: keep wack <= %s[%s]", logHdr, tsi.StringEx(), stages[status.Stage])
nlog.Infof("%s: keep wack <= %s[%s]", rargs.logHdr, tsi.StringEx(), stages[status.Stage])
w4me = true
break
}
}
if !w4me {
nlog.Infof("%s: %s[%s] ok (not waiting for me)", logHdr, tsi.StringEx(), stages[status.Stage])
nlog.Infof("%s: %s[%s] ok (not waiting for me)", rargs.logHdr, tsi.StringEx(), stages[status.Stage])
ok = true
return
}
time.Sleep(sleepRetry)
curwt += sleepRetry
}
nlog.Errorf("%s: timed out waiting for %s to reach %s", logHdr, tsi.StringEx(), stages[rebStageFin])
nlog.Errorf("%s: timed out waiting for %s to reach %s", rargs.logHdr, tsi.StringEx(), stages[rebStageFin])
return
}

Expand All @@ -190,19 +186,18 @@ func (reb *Reb) waitAcksExtended(tsi *meta.Snode, rargs *rebArgs) (ok bool) {
func (reb *Reb) checkStage(tsi *meta.Snode, rargs *rebArgs, desiredStage uint32) (status *Status, ok bool) {
var (
sleepRetry = cmn.KeepaliveRetryDuration(rargs.config)
logHdr = reb.logHdr(rargs.id, rargs.smap)
query = url.Values{apc.QparamRebStatus: []string{"true"}}
xreb = reb.xctn()
xreb = rargs.xreb
tname = tsi.StringEx()
)
if xreb == nil || xreb.IsAborted() {
return
}
debug.Assertf(reb.RebID() == xreb.RebID(), "%s (rebID=%d) vs %s", logHdr, reb.RebID(), xreb)
debug.Assertf(reb.RebID() == xreb.RebID(), "%s (rebID=%d) vs %s", rargs.logHdr, reb.RebID(), xreb)
body, code, err := core.T.Health(tsi, apc.DefaultTimeout, query)
if err != nil {
if errAborted := xreb.AbortedAfter(sleepRetry); errAborted != nil {
nlog.Infoln(logHdr, "abort check status", errAborted)
nlog.Infoln(rargs.logHdr, "abort check status", errAborted)
return
}
body, code, err = core.T.Health(tsi, apc.DefaultTimeout, query) // retry once
Expand All @@ -217,7 +212,7 @@ func (reb *Reb) checkStage(tsi *meta.Snode, rargs *rebArgs, desiredStage uint32)
status = &Status{}
err = jsoniter.Unmarshal(body, status)
if err != nil {
err = fmt.Errorf(cmn.FmtErrUnmarshal, logHdr, "reb status from "+tname, cos.BHead(body), err)
err = fmt.Errorf(cmn.FmtErrUnmarshal, rargs.logHdr, "reb status from "+tname, cos.BHead(body), err)
reb.abortAndBroadcast(err)
return
}
Expand All @@ -226,7 +221,7 @@ func (reb *Reb) checkStage(tsi *meta.Snode, rargs *rebArgs, desiredStage uint32)
//
otherXid := xact.RebID2S(status.RebID)
if status.RebID > reb.rebID.Load() {
err := cmn.NewErrAborted(xreb.Name(), logHdr, errors.New(tname+" runs newer "+otherXid))
err := cmn.NewErrAborted(xreb.Name(), rargs.logHdr, errors.New(tname+" runs newer "+otherXid))
reb.abortAndBroadcast(err)
return
}
Expand All @@ -239,12 +234,12 @@ func (reb *Reb) checkStage(tsi *meta.Snode, rargs *rebArgs, desiredStage uint32)
if !status.Running {
what = "transitioning(?) from"
}
nlog.Warningf("%s: %s[%s, v%d] %s older rebalance - keep waiting", logHdr, tname, otherXid, status.RebVersion, what)
nlog.Warningf("%s: %s[%s, v%d] %s older rebalance - keep waiting", rargs.logHdr, tname, otherXid, status.RebVersion, what)
return
}
// other target aborted same ID (do not call `reb.abortAndBroadcast` - no need)
if status.RebID == reb.RebID() && status.Aborted {
err := cmn.NewErrAborted(xreb.Name(), logHdr, fmt.Errorf("status 'aborted' from %s", tname))
err := cmn.NewErrAborted(xreb.Name(), rargs.logHdr, fmt.Errorf("status 'aborted' from %s", tname))
xreb.Abort(err)
return
}
Expand All @@ -253,6 +248,6 @@ func (reb *Reb) checkStage(tsi *meta.Snode, rargs *rebArgs, desiredStage uint32)
return // Ok
}

nlog.Infof("%s: %s[%s, v%d] not yet at the right stage %s", logHdr, tname, stages[status.Stage], status.RebVersion, stages[desiredStage])
nlog.Infof("%s: %s[%s, v%d] not yet at the right stage %s", rargs.logHdr, tname, stages[status.Stage], status.RebVersion, stages[desiredStage])
return
}
55 changes: 27 additions & 28 deletions reb/ec.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,36 +45,34 @@ import (
// update their metafiles. Targets do not overwrite their metafiles with a new
// one. They update only `Daemons` and `FullReplica` fields.

func (reb *Reb) runECjoggers() {
func (reb *Reb) runECjoggers(rargs *rebArgs) {
var (
wg = &sync.WaitGroup{}
avail = fs.GetAvail()
cfg = cmn.GCO.Get()
b = reb.xctn().Bck()
wg = &sync.WaitGroup{}
b = rargs.xreb.Bck()
)
for _, mi := range avail {
for _, mi := range rargs.apaths {
bck := cmn.Bck{Provider: apc.AIS}
if b != nil {
bck = cmn.Bck{Name: b.Name, Provider: apc.AIS, Ns: b.Ns}
}
wg.Add(1)
go reb.jogEC(mi, &bck, wg)
go reb.jogEC(mi, &bck, wg, rargs)
}
for _, provider := range cfg.Backend.Providers {
for _, mi := range avail {
for _, provider := range rargs.config.Backend.Providers {
for _, mi := range rargs.apaths {
bck := cmn.Bck{Provider: provider.Name}
if b != nil {
bck = cmn.Bck{Name: bck.Name, Provider: provider.Name, Ns: bck.Ns}
}
wg.Add(1)
go reb.jogEC(mi, &bck, wg)
go reb.jogEC(mi, &bck, wg, rargs)
}
}
wg.Wait()
}

// mountpath walker - walks through files in /meta/ directory
func (reb *Reb) jogEC(mi *fs.Mountpath, bck *cmn.Bck, wg *sync.WaitGroup) {
func (reb *Reb) jogEC(mi *fs.Mountpath, bck *cmn.Bck, wg *sync.WaitGroup, rargs *rebArgs) {
defer wg.Done()
opts := &fs.WalkOpts{
Mi: mi,
Expand All @@ -84,11 +82,11 @@ func (reb *Reb) jogEC(mi *fs.Mountpath, bck *cmn.Bck, wg *sync.WaitGroup) {
}
opts.Bck.Copy(bck)
if err := fs.Walk(opts); err != nil {
xreb := reb.xctn()
xreb := rargs.xreb
if xreb.IsAborted() || xreb.Finished() {
nlog.Infof("aborting traversal")
nlog.Infoln(rargs.logHdr, "aborting traversal")
} else {
nlog.Warningf("failed to traverse, err: %v", err)
nlog.Warningln(rargs.logHdr, "failed to traverse, err:", err)
}
}
}
Expand Down Expand Up @@ -130,7 +128,7 @@ func (reb *Reb) sendFromDisk(ct *core.CT, meta *ec.Metadata, target *meta.Snode,
)
if lom != nil {
defer core.FreeLOM(lom)
roc, errReader = lom.NewDeferROC()
roc, errReader = lom.NewDeferROC() // + unlock
} else {
roc, errReader = cos.NewFileHandle(fqn)
}
Expand All @@ -155,37 +153,38 @@ func (reb *Reb) sendFromDisk(ct *core.CT, meta *ec.Metadata, target *meta.Snode,
return fmt.Errorf("failed to send slices to nodes [%s..]: %v", target.ID(), err)
}

xreb := reb.xctn()
xreb.OutObjsAdd(1, o.Hdr.ObjAttrs.Size)
xctn := reb.xctn()
xctn.OutObjsAdd(1, o.Hdr.ObjAttrs.Size)
return nil
}

// Saves received CT to a local drive if needed:
// 1. Full object/replica is received
// 2. A CT is received and this target is not the default target (it
// means that the CTs came from default target after EC had been rebuilt)
func (reb *Reb) saveCTToDisk(ntfn *stageNtfn, hdr *transport.ObjHdr, data io.Reader) error {
cos.Assert(ntfn.md != nil)
func (reb *Reb) saveCTToDisk(ntfn *stageNtfn, hdr *transport.ObjHdr, data io.Reader) (err error) {
var (
err error
bck = meta.CloneBck(&hdr.Bck)
bck = meta.CloneBck(&hdr.Bck)
xctn = reb.xctn()
)
if err := bck.Init(core.T.Bowner()); err != nil {
return err
if ern := bck.Init(core.T.Bowner()); ern != nil {
return ern
}

md := ntfn.md.NewPack()
if ntfn.md.SliceID != 0 {
args := &ec.WriteArgs{Reader: data, MD: md, Xact: reb.xctn()}
args := &ec.WriteArgs{Reader: data, MD: md, Xact: xctn}
err = ec.WriteSliceAndMeta(hdr, args)
} else {
var lom *core.LOM
lom, err = ec.AllocLomFromHdr(hdr)
if err == nil {
args := &ec.WriteArgs{Reader: data, MD: md, Cksum: hdr.ObjAttrs.Cksum, Xact: reb.xctn()}
args := &ec.WriteArgs{Reader: data, MD: md, Cksum: hdr.ObjAttrs.Cksum, Xact: xctn}
err = ec.WriteReplicaAndMeta(lom, args)
}
core.FreeLOM(lom)
}

return err
}

Expand Down Expand Up @@ -283,10 +282,10 @@ func (reb *Reb) renameLocalCT(req *stageNtfn, ct *core.CT, md *ec.Metadata) (
}

func (reb *Reb) walkEC(fqn string, de fs.DirEntry) error {
xreb := reb.xctn()
if err := xreb.AbortErr(); err != nil {
xctn := reb.xctn()
if err := xctn.AbortErr(); err != nil {
// notify `dir.Walk` to stop iterations
nlog.Infoln(xreb.Name(), "walk-ec aborted", err)
nlog.Infoln(xctn.Name(), "walk-ec aborted", err)
return err
}

Expand Down
Loading

0 comments on commit fe71dc8

Please sign in to comment.