Skip to content

Commit

Permalink
[API change] core: xactions (jobs) to provide initiating control msg
Browse files Browse the repository at this point in the history
* observability:
* add `ctlmsg` to all supported x-kinds
* get-xact APIs and friends: return as part of xaction "snap"
* separately, CLI:
  - minor ref around xaction templates

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Dec 15, 2024
1 parent 0634d83 commit 8690876
Show file tree
Hide file tree
Showing 40 changed files with 234 additions and 75 deletions.
21 changes: 17 additions & 4 deletions ais/tgtspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package ais

import (
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -92,11 +93,17 @@ func (t *target) OOS(csRefreshed *fs.CapStatus, config *cmn.Config, tcdf *fs.Tcd
}

func (t *target) runLRU(id string, wg *sync.WaitGroup, force bool, bcks ...cmn.Bck) {
regToIC := id == ""
var (
ctlmsg string
regToIC = id == ""
)
if regToIC {
id = cos.GenUUID()
}
rns := xreg.RenewLRU(id)
if len(bcks) > 0 {
ctlmsg = fmt.Sprintf("%v", bcks)
}
rns := xreg.RenewLRU(id, ctlmsg)
if rns.Err != nil || rns.IsRunning() {
debug.Assert(rns.Err == nil || cmn.IsErrXactUsePrev(rns.Err))
if wg != nil {
Expand Down Expand Up @@ -129,11 +136,17 @@ func (t *target) runLRU(id string, wg *sync.WaitGroup, force bool, bcks ...cmn.B
}

func (t *target) runSpaceCleanup(xargs *xact.ArgsMsg, wg *sync.WaitGroup) fs.CapStatus {
regToIC := xargs.ID != ""
var (
ctlmsg string
regToIC = xargs.ID != ""
)
if !regToIC {
xargs.ID = cos.GenUUID()
}
rns := xreg.RenewStoreCleanup(xargs.ID)
if len(xargs.Buckets) > 0 {
ctlmsg = fmt.Sprintf("%v", xargs.Buckets)
}
rns := xreg.RenewStoreCleanup(xargs.ID, ctlmsg)
if rns.Err != nil || rns.IsRunning() {
debug.Assert(rns.Err == nil || cmn.IsErrXactUsePrev(rns.Err))
if wg != nil {
Expand Down
18 changes: 18 additions & 0 deletions api/apc/bsummary.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
*/
package apc

import "strings"

type (
// to generate bucket summary (or summaries)
BsummCtrlMsg struct {
Expand Down Expand Up @@ -35,3 +37,19 @@ type (
IsBckPresent bool `json:"is_present"` // in BMD
}
)

func (msg *BsummCtrlMsg) Str(cname string) string {
var sb strings.Builder
sb.Grow(64)
sb.WriteString(cname)
if msg.ObjCached {
sb.WriteString(", cached")
}
if msg.BckPresent {
sb.WriteString(", bck-present")
}
if msg.DontAddRemote {
sb.WriteString(", don't-add")
}
return sb.String()
}
39 changes: 39 additions & 0 deletions api/apc/lsmsg.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,45 @@ func (lsmsg *LsoMsg) PropsSet() (s cos.StrSet) {
return s
}

func (lsmsg *LsoMsg) Str(cname string) string {
var sb strings.Builder
sb.Grow(80)

sb.WriteString(cname)
if lsmsg.Props != "" {
sb.WriteString(", props:")
sb.WriteString(lsmsg.Props)
}
if lsmsg.Flags == 0 {
return sb.String()
}

sb.WriteString(", flags:")
if lsmsg.IsFlagSet(LsObjCached) {
sb.WriteString("cached,")
}
if lsmsg.IsFlagSet(LsMissing) {
sb.WriteString("missing,")
}
if lsmsg.IsFlagSet(LsArchDir) {
sb.WriteString("arch,")
}
if lsmsg.IsFlagSet(LsBckPresent) {
sb.WriteString("bck-present,")
}
if lsmsg.IsFlagSet(LsDontAddRemote) {
sb.WriteString("skip-lookup,")
}
if lsmsg.IsFlagSet(LsNoRecursion) {
sb.WriteString("no-recurs,")
}
if lsmsg.IsFlagSet(LsVerChanged) {
sb.WriteString("version-changed,")
}
s := sb.String()
return s[:len(s)-1]
}

// LsoMsg flags enum: LsObjCached, ...
func (lsmsg *LsoMsg) SetFlag(flag uint64) { lsmsg.Flags |= flag }
func (lsmsg *LsoMsg) ClearFlag(flag uint64) { lsmsg.Flags &= ^flag }
Expand Down
34 changes: 34 additions & 0 deletions api/apc/multiobj.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@
*/
package apc

import (
"fmt"
"strconv"
"strings"

"github.com/NVIDIA/aistore/cmn/cos"
)

// (common for all multi-object operations)
type (
// List of object names _or_ a template specifying { optional Prefix, zero or more Ranges }
Expand All @@ -20,6 +28,17 @@ type (
func (lrm *ListRange) IsList() bool { return len(lrm.ObjNames) > 0 }
func (lrm *ListRange) HasTemplate() bool { return lrm.Template != "" }

func (lrm *ListRange) Str(isPrefix bool) string {
switch {
case isPrefix:
return "prefix: " + lrm.Template
case lrm.IsList():
return fmt.Sprintf("list: %v", lrm.ObjNames)
default:
return "template: " + lrm.Template
}
}

// prefetch
type PrefetchMsg struct {
ListRange
Expand All @@ -29,6 +48,21 @@ type PrefetchMsg struct {
LatestVer bool `json:"latest-ver"` // when true & in-cluster: check with remote whether (deleted | version-changed)
}

func (msg *PrefetchMsg) Str(isPrefix bool) string {
var sb strings.Builder
sb.Grow(80)
sb.WriteString(msg.ListRange.Str(isPrefix))
if msg.BlobThreshold > 0 {
sb.WriteString(", blob-threshold: ")
sb.WriteString(cos.ToSizeIEC(msg.BlobThreshold, 0))
}
if msg.NumWorkers > 0 {
sb.WriteString(", workers: ")
sb.WriteString(strconv.Itoa(msg.NumWorkers))
}
return sb.String()
}

// ArchiveMsg contains the parameters (all except the destination bucket)
// for archiving mutiple objects as one of the supported archive.FileExtensions types
// at the specified (bucket) destination.
Expand Down
32 changes: 31 additions & 1 deletion api/apc/promote.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
// Package apc: API control messages and constants
/*
* Copyright (c) 2018-2022, NVIDIA CORPORATION. All rights reserved.
* Copyright (c) 2018-2024, NVIDIA CORPORATION. All rights reserved.
*/
package apc

import "strings"

// common part that's used in `api.PromoteArgs` and `PromoteParams`(server side), both
type PromoteArgs struct {
DaemonID string `json:"tid,omitempty"` // target ID
Expand All @@ -18,3 +20,31 @@ type PromoteArgs struct {
// (auto-detection takes time, etc.)
SrcIsNotFshare bool `json:"notshr,omitempty"` // the source is not a file share equally accessible by all targets
}

func (msg *PromoteArgs) String() string {
var sb strings.Builder
sb.Grow(160)
sb.WriteString("src: ")
sb.WriteString(msg.SrcFQN)
sb.WriteString(", dst: ")
sb.WriteString(msg.ObjName)
if msg.DaemonID != "" {
sb.WriteString(", node: ")
sb.WriteString(msg.DaemonID)
}
if msg.Recursive {
sb.WriteString(", recurs")
} else {
sb.WriteString(", non-recurs")
}
if msg.OverwriteDst {
sb.WriteString(", overwrite")
}
if msg.DeleteSrc {
sb.WriteString(", delete-src")
}
if msg.SrcIsNotFshare {
sb.WriteString(", not-file-share")
}
return sb.String()
}
6 changes: 3 additions & 3 deletions cmd/cli/cli/show_hdlr.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ const showJobUsage = "show running and/or finished jobs\n" +
indent1 + tabHelpOpt + "."

type (
daemonTemplateXactSnaps struct {
nodeSnaps struct {
DaemonID string
XactSnaps []*core.Snap
}
Expand Down Expand Up @@ -491,7 +491,7 @@ func xlistByKindID(c *cli.Context, xargs *xact.ArgsMsg, caption bool, xs xact.Mu
// second, filteredXs => dts templates
var (
fromToBck, haveBck bool
dts = make([]daemonTemplateXactSnaps, 0, len(filteredXs))
dts = make([]nodeSnaps, 0, len(filteredXs))
)
for tid, snaps := range filteredXs {
if len(snaps) == 0 {
Expand All @@ -506,7 +506,7 @@ func xlistByKindID(c *cli.Context, xargs *xact.ArgsMsg, caption bool, xs xact.Mu
} else if !snaps[0].Bck.IsEmpty() {
haveBck = true
}
dts = append(dts, daemonTemplateXactSnaps{DaemonID: tid, XactSnaps: snaps})
dts = append(dts, nodeSnaps{DaemonID: tid, XactSnaps: snaps})
}
sort.Slice(dts, func(i, j int) bool {
return dts[i].DaemonID < dts[j].DaemonID // ascending by node id/name
Expand Down
12 changes: 6 additions & 6 deletions cmd/cli/teb/templates.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,11 @@ const (
// all other xactions
//
XactBucketTmpl = xactBucketHdr + XactNoHdrBucketTmpl
XactNoHdrBucketTmpl = "{{range $daemon := . }}" + xactBucketBodyAll + "{{end}}"
XactNoHdrBucketTmpl = "{{range $nodeSnaps := . }}" + xactBucketBodyAll + "{{end}}"

xactBucketHdr = "NODE\t ID\t KIND\t BUCKET\t OBJECTS\t BYTES\t START\t END\t STATE\n"
xactBucketBodyAll = "{{range $key, $xctn := $daemon.XactSnaps}}" + xactBucketBodyOne + "{{end}}"
xactBucketBodyOne = "{{ $daemon.DaemonID }}\t " +
xactBucketBodyAll = "{{range $key, $xctn := $nodeSnaps.XactSnaps}}" + xactBucketBodyOne + "{{end}}"
xactBucketBodyOne = "{{ $nodeSnaps.DaemonID }}\t " +
"{{if $xctn.ID}}{{$xctn.ID}}{{else}}-{{end}}\t " +
"{{$xctn.Kind}}\t " +
"{{FormatBckName $xctn.Bck}}\t " +
Expand All @@ -132,11 +132,11 @@ const (

// same as above except for: src-bck, dst-bck columns
XactFromToTmpl = xactFromToHdr + XactNoHdrFromToTmpl
XactNoHdrFromToTmpl = "{{range $daemon := . }}" + xactFromToBodyAll + "{{end}}"
XactNoHdrFromToTmpl = "{{range $nodeSnaps := . }}" + xactFromToBodyAll + "{{end}}"

xactFromToHdr = "NODE\t ID\t KIND\t SRC BUCKET\t DST BUCKET\t OBJECTS\t BYTES\t START\t END\t STATE\n"
xactFromToBodyAll = "{{range $key, $xctn := $daemon.XactSnaps}}" + xactFromToBodyOne + "{{end}}"
xactFromToBodyOne = "{{ $daemon.DaemonID }}\t " +
xactFromToBodyAll = "{{range $key, $xctn := $nodeSnaps.XactSnaps}}" + xactFromToBodyOne + "{{end}}"
xactFromToBodyOne = "{{ $nodeSnaps.DaemonID }}\t " +
"{{if $xctn.ID}}{{$xctn.ID}}{{else}}-{{end}}\t " +
"{{$xctn.Kind}}\t " +
"{{FormatBckName $xctn.SrcBck}}\t " +
Expand Down
2 changes: 1 addition & 1 deletion core/mock/xact_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (*XactMock) Run(*sync.WaitGroup) {

func NewXact(kind string) *XactMock {
xctn := &XactMock{}
xctn.InitBase(cos.GenUUID(), kind, nil)
xctn.InitBase(cos.GenUUID(), kind, "", nil)
return xctn
}

Expand Down
1 change: 1 addition & 0 deletions core/xaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ type (
DstBck cmn.Bck `json:"dst-bck"`
ID string `json:"id"`
Kind string `json:"kind"`
CtlMsg string `json:"ctlmsg,omitempty"` // added v3.26

// extended error info
AbortErr string `json:"abort-err"`
Expand Down
4 changes: 3 additions & 1 deletion ec/bencodex.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,16 +98,18 @@ func (p *encFactory) WhenPrevIsRunning(prevEntry xreg.Renewable) (wpr xreg.WPR,
///////////////////

func newXactBckEncode(bck *meta.Bck, uuid string, checkAndRecover bool) (r *XactBckEncode, err error) {
var ctlmsg string
r = &XactBckEncode{
bck: bck,
wg: &sync.WaitGroup{},
smap: core.T.Sowner().Get(),
checkAndRecover: checkAndRecover,
}
if checkAndRecover {
ctlmsg = "recover"
r.probFilter = prob.NewDefaultFilter()
}
r.InitBase(uuid, apc.ActECEncode, bck)
r.InitBase(uuid, apc.ActECEncode, ctlmsg, bck)

if err = bck.Init(core.T.Bowner()); err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion ec/getx.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (*getFactory) New(_ xreg.Args, bck *meta.Bck) xreg.Renewable {

func (p *getFactory) Start() error {
xec := ECM.NewGetXact(p.Bck.Bucket())
xec.DemandBase.Init(cos.GenUUID(), p.Kind(), p.Bck, 0 /*use default*/)
xec.DemandBase.Init(cos.GenUUID(), p.Kind(), "" /*ctlmsg*/, p.Bck, 0 /*use default*/)
p.xctn = xec

xact.GoRunW(xec)
Expand Down
2 changes: 1 addition & 1 deletion ec/putx.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (*putFactory) New(_ xreg.Args, bck *meta.Bck) xreg.Renewable {

func (p *putFactory) Start() error {
xec := ECM.NewPutXact(p.Bck.Bucket())
xec.DemandBase.Init(cos.GenUUID(), p.Kind(), p.Bck, 0 /*use default*/)
xec.DemandBase.Init(cos.GenUUID(), p.Kind(), "" /*ctlmsg*/, p.Bck, 0 /*use default*/)
p.xctn = xec

xact.GoRunW(xec)
Expand Down
2 changes: 1 addition & 1 deletion ec/respondx.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (p *rspFactory) WhenPrevIsRunning(xprev xreg.Renewable) (xreg.WPR, error) {

func (p *rspFactory) Start() error {
xec := ECM.NewRespondXact(p.Bck.Bucket())
xec.DemandBase.Init(cos.GenUUID(), p.Kind(), p.Bck, 0 /*use default*/)
xec.DemandBase.Init(cos.GenUUID(), p.Kind(), "" /*ctlmsg*/, p.Bck, 0 /*use default*/)
p.xctn = xec

xact.GoRunW(xec)
Expand Down
2 changes: 1 addition & 1 deletion ext/dload/xact.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (*factory) WhenPrevIsRunning(xreg.Renewable) (xreg.WPR, error) {
func newXact(p *factory) (xld *Xact) {
xld = &Xact{p: p}
xld.dispatcher = newDispatcher(xld)
xld.DemandBase.Init(p.UUID(), apc.Download, p.bck, 0 /*use default*/)
xld.DemandBase.Init(p.UUID(), apc.Download, "" /*ctlmsg*/, p.bck, 0 /*use default*/)
return
}

Expand Down
2 changes: 1 addition & 1 deletion ext/dsort/xact.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (p *factory) Start() error {
args, ok := custom.(*xreg.DsortArgs)
debug.Assert(ok)
p.xctn = &xaction{args: args}
p.xctn.InitBase(p.UUID(), apc.ActDsort, args.BckTo /*compare w/ tcb and tco*/)
p.xctn.InitBase(p.UUID(), apc.ActDsort, "" /*ctlmsg*/, args.BckTo /*compare w/ tcb and tco*/)

g.once.Do(func() {
hk.Reg(apc.ActDsort+hk.NameSuffix, g.mg.housekeep, hk.DayInterval)
Expand Down
8 changes: 4 additions & 4 deletions mirror/makencopies.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,12 @@ func newMNC(p *mncFactory, slab *memsys.Slab) (r *mncXact) {
Throttle: true,
}
mpopts.Bck.Copy(p.Bck.Bucket())
r.BckJog.Init(p.UUID(), apc.ActMakeNCopies, p.Bck, mpopts, cmn.GCO.Get())
s := fmt.Sprintf("%s-copies-%d", r.p.args.Tag, r.p.args.Copies)
r.BckJog.Init(p.UUID(), apc.ActMakeNCopies, s /*ctlmsg*/, p.Bck, mpopts, cmn.GCO.Get())

// name
s := fmt.Sprintf("-%s-copies-%d", r.p.args.Tag, r.p.args.Copies)
r._nam = r.Base.Name() + s
r._str = r.Base.String() + s
r._nam = r.Base.Name() + "-" + s
r._str = r.Base.String() + "-" + s
return r
}

Expand Down
2 changes: 1 addition & 1 deletion mirror/put_copies.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (p *putFactory) Start() error {
// is Ok (compare with x-archive, x-tco)
beid = cos.GenUUID()
}
r.DemandBase.Init(beid, p.Kind(), bck, xact.IdleDefault)
r.DemandBase.Init(beid, p.Kind(), "" /*ctlmsg*/, bck, xact.IdleDefault)

// joggers
r.workers = mpather.NewWorkerGroup(&mpather.WorkerGroupOpts{
Expand Down
Loading

0 comments on commit 8690876

Please sign in to comment.