Skip to content

Commit

Permalink
scrub (feature; new)
Browse files Browse the repository at this point in the history
* detailed logs
* part five; prev. commit: f339823

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Dec 14, 2024
1 parent f339823 commit 0634d83
Showing 1 changed file with 129 additions and 40 deletions.
169 changes: 129 additions & 40 deletions cmd/cli/cli/scrub.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package cli
import (
"errors"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
Expand All @@ -25,6 +26,15 @@ import (
"github.com/urfave/cli"
)

// [TODO] in order of priority:
// - hide zero columns
// - emit "100% healthy" when all counters are zero
// - when waiting for a long time, show log filenames (to tail -f)
// - version-changed
// - version-deleted
// - async execution
// - pretty print

type (
scrubCtx struct {
c *cli.Context
Expand All @@ -38,6 +48,21 @@ type (
// timing
ival time.Duration
last atomic.Int64
// detailed log
log struct {
misplaced _log
missing _log
small _log
large _log
}
_many bool
}
_log struct {
fh *os.File
tag string
fn string
cnt int
mu sync.Mutex
}
// fields exported => teb/template
scrubOne struct {
Expand Down Expand Up @@ -111,70 +136,83 @@ func scrubHandler(c *cli.Context) (err error) {
qflprn(smallSizeFlag), cos.ToSizeIEC(ctx.small, 0))
}

// TODO -- FIXME: support async execution
if err := ctx.createLogs(); err != nil {
return err
}

if ctx.qbck.IsBucket() {
return waitForFunc(ctx.one, ctx.ival)
err = waitForFunc(ctx.one, ctx.ival)
} else {
err = waitForFunc(ctx.many, ctx.ival)
}
return waitForFunc(ctx.many, ctx.ival)

ctx.closeLogs(c)
return err
}

//////////////
// scrubOne //
// scrubCtx //
//////////////

func (scr *scrubOne) upd(en *cmn.LsoEnt, bprops *cmn.Bprops) {
scr.Listed++
if !en.IsStatusOK() {
scr.Stats.Misplaced++
return
}
if bprops.Mirror.Enabled && en.Copies < int16(bprops.Mirror.Copies) {
scr.Stats.MissingCp++
}
if en.Size <= scr.parent.small {
scr.Stats.SmallSz++
} else if en.Size >= scr.parent.large {
scr.Stats.LargeSz++
func (ctx *scrubCtx) createLogs() error {
ctx.log.misplaced.tag, ctx.log.missing.tag, ctx.log.small.tag, ctx.log.large.tag =
"misplaced", "missing", "small", "large"

for _, log := range []*_log{&ctx.log.misplaced, &ctx.log.missing, &ctx.log.small, &ctx.log.large} {
if err := ctx._create(log); err != nil {
return err
}
}
return nil
}

func (scr *scrubOne) toSB(sb *strings.Builder, total int) {
sb.WriteString(scr.Bck.Cname(""))
sb.WriteString(": scrubbed ")
sb.WriteString(cos.FormatBigNum(total))
sb.WriteString(" names")

var scr0 scrubOne
if scr.Stats == scr0.Stats {
return
func (ctx *scrubCtx) closeLogs(c *cli.Context) {
var titled bool
for _, log := range []*_log{&ctx.log.misplaced, &ctx.log.missing, &ctx.log.small, &ctx.log.large} {
cos.Close(log.fh)
if log.cnt == 0 {
cos.RemoveFile(log.fn)
continue
}
if !titled {
const title = "Detailed Logs"
fmt.Fprintln(c.App.Writer)
fmt.Fprintln(c.App.Writer, fcyan(title))
fmt.Fprintln(c.App.Writer, strings.Repeat("-", len(title)))
titled = true
}
fmt.Fprintf(c.App.Writer, "* %s objects: %s (%d record%s)\n", log.tag, log.fn, log.cnt, cos.Plural(log.cnt))
}

sb.WriteByte(' ')
s := fmt.Sprintf("%+v", scr.Stats)
sb.WriteString(s)
}

//////////////
// scrubCtx //
//////////////
func (*scrubCtx) _create(log *_log) (err error) {
fn := fmt.Sprintf(".ais-scrub-%s.%d.log", log.tag, os.Getpid())
log.fn = filepath.Join(os.TempDir(), fn)
log.fh, err = cos.CreateFile(log.fn)
return err
}

func (ctx *scrubCtx) many() error {
bcks, err := api.ListBuckets(apiBP, ctx.qbck, apc.FltPresent)
if err != nil {
return V(err)
}
num := len(bcks)
if num == 1 {
ctx.qbck = cmn.QueryBcks(bcks[0])
return ctx.one()
}
debug.Assert(num > 1)

// many
ctx._many = true
var (
num = len(bcks)
wg = cos.NewLimitedWaitGroup(sys.NumCPU(), num)
mu = &sync.Mutex{}
wg = cos.NewLimitedWaitGroup(sys.NumCPU(), num)
mu = &sync.Mutex{}
)
ctx.scrubs = make([]*scrubOne, 0, num)
for i := range bcks {
bck := bcks[i]
if ctx.qbck.Name != "" && !ctx.qbck.Equal(&bck) {
continue
}

wg.Add(1)
go ctx.gols(bck, wg, mu)
}
Expand Down Expand Up @@ -294,3 +332,54 @@ func (ctx *scrubCtx) ls(bck cmn.Bck) (*scrubOne, error) {

return scr, nil
}

//////////////
// scrubOne //
//////////////

func (scr *scrubOne) upd(en *cmn.LsoEnt, bprops *cmn.Bprops) {
scr.Listed++
if !en.IsStatusOK() {
scr.Stats.Misplaced++
scr.log(&scr.parent.log.misplaced, scr.Bck.Cname(en.Name))
return
}
if bprops.Mirror.Enabled && en.Copies < int16(bprops.Mirror.Copies) {
scr.Stats.MissingCp++
scr.log(&scr.parent.log.missing, scr.Bck.Cname(en.Name))
}
if en.Size <= scr.parent.small {
scr.Stats.SmallSz++
scr.log(&scr.parent.log.small, scr.Bck.Cname(en.Name))
} else if en.Size >= scr.parent.large {
scr.Stats.LargeSz++
scr.log(&scr.parent.log.large, scr.Bck.Cname(en.Name))
}
}

func (scr *scrubOne) log(to *_log, s string) {
if scr.parent._many {
to.mu.Lock()
}
fmt.Fprintln(to.fh, s)
to.cnt++
if scr.parent._many {
to.mu.Unlock()
}
}

func (scr *scrubOne) toSB(sb *strings.Builder, total int) {
sb.WriteString(scr.Bck.Cname(""))
sb.WriteString(": scrubbed ")
sb.WriteString(cos.FormatBigNum(total))
sb.WriteString(" names")

var scr0 scrubOne
if scr.Stats == scr0.Stats {
return
}

sb.WriteByte(' ')
s := fmt.Sprintf("%+v", scr.Stats)
sb.WriteString(s)
}

0 comments on commit 0634d83

Please sign in to comment.