Skip to content

Commit

Permalink
CLI: universal command line for multi-object oper-s (major)
Browse files Browse the repository at this point in the history
* support _embedded_ template: `BUCKET[/template-or-prefix-or-object-name]`
* possibly, different buckets in a single command line
* disambiguate `object-name` vs prefix (ie., `--prefix`)
* universally support optionsL `--template`, `--list`, and `--prefix`
* cover verbs:
  - cp
  - prefetch
  - evict
  - archive
  - rm
  - etl
* add `ais object prefetch`
* add `api.PrefetchObject`

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Dec 18, 2023
1 parent d8143f1 commit dd27a1e
Show file tree
Hide file tree
Showing 20 changed files with 427 additions and 292 deletions.
6 changes: 5 additions & 1 deletion ais/htrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -947,7 +947,11 @@ func (h *htrun) logerr(tag string, v any, err error) {
f := filepath.Base(file)
msg += fmt.Sprintf("%s:%d", f, line)
}
nlog.Errorln(msg)
if cos.IsErrBrokenPipe(err) { // client went away
nlog.Infoln("Warning: " + msg)
} else {
nlog.Errorln(msg)
}
h.statsT.IncErr(stats.ErrHTTPWriteCount)
}

Expand Down
1 change: 0 additions & 1 deletion ais/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -1069,7 +1069,6 @@ func (p *proxy) httpbckput(w http.ResponseWriter, r *http.Request) {
bckToArgs := bckInitArgs{p: p, w: w, r: r, bck: bckTo, msg: msg, perms: apc.AcePUT, query: query}
bckToArgs.createAIS = false
if bckTo, err = bckToArgs.initAndTry(); err != nil {
p.writeErr(w, r, err)
return
}
}
Expand Down
2 changes: 1 addition & 1 deletion ais/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -876,7 +876,7 @@ func (t *target) httpobjdelete(w http.ResponseWriter, r *http.Request, apireq *a
}

errCode, err := t.DeleteObject(lom, evict)
if err == nil {
if err == nil && errCode == 0 {
// EC cleanup if EC is enabled
ec.ECM.CleanupObject(lom)
} else {
Expand Down
4 changes: 2 additions & 2 deletions api/multiobj.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,10 @@ func EvictRange(bp BaseParams, bck cmn.Bck, rng string) (string, error) {
}

// PrefetchList sends request to prefetch a list of objects from a remote bucket.
func PrefetchList(bp BaseParams, bck cmn.Bck, fileslist []string) (string, error) {
func PrefetchList(bp BaseParams, bck cmn.Bck, objNames []string) (string, error) {
bp.Method = http.MethodPost
q := bck.NewQuery()
msg := apc.ListRange{ObjNames: fileslist}
msg := apc.ListRange{ObjNames: objNames}
return dolr(bp, bck, apc.ActPrefetchObjects, msg, q)
}

Expand Down
41 changes: 23 additions & 18 deletions api/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (oah *ObjAttrs) RespHeader() http.Header {
// `io.Copy` is used internally to copy response bytes from the request to the writer.
//
// Returns `ObjAttrs` that can be further used to get the size and other object metadata.
func GetObject(bp BaseParams, bck cmn.Bck, object string, args *GetArgs) (oah ObjAttrs, err error) {
func GetObject(bp BaseParams, bck cmn.Bck, objName string, args *GetArgs) (oah ObjAttrs, err error) {
var (
wresp *wrappedResp
w, q, hdr = args.ret()
Expand All @@ -173,7 +173,7 @@ func GetObject(bp BaseParams, bck cmn.Bck, object string, args *GetArgs) (oah Ob
reqParams := AllocRp()
{
reqParams.BaseParams = bp
reqParams.Path = apc.URLPathObjects.Join(bck.Name, object)
reqParams.Path = apc.URLPathObjects.Join(bck.Name, objName)
reqParams.Query = bck.NewQuery()
reqParams.Header = hdr
}
Expand Down Expand Up @@ -231,14 +231,14 @@ func GetObjectWithValidation(bp BaseParams, bck cmn.Bck, objName string, args *G

// GetObjectReader returns reader of the requested object. It does not read body
// bytes, nor validates a checksum. Caller is responsible for closing the reader.
func GetObjectReader(bp BaseParams, bck cmn.Bck, object string, args *GetArgs) (r io.ReadCloser, err error) {
func GetObjectReader(bp BaseParams, bck cmn.Bck, objName string, args *GetArgs) (r io.ReadCloser, err error) {
_, q, hdr := args.ret()
q = bck.AddToQuery(q)
bp.Method = http.MethodGet
reqParams := AllocRp()
{
reqParams.BaseParams = bp
reqParams.Path = apc.URLPathObjects.Join(bck.Name, object)
reqParams.Path = apc.URLPathObjects.Join(bck.Name, objName)
reqParams.Query = q
reqParams.Header = hdr
}
Expand Down Expand Up @@ -303,7 +303,7 @@ func (args *AppendArgs) _append(reqArgs *cmn.HreqArgs) (*http.Request, error) {
// HeadObject returns object properties; can be conventionally used to establish in-cluster presence.
// - fltPresence: as per QparamFltPresence enum (for values and comments, see api/apc/query.go)
// - silent==true: not to log (not-found) error
func HeadObject(bp BaseParams, bck cmn.Bck, object string, fltPresence int, silent bool) (*cmn.ObjectProps, error) {
func HeadObject(bp BaseParams, bck cmn.Bck, objName string, fltPresence int, silent bool) (*cmn.ObjectProps, error) {
bp.Method = http.MethodHead

q := bck.NewQuery()
Expand All @@ -316,7 +316,7 @@ func HeadObject(bp BaseParams, bck cmn.Bck, object string, fltPresence int, sile
defer FreeRp(reqParams)
{
reqParams.BaseParams = bp
reqParams.Path = apc.URLPathObjects.Join(bck.Name, object)
reqParams.Path = apc.URLPathObjects.Join(bck.Name, objName)
reqParams.Query = q
}
hdr, _, err := reqParams.doReqHdr()
Expand Down Expand Up @@ -350,7 +350,7 @@ func HeadObject(bp BaseParams, bck cmn.Bck, object string, fltPresence int, sile
// By default, adds new or updates existing custom keys.
// Use `setNewCustomMDFlag` to _replace_ all existing keys with the specified (new) ones.
// See also: HeadObject() and apc.HdrObjCustomMD
func SetObjectCustomProps(bp BaseParams, bck cmn.Bck, object string, custom cos.StrKVs, setNew bool) error {
func SetObjectCustomProps(bp BaseParams, bck cmn.Bck, objName string, custom cos.StrKVs, setNew bool) error {
var (
actMsg = apc.ActMsg{Value: custom}
q url.Values
Expand All @@ -366,7 +366,7 @@ func SetObjectCustomProps(bp BaseParams, bck cmn.Bck, object string, custom cos.
reqParams := AllocRp()
{
reqParams.BaseParams = bp
reqParams.Path = apc.URLPathObjects.Join(bck.Name, object)
reqParams.Path = apc.URLPathObjects.Join(bck.Name, objName)
reqParams.Body = cos.MustMarshal(actMsg)
reqParams.Header = http.Header{cos.HdrContentType: []string{cos.ContentJSON}}
reqParams.Query = q
Expand All @@ -376,28 +376,26 @@ func SetObjectCustomProps(bp BaseParams, bck cmn.Bck, object string, custom cos.
return err
}

// DeleteObject deletes an object specified by bucket/object.
func DeleteObject(bp BaseParams, bck cmn.Bck, object string) error {
func DeleteObject(bp BaseParams, bck cmn.Bck, objName string) error {
bp.Method = http.MethodDelete
reqParams := AllocRp()
{
reqParams.BaseParams = bp
reqParams.Path = apc.URLPathObjects.Join(bck.Name, object)
reqParams.Path = apc.URLPathObjects.Join(bck.Name, objName)
reqParams.Query = bck.NewQuery()
}
err := reqParams.DoRequest()
FreeRp(reqParams)
return err
}

// EvictObject evicts an object specified by bucket/object.
func EvictObject(bp BaseParams, bck cmn.Bck, object string) error {
func EvictObject(bp BaseParams, bck cmn.Bck, objName string) error {
bp.Method = http.MethodDelete
actMsg := apc.ActMsg{Action: apc.ActEvictObjects, Name: cos.JoinWords(bck.Name, object)}
actMsg := apc.ActMsg{Action: apc.ActEvictObjects, Name: cos.JoinWords(bck.Name, objName)}
reqParams := AllocRp()
{
reqParams.BaseParams = bp
reqParams.Path = apc.URLPathObjects.Join(bck.Name, object)
reqParams.Path = apc.URLPathObjects.Join(bck.Name, objName)
reqParams.Body = cos.MustMarshal(actMsg)
reqParams.Header = http.Header{cos.HdrContentType: []string{cos.ContentJSON}}
reqParams.Query = bck.NewQuery()
Expand All @@ -407,8 +405,15 @@ func EvictObject(bp BaseParams, bck cmn.Bck, object string) error {
return err
}

// PutObject creates an object from the body of the reader (`args.Reader`) and puts
// it in the specified bucket.
// prefetch object
// - convenience method added also for "symmetry" with the evict (above)
// - compare with api.PrefetchList and api.PrefetchRange
func PrefetchObject(bp BaseParams, bck cmn.Bck, objName string) (string, error) {
return PrefetchList(bp, bck, []string{objName})
}

// PutObject PUTs the specified reader (`args.Reader`) as a new object
// (or a new version of the object) it in the specified bucket.
//
// Assumes that `args.Reader` is already opened and ready for usage.
// Returns `ObjAttrs` that can be further used to get the size and other object metadata.
Expand Down Expand Up @@ -564,7 +569,7 @@ func Promote(args *PromoteArgs) (xid string, err error) {
_, err = reqParams.doReqStr(&xid)
FreeRp(reqParams)
args.BaseParams.Method = method
return
return xid, err
}

// DoWithRetry executes `http-client.Do` and retries *retriable connection errors*,
Expand Down
34 changes: 25 additions & 9 deletions cmd/cli/cli/arch_hdlr.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,18 @@ var (
// flags
archCmdsFlags = map[string][]cli.Flag{
commandBucket: {
templateFlag,
listFlag,
dryRunFlag,
inclSrcBucketNameFlag,
archAppendOrPutFlag,
continueOnErrorFlag,
dontHeadBucketFlag,
dryRunFlag,
listFlag,
templateFlag,
verbObjPrefixFlag,
inclSrcBucketNameFlag,
waitFlag,
},
commandPut: append(
listrangeFlags,
listRangeProgressWaitFlags,
archAppendOrPutFlag,
archAppendOnlyFlag,
archpathFlag,
Expand All @@ -82,9 +84,14 @@ var (

// archive bucket
archBucketCmd = cli.Command{
Name: commandBucket,
Usage: "archive multiple objects from " + bucketSrcArgument + " as " + archExts + "-formatted shard",
ArgsUsage: bucketSrcArgument + " " + dstShardArgument,
Name: commandBucket,
Usage: "archive selected or matching objects from " + bucketObjectSrcArgument + " as\n" +
indent1 + archExts + "-formatted object (a.k.a. shard),\n" +
indent1 + "e.g.:\n" +
indent1 + "\t- 'archive bucket ais://src ais://dst/a.tar.lz4 --template \"shard-{001..997}\"'\n" +
indent1 + "\t- 'archive bucket \"ais://src/shard-{001..997}\" ais://dst/a.tar.lz4'\t- same as above (notice double quotes)\n" +
indent1 + "\t- 'archive bucket \"ais://src/shard-{998..999}\" ais://dst/a.tar.lz4 --append-or-put'\t- append (ie., archive) 2 more objects",
ArgsUsage: bucketObjectSrcArgument + " " + dstShardArgument,
Flags: archCmdsFlags[commandBucket],
Action: archMultiObjHandler,
BashComplete: putPromApndCompletions,
Expand Down Expand Up @@ -191,10 +198,11 @@ func archUsageHandler(c *cli.Context) error {
}

func archMultiObjHandler(c *cli.Context) error {
// ditto
// is it an attempt to PUT => archive?
{
a := archput{}
if err := a.parse(c); err == nil {
// Yes, it is
msg := fmt.Sprintf("expecting %s\n(hint: use 'ais archive put' command, %s for details)",
c.Command.ArgsUsage, qflprn(cli.HelpFlag))
return errors.New(msg)
Expand Down Expand Up @@ -226,6 +234,14 @@ func archMultiObjHandler(c *cli.Context) error {
fmt.Fprintf(c.App.Writer, "archive %s/{%s} as %q\n", a.rsrc.bck, what, a.dest())
return nil
}
if !flagIsSet(c, dontHeadBucketFlag) {
if _, err := headBucket(a.rsrc.bck, false /* don't add */); err != nil {
return err
}
if _, err := headBucket(a.dst.bck, false /* don't add */); err != nil {
return err
}
}
// do
_, err := api.ArchiveMultiObj(apiBP, a.rsrc.bck, &msg)
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions cmd/cli/cli/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func _evictBck(c *cli.Context, bck cmn.Bck) (err error) {
if err = api.EvictRemoteBucket(apiBP, bck, flagIsSet(c, keepMDFlag)); err != nil {
return
}
fmt.Fprintf(c.App.Writer, "%q bucket evicted\n", bck.Cname(""))
actionDone(c, "Evicted bucket "+bck.Cname("")+" from aistore")
return
}

Expand Down Expand Up @@ -436,6 +436,7 @@ func ecEncode(c *cli.Context, bck cmn.Bck, data, parity int) (err error) {
func printObjProps(c *cli.Context, entries cmn.LsoEntries, lstFilter *lstFilter, props string, addCachedCol bool) error {
var (
hideHeader = flagIsSet(c, noHeaderFlag)
hideFooter = flagIsSet(c, noFooterFlag)
matched, other = lstFilter.apply(entries)
units, errU = parseUnitsFlag(c, unitsFlag)
)
Expand All @@ -449,7 +450,7 @@ func printObjProps(c *cli.Context, entries cmn.LsoEntries, lstFilter *lstFilter,
if err := teb.Print(matched, tmpl, opts); err != nil {
return err
}
if len(matched) > 10 {
if !hideFooter && len(matched) > 10 {
listed := fblue("Listed:")
fmt.Fprintln(c.App.Writer, listed, cos.FormatBigNum(len(matched)), "names")
}
Expand Down
72 changes: 17 additions & 55 deletions cmd/cli/cli/bucket_hdlr.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ var (
yesFlag,
},
commandCopy: {
listFlag,
templateFlag,
verbObjPrefixFlag,
copyAllObjsFlag,
continueOnErrorFlag,
forceFlag,
copyDryRunFlag,
copyPrependFlag,
copyObjPrefixFlag,
listFlag,
templateFlag,
progressFlag,
refreshFlag,
waitFlag,
Expand All @@ -64,10 +64,12 @@ var (
waitJobXactFinishedFlag,
},
commandEvict: append(
listrangeFlags,
dryRunFlag,
listRangeProgressWaitFlags,
keepMDFlag,
verboseFlag,
verbObjPrefixFlag, // to disambiguate bucket/prefix vs bucket/objName
dryRunFlag,
verboseFlag, // not yet used
nonverboseFlag,
),
cmdSetBprops: {
forceFlag,
Expand Down Expand Up @@ -153,12 +155,15 @@ var (
}
bucketObjCmdEvict = cli.Command{
Name: commandEvict,
Usage: "evict one remote bucket, multiple buckets, or selected objects in a given remote bucket\n" +
indent1 + "(to select, use '--list' or '--template'), e.g.:\n" +
indent1 + "\t* gs://abc\t- evict entire bucket (all gs://abc objects in aistore);\n" +
indent1 + "\t* gs:\t- evict all GCP buckets;\n" +
indent1 + "\t* gs://abc --template images/\t- evict all objects from the virtual subdirectory called \"images\"",
ArgsUsage: optionalObjectsArgument,
Usage: "evict one remote bucket, multiple remote buckets, or\n" +
indent1 + "selected objects in a given remote bucket or buckets, e.g.:\n" +
indent1 + "\t- 'evict gs://abc'\t- evict entire bucket (all gs://abc objects in aistore);\n" +
indent1 + "\t- 'evict gs:'\t- evict all GCP buckets from the cluster;\n" +
indent1 + "\t- 'evict gs://abc --template images/'\t- evict all objects from the virtual subdirectory \"images\";\n" +
indent1 + "\t- 'evict gs://abc/images/'\t- same as above;\n" +
indent1 + "\t- 'evict gs://abc --template \"shard-{0000..9999}.tar.lz4\"'\t- evict the matching range (prefix + brace expansion);\n" +
indent1 + "\t- 'evict \"gs://abc/shard-{0000..9999}.tar.lz4\"'\t- same as above (notice double quotes)",
ArgsUsage: bucketObjectOrTemplateMultiArg,
Flags: bucketCmdsFlags[commandEvict],
Action: evictHandler,
BashComplete: bucketCompletions(bcmplop{multiple: true}),
Expand Down Expand Up @@ -368,48 +373,6 @@ func removeBucketHandler(c *cli.Context) error {
return destroyBuckets(c, buckets)
}

func evictHandler(c *cli.Context) error {
if flagIsSet(c, dryRunFlag) {
dryRunCptn(c)
}
if c.NArg() == 0 {
return missingArgumentsError(c, c.Command.ArgsUsage)
}

// Bucket argument provided by the user.
if c.NArg() == 1 {
uri := preparseBckObjURI(c.Args().Get(0))
bck, objName, err := parseBckObjURI(c, uri, true /*emptyObjnameOK*/) // cmn.ParseBckObjectURI(uri, opts)
if err != nil {
return err
}
if flagIsSet(c, listFlag) || flagIsSet(c, templateFlag) {
if objName != "" {
return incorrectUsageMsg(c,
"object name (%q) cannot be used together with %s and/or %s flags",
objName, qflprn(listFlag), qflprn(templateFlag))
}
// List or range operation on a given bucket.
return listrange(c, bck)
}
if objName == "" {
// Evict entire bucket.
return evictBucket(c, bck)
}

// Evict a single object from remote bucket - multiObjOp will handle.
}

// List and range flags are invalid with object argument(s)
if flagIsSet(c, listFlag) || flagIsSet(c, templateFlag) {
return incorrectUsageMsg(c, "flags %q, %q cannot be used together with object name arguments",
listFlag.Name, templateFlag.Name)
}

// operation on a given object or objects.
return multiobjArg(c, commandEvict)
}

func resetPropsHandler(c *cli.Context) error {
bck, err := parseBckURI(c, c.Args().Get(0), false)
if err != nil {
Expand Down Expand Up @@ -557,7 +520,6 @@ func listAnyHandler(c *cli.Context) error {

if err != nil {
if strings.Contains(err.Error(), cos.OnlyPlus) && strings.Contains(err.Error(), "bucket name") {
// slightly nicer
err = fmt.Errorf("bucket name in %q is invalid: "+cos.OnlyPlus, uri)
}
return err
Expand Down
Loading

0 comments on commit dd27a1e

Please sign in to comment.