Skip to content

Commit

Permalink
ishard: maintenance
Browse files Browse the repository at this point in the history
* add make for the binary executable and lint
* refactor CLI config parsing
* extend test cases
* update docs

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
Nahemah1022 authored and alex-aizman committed Jan 15, 2025
1 parent 37271ae commit bccf9c6
Show file tree
Hide file tree
Showing 8 changed files with 228 additions and 136 deletions.
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ cli-autocompletions: ## Add CLI autocompletions
@echo "Adding CLI autocomplete..."
@./$(BUILD_DIR)/cli/autocomplete/install.sh

ishard: ## Build ishard CLI binary
@echo "Building ishard..."
@cd $(BUILD_DIR)/ishard && go build -o $(BUILD_DEST)/ishard *.go
@echo "done."

authn: build-authn ## Build AuthN
aisloader: build-aisloader ## Build aisloader
xmeta: build-xmeta ## Build xmeta
Expand Down Expand Up @@ -268,6 +273,7 @@ lint:
@([[ -x "$(command -v golangci-lint)" ]] && echo "Cannot find golangci-lint, run 'make lint-update' to install" && exit 1) || true
@$(SHELL) "$(SCRIPTS_DIR)/bootstrap.sh" lint
@$(MAKE) -C $(BUILD_DIR)/cli lint
@$(MAKE) -C $(BUILD_DIR)/ishard lint

install-python-deps:
@pip3 install -r ./python/aistore/common_requirements
Expand Down
26 changes: 26 additions & 0 deletions cmd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
| `cmd/aisnodeprofile` | `aisnode` | ... with profiling enabled | |
| `cmd/authn` | `authn` | Standalone server providing token-based secure access to AIS clusters | [AuthN](/docs/authn.md) |
| `cmd/xmeta` | `xmeta` | Low-level tool to format (or extract in plain text) assorted AIS metadata and control structures | [xmeta](/cmd/xmeta/README.md) |
| `cmd/ishard` | `ishard` | AIS integrated utility to create well-formed shards from the original dataset | [ishard](/cmd/ishard/README.md) |

**NOTE**: installed CLI executable is named `ais`.

Expand Down Expand Up @@ -117,3 +118,28 @@ OR, same:
$ cd cmd/xmeta
$ go install
```

## ishard

`ishard` is a AIS integrated utility to create well-formed shards from the original dataset - see [usage](/cmd/ishard/README.md).

For command line options and usage examples, simply run `ishard` with no arguments:

```console
$ ishard
Usage of ishard:
...
...
```

To install, run:

```console
$ make ishard
```

You could also use `go install`:

```console
$ go install github.com/NVIDIA/aistore/cmd/ishard@latest
```
45 changes: 22 additions & 23 deletions cmd/ishard/ishard/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,14 +240,14 @@ func (*DryRunFlag) IsBoolFlag() bool {
return true
}

// Load configuration for ishard from cli, or spec files (TODO)
func Load() (*Config, error) {
// Load configuration for ishard from CLI
func LoadFromCLI() (*Config, error) {
cfg := DefaultConfig
parseCliParams(&cfg)
return &cfg, nil
err := parseCliParams(&cfg)
return &cfg, err
}

func parseCliParams(cfg *Config) {
func parseCliParams(cfg *Config) error {
flag.StringVar(&cfg.SrcBck.Name, "src_bck", "", "Source bucket name or URI.")
flag.StringVar(&cfg.DstBck.Name, "dst_bck", "", "Destination bucket name or URI.")
flag.StringVar(&cfg.ShardTemplate, "shard_template", "shard-%06d", "The template used for generating output shards. Default is `\"shard-%06d\"`. Accepts Bash, Fmt, or At formats.\n"+
Expand Down Expand Up @@ -295,17 +295,14 @@ func parseCliParams(cfg *Config) {

var reactions = []string{"ignore", "warn", "abort", "exclude"}
if !cos.StringInSlice(missingExtActStr, reactions) {
fmt.Printf("Invalid action: %s. Accepted values are: abort, warn, ignore, exclude\n", missingExtActStr)
flag.Usage()
os.Exit(1)
msg := fmt.Sprintf("Invalid action: %s. Accepted values are: abort, warn, ignore, exclude\n", missingExtActStr)
return errWithUsage(errors.New(msg))
}

if sampleExts != "" {
cfg.MExtMgr, err = NewMissingExtManager(missingExtActStr, strings.Split(sampleExts, ","))
if err != nil {
fmt.Fprintln(os.Stderr, err)
flag.Usage()
os.Exit(1)
return errWithUsage(err)
}
}

Expand All @@ -323,27 +320,29 @@ func parseCliParams(cfg *Config) {
} else {
fmt.Printf("\"sample_key_pattern\" %s is not built-in (\"base_file_name\" | \"full_name\" | \"collapse_all_dir\"), compiled as custom regex\n", sampleKeyPatternStr)
if _, err := regexp.Compile(sampleKeyPatternStr); err != nil {
fmt.Fprintln(os.Stderr, err)
flag.Usage()
os.Exit(1)
return errWithUsage(err)
}
cfg.SampleKeyPattern = SampleKeyPattern{Regex: sampleKeyPatternStr, CaptureGroup: "$1"}
}

if cfg.SrcBck.Name == "" || cfg.DstBck.Name == "" {
fmt.Fprintln(os.Stderr, "Error: src_bck and dst_bck are required parameters.")
flag.Usage()
os.Exit(1)
return errWithUsage(errors.New("Error: src_bck and dst_bck are required parameters.\n%s"))
}

if cfg.SrcBck, cfg.SrcPrefix, err = cmn.ParseBckObjectURI(cfg.SrcBck.Name, cmn.ParseURIOpts{DefaultProvider: apc.AIS}); err != nil {
fmt.Fprintln(os.Stderr, err)
flag.Usage()
os.Exit(1)
return errWithUsage(err)
}
if cfg.DstBck, _, err = cmn.ParseBckObjectURI(cfg.DstBck.Name, cmn.ParseURIOpts{DefaultProvider: apc.AIS}); err != nil {
fmt.Fprintln(os.Stderr, err)
flag.Usage()
os.Exit(1)
return errWithUsage(err)
}

return nil
}

func errWithUsage(err error) error {
var buf strings.Builder
flag.CommandLine.SetOutput(&buf)
flag.Usage()
flag.CommandLine.SetOutput(os.Stderr)
return fmt.Errorf("%v\n%s", err, buf.String())
}
13 changes: 9 additions & 4 deletions cmd/ishard/ishard/config/sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ var (
CollapseAllDirPattern = SampleKeyPattern{Regex: `/`, CaptureGroup: ""}
)

const (
WarningPrefix = "[Warning]"
ErrorPrefix = "[Error]"
)

// MissingExtManager contains the set of expected extensions for each sample, and corresponding reaction
type MissingExtManager struct {
Name string
Expand Down Expand Up @@ -78,10 +83,10 @@ func (mgr *MissingExtManager) warn(recs *shard.Records) (*shard.Records, error)
mgr.EffectiveObjSize += record.TotalSize()
extra, missing := difference(mgr.extSet, record.Objects)
for ext := range extra {
fmt.Printf("[Warning] sample %s contains extension %s, not specified in `sample_ext` config\n", record.Name, ext)
fmt.Printf("%s sample %s contains extension %s, not specified in `sample_ext` config\n", WarningPrefix, record.Name, ext)
}
for ext := range missing {
fmt.Printf("[Warning] extension %s not found in sample %s\n", ext, record.Name)
fmt.Printf("%s extension %s not found in sample %s\n", WarningPrefix, ext, record.Name)
}
}

Expand All @@ -93,10 +98,10 @@ func (mgr *MissingExtManager) abort(recs *shard.Records) (*shard.Records, error)
mgr.EffectiveObjSize += record.TotalSize()
extra, missing := difference(mgr.extSet, record.Objects)
for ext := range extra {
return nil, fmt.Errorf("sample %s contains extension %s, not specified in `sample_ext` config", record.Name, ext)
return nil, fmt.Errorf("%s sample %s contains extension %s, not specified in `sample_ext` config", ErrorPrefix, record.Name, ext)
}
for ext := range missing {
return nil, fmt.Errorf("missing extension: extension %s not found in sample %s", ext, record.Name)
return nil, fmt.Errorf("%s missing extension: extension %s not found in sample %s", ErrorPrefix, ext, record.Name)
}
}

Expand Down
84 changes: 84 additions & 0 deletions cmd/ishard/ishard/dir.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Package ishard provides utility for shard the initial dataset
/*
* Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
*/
package ishard

import (
"path/filepath"
"strings"

"github.com/NVIDIA/aistore/cmd/ishard/ishard/config"
"github.com/NVIDIA/aistore/ext/dsort/shard"
)

// Represents the hierarchical structure of virtual directories within a bucket
type dirNode struct {
children map[string]*dirNode
records *shard.Records
}

func newDirNode() *dirNode {
return &dirNode{
children: make(map[string]*dirNode),
records: shard.NewRecords(16),
}
}

func (n *dirNode) insert(keyPath, fullPath string, size int64) {
parts := strings.Split(keyPath, "/")
current := n

for i, part := range parts {
if _, exists := current.children[part]; !exists {
if i == len(parts)-1 {
ext := filepath.Ext(fullPath)
base := strings.TrimSuffix(keyPath, ext)
current.records.Insert(&shard.Record{
Key: base,
Name: base,
Objects: []*shard.RecordObj{{
ContentPath: fullPath,
StoreType: shard.SGLStoreType,
Offset: 0,
MetadataSize: 0,
Size: size,
Extension: ext,
}},
})
} else {
current.children[part] = newDirNode()
}
}
current = current.children[part]
}
}

// apply performs a preorder traversal through the tree starting from the node `n`,
// applying the given reaction `act` to the Records of each node. The traversal stops if an error occurs.
func (n *dirNode) apply(act *config.MissingExtManager, recursive bool) error {
if n == nil {
return nil
}

newRecs, err := act.React(n.records)
if err != nil {
return err
}
if newRecs != nil {
n.records.Drain()
n.records = newRecs
}

if !recursive {
return nil
}

for _, child := range n.children {
if err := child.apply(act, recursive); err != nil {
return err
}
}

return nil
}
6 changes: 4 additions & 2 deletions cmd/ishard/ishard/factory/factory.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Package factory provides functions to create shards and track their creation progress
/*
* Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
* Copyright (c) 2024-2025, NVIDIA CORPORATION. All rights reserved.
*/
package factory

Expand Down Expand Up @@ -56,7 +56,9 @@ func NewShardFactory(baseParams api.BaseParams, fromBck, toBck cmn.Bck, ext, sha
ext: ext,
OutShardNames: make([]string, 0),

// block when number of creating shards reaches to archive xacts's workCh size. otherwise xact commit may timeout. see xact/xs/archive.go
// TODO: The channel size is limited to manage concurrent xaction begin-phase requests,
// preventing potential blocks and timeouts during subsequent commit-phase requests.
// Refer to `maxNumInParallel` in `xact/xs/streaming.go` and `xact/xs/archive.go` for details.
pollCh: make(chan *shard.Shard, 512),
dryRunCfg: dryRun,
}
Expand Down
Loading

0 comments on commit bccf9c6

Please sign in to comment.