From 86908763da397b13d722f7769697da5d283587de Mon Sep 17 00:00:00 2001 From: Alex Aizman Date: Sun, 15 Dec 2024 13:43:05 -0500 Subject: [PATCH] [API change] core: xactions (jobs) to provide initiating control msg * observability: * add `ctlmsg` to all supported x-kinds * get-xact APIs and friends: return as part of xaction "snap" * separately, CLI: - minor ref around xaction templates Signed-off-by: Alex Aizman --- ais/tgtspace.go | 21 +++++++++++++++++---- api/apc/bsummary.go | 18 ++++++++++++++++++ api/apc/lsmsg.go | 39 +++++++++++++++++++++++++++++++++++++++ api/apc/multiobj.go | 34 ++++++++++++++++++++++++++++++++++ api/apc/promote.go | 32 +++++++++++++++++++++++++++++++- cmd/cli/cli/show_hdlr.go | 6 +++--- cmd/cli/teb/templates.go | 12 ++++++------ core/mock/xact_mock.go | 2 +- core/xaction.go | 1 + ec/bencodex.go | 4 +++- ec/getx.go | 2 +- ec/putx.go | 2 +- ec/respondx.go | 2 +- ext/dload/xact.go | 2 +- ext/dsort/xact.go | 2 +- mirror/makencopies.go | 8 ++++---- mirror/put_copies.go | 2 +- reb/globrun.go | 6 +++++- space/cleanup.go | 9 +++++---- space/lru.go | 3 ++- space/space_test.go | 4 ++-- xact/base.go | 17 +++++++++++------ xact/bckjog.go | 4 ++-- xact/demand.go | 4 ++-- xact/xreg/nonbck.go | 12 ++++++------ xact/xs/archive.go | 2 +- xact/xs/blob_download.go | 2 +- xact/xs/brename.go | 2 +- xact/xs/dpromote.go | 7 +++++-- xact/xs/election.go | 2 +- xact/xs/etl.go | 11 ++++++++--- xact/xs/evict.go | 2 +- xact/xs/lom_warmup.go | 2 +- xact/xs/lso.go | 2 +- xact/xs/nsumm.go | 8 ++++---- xact/xs/prefetch.go | 2 +- xact/xs/rebres.go | 6 ++++-- xact/xs/tcb.go | 3 ++- xact/xs/tcobjs.go | 2 +- xact/xs/xaction_test.go | 8 ++++---- 40 files changed, 234 insertions(+), 75 deletions(-) diff --git a/ais/tgtspace.go b/ais/tgtspace.go index 4aa5112b73d..f5a25d796f2 100644 --- a/ais/tgtspace.go +++ b/ais/tgtspace.go @@ -5,6 +5,7 @@ package ais import ( + "fmt" "sync" "time" @@ -92,11 +93,17 @@ func (t *target) OOS(csRefreshed *fs.CapStatus, config *cmn.Config, tcdf *fs.Tcd } func (t *target) runLRU(id string, wg *sync.WaitGroup, force bool, bcks ...cmn.Bck) { - regToIC := id == "" + var ( + ctlmsg string + regToIC = id == "" + ) if regToIC { id = cos.GenUUID() } - rns := xreg.RenewLRU(id) + if len(bcks) > 0 { + ctlmsg = fmt.Sprintf("%v", bcks) + } + rns := xreg.RenewLRU(id, ctlmsg) if rns.Err != nil || rns.IsRunning() { debug.Assert(rns.Err == nil || cmn.IsErrXactUsePrev(rns.Err)) if wg != nil { @@ -129,11 +136,17 @@ func (t *target) runLRU(id string, wg *sync.WaitGroup, force bool, bcks ...cmn.B } func (t *target) runSpaceCleanup(xargs *xact.ArgsMsg, wg *sync.WaitGroup) fs.CapStatus { - regToIC := xargs.ID != "" + var ( + ctlmsg string + regToIC = xargs.ID != "" + ) if !regToIC { xargs.ID = cos.GenUUID() } - rns := xreg.RenewStoreCleanup(xargs.ID) + if len(xargs.Buckets) > 0 { + ctlmsg = fmt.Sprintf("%v", xargs.Buckets) + } + rns := xreg.RenewStoreCleanup(xargs.ID, ctlmsg) if rns.Err != nil || rns.IsRunning() { debug.Assert(rns.Err == nil || cmn.IsErrXactUsePrev(rns.Err)) if wg != nil { diff --git a/api/apc/bsummary.go b/api/apc/bsummary.go index ac9e4fb6363..c47e3cd9444 100644 --- a/api/apc/bsummary.go +++ b/api/apc/bsummary.go @@ -4,6 +4,8 @@ */ package apc +import "strings" + type ( // to generate bucket summary (or summaries) BsummCtrlMsg struct { @@ -35,3 +37,19 @@ type ( IsBckPresent bool `json:"is_present"` // in BMD } ) + +func (msg *BsummCtrlMsg) Str(cname string) string { + var sb strings.Builder + sb.Grow(64) + sb.WriteString(cname) + if msg.ObjCached { + sb.WriteString(", cached") + } + if msg.BckPresent { + sb.WriteString(", bck-present") + } + if msg.DontAddRemote { + sb.WriteString(", don't-add") + } + return sb.String() +} diff --git a/api/apc/lsmsg.go b/api/apc/lsmsg.go index 4c46c7a96d9..38cac111191 100644 --- a/api/apc/lsmsg.go +++ b/api/apc/lsmsg.go @@ -206,6 +206,45 @@ func (lsmsg *LsoMsg) PropsSet() (s cos.StrSet) { return s } +func (lsmsg *LsoMsg) Str(cname string) string { + var sb strings.Builder + sb.Grow(80) + + sb.WriteString(cname) + if lsmsg.Props != "" { + sb.WriteString(", props:") + sb.WriteString(lsmsg.Props) + } + if lsmsg.Flags == 0 { + return sb.String() + } + + sb.WriteString(", flags:") + if lsmsg.IsFlagSet(LsObjCached) { + sb.WriteString("cached,") + } + if lsmsg.IsFlagSet(LsMissing) { + sb.WriteString("missing,") + } + if lsmsg.IsFlagSet(LsArchDir) { + sb.WriteString("arch,") + } + if lsmsg.IsFlagSet(LsBckPresent) { + sb.WriteString("bck-present,") + } + if lsmsg.IsFlagSet(LsDontAddRemote) { + sb.WriteString("skip-lookup,") + } + if lsmsg.IsFlagSet(LsNoRecursion) { + sb.WriteString("no-recurs,") + } + if lsmsg.IsFlagSet(LsVerChanged) { + sb.WriteString("version-changed,") + } + s := sb.String() + return s[:len(s)-1] +} + // LsoMsg flags enum: LsObjCached, ... func (lsmsg *LsoMsg) SetFlag(flag uint64) { lsmsg.Flags |= flag } func (lsmsg *LsoMsg) ClearFlag(flag uint64) { lsmsg.Flags &= ^flag } diff --git a/api/apc/multiobj.go b/api/apc/multiobj.go index f350606600a..21b28249865 100644 --- a/api/apc/multiobj.go +++ b/api/apc/multiobj.go @@ -4,6 +4,14 @@ */ package apc +import ( + "fmt" + "strconv" + "strings" + + "github.com/NVIDIA/aistore/cmn/cos" +) + // (common for all multi-object operations) type ( // List of object names _or_ a template specifying { optional Prefix, zero or more Ranges } @@ -20,6 +28,17 @@ type ( func (lrm *ListRange) IsList() bool { return len(lrm.ObjNames) > 0 } func (lrm *ListRange) HasTemplate() bool { return lrm.Template != "" } +func (lrm *ListRange) Str(isPrefix bool) string { + switch { + case isPrefix: + return "prefix: " + lrm.Template + case lrm.IsList(): + return fmt.Sprintf("list: %v", lrm.ObjNames) + default: + return "template: " + lrm.Template + } +} + // prefetch type PrefetchMsg struct { ListRange @@ -29,6 +48,21 @@ type PrefetchMsg struct { LatestVer bool `json:"latest-ver"` // when true & in-cluster: check with remote whether (deleted | version-changed) } +func (msg *PrefetchMsg) Str(isPrefix bool) string { + var sb strings.Builder + sb.Grow(80) + sb.WriteString(msg.ListRange.Str(isPrefix)) + if msg.BlobThreshold > 0 { + sb.WriteString(", blob-threshold: ") + sb.WriteString(cos.ToSizeIEC(msg.BlobThreshold, 0)) + } + if msg.NumWorkers > 0 { + sb.WriteString(", workers: ") + sb.WriteString(strconv.Itoa(msg.NumWorkers)) + } + return sb.String() +} + // ArchiveMsg contains the parameters (all except the destination bucket) // for archiving mutiple objects as one of the supported archive.FileExtensions types // at the specified (bucket) destination. diff --git a/api/apc/promote.go b/api/apc/promote.go index c6ade9f7006..453dd826092 100644 --- a/api/apc/promote.go +++ b/api/apc/promote.go @@ -1,9 +1,11 @@ // Package apc: API control messages and constants /* - * Copyright (c) 2018-2022, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2018-2024, NVIDIA CORPORATION. All rights reserved. */ package apc +import "strings" + // common part that's used in `api.PromoteArgs` and `PromoteParams`(server side), both type PromoteArgs struct { DaemonID string `json:"tid,omitempty"` // target ID @@ -18,3 +20,31 @@ type PromoteArgs struct { // (auto-detection takes time, etc.) SrcIsNotFshare bool `json:"notshr,omitempty"` // the source is not a file share equally accessible by all targets } + +func (msg *PromoteArgs) String() string { + var sb strings.Builder + sb.Grow(160) + sb.WriteString("src: ") + sb.WriteString(msg.SrcFQN) + sb.WriteString(", dst: ") + sb.WriteString(msg.ObjName) + if msg.DaemonID != "" { + sb.WriteString(", node: ") + sb.WriteString(msg.DaemonID) + } + if msg.Recursive { + sb.WriteString(", recurs") + } else { + sb.WriteString(", non-recurs") + } + if msg.OverwriteDst { + sb.WriteString(", overwrite") + } + if msg.DeleteSrc { + sb.WriteString(", delete-src") + } + if msg.SrcIsNotFshare { + sb.WriteString(", not-file-share") + } + return sb.String() +} diff --git a/cmd/cli/cli/show_hdlr.go b/cmd/cli/cli/show_hdlr.go index 17ff3911ee0..e14b57069f2 100644 --- a/cmd/cli/cli/show_hdlr.go +++ b/cmd/cli/cli/show_hdlr.go @@ -44,7 +44,7 @@ const showJobUsage = "show running and/or finished jobs\n" + indent1 + tabHelpOpt + "." type ( - daemonTemplateXactSnaps struct { + nodeSnaps struct { DaemonID string XactSnaps []*core.Snap } @@ -491,7 +491,7 @@ func xlistByKindID(c *cli.Context, xargs *xact.ArgsMsg, caption bool, xs xact.Mu // second, filteredXs => dts templates var ( fromToBck, haveBck bool - dts = make([]daemonTemplateXactSnaps, 0, len(filteredXs)) + dts = make([]nodeSnaps, 0, len(filteredXs)) ) for tid, snaps := range filteredXs { if len(snaps) == 0 { @@ -506,7 +506,7 @@ func xlistByKindID(c *cli.Context, xargs *xact.ArgsMsg, caption bool, xs xact.Mu } else if !snaps[0].Bck.IsEmpty() { haveBck = true } - dts = append(dts, daemonTemplateXactSnaps{DaemonID: tid, XactSnaps: snaps}) + dts = append(dts, nodeSnaps{DaemonID: tid, XactSnaps: snaps}) } sort.Slice(dts, func(i, j int) bool { return dts[i].DaemonID < dts[j].DaemonID // ascending by node id/name diff --git a/cmd/cli/teb/templates.go b/cmd/cli/teb/templates.go index c0ca1ce587d..492953a4e5b 100644 --- a/cmd/cli/teb/templates.go +++ b/cmd/cli/teb/templates.go @@ -116,11 +116,11 @@ const ( // all other xactions // XactBucketTmpl = xactBucketHdr + XactNoHdrBucketTmpl - XactNoHdrBucketTmpl = "{{range $daemon := . }}" + xactBucketBodyAll + "{{end}}" + XactNoHdrBucketTmpl = "{{range $nodeSnaps := . }}" + xactBucketBodyAll + "{{end}}" xactBucketHdr = "NODE\t ID\t KIND\t BUCKET\t OBJECTS\t BYTES\t START\t END\t STATE\n" - xactBucketBodyAll = "{{range $key, $xctn := $daemon.XactSnaps}}" + xactBucketBodyOne + "{{end}}" - xactBucketBodyOne = "{{ $daemon.DaemonID }}\t " + + xactBucketBodyAll = "{{range $key, $xctn := $nodeSnaps.XactSnaps}}" + xactBucketBodyOne + "{{end}}" + xactBucketBodyOne = "{{ $nodeSnaps.DaemonID }}\t " + "{{if $xctn.ID}}{{$xctn.ID}}{{else}}-{{end}}\t " + "{{$xctn.Kind}}\t " + "{{FormatBckName $xctn.Bck}}\t " + @@ -132,11 +132,11 @@ const ( // same as above except for: src-bck, dst-bck columns XactFromToTmpl = xactFromToHdr + XactNoHdrFromToTmpl - XactNoHdrFromToTmpl = "{{range $daemon := . }}" + xactFromToBodyAll + "{{end}}" + XactNoHdrFromToTmpl = "{{range $nodeSnaps := . }}" + xactFromToBodyAll + "{{end}}" xactFromToHdr = "NODE\t ID\t KIND\t SRC BUCKET\t DST BUCKET\t OBJECTS\t BYTES\t START\t END\t STATE\n" - xactFromToBodyAll = "{{range $key, $xctn := $daemon.XactSnaps}}" + xactFromToBodyOne + "{{end}}" - xactFromToBodyOne = "{{ $daemon.DaemonID }}\t " + + xactFromToBodyAll = "{{range $key, $xctn := $nodeSnaps.XactSnaps}}" + xactFromToBodyOne + "{{end}}" + xactFromToBodyOne = "{{ $nodeSnaps.DaemonID }}\t " + "{{if $xctn.ID}}{{$xctn.ID}}{{else}}-{{end}}\t " + "{{$xctn.Kind}}\t " + "{{FormatBckName $xctn.SrcBck}}\t " + diff --git a/core/mock/xact_mock.go b/core/mock/xact_mock.go index 0804277fe96..6f9374bc7cd 100644 --- a/core/mock/xact_mock.go +++ b/core/mock/xact_mock.go @@ -30,7 +30,7 @@ func (*XactMock) Run(*sync.WaitGroup) { func NewXact(kind string) *XactMock { xctn := &XactMock{} - xctn.InitBase(cos.GenUUID(), kind, nil) + xctn.InitBase(cos.GenUUID(), kind, "", nil) return xctn } diff --git a/core/xaction.go b/core/xaction.go index 0250f2e2216..309822ddf52 100644 --- a/core/xaction.go +++ b/core/xaction.go @@ -91,6 +91,7 @@ type ( DstBck cmn.Bck `json:"dst-bck"` ID string `json:"id"` Kind string `json:"kind"` + CtlMsg string `json:"ctlmsg,omitempty"` // added v3.26 // extended error info AbortErr string `json:"abort-err"` diff --git a/ec/bencodex.go b/ec/bencodex.go index 5bb8cd81e43..262335360b5 100644 --- a/ec/bencodex.go +++ b/ec/bencodex.go @@ -98,6 +98,7 @@ func (p *encFactory) WhenPrevIsRunning(prevEntry xreg.Renewable) (wpr xreg.WPR, /////////////////// func newXactBckEncode(bck *meta.Bck, uuid string, checkAndRecover bool) (r *XactBckEncode, err error) { + var ctlmsg string r = &XactBckEncode{ bck: bck, wg: &sync.WaitGroup{}, @@ -105,9 +106,10 @@ func newXactBckEncode(bck *meta.Bck, uuid string, checkAndRecover bool) (r *Xact checkAndRecover: checkAndRecover, } if checkAndRecover { + ctlmsg = "recover" r.probFilter = prob.NewDefaultFilter() } - r.InitBase(uuid, apc.ActECEncode, bck) + r.InitBase(uuid, apc.ActECEncode, ctlmsg, bck) if err = bck.Init(core.T.Bowner()); err != nil { return nil, err diff --git a/ec/getx.go b/ec/getx.go index 9572ce3f16b..651ab8e3f9e 100644 --- a/ec/getx.go +++ b/ec/getx.go @@ -66,7 +66,7 @@ func (*getFactory) New(_ xreg.Args, bck *meta.Bck) xreg.Renewable { func (p *getFactory) Start() error { xec := ECM.NewGetXact(p.Bck.Bucket()) - xec.DemandBase.Init(cos.GenUUID(), p.Kind(), p.Bck, 0 /*use default*/) + xec.DemandBase.Init(cos.GenUUID(), p.Kind(), "" /*ctlmsg*/, p.Bck, 0 /*use default*/) p.xctn = xec xact.GoRunW(xec) diff --git a/ec/putx.go b/ec/putx.go index b5fad98507f..fe2b40c2e66 100644 --- a/ec/putx.go +++ b/ec/putx.go @@ -66,7 +66,7 @@ func (*putFactory) New(_ xreg.Args, bck *meta.Bck) xreg.Renewable { func (p *putFactory) Start() error { xec := ECM.NewPutXact(p.Bck.Bucket()) - xec.DemandBase.Init(cos.GenUUID(), p.Kind(), p.Bck, 0 /*use default*/) + xec.DemandBase.Init(cos.GenUUID(), p.Kind(), "" /*ctlmsg*/, p.Bck, 0 /*use default*/) p.xctn = xec xact.GoRunW(xec) diff --git a/ec/respondx.go b/ec/respondx.go index 622cd47426f..4e2cfcf8dc7 100644 --- a/ec/respondx.go +++ b/ec/respondx.go @@ -61,7 +61,7 @@ func (p *rspFactory) WhenPrevIsRunning(xprev xreg.Renewable) (xreg.WPR, error) { func (p *rspFactory) Start() error { xec := ECM.NewRespondXact(p.Bck.Bucket()) - xec.DemandBase.Init(cos.GenUUID(), p.Kind(), p.Bck, 0 /*use default*/) + xec.DemandBase.Init(cos.GenUUID(), p.Kind(), "" /*ctlmsg*/, p.Bck, 0 /*use default*/) p.xctn = xec xact.GoRunW(xec) diff --git a/ext/dload/xact.go b/ext/dload/xact.go index 4c85aac798a..5fbdb22ad69 100644 --- a/ext/dload/xact.go +++ b/ext/dload/xact.go @@ -192,7 +192,7 @@ func (*factory) WhenPrevIsRunning(xreg.Renewable) (xreg.WPR, error) { func newXact(p *factory) (xld *Xact) { xld = &Xact{p: p} xld.dispatcher = newDispatcher(xld) - xld.DemandBase.Init(p.UUID(), apc.Download, p.bck, 0 /*use default*/) + xld.DemandBase.Init(p.UUID(), apc.Download, "" /*ctlmsg*/, p.bck, 0 /*use default*/) return } diff --git a/ext/dsort/xact.go b/ext/dsort/xact.go index ca1288d45f7..6aab79947fc 100644 --- a/ext/dsort/xact.go +++ b/ext/dsort/xact.go @@ -40,7 +40,7 @@ func (p *factory) Start() error { args, ok := custom.(*xreg.DsortArgs) debug.Assert(ok) p.xctn = &xaction{args: args} - p.xctn.InitBase(p.UUID(), apc.ActDsort, args.BckTo /*compare w/ tcb and tco*/) + p.xctn.InitBase(p.UUID(), apc.ActDsort, "" /*ctlmsg*/, args.BckTo /*compare w/ tcb and tco*/) g.once.Do(func() { hk.Reg(apc.ActDsort+hk.NameSuffix, g.mg.housekeep, hk.DayInterval) diff --git a/mirror/makencopies.go b/mirror/makencopies.go index 182834ad9eb..1517fb761ab 100644 --- a/mirror/makencopies.go +++ b/mirror/makencopies.go @@ -84,12 +84,12 @@ func newMNC(p *mncFactory, slab *memsys.Slab) (r *mncXact) { Throttle: true, } mpopts.Bck.Copy(p.Bck.Bucket()) - r.BckJog.Init(p.UUID(), apc.ActMakeNCopies, p.Bck, mpopts, cmn.GCO.Get()) + s := fmt.Sprintf("%s-copies-%d", r.p.args.Tag, r.p.args.Copies) + r.BckJog.Init(p.UUID(), apc.ActMakeNCopies, s /*ctlmsg*/, p.Bck, mpopts, cmn.GCO.Get()) // name - s := fmt.Sprintf("-%s-copies-%d", r.p.args.Tag, r.p.args.Copies) - r._nam = r.Base.Name() + s - r._str = r.Base.String() + s + r._nam = r.Base.Name() + "-" + s + r._str = r.Base.String() + "-" + s return r } diff --git a/mirror/put_copies.go b/mirror/put_copies.go index 9fcfd5ae4fc..f286c0d395c 100644 --- a/mirror/put_copies.go +++ b/mirror/put_copies.go @@ -95,7 +95,7 @@ func (p *putFactory) Start() error { // is Ok (compare with x-archive, x-tco) beid = cos.GenUUID() } - r.DemandBase.Init(beid, p.Kind(), bck, xact.IdleDefault) + r.DemandBase.Init(beid, p.Kind(), "" /*ctlmsg*/, bck, xact.IdleDefault) // joggers r.workers = mpather.NewWorkerGroup(&mpather.WorkerGroupOpts{ diff --git a/reb/globrun.go b/reb/globrun.go index fd67520dfb7..a5b42b7ffb5 100644 --- a/reb/globrun.go +++ b/reb/globrun.go @@ -371,7 +371,11 @@ func _pingall(rargs *rebArgs) bool { } func (reb *Reb) initRenew(rargs *rebArgs, notif *xact.NotifXact, haveStreams bool) bool { - rns := xreg.RenewRebalance(rargs.id) + var ctlmsg string + if rargs.bck != nil { + ctlmsg = rargs.bck.Cname(rargs.prefix) + } + rns := xreg.RenewRebalance(rargs.id, ctlmsg) if rns.Err != nil { return false } diff --git a/space/cleanup.go b/space/cleanup.go index 8cabe5a888f..41b0e9ca3e8 100644 --- a/space/cleanup.go +++ b/space/cleanup.go @@ -33,6 +33,9 @@ import ( // stats counters "cleanup.store.n" & "cleanup.store.size" (not to confuse with generic ""loc-objs", "in-objs", etc.) type ( + XactCln struct { + xact.Base + } IniCln struct { StatsT stats.Tracker Config *cmn.Config @@ -40,9 +43,6 @@ type ( WG *sync.WaitGroup Args *xact.ArgsMsg } - XactCln struct { - xact.Base - } ) // private @@ -110,7 +110,8 @@ func (*clnFactory) New(args xreg.Args, _ *meta.Bck) xreg.Renewable { func (p *clnFactory) Start() error { p.xctn = &XactCln{} - p.xctn.InitBase(p.UUID(), apc.ActStoreCleanup, nil) + ctlmsg := p.Args.Custom.(string) + p.xctn.InitBase(p.UUID(), apc.ActStoreCleanup, ctlmsg, nil) return nil } diff --git a/space/lru.go b/space/lru.go index 98c333ad86e..b27d08f6357 100644 --- a/space/lru.go +++ b/space/lru.go @@ -120,7 +120,8 @@ func (*lruFactory) New(args xreg.Args, _ *meta.Bck) xreg.Renewable { func (p *lruFactory) Start() error { p.xctn = &XactLRU{} - p.xctn.InitBase(p.UUID(), apc.ActLRU, nil) + ctlmsg := p.Args.Custom.(string) + p.xctn.InitBase(p.UUID(), apc.ActLRU, ctlmsg, nil) return nil } diff --git a/space/space_test.go b/space/space_test.go index ce2d679c0a1..e64ed539898 100644 --- a/space/space_test.go +++ b/space/space_test.go @@ -344,7 +344,7 @@ func newTargetLRUMock() *mock.TargetMock { func newIniLRU() *space.IniLRU { xlru := &space.XactLRU{} - xlru.InitBase(cos.GenUUID(), apc.ActLRU, nil) + xlru.InitBase(cos.GenUUID(), apc.ActLRU, "" /*ctlmsg*/, nil) return &space.IniLRU{ Xaction: xlru, Config: cmn.GCO.Get(), @@ -356,7 +356,7 @@ func newIniLRU() *space.IniLRU { func newInitStoreCln() *space.IniCln { xcln := &space.XactCln{} - xcln.InitBase(cos.GenUUID(), apc.ActStoreCleanup, nil) + xcln.InitBase(cos.GenUUID(), apc.ActStoreCleanup, "" /*ctlmsg*/, nil) return &space.IniCln{ Xaction: xcln, Config: cmn.GCO.Get(), diff --git a/xact/base.go b/xact/base.go index 6a79d0caa54..5c03960f565 100644 --- a/xact/base.go +++ b/xact/base.go @@ -33,11 +33,12 @@ type ( err ratomic.Pointer[error] done atomic.Bool } - id string - kind string - _nam string - err cos.Errs - stats struct { + id string + kind string + _nam string + ctlmsg string + err cos.Errs + stats struct { objs atomic.Int64 // locally processed bytes atomic.Int64 outobjs atomic.Int64 // transmit @@ -69,10 +70,13 @@ func GoRunW(xctn core.Xact) { // Base - partially implements `core.Xact` interface ////////////// -func (xctn *Base) InitBase(id, kind string, bck *meta.Bck) { +func (xctn *Base) InitBase(id, kind, ctlmsg string, bck *meta.Bck) { debug.Assert(kind == apc.ActETLInline || cos.IsValidUUID(id) || IsValidRebID(id), id) debug.Assert(IsValidKind(kind), kind) + xctn.id, xctn.kind = id, kind + xctn.ctlmsg = ctlmsg + xctn.abort.ch = make(chan error, 1) if bck != nil { xctn.bck = *bck @@ -382,6 +386,7 @@ func (xctn *Base) InObjsAdd(cnt int, size int64) { func (xctn *Base) ToSnap(snap *core.Snap) { snap.ID = xctn.ID() snap.Kind = xctn.Kind() + snap.CtlMsg = xctn.ctlmsg snap.StartTime = xctn.StartTime() snap.EndTime = xctn.EndTime() if err := xctn.AbortErr(); err != nil { diff --git a/xact/bckjog.go b/xact/bckjog.go index c3cd88d0ee5..cb1653232fe 100644 --- a/xact/bckjog.go +++ b/xact/bckjog.go @@ -16,8 +16,8 @@ type BckJog struct { Base } -func (r *BckJog) Init(id, kind string, bck *meta.Bck, opts *mpather.JgroupOpts, config *cmn.Config) { - r.InitBase(id, kind, bck) +func (r *BckJog) Init(id, kind, ctlmsg string, bck *meta.Bck, opts *mpather.JgroupOpts, config *cmn.Config) { + r.InitBase(id, kind, ctlmsg, bck) r.joggers = mpather.NewJoggerGroup(opts, config, nil) r.Config = config } diff --git a/xact/demand.go b/xact/demand.go index 16f160085f1..fcae357803f 100644 --- a/xact/demand.go +++ b/xact/demand.go @@ -56,7 +56,7 @@ func (r *DemandBase) IsIdle() bool { return last != 0 && mono.Since(last) >= max(cmn.Rom.MaxKeepalive(), 2*time.Second) } -func (r *DemandBase) Init(uuid, kind string, bck *meta.Bck, idleDur time.Duration) { +func (r *DemandBase) Init(uuid, kind, ctlmsg string, bck *meta.Bck, idleDur time.Duration) { r.hkName = kind + "/" + uuid if idleDur > 0 { r.idle.d.Store(int64(idleDur)) @@ -64,7 +64,7 @@ func (r *DemandBase) Init(uuid, kind string, bck *meta.Bck, idleDur time.Duratio r.idle.d.Store(int64(IdleDefault)) } r.idle.ticks.Init() - r.InitBase(uuid, kind, bck) + r.InitBase(uuid, kind, ctlmsg, bck) r.idle.last.Store(mono.NanoTime()) r.hkReg.Store(true) diff --git a/xact/xreg/nonbck.go b/xact/xreg/nonbck.go index d973fd0763f..69772c88dcc 100644 --- a/xact/xreg/nonbck.go +++ b/xact/xreg/nonbck.go @@ -17,8 +17,8 @@ func RegNonBckXact(entry Renewable) { dreg.nonbckXacts[entry.Kind()] = entry // no locking: all reg-s are done at init time } -func RenewRebalance(id int64) RenewRes { - e := dreg.nonbckXacts[apc.ActRebalance].New(Args{UUID: xact.RebID2S(id)}, nil) +func RenewRebalance(id int64, ctlmsg string) RenewRes { + e := dreg.nonbckXacts[apc.ActRebalance].New(Args{UUID: xact.RebID2S(id), Custom: ctlmsg}, nil) return dreg.renew(e, nil) } @@ -34,13 +34,13 @@ func RenewElection() RenewRes { return dreg.renew(e, nil) } -func RenewLRU(id string) RenewRes { - e := dreg.nonbckXacts[apc.ActLRU].New(Args{UUID: id}, nil) +func RenewLRU(id, ctlmsg string) RenewRes { + e := dreg.nonbckXacts[apc.ActLRU].New(Args{UUID: id, Custom: ctlmsg}, nil) return dreg.renew(e, nil) } -func RenewStoreCleanup(id string) RenewRes { - e := dreg.nonbckXacts[apc.ActStoreCleanup].New(Args{UUID: id}, nil) +func RenewStoreCleanup(id, ctlmsg string) RenewRes { + e := dreg.nonbckXacts[apc.ActStoreCleanup].New(Args{UUID: id, Custom: ctlmsg}, nil) return dreg.renew(e, nil) } diff --git a/xact/xs/archive.go b/xact/xs/archive.go index 419499ddb92..126bda49a4b 100644 --- a/xact/xs/archive.go +++ b/xact/xs/archive.go @@ -117,7 +117,7 @@ func (p *archFactory) Start() (err error) { avail := fs.GetAvail() r.joggers.m = make(map[string]*jogger, len(avail)) p.xctn = r - r.DemandBase.Init(p.UUID() /*== p.Args.UUID above*/, p.kind, p.Bck /*from*/, xact.IdleDefault) + r.DemandBase.Init(p.UUID(), p.kind, "" /*ctlmsg*/, p.Bck /*from*/, xact.IdleDefault) // TODO ctlmsg: arch, tco if err := p.newDM(p.Args.UUID /*trname*/, r.recv, r.config, cmn.OwtPut, 0 /*pdu*/); err != nil { return err diff --git a/xact/xs/blob_download.go b/xact/xs/blob_download.go index 0a0b9d02467..f9f02aceb96 100644 --- a/xact/xs/blob_download.go +++ b/xact/xs/blob_download.go @@ -176,7 +176,7 @@ func (*blobFactory) New(args xreg.Args, bck *meta.Bck) xreg.Renewable { func (p *blobFactory) Start() error { // reuse the same args-carrying structure and keep filling-in r := p.pre - r.InitBase(p.Args.UUID, p.Kind(), r.args.Lom.Bck()) + r.InitBase(p.Args.UUID, p.Kind(), r.args.Lom.Cname(), r.args.Lom.Bck()) // 2nd (just in time) tune-up var ( diff --git a/xact/xs/brename.go b/xact/xs/brename.go index e4b75365420..e27b6ea7ccc 100644 --- a/xact/xs/brename.go +++ b/xact/xs/brename.go @@ -97,7 +97,7 @@ func newBckRename(uuid, kind, rebID string, bck, bckFrom, bckTo *meta.Bck) (x *b debug.Assert(xact.IsValidRebID(rebID), rebID) x = &bckRename{bckFrom: bckFrom, bckTo: bckTo, rebID: rebID} - x.InitBase(uuid, kind, bck) + x.InitBase(uuid, kind, "via "+rebID /*ctlmsg*/, bck) return } diff --git a/xact/xs/dpromote.go b/xact/xs/dpromote.go index a5f3aa824ce..4b34d5dea89 100644 --- a/xact/xs/dpromote.go +++ b/xact/xs/dpromote.go @@ -55,8 +55,11 @@ func (*proFactory) New(args xreg.Args, bck *meta.Bck) xreg.Renewable { } func (p *proFactory) Start() error { - xctn := &XactDirPromote{p: p} - xctn.BckJog.Init(p.Args.UUID /*global xID*/, apc.ActPromote, p.Bck, &mpather.JgroupOpts{}, cmn.GCO.Get()) + var ( + xctn = &XactDirPromote{p: p} + ctlmsg = p.args.String() + ) + xctn.BckJog.Init(p.Args.UUID /*global xID*/, apc.ActPromote, ctlmsg, p.Bck, &mpather.JgroupOpts{}, cmn.GCO.Get()) p.xctn = xctn return nil } diff --git a/xact/xs/election.go b/xact/xs/election.go index 18505dcda12..25a3344fb9c 100644 --- a/xact/xs/election.go +++ b/xact/xs/election.go @@ -37,7 +37,7 @@ func (*eleFactory) New(xreg.Args, *meta.Bck) xreg.Renewable { return &eleFactory func (p *eleFactory) Start() error { p.xctn = &Election{} - p.xctn.InitBase(cos.GenUUID(), apc.ActElection, nil) + p.xctn.InitBase(cos.GenUUID(), apc.ActElection, "", nil) return nil } diff --git a/xact/xs/etl.go b/xact/xs/etl.go index 85cff69b3fe..7f9ce7675dd 100644 --- a/xact/xs/etl.go +++ b/xact/xs/etl.go @@ -6,6 +6,7 @@ package xs import ( + "fmt" "sync" "github.com/NVIDIA/aistore/api/apc" @@ -39,7 +40,7 @@ func (*etlFactory) New(args xreg.Args, _ *meta.Bck) xreg.Renewable { func (p *etlFactory) Start() error { debug.Assert(cos.IsValidUUID(p.Args.UUID), p.Args.UUID) - p.xctn = newETL(p.Args.UUID, p.Kind()) + p.xctn = newETL(p) return nil } @@ -52,9 +53,13 @@ func (*etlFactory) WhenPrevIsRunning(xreg.Renewable) (xreg.WPR, error) { // (tests only) -func newETL(id, kind string) (xctn *xactETL) { +func newETL(p *etlFactory) (xctn *xactETL) { + var ( + s = p.Args.Custom.(fmt.Stringer) + ctlmsg = s.String() + ) xctn = &xactETL{} - xctn.InitBase(id, kind, nil) + xctn.InitBase(p.Args.UUID, p.Kind(), ctlmsg, nil) return } diff --git a/xact/xs/evict.go b/xact/xs/evict.go index 828644d914a..07fd3742a1a 100644 --- a/xact/xs/evict.go +++ b/xact/xs/evict.go @@ -60,7 +60,7 @@ func newEvictDelete(xargs *xreg.Args, kind string, bck *meta.Bck, msg *apc.ListR if err = ed.lrit.init(ed, msg, bck, lrpWorkersDflt); err != nil { return nil, err } - ed.InitBase(xargs.UUID, kind, bck) + ed.InitBase(xargs.UUID, kind, msg.Str(ed.lrp == lrpPrefix), bck) return ed, nil } diff --git a/xact/xs/lom_warmup.go b/xact/xs/lom_warmup.go index eeaf9e20947..d4a6ae96900 100644 --- a/xact/xs/lom_warmup.go +++ b/xact/xs/lom_warmup.go @@ -69,7 +69,7 @@ func newXactLLC(uuid string, bck *meta.Bck) (r *xactLLC) { DoLoad: mpather.Load, } mpopts.Bck.Copy(bck.Bucket()) - r.BckJog.Init(uuid, apc.ActLoadLomCache, bck, mpopts, cmn.GCO.Get()) + r.BckJog.Init(uuid, apc.ActLoadLomCache, "" /*ctlmsg*/, bck, mpopts, cmn.GCO.Get()) return } diff --git a/xact/xs/lso.go b/xact/xs/lso.go index 9a711b2f426..9ce618474af 100644 --- a/xact/xs/lso.go +++ b/xact/xs/lso.go @@ -113,7 +113,7 @@ func (p *lsoFactory) Start() error { // idle timeout vs delayed next-page request // see also: resetIdle() - r.DemandBase.Init(p.UUID(), apc.ActList, p.Bck, r.config.Timeout.MaxHostBusy.D()) + r.DemandBase.Init(p.UUID(), apc.ActList, p.msg.Str(p.Bck.Cname(p.msg.Prefix)) /*ctlmsg*/, p.Bck, r.config.Timeout.MaxHostBusy.D()) // NOTE: is set by the first message, never changes r.walk.wor = r.msg.WantOnlyRemoteProps() diff --git a/xact/xs/nsumm.go b/xact/xs/nsumm.go index 1815fb8f438..4bd256bb1ac 100644 --- a/xact/xs/nsumm.go +++ b/xact/xs/nsumm.go @@ -140,11 +140,11 @@ func newSumm(p *nsummFactory) (r *XactNsumm, err error) { } } - r.BckJog.Init(p.UUID(), p.Kind(), p.Bck, opts, cmn.GCO.Get()) + ctlmsg := p.msg.Str(p.Bck.Cname(p.msg.Prefix)) + r.BckJog.Init(p.UUID(), p.Kind(), ctlmsg, p.Bck, opts, cmn.GCO.Get()) - s := fmt.Sprintf("-msg-%+v", r.p.msg) - r._nam = r.Base.Name() + s - r._str = r.Base.String() + s + r._nam = r.Base.Name() + "-" + ctlmsg + r._str = r.Base.String() + "-" + ctlmsg return r, nil } diff --git a/xact/xs/prefetch.go b/xact/xs/prefetch.go index a77529c1de4..c3522d4899e 100644 --- a/xact/xs/prefetch.go +++ b/xact/xs/prefetch.go @@ -93,7 +93,7 @@ func newPrefetch(xargs *xreg.Args, kind string, bck *meta.Bck, msg *apc.Prefetch if err != nil { return nil, err } - r.InitBase(xargs.UUID, kind, bck) + r.InitBase(xargs.UUID, kind, msg.Str(r.lrp == lrpPrefix), bck) r.latestVer = bck.VersionConf().ValidateWarmGet || msg.LatestVer if r.msg.BlobThreshold > 0 { diff --git a/xact/xs/rebres.go b/xact/xs/rebres.go index dee87118dfa..a51d26a4e07 100644 --- a/xact/xs/rebres.go +++ b/xact/xs/rebres.go @@ -95,7 +95,9 @@ func (p *rebFactory) WhenPrevIsRunning(prevEntry xreg.Renewable) (wpr xreg.WPR, func newRebalance(p *rebFactory) (xreb *Rebalance, err error) { xreb = &Rebalance{} - xreb.InitBase(p.Args.UUID, p.Kind(), nil) + ctlmsg, ok := p.Args.Custom.(string) + debug.Assert(ok) + xreb.InitBase(p.Args.UUID, p.Kind(), ctlmsg, nil) id, err := xact.S2RebID(p.Args.UUID) if err != nil { @@ -154,7 +156,7 @@ func (*resFactory) WhenPrevIsRunning(xreg.Renewable) (xreg.WPR, error) { return func NewResilver(id, kind string) (xres *Resilver) { xres = &Resilver{} - xres.InitBase(id, kind, nil) + xres.InitBase(id, kind, "" /*ctlmsg*/, nil) return } diff --git a/xact/xs/tcb.go b/xact/xs/tcb.go index 1cb5bdf5e02..b359264b632 100644 --- a/xact/xs/tcb.go +++ b/xact/xs/tcb.go @@ -175,7 +175,8 @@ func newTCB(p *tcbFactory, slab *memsys.Slab, config *cmn.Config, smap *meta.Sma Throttle: true, // always trottling } mpopts.Bck.Copy(p.args.BckFrom.Bucket()) - r.BckJog.Init(p.UUID(), p.kind, p.args.BckTo, mpopts, config) + + r.BckJog.Init(p.UUID(), p.kind, r.nam /*ctlmsg*/, p.args.BckTo, mpopts, config) if p.args.Msg.Sync { debug.Assert(p.args.Msg.Prepend == "", p.args.Msg.Prepend) // validated (cli, P) diff --git a/xact/xs/tcobjs.go b/xact/xs/tcobjs.go index 256a7f57833..0b7bd037c4e 100644 --- a/xact/xs/tcobjs.go +++ b/xact/xs/tcobjs.go @@ -89,7 +89,7 @@ func (p *tcoFactory) Start() error { r.owt = cmn.OwtTransform } p.xctn = r - r.DemandBase.Init(p.UUID(), p.Kind(), p.Bck, xact.IdleDefault) + r.DemandBase.Init(p.UUID(), p.Kind(), "" /*ctlmsg*/, p.Bck, xact.IdleDefault) // TODO ctlmsg: arch, tco var sizePDU int32 if p.kind == apc.ActETLObjects { diff --git a/xact/xs/xaction_test.go b/xact/xs/xaction_test.go index b31cd30894d..cf7ad852ba1 100644 --- a/xact/xs/xaction_test.go +++ b/xact/xs/xaction_test.go @@ -57,7 +57,7 @@ func TestXactionRenewLRU(t *testing.T) { wg.Add(num) for range num { go func() { - xactCh <- xreg.RenewLRU(cos.GenUUID()) + xactCh <- xreg.RenewLRU(cos.GenUUID(), "") wg.Done() }() } @@ -135,7 +135,7 @@ func TestXactionAbortAll(t *testing.T) { xreg.RegBckXact(&xs.TestBmvFactory{}) cos.InitShortID(0) - rnsLRU := xreg.RenewLRU(cos.GenUUID()) + rnsLRU := xreg.RenewLRU(cos.GenUUID(), "") tassert.Errorf(t, !rnsLRU.IsRunning(), "new LRU must be created") rnsRen := xreg.RenewBckRename(bckFrom, bckTo, cos.GenUUID(), 123, "phase") xactBck := rnsRen.Entry.Get() @@ -166,7 +166,7 @@ func TestXactionAbortAllGlobal(t *testing.T) { xreg.RegBckXact(&xs.TestBmvFactory{}) cos.InitShortID(0) - rnsLRU := xreg.RenewLRU(cos.GenUUID()) + rnsLRU := xreg.RenewLRU(cos.GenUUID(), "") tassert.Errorf(t, !rnsLRU.IsRunning(), "new LRU must be created") rnsRen := xreg.RenewBckRename(bckFrom, bckTo, cos.GenUUID(), 123, "phase") xactBck := rnsRen.Entry.Get() @@ -197,7 +197,7 @@ func TestXactionAbortBuckets(t *testing.T) { xreg.RegBckXact(&xs.TestBmvFactory{}) cos.InitShortID(0) - rnsLRU := xreg.RenewLRU(cos.GenUUID()) + rnsLRU := xreg.RenewLRU(cos.GenUUID(), "") tassert.Errorf(t, !rnsLRU.IsRunning(), "new LRU must be created") rns := xreg.RenewBckRename(bckFrom, bckTo, cos.GenUUID(), 123, "phase") xactBck := rns.Entry.Get()