From 48869985429992403ca1fdf12e90db483c723625 Mon Sep 17 00:00:00 2001 From: Tony Chen Date: Tue, 14 Jan 2025 14:07:16 -0800 Subject: [PATCH] ishard: maintenance * add make for the binary executable * refractor CLI config parsing * extend test cases Signed-off-by: Tony Chen --- Makefile | 5 ++ cmd/ishard/ishard/config/config.go | 45 ++++++----- cmd/ishard/ishard/config/sample.go | 13 +++- cmd/ishard/ishard/dir.go | 84 +++++++++++++++++++++ cmd/ishard/ishard/factory/factory.go | 6 +- cmd/ishard/ishard/ishard.go | 107 +++------------------------ cmd/ishard/ishard/ishard_test.go | 77 ++++++++++++++++--- 7 files changed, 201 insertions(+), 136 deletions(-) create mode 100644 cmd/ishard/ishard/dir.go diff --git a/Makefile b/Makefile index 17e48f9592..fcc98311b8 100644 --- a/Makefile +++ b/Makefile @@ -129,6 +129,11 @@ cli-autocompletions: ## Add CLI autocompletions @echo "Adding CLI autocomplete..." @./$(BUILD_DIR)/cli/autocomplete/install.sh +ishard: ## Build ishard CLI binary + @echo "Building ishard..." + @cd $(BUILD_DIR)/ishard && go build -o $(BUILD_DEST)/ishard *.go + @echo "done." + authn: build-authn ## Build AuthN aisloader: build-aisloader ## Build aisloader xmeta: build-xmeta ## Build xmeta diff --git a/cmd/ishard/ishard/config/config.go b/cmd/ishard/ishard/config/config.go index d69cba7dda..02a5496e04 100644 --- a/cmd/ishard/ishard/config/config.go +++ b/cmd/ishard/ishard/config/config.go @@ -240,14 +240,14 @@ func (*DryRunFlag) IsBoolFlag() bool { return true } -// Load configuration for ishard from cli, or spec files (TODO) -func Load() (*Config, error) { +// Load configuration for ishard from CLI +func LoadFromCLI() (*Config, error) { cfg := DefaultConfig - parseCliParams(&cfg) - return &cfg, nil + err := parseCliParams(&cfg) + return &cfg, err } -func parseCliParams(cfg *Config) { +func parseCliParams(cfg *Config) error { flag.StringVar(&cfg.SrcBck.Name, "src_bck", "", "Source bucket name or URI.") flag.StringVar(&cfg.DstBck.Name, "dst_bck", "", "Destination bucket name or URI.") flag.StringVar(&cfg.ShardTemplate, "shard_template", "shard-%06d", "The template used for generating output shards. Default is `\"shard-%06d\"`. Accepts Bash, Fmt, or At formats.\n"+ @@ -295,17 +295,14 @@ func parseCliParams(cfg *Config) { var reactions = []string{"ignore", "warn", "abort", "exclude"} if !cos.StringInSlice(missingExtActStr, reactions) { - fmt.Printf("Invalid action: %s. Accepted values are: abort, warn, ignore, exclude\n", missingExtActStr) - flag.Usage() - os.Exit(1) + msg := fmt.Sprintf("Invalid action: %s. Accepted values are: abort, warn, ignore, exclude\n", missingExtActStr) + return errWithUsage(errors.New(msg)) } if sampleExts != "" { cfg.MExtMgr, err = NewMissingExtManager(missingExtActStr, strings.Split(sampleExts, ",")) if err != nil { - fmt.Fprintln(os.Stderr, err) - flag.Usage() - os.Exit(1) + return errWithUsage(err) } } @@ -323,27 +320,29 @@ func parseCliParams(cfg *Config) { } else { fmt.Printf("\"sample_key_pattern\" %s is not built-in (\"base_file_name\" | \"full_name\" | \"collapse_all_dir\"), compiled as custom regex\n", sampleKeyPatternStr) if _, err := regexp.Compile(sampleKeyPatternStr); err != nil { - fmt.Fprintln(os.Stderr, err) - flag.Usage() - os.Exit(1) + return errWithUsage(err) } cfg.SampleKeyPattern = SampleKeyPattern{Regex: sampleKeyPatternStr, CaptureGroup: "$1"} } if cfg.SrcBck.Name == "" || cfg.DstBck.Name == "" { - fmt.Fprintln(os.Stderr, "Error: src_bck and dst_bck are required parameters.") - flag.Usage() - os.Exit(1) + return errWithUsage(errors.New("Error: src_bck and dst_bck are required parameters.\n%s")) } if cfg.SrcBck, cfg.SrcPrefix, err = cmn.ParseBckObjectURI(cfg.SrcBck.Name, cmn.ParseURIOpts{DefaultProvider: apc.AIS}); err != nil { - fmt.Fprintln(os.Stderr, err) - flag.Usage() - os.Exit(1) + return errWithUsage(err) } if cfg.DstBck, _, err = cmn.ParseBckObjectURI(cfg.DstBck.Name, cmn.ParseURIOpts{DefaultProvider: apc.AIS}); err != nil { - fmt.Fprintln(os.Stderr, err) - flag.Usage() - os.Exit(1) + return errWithUsage(err) } + + return nil +} + +func errWithUsage(err error) error { + var buf strings.Builder + flag.CommandLine.SetOutput(&buf) + flag.Usage() + flag.CommandLine.SetOutput(os.Stderr) + return fmt.Errorf("%v\n%s", err, buf.String()) } diff --git a/cmd/ishard/ishard/config/sample.go b/cmd/ishard/ishard/config/sample.go index aa78a267c3..9041ce3a3d 100644 --- a/cmd/ishard/ishard/config/sample.go +++ b/cmd/ishard/ishard/config/sample.go @@ -25,6 +25,11 @@ var ( CollapseAllDirPattern = SampleKeyPattern{Regex: `/`, CaptureGroup: ""} ) +const ( + WarningPrefix = "[Warning]" + ErrorPrefix = "[Error]" +) + // MissingExtManager contains the set of expected extensions for each sample, and corresponding reaction type MissingExtManager struct { Name string @@ -78,10 +83,10 @@ func (mgr *MissingExtManager) warn(recs *shard.Records) (*shard.Records, error) mgr.EffectiveObjSize += record.TotalSize() extra, missing := difference(mgr.extSet, record.Objects) for ext := range extra { - fmt.Printf("[Warning] sample %s contains extension %s, not specified in `sample_ext` config\n", record.Name, ext) + fmt.Printf("%s sample %s contains extension %s, not specified in `sample_ext` config\n", WarningPrefix, record.Name, ext) } for ext := range missing { - fmt.Printf("[Warning] extension %s not found in sample %s\n", ext, record.Name) + fmt.Printf("%s extension %s not found in sample %s\n", WarningPrefix, ext, record.Name) } } @@ -93,10 +98,10 @@ func (mgr *MissingExtManager) abort(recs *shard.Records) (*shard.Records, error) mgr.EffectiveObjSize += record.TotalSize() extra, missing := difference(mgr.extSet, record.Objects) for ext := range extra { - return nil, fmt.Errorf("sample %s contains extension %s, not specified in `sample_ext` config", record.Name, ext) + return nil, fmt.Errorf("%s sample %s contains extension %s, not specified in `sample_ext` config", ErrorPrefix, record.Name, ext) } for ext := range missing { - return nil, fmt.Errorf("missing extension: extension %s not found in sample %s", ext, record.Name) + return nil, fmt.Errorf("%s missing extension: extension %s not found in sample %s", ErrorPrefix, ext, record.Name) } } diff --git a/cmd/ishard/ishard/dir.go b/cmd/ishard/ishard/dir.go new file mode 100644 index 0000000000..35d028dadd --- /dev/null +++ b/cmd/ishard/ishard/dir.go @@ -0,0 +1,84 @@ +// Package ishard provides utility for shard the initial dataset +/* + * Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. + */ +package ishard + +import ( + "path/filepath" + "strings" + + "github.com/NVIDIA/aistore/cmd/ishard/ishard/config" + "github.com/NVIDIA/aistore/ext/dsort/shard" +) + +// Represents the hierarchical structure of virtual directories within a bucket +type dirNode struct { + children map[string]*dirNode + records *shard.Records +} + +func newDirNode() *dirNode { + return &dirNode{ + children: make(map[string]*dirNode), + records: shard.NewRecords(16), + } +} + +func (n *dirNode) insert(keyPath, fullPath string, size int64) { + parts := strings.Split(keyPath, "/") + current := n + + for i, part := range parts { + if _, exists := current.children[part]; !exists { + if i == len(parts)-1 { + ext := filepath.Ext(fullPath) + base := strings.TrimSuffix(keyPath, ext) + current.records.Insert(&shard.Record{ + Key: base, + Name: base, + Objects: []*shard.RecordObj{{ + ContentPath: fullPath, + StoreType: shard.SGLStoreType, + Offset: 0, + MetadataSize: 0, + Size: size, + Extension: ext, + }}, + }) + } else { + current.children[part] = newDirNode() + } + } + current = current.children[part] + } +} + +// apply performs a preorder traversal through the tree starting from the node `n`, +// applying the given reaction `act` to the Records of each node. The traversal stops if an error occurs. +func (n *dirNode) apply(act *config.MissingExtManager, recursive bool) error { + if n == nil { + return nil + } + + newRecs, err := act.React(n.records) + if err != nil { + return err + } + if newRecs != nil { + n.records.Drain() + n.records = newRecs + } + + if !recursive { + return nil + } + + for _, child := range n.children { + if err := child.apply(act, recursive); err != nil { + return err + } + } + + return nil +} diff --git a/cmd/ishard/ishard/factory/factory.go b/cmd/ishard/ishard/factory/factory.go index 940c7c3d85..6731446339 100644 --- a/cmd/ishard/ishard/factory/factory.go +++ b/cmd/ishard/ishard/factory/factory.go @@ -1,6 +1,6 @@ // Package factory provides functions to create shards and track their creation progress /* - * Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2024-2025, NVIDIA CORPORATION. All rights reserved. */ package factory @@ -56,7 +56,9 @@ func NewShardFactory(baseParams api.BaseParams, fromBck, toBck cmn.Bck, ext, sha ext: ext, OutShardNames: make([]string, 0), - // block when number of creating shards reaches to archive xacts's workCh size. otherwise xact commit may timeout. see xact/xs/archive.go + // TODO: The channel size is limited to manage concurrent xaction begin-phase requests, + // preventing potential blocks and timeouts during subsequent commit-phase requests. + // Refer to `maxNumInParallel` in `xact/xs/streaming.go` and `xact/xs/archive.go` for details. pollCh: make(chan *shard.Shard, 512), dryRunCfg: dryRun, } diff --git a/cmd/ishard/ishard/ishard.go b/cmd/ishard/ishard/ishard.go index 8860f3d011..1d42c10fd8 100644 --- a/cmd/ishard/ishard/ishard.go +++ b/cmd/ishard/ishard/ishard.go @@ -1,14 +1,12 @@ // Package ishard provides utility for shard the initial dataset /* - * Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2024-2025, NVIDIA CORPORATION. All rights reserved. */ package ishard import ( "fmt" - "path/filepath" "regexp" - "strings" "github.com/NVIDIA/aistore/api" "github.com/NVIDIA/aistore/api/apc" @@ -20,85 +18,6 @@ import ( "github.com/NVIDIA/aistore/ext/dsort/shard" ) -///////////// -// dirNode // -///////////// - -// Represents the hierarchical structure of virtual directories within a bucket -type dirNode struct { - children map[string]*dirNode - records *shard.Records -} - -func newDirNode() *dirNode { - return &dirNode{ - children: make(map[string]*dirNode), - records: shard.NewRecords(16), - } -} - -func (n *dirNode) insert(keyPath, fullPath string, size int64) { - parts := strings.Split(keyPath, "/") - current := n - - for i, part := range parts { - if _, exists := current.children[part]; !exists { - if i == len(parts)-1 { - ext := filepath.Ext(fullPath) - base := strings.TrimSuffix(keyPath, ext) - current.records.Insert(&shard.Record{ - Key: base, - Name: base, - Objects: []*shard.RecordObj{{ - ContentPath: fullPath, - StoreType: shard.SGLStoreType, - Offset: 0, - MetadataSize: 0, - Size: size, - Extension: ext, - }}, - }) - } else { - current.children[part] = newDirNode() - } - } - current = current.children[part] - } -} - -// apply performs a preorder traversal through the tree starting from the node `n`, -// applying the given reaction `act` to the Records of each node. The traversal stops if an error occurs. -func (n *dirNode) apply(act *config.MissingExtManager, recursive bool) error { - if n == nil { - return nil - } - - newRecs, err := act.React(n.records) - if err != nil { - return err - } - if newRecs != nil { - n.records.Drain() - n.records = newRecs - } - - if !recursive { - return nil - } - - for _, child := range n.children { - if err := child.apply(act, recursive); err != nil { - return err - } - } - - return nil -} - -////////////// -// ISharder // -////////////// - // ISharder executes an initial sharding job with given configuration type ISharder struct { cfg *config.Config @@ -220,27 +139,23 @@ func (is *ISharder) incAndCheck(size int64) (accumulatedSize int64) { } // NewISharder instantiates an ISharder with the configuration if provided; -// otherwise, it loads from CLI or uses the default config. -func NewISharder(cfgArg *config.Config) (is *ISharder, err error) { - is = &ISharder{} - - // Use provided config if given - if cfgArg != nil { - is.cfg = cfgArg - } else { - is.cfg, err = config.Load() +// otherwise, it loads config from CLI parameters. +func NewISharder(cfg *config.Config) (is *ISharder, err error) { + if cfg == nil { + cfg, err = config.LoadFromCLI() if err != nil { - defaultCfg := config.DefaultConfig - is.cfg = &defaultCfg + return nil, err } } - is.baseParams = api.BaseParams{URL: is.cfg.URL} + is = &ISharder{cfg: cfg} + + is.baseParams = api.BaseParams{URL: cfg.URL} is.baseParams.Client = cmn.NewClient(cmn.TransportArgs{UseHTTPProxyEnv: true}) - is.sampleKeyRegex = regexp.MustCompile(is.cfg.SampleKeyPattern.Regex) + is.sampleKeyRegex, err = regexp.Compile(cfg.SampleKeyPattern.Regex) - return is, err + return } func (is *ISharder) Start() error { diff --git a/cmd/ishard/ishard/ishard_test.go b/cmd/ishard/ishard/ishard_test.go index 9e0f72ebd0..5ccee36535 100644 --- a/cmd/ishard/ishard/ishard_test.go +++ b/cmd/ishard/ishard/ishard_test.go @@ -8,6 +8,7 @@ import ( "bytes" "fmt" "math/rand/v2" + "os" "path/filepath" "regexp" "strings" @@ -29,7 +30,7 @@ import ( ) func runIshardTest(t *testing.T, cfg *config.Config, baseParams api.BaseParams, numRecords, numExtensions int, - fileSize int64, sampleKeyPattern config.SampleKeyPattern, randomize, dropout bool) { //nolint:unparam // dropout to be implemented + fileSize int64, sampleKeyPattern config.SampleKeyPattern, randomize bool) { tools.CreateBucket(t, cfg.URL, cfg.SrcBck, nil, true /*cleanup*/) tools.CreateBucket(t, cfg.URL, cfg.DstBck, nil, true /*cleanup*/) @@ -49,7 +50,7 @@ func runIshardTest(t *testing.T, cfg *config.Config, baseParams api.BaseParams, err = isharder.Start() tassert.CheckFatal(t, err) - checkOutputShards(t, baseParams, cfg.DstBck, numRecords*numExtensions, totalSize, sampleKeyPattern, dropout) + checkOutputShards(t, baseParams, cfg.DstBck, numRecords*numExtensions, totalSize, sampleKeyPattern, false) } func TestIshardNoRecordsSplit(t *testing.T) { @@ -96,7 +97,7 @@ func TestIshardNoRecordsSplit(t *testing.T) { tc.numRecords /= 10 } - runIshardTest(t, cfg, baseParams, tc.numRecords, tc.numExtensions, tc.fileSize, config.BaseFileNamePattern, false /*randomize*/, false /*dropout*/) + runIshardTest(t, cfg, baseParams, tc.numRecords, tc.numExtensions, tc.fileSize, config.BaseFileNamePattern, false /*randomize*/) }) } } @@ -158,7 +159,7 @@ func TestIshardShardSize(t *testing.T) { tc.numRecords /= 10 } - runIshardTest(t, cfg, baseParams, tc.numRecords, numExtensions, tc.fileSize, config.BaseFileNamePattern, false /*randomize*/, false /*dropout*/) + runIshardTest(t, cfg, baseParams, tc.numRecords, numExtensions, tc.fileSize, config.BaseFileNamePattern, false /*randomize*/) lsmsg := &apc.LsoMsg{} lsmsg.SetFlag(apc.LsNameSize) @@ -306,7 +307,7 @@ func TestIshardTemplate(t *testing.T) { numExtensions = 3 ) - runIshardTest(t, cfg, baseParams, tc.numRecords, numExtensions, tc.fileSize, config.BaseFileNamePattern, false /*randomize*/, false /*dropout*/) + runIshardTest(t, cfg, baseParams, tc.numRecords, numExtensions, tc.fileSize, config.BaseFileNamePattern, false /*randomize*/) tarballs, err := api.ListObjects(baseParams, cfg.DstBck, &apc.LsoMsg{}, api.ListArgs{}) tassert.CheckFatal(t, err) @@ -381,7 +382,7 @@ func TestIshardSampleKeyPattern(t *testing.T) { fileSize = 32 * cos.KiB ) - runIshardTest(t, cfg, baseParams, numRecords, numExtensions, int64(fileSize), tc.pattern, true /*randomize*/, false /*dropout*/) + runIshardTest(t, cfg, baseParams, numRecords, numExtensions, int64(fileSize), tc.pattern, true /*randomize*/) }) } } @@ -437,7 +438,7 @@ func TestIshardMissingExtension(t *testing.T) { fmt.Printf("starting ishard, from %s to %s\n", cfg.SrcBck.String(), cfg.DstBck.String()) err = isharder.Start() // error is expected to occur since `dropout` is set, ishard should abort - if err == nil || !strings.HasPrefix(err.Error(), "missing extension: ") { + if err == nil || !strings.HasPrefix(err.Error(), config.ErrorPrefix) { tassert.Fatalf(t, false, "expected error with 'missing extension:', but got: %v", err) } }) @@ -498,9 +499,56 @@ func TestIshardMissingExtension(t *testing.T) { } tassert.CheckFatal(t, err) }) + + t.Run("TestIshardMissingExtension/action=warn", func(t *testing.T) { + cfg.SrcBck = cmn.Bck{ + Name: trand.String(15), + Provider: apc.AIS, + } + cfg.DstBck = cmn.Bck{ + Name: trand.String(15), + Provider: apc.AIS, + } + tools.CreateBucket(t, cfg.URL, cfg.SrcBck, nil, true /*cleanup*/) + tools.CreateBucket(t, cfg.URL, cfg.DstBck, nil, true /*cleanup*/) + + expectedExts := []string{".jpeg", ".cls", ".json"} + var err error + cfg.MExtMgr, err = config.NewMissingExtManager("warn", expectedExts) + tassert.CheckFatal(t, err) + + _, err = generateNestedStructure(baseParams, cfg.SrcBck, numRecords, "", expectedExts, int64(fileSize), true /*randomize*/, true /*dropout*/) + tassert.CheckFatal(t, err) + + isharder, err := ishard.NewISharder(cfg) + tassert.CheckFatal(t, err) + + fmt.Printf("starting ishard, from %s to %s\n", cfg.SrcBck.String(), cfg.DstBck.String()) + + // Capture stdout output to verify warning messages + r, w, _ := os.Pipe() + stdout := os.Stdout + defer func() { os.Stdout = stdout }() // Restore original stdout at the end + os.Stdout = w + + err = isharder.Start() + w.Close() + + var outputBuffer bytes.Buffer + _, _ = outputBuffer.ReadFrom(r) + output := outputBuffer.String() + + if !strings.HasPrefix(output, config.WarningPrefix) { + t.Errorf("Expected warning message not found in output: %s", output) + } + + tassert.CheckFatal(t, err) + }) } func TestIshardEKM(t *testing.T) { + tools.CheckSkip(t, &tools.SkipTestArgs{Long: true}) + var ( cfg = &config.Config{ SrcBck: cmn.Bck{ @@ -617,7 +665,7 @@ func TestIshardParallel(t *testing.T) { wg.Add(1) go func(cfg config.Config, baseParams api.BaseParams) { defer wg.Done() - runIshardTest(t, &cfg, baseParams, numRecords, numExtensions, int64(fileSize), config.BaseFileNamePattern, false /*randomize*/, false /*dropout*/) + runIshardTest(t, &cfg, baseParams, numRecords, numExtensions, int64(fileSize), config.BaseFileNamePattern, false /*randomize*/) }(*cfg, baseParams) } wg.Wait() @@ -658,7 +706,7 @@ func TestIshardChain(t *testing.T) { fileSize = 32 * cos.KiB ) - runIshardTest(t, cfg, baseParams, numRecords, numExtensions, int64(fileSize), config.BaseFileNamePattern, false /*randomize*/, false /*dropout*/) + runIshardTest(t, cfg, baseParams, numRecords, numExtensions, int64(fileSize), config.BaseFileNamePattern, false /*randomize*/) } } @@ -693,7 +741,7 @@ func TestIshardLargeBucket(t *testing.T) { fileSize = 32 * cos.KiB ) - runIshardTest(t, cfg, baseParams, numRecords, numExtensions, int64(fileSize), config.BaseFileNamePattern, false /*randomize*/, false /*dropout*/) + runIshardTest(t, cfg, baseParams, numRecords, numExtensions, int64(fileSize), config.BaseFileNamePattern, false /*randomize*/) } func TestIshardLargeFiles(t *testing.T) { @@ -727,7 +775,7 @@ func TestIshardLargeFiles(t *testing.T) { fileSize = 2 * cos.GiB ) - runIshardTest(t, cfg, baseParams, numRecords, numExtensions, int64(fileSize), config.FullNamePattern, false /*randomize*/, false /*dropout*/) + runIshardTest(t, cfg, baseParams, numRecords, numExtensions, int64(fileSize), config.FullNamePattern, false /*randomize*/) } // Helper function to generate a nested directory structure @@ -743,6 +791,7 @@ func generateNestedStructure(baseParams api.BaseParams, bucket cmn.Bck, numRecor // Queue to hold objects temporarily to random insertion randomizeQueue := make([]string, 0) + dropOnce := dropout basePath := randomFilePath() for range numRecords { @@ -774,6 +823,12 @@ func generateNestedStructure(baseParams api.BaseParams, bucket cmn.Bck, numRecor baseName := trand.String(5) for _, ext := range extensions { + // Ensure to drop an extension at least once + if dropOnce { + dropOnce = false + continue + } + // Randomly skip an extension if dropout is set if dropout && rand.IntN(10) == 0 { continue