Skip to content

Commit

Permalink
ishard: maintenance
Browse files Browse the repository at this point in the history
    * add make for the binary executable
    * refractor CLI config parsing
    * extend test cases

Signed-off-by: Tony Chen <[email protected]>
  • Loading branch information
Nahemah1022 committed Jan 15, 2025
1 parent 8e56ddb commit 4886998
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 136 deletions.
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ cli-autocompletions: ## Add CLI autocompletions
@echo "Adding CLI autocomplete..."
@./$(BUILD_DIR)/cli/autocomplete/install.sh

ishard: ## Build ishard CLI binary
@echo "Building ishard..."
@cd $(BUILD_DIR)/ishard && go build -o $(BUILD_DEST)/ishard *.go
@echo "done."

authn: build-authn ## Build AuthN
aisloader: build-aisloader ## Build aisloader
xmeta: build-xmeta ## Build xmeta
Expand Down
45 changes: 22 additions & 23 deletions cmd/ishard/ishard/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,14 +240,14 @@ func (*DryRunFlag) IsBoolFlag() bool {
return true
}

// Load configuration for ishard from cli, or spec files (TODO)
func Load() (*Config, error) {
// Load configuration for ishard from CLI
func LoadFromCLI() (*Config, error) {
cfg := DefaultConfig
parseCliParams(&cfg)
return &cfg, nil
err := parseCliParams(&cfg)
return &cfg, err
}

func parseCliParams(cfg *Config) {
func parseCliParams(cfg *Config) error {
flag.StringVar(&cfg.SrcBck.Name, "src_bck", "", "Source bucket name or URI.")
flag.StringVar(&cfg.DstBck.Name, "dst_bck", "", "Destination bucket name or URI.")
flag.StringVar(&cfg.ShardTemplate, "shard_template", "shard-%06d", "The template used for generating output shards. Default is `\"shard-%06d\"`. Accepts Bash, Fmt, or At formats.\n"+
Expand Down Expand Up @@ -295,17 +295,14 @@ func parseCliParams(cfg *Config) {

var reactions = []string{"ignore", "warn", "abort", "exclude"}
if !cos.StringInSlice(missingExtActStr, reactions) {
fmt.Printf("Invalid action: %s. Accepted values are: abort, warn, ignore, exclude\n", missingExtActStr)
flag.Usage()
os.Exit(1)
msg := fmt.Sprintf("Invalid action: %s. Accepted values are: abort, warn, ignore, exclude\n", missingExtActStr)
return errWithUsage(errors.New(msg))
}

if sampleExts != "" {
cfg.MExtMgr, err = NewMissingExtManager(missingExtActStr, strings.Split(sampleExts, ","))
if err != nil {
fmt.Fprintln(os.Stderr, err)
flag.Usage()
os.Exit(1)
return errWithUsage(err)
}
}

Expand All @@ -323,27 +320,29 @@ func parseCliParams(cfg *Config) {
} else {
fmt.Printf("\"sample_key_pattern\" %s is not built-in (\"base_file_name\" | \"full_name\" | \"collapse_all_dir\"), compiled as custom regex\n", sampleKeyPatternStr)
if _, err := regexp.Compile(sampleKeyPatternStr); err != nil {
fmt.Fprintln(os.Stderr, err)
flag.Usage()
os.Exit(1)
return errWithUsage(err)
}
cfg.SampleKeyPattern = SampleKeyPattern{Regex: sampleKeyPatternStr, CaptureGroup: "$1"}
}

if cfg.SrcBck.Name == "" || cfg.DstBck.Name == "" {
fmt.Fprintln(os.Stderr, "Error: src_bck and dst_bck are required parameters.")
flag.Usage()
os.Exit(1)
return errWithUsage(errors.New("Error: src_bck and dst_bck are required parameters.\n%s"))
}

if cfg.SrcBck, cfg.SrcPrefix, err = cmn.ParseBckObjectURI(cfg.SrcBck.Name, cmn.ParseURIOpts{DefaultProvider: apc.AIS}); err != nil {
fmt.Fprintln(os.Stderr, err)
flag.Usage()
os.Exit(1)
return errWithUsage(err)
}
if cfg.DstBck, _, err = cmn.ParseBckObjectURI(cfg.DstBck.Name, cmn.ParseURIOpts{DefaultProvider: apc.AIS}); err != nil {
fmt.Fprintln(os.Stderr, err)
flag.Usage()
os.Exit(1)
return errWithUsage(err)
}

return nil
}

func errWithUsage(err error) error {
var buf strings.Builder
flag.CommandLine.SetOutput(&buf)
flag.Usage()
flag.CommandLine.SetOutput(os.Stderr)
return fmt.Errorf("%v\n%s", err, buf.String())
}
13 changes: 9 additions & 4 deletions cmd/ishard/ishard/config/sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ var (
CollapseAllDirPattern = SampleKeyPattern{Regex: `/`, CaptureGroup: ""}
)

const (
WarningPrefix = "[Warning]"
ErrorPrefix = "[Error]"
)

// MissingExtManager contains the set of expected extensions for each sample, and corresponding reaction
type MissingExtManager struct {
Name string
Expand Down Expand Up @@ -78,10 +83,10 @@ func (mgr *MissingExtManager) warn(recs *shard.Records) (*shard.Records, error)
mgr.EffectiveObjSize += record.TotalSize()
extra, missing := difference(mgr.extSet, record.Objects)
for ext := range extra {
fmt.Printf("[Warning] sample %s contains extension %s, not specified in `sample_ext` config\n", record.Name, ext)
fmt.Printf("%s sample %s contains extension %s, not specified in `sample_ext` config\n", WarningPrefix, record.Name, ext)
}
for ext := range missing {
fmt.Printf("[Warning] extension %s not found in sample %s\n", ext, record.Name)
fmt.Printf("%s extension %s not found in sample %s\n", WarningPrefix, ext, record.Name)
}
}

Expand All @@ -93,10 +98,10 @@ func (mgr *MissingExtManager) abort(recs *shard.Records) (*shard.Records, error)
mgr.EffectiveObjSize += record.TotalSize()
extra, missing := difference(mgr.extSet, record.Objects)
for ext := range extra {
return nil, fmt.Errorf("sample %s contains extension %s, not specified in `sample_ext` config", record.Name, ext)
return nil, fmt.Errorf("%s sample %s contains extension %s, not specified in `sample_ext` config", ErrorPrefix, record.Name, ext)
}
for ext := range missing {
return nil, fmt.Errorf("missing extension: extension %s not found in sample %s", ext, record.Name)
return nil, fmt.Errorf("%s missing extension: extension %s not found in sample %s", ErrorPrefix, ext, record.Name)
}
}

Expand Down
84 changes: 84 additions & 0 deletions cmd/ishard/ishard/dir.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Package ishard provides utility for shard the initial dataset
/*
* Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
*/
package ishard

import (
"path/filepath"
"strings"

"github.com/NVIDIA/aistore/cmd/ishard/ishard/config"
"github.com/NVIDIA/aistore/ext/dsort/shard"
)

// Represents the hierarchical structure of virtual directories within a bucket
type dirNode struct {
children map[string]*dirNode
records *shard.Records
}

func newDirNode() *dirNode {
return &dirNode{
children: make(map[string]*dirNode),
records: shard.NewRecords(16),
}
}

func (n *dirNode) insert(keyPath, fullPath string, size int64) {
parts := strings.Split(keyPath, "/")
current := n

for i, part := range parts {
if _, exists := current.children[part]; !exists {
if i == len(parts)-1 {
ext := filepath.Ext(fullPath)
base := strings.TrimSuffix(keyPath, ext)
current.records.Insert(&shard.Record{
Key: base,
Name: base,
Objects: []*shard.RecordObj{{
ContentPath: fullPath,
StoreType: shard.SGLStoreType,
Offset: 0,
MetadataSize: 0,
Size: size,
Extension: ext,
}},
})
} else {
current.children[part] = newDirNode()
}
}
current = current.children[part]
}
}

// apply performs a preorder traversal through the tree starting from the node `n`,
// applying the given reaction `act` to the Records of each node. The traversal stops if an error occurs.
func (n *dirNode) apply(act *config.MissingExtManager, recursive bool) error {
if n == nil {
return nil
}

newRecs, err := act.React(n.records)
if err != nil {
return err
}
if newRecs != nil {
n.records.Drain()
n.records = newRecs
}

if !recursive {
return nil
}

for _, child := range n.children {
if err := child.apply(act, recursive); err != nil {
return err
}
}

return nil
}
6 changes: 4 additions & 2 deletions cmd/ishard/ishard/factory/factory.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Package factory provides functions to create shards and track their creation progress
/*
* Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
* Copyright (c) 2024-2025, NVIDIA CORPORATION. All rights reserved.
*/
package factory

Expand Down Expand Up @@ -56,7 +56,9 @@ func NewShardFactory(baseParams api.BaseParams, fromBck, toBck cmn.Bck, ext, sha
ext: ext,
OutShardNames: make([]string, 0),

// block when number of creating shards reaches to archive xacts's workCh size. otherwise xact commit may timeout. see xact/xs/archive.go
// TODO: The channel size is limited to manage concurrent xaction begin-phase requests,
// preventing potential blocks and timeouts during subsequent commit-phase requests.
// Refer to `maxNumInParallel` in `xact/xs/streaming.go` and `xact/xs/archive.go` for details.
pollCh: make(chan *shard.Shard, 512),
dryRunCfg: dryRun,
}
Expand Down
107 changes: 11 additions & 96 deletions cmd/ishard/ishard/ishard.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
// Package ishard provides utility for shard the initial dataset
/*
* Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
* Copyright (c) 2024-2025, NVIDIA CORPORATION. All rights reserved.
*/
package ishard

import (
"fmt"
"path/filepath"
"regexp"
"strings"

"github.com/NVIDIA/aistore/api"
"github.com/NVIDIA/aistore/api/apc"
Expand All @@ -20,85 +18,6 @@ import (
"github.com/NVIDIA/aistore/ext/dsort/shard"
)

/////////////
// dirNode //
/////////////

// Represents the hierarchical structure of virtual directories within a bucket
type dirNode struct {
children map[string]*dirNode
records *shard.Records
}

func newDirNode() *dirNode {
return &dirNode{
children: make(map[string]*dirNode),
records: shard.NewRecords(16),
}
}

func (n *dirNode) insert(keyPath, fullPath string, size int64) {
parts := strings.Split(keyPath, "/")
current := n

for i, part := range parts {
if _, exists := current.children[part]; !exists {
if i == len(parts)-1 {
ext := filepath.Ext(fullPath)
base := strings.TrimSuffix(keyPath, ext)
current.records.Insert(&shard.Record{
Key: base,
Name: base,
Objects: []*shard.RecordObj{{
ContentPath: fullPath,
StoreType: shard.SGLStoreType,
Offset: 0,
MetadataSize: 0,
Size: size,
Extension: ext,
}},
})
} else {
current.children[part] = newDirNode()
}
}
current = current.children[part]
}
}

// apply performs a preorder traversal through the tree starting from the node `n`,
// applying the given reaction `act` to the Records of each node. The traversal stops if an error occurs.
func (n *dirNode) apply(act *config.MissingExtManager, recursive bool) error {
if n == nil {
return nil
}

newRecs, err := act.React(n.records)
if err != nil {
return err
}
if newRecs != nil {
n.records.Drain()
n.records = newRecs
}

if !recursive {
return nil
}

for _, child := range n.children {
if err := child.apply(act, recursive); err != nil {
return err
}
}

return nil
}

//////////////
// ISharder //
//////////////

// ISharder executes an initial sharding job with given configuration
type ISharder struct {
cfg *config.Config
Expand Down Expand Up @@ -220,27 +139,23 @@ func (is *ISharder) incAndCheck(size int64) (accumulatedSize int64) {
}

// NewISharder instantiates an ISharder with the configuration if provided;
// otherwise, it loads from CLI or uses the default config.
func NewISharder(cfgArg *config.Config) (is *ISharder, err error) {
is = &ISharder{}

// Use provided config if given
if cfgArg != nil {
is.cfg = cfgArg
} else {
is.cfg, err = config.Load()
// otherwise, it loads config from CLI parameters.
func NewISharder(cfg *config.Config) (is *ISharder, err error) {
if cfg == nil {
cfg, err = config.LoadFromCLI()
if err != nil {
defaultCfg := config.DefaultConfig
is.cfg = &defaultCfg
return nil, err
}
}

is.baseParams = api.BaseParams{URL: is.cfg.URL}
is = &ISharder{cfg: cfg}

is.baseParams = api.BaseParams{URL: cfg.URL}
is.baseParams.Client = cmn.NewClient(cmn.TransportArgs{UseHTTPProxyEnv: true})

is.sampleKeyRegex = regexp.MustCompile(is.cfg.SampleKeyPattern.Regex)
is.sampleKeyRegex, err = regexp.Compile(cfg.SampleKeyPattern.Regex)

return is, err
return
}

func (is *ISharder) Start() error {
Expand Down
Loading

0 comments on commit 4886998

Please sign in to comment.