Skip to content

Commit

Permalink
scrub (feature)
Browse files Browse the repository at this point in the history
* part ten, prev. commit: 5417ebe

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Jan 2, 2025
1 parent dca0b76 commit a7ce27f
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 56 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
uses: golangci/golangci-lint-action@v3
with:
# NOTE: See `lint-update-ci` target in Makefile.
version: v1.63.0
version: v1.63.2
args: --timeout=30m

- name: Lint
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ lint-update:
## See also: .github/workflows/lint.yml
lint-update-ci:
@rm -f $(GOPATH)/bin/golangci-lint
@curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(GOPATH)/bin v1.63.0
@curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(GOPATH)/bin v1.63.2

lint:
@([[ -x "$(command -v golangci-lint)" ]] && echo "Cannot find golangci-lint, run 'make lint-update' to install" && exit 1) || true
Expand Down
128 changes: 77 additions & 51 deletions cmd/cli/cli/scrub.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -26,14 +27,15 @@ import (
"github.com/urfave/cli"
)

// [TODO] in order of priority:
// - hide zero columns
// - emit "100% healthy" when all counters are zero
// [TODO]
// - when waiting for a long time, show log filenames (to tail -f)
// - version-changed
// - version-deleted
// - async execution
// - pretty print
// - async execution, with --wait option
// - add options:
// --mountpath-misplaced
// --checksum
// --fix (***)
// - speed-up `ls` via multiple workers (***)
// - reuse strings.Builder buf, here and elsewhere

type (
_log struct {
Expand All @@ -44,13 +46,13 @@ type (
mu sync.Mutex
}
// fields exported => teb/template
scrubOne teb.ScrubOne
scrBp teb.ScrBp
)

type (
scrubCtx struct {
scrCtx struct {
c *cli.Context
scrubs []*scrubOne
scrubs []*scrBp
qbck cmn.QueryBcks
pref string
units string
Expand All @@ -69,7 +71,7 @@ type (

func scrubHandler(c *cli.Context) (err error) {
var (
ctx = scrubCtx{c: c}
ctx = scrCtx{c: c}
uri = preparseBckObjURI(c.Args().Get(0))
)
ctx.qbck, ctx.pref, err = parseQueryBckURI(uri)
Expand Down Expand Up @@ -141,10 +143,10 @@ func scrubHandler(c *cli.Context) (err error) {
}

//////////////
// scrubCtx //
// scrCtx //
//////////////

func (ctx *scrubCtx) createLogs() error {
func (ctx *scrCtx) createLogs() error {
pid := os.Getpid()
for i := 1; i < len(ctx.logs); i++ { // skipping listed objects
log := &ctx.logs[i]
Expand All @@ -161,14 +163,14 @@ func (ctx *scrubCtx) createLogs() error {
return nil
}

func (*scrubCtx) _create(log *_log, pid int) (err error) {
func (*scrCtx) _create(log *_log, pid int) (err error) {
fn := fmt.Sprintf(".ais-scrub-%s.%d.log", log.tag, pid)
log.fn = filepath.Join(os.TempDir(), fn)
log.fh, err = cos.CreateFile(log.fn)
return err
}

func (ctx *scrubCtx) closeLogs(c *cli.Context) {
func (ctx *scrCtx) closeLogs(c *cli.Context) {
var titled bool
for i := 1; i < len(ctx.logs); i++ { // skipping listed objects
log := &ctx.logs[i]
Expand All @@ -188,7 +190,7 @@ func (ctx *scrubCtx) closeLogs(c *cli.Context) {
}
}

func (ctx *scrubCtx) many() error {
func (ctx *scrCtx) many() error {
bcks, err := api.ListBuckets(apiBP, ctx.qbck, apc.FltPresent)
if err != nil {
return V(err)
Expand All @@ -206,7 +208,7 @@ func (ctx *scrubCtx) many() error {
wg = cos.NewLimitedWaitGroup(sys.NumCPU(), num)
mu = &sync.Mutex{}
)
ctx.scrubs = make([]*scrubOne, 0, num)
ctx.scrubs = make([]*scrBp, 0, num)
for i := range bcks {
bck := bcks[i]
wg.Add(1)
Expand All @@ -218,18 +220,18 @@ func (ctx *scrubCtx) many() error {
}

// print and be done
func (ctx *scrubCtx) prnt() error {
out := make([]*teb.ScrubOne, len(ctx.scrubs))
func (ctx *scrCtx) prnt() error {
out := make([]*teb.ScrBp, len(ctx.scrubs))
for i, scr := range ctx.scrubs {
out[i] = (*teb.ScrubOne)(scr)
out[i] = (*teb.ScrBp)(scr)
}
all := teb.ScrubHelper{All: out}
tab := all.MakeTab(ctx.units, ctx.haveRemote.Load())

return teb.Print(out, tab.Template(flagIsSet(ctx.c, noHeaderFlag)))
}

func (ctx *scrubCtx) gols(bck cmn.Bck, wg cos.WG, mu *sync.Mutex) {
func (ctx *scrCtx) gols(bck cmn.Bck, wg cos.WG, mu *sync.Mutex) {
defer wg.Done()
scr, err := ctx.ls(bck)
if err != nil {
Expand All @@ -242,37 +244,38 @@ func (ctx *scrubCtx) gols(bck cmn.Bck, wg cos.WG, mu *sync.Mutex) {
mu.Unlock()
}

func (ctx *scrubCtx) one() error {
func (ctx *scrCtx) one() error {
scr, err := ctx.ls(cmn.Bck(ctx.qbck))
if err != nil {
return err
}

ctx.scrubs = []*scrubOne{scr}
ctx.scrubs = []*scrBp{scr}
return ctx.prnt()
}

func (ctx *scrubCtx) ls(bck cmn.Bck) (*scrubOne, error) {
func (ctx *scrCtx) ls(bck cmn.Bck) (*scrBp, error) {
bprops, errV := headBucket(bck, true /* don't add */)
if errV != nil {
return nil, errV
}
bck.Props = bprops

var (
lsargs api.ListArgs
scr = &scrubOne{Bck: bck, Prefix: ctx.pref}
scr = &scrBp{Bck: bck, Prefix: ctx.pref}
lsmsg = &apc.LsoMsg{
Prefix: ctx.pref,
Flags: apc.LsMissing,
}
)
bck.Props = bprops
propNames := []string{apc.GetPropsName, apc.GetPropsSize, apc.GetPropsAtime, apc.GetPropsLocation, apc.GetPropsCustom}
if bck.IsRemote() {
lsmsg.Flags |= apc.LsObjCached | apc.LsVerChanged
lsmsg.AddProps(apc.GetPropsName, apc.GetPropsSize, apc.GetPropsCustom)
ctx.haveRemote.Store(true)
lsmsg.AddProps(propNames...)
ctx.haveRemote.Store(true) // incl. version-changed etc. columns
} else {
lsmsg.AddProps(apc.GetPropsName, apc.GetPropsSize)
lsmsg.AddProps(propNames[:len(propNames)-1]...) // minus apc.GetPropsCustom
}

pageSize, maxPages, limit, err := _setPage(ctx.c, bck)
Expand All @@ -287,7 +290,7 @@ func (ctx *scrubCtx) ls(bck cmn.Bck) (*scrubOne, error) {
listed int
yelped bool
)
// pages
// main loop: pages
for {
lst, err := api.ListObjectsPage(apiBP, bck, lsmsg, lsargs)
if err != nil {
Expand All @@ -299,7 +302,7 @@ func (ctx *scrubCtx) ls(bck cmn.Bck) (*scrubOne, error) {
continue
}
debug.Assert(en.IsPresent(), bck.Cname(en.Name), " must be present") // (LsObjCached)
scr.upd(ctx, en, bprops)
scr.upd(ctx, en, &bck)
}

if lsmsg.ContinuationToken == "" {
Expand Down Expand Up @@ -354,64 +357,87 @@ func (ctx *scrubCtx) ls(bck cmn.Bck) (*scrubOne, error) {
}

//////////////
// scrubOne //
// scrBp //
//////////////

func (scr *scrubOne) upd(parent *scrubCtx, en *cmn.LsoEnt, bprops *cmn.Bprops) {
func (scr *scrBp) upd(parent *scrCtx, en *cmn.LsoEnt, bck *cmn.Bck) {
scr.Stats[teb.ScrObjects].Cnt++
scr.Stats[teb.ScrObjects].Siz += en.Size

if !en.IsStatusOK() {
scr.Stats[teb.ScrMisplaced].Cnt++
scr.Stats[teb.ScrMisplaced].Siz += en.Size
scr.log(&parent.logs[teb.ScrMisplaced], scr.Bck.Cname(en.Name), parent._many)
return // no further checking
scr.log(parent, en, bck, teb.ScrMisplaced)
// no further checking
return
}

if bprops.Mirror.Enabled && en.Copies < int16(bprops.Mirror.Copies) {
if bck.Props.Mirror.Enabled && en.Copies < int16(bck.Props.Mirror.Copies) {
scr.Stats[teb.ScrMissingCp].Cnt++
scr.log(&parent.logs[teb.ScrMissingCp], scr.Bck.Cname(en.Name), parent._many)
scr.log(parent, en, bck, teb.ScrMissingCp)
}

if en.Size <= parent.small {
scr.Stats[teb.ScrSmallSz].Cnt++
scr.Stats[teb.ScrSmallSz].Siz += en.Size
scr.log(&parent.logs[teb.ScrSmallSz], scr.Bck.Cname(en.Name), parent._many)
scr.log(parent, en, bck, teb.ScrSmallSz)
} else if en.Size >= parent.large {
scr.Stats[teb.ScrLargeSz].Cnt++
scr.Stats[teb.ScrLargeSz].Siz += en.Size
scr.log(&parent.logs[teb.ScrLargeSz], scr.Bck.Cname(en.Name), parent._many)
scr.log(parent, en, bck, teb.ScrLargeSz)
}

if en.IsVerChanged() {
scr.Stats[teb.ScrVchanged].Cnt++
scr.Stats[teb.ScrVchanged].Siz += en.Size
scr.log(&parent.logs[teb.ScrVchanged], scr.Bck.Cname(en.Name), parent._many)
scr.log(parent, en, bck, teb.ScrVchanged)
} else if en.IsVerRemoved() {
scr.Stats[teb.ScrVremoved].Cnt++
scr.Stats[teb.ScrVremoved].Siz += en.Size
scr.log(&parent.logs[teb.ScrVremoved], scr.Bck.Cname(en.Name), parent._many)
scr.log(parent, en, bck, teb.ScrVremoved)
}
}

func (*scrubOne) log(to *_log, s string, lock bool) {
if lock {
to.mu.Lock()
}
fmt.Fprintln(to.fh, s)
to.cnt++
if lock {
to.mu.Unlock()
const (
logTitle = "Name,Size,Atime,Location"
delim = `","`
)

func (*scrBp) log(parent *scrCtx, en *cmn.LsoEnt, bck *cmn.Bck, i int) {
log := &parent.logs[i]
if parent._many {
log.mu.Lock()
}
if log.cnt == 0 {
fmt.Fprintln(log.fh, logTitle)
fmt.Fprintln(log.fh, strings.Repeat("=", len(logTitle)))
}

var sb strings.Builder
sb.Grow(256)
sb.WriteByte('"')
sb.WriteString(bck.Cname(en.Name))
sb.WriteString(delim)
sb.WriteString(strconv.FormatInt(en.Size, 10))
sb.WriteString(delim)
sb.WriteString(en.Atime)
sb.WriteString(delim)
sb.WriteString(en.Location)
sb.WriteByte('"')
fmt.Fprintln(log.fh, sb.String())
log.cnt++
if parent._many {
log.mu.Unlock()
}
}

func (scr *scrubOne) toSB(sb *strings.Builder, total int) {
func (scr *scrBp) toSB(sb *strings.Builder, total int) {
sb.WriteString(scr.Bck.Cname(""))
sb.WriteString(": scrubbed ")
sb.WriteString(cos.FormatBigNum(total))
sb.WriteString(" names")

var scr0 scrubOne
var scr0 scrBp
if scr.Stats == scr0.Stats {
return
}
Expand Down
6 changes: 3 additions & 3 deletions cmd/cli/teb/scrub.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ type (
Cnt int64
Siz int64
}
ScrubOne struct {
ScrBp struct {
Bck cmn.Bck
Prefix string
Stats [ScrNumStats]CntSiz
}
ScrubHelper struct {
All []*ScrubOne
All []*ScrBp
}
)

Expand Down Expand Up @@ -124,7 +124,7 @@ func (*ScrubHelper) _hideCol(cols []*header, name string) {
// format values
const zeroCnt = "-"

func (*ScrubOne) fmtVal(v CntSiz, units string) string {
func (*ScrBp) fmtVal(v CntSiz, units string) string {
if v.Cnt == 0 {
return zeroCnt
}
Expand Down

0 comments on commit a7ce27f

Please sign in to comment.