Skip to content

Commit

Permalink
[API change] get latest version (feature)
Browse files Browse the repository at this point in the history
* (motivation: fine-grained control)
* prev. commit: b732d06

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Dec 27, 2023
1 parent 78be13f commit d8806b3
Show file tree
Hide file tree
Showing 20 changed files with 205 additions and 56 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ With a little effort, they all could be extracted and used outside.
- [Downloader](/docs/downloader.md)
- [On-disk layout](/docs/on_disk_layout.md)
- [Buckets: definition, operations, properties](https://github.com/NVIDIA/aistore/blob/main/docs/bucket.md#bucket)
- [Validate Warm GET: a quick synopsys](/docs/validate_warm_get.md)
- [Out of band updates](/docs/validate_warm_get.md)

## License

Expand Down
3 changes: 3 additions & 0 deletions ais/dpq.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type dpq struct {
bsummRemote string // QparamBsummRemote
etlName string // QparamETLName
silent string // QparamSilent
latestVer string // QparamLatestVer
}

var (
Expand Down Expand Up @@ -119,6 +120,8 @@ func (dpq *dpq) parse(rawQuery string) (err error) {
dpq.etlName = value
case apc.QparamSilent:
dpq.silent = value
case apc.QparamLatestVer:
dpq.latestVer = value

case s3.QparamMptUploadID, s3.QparamMptUploads, s3.QparamMptPartNo:
// TODO: ignore for now
Expand Down
4 changes: 2 additions & 2 deletions ais/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,8 +707,8 @@ func (t *target) getObject(w http.ResponseWriter, r *http.Request, dpq *dpq, bck
filename: filename,
mime: dpq.archmime, // apc.QparamArchmime
}
goi.isGFN = cos.IsParseBool(dpq.isGFN) // query.Get(apc.QparamIsGFNRequest)
// goi.chunked = config.Net.HTTP.Chunked NOTE: disabled - no need
goi.isGFN = cos.IsParseBool(dpq.isGFN) // query.Get(apc.QparamIsGFNRequest)
goi.latestVer = goi.lom.ValidateWarmGet(dpq.latestVer) // apc.QparamLatestVer or versioning.validate_warm_get
}
if bck.IsHTTP() {
originalURL := dpq.origURL // query.Get(apc.QparamOrigURL)
Expand Down
3 changes: 2 additions & 1 deletion ais/tgtobj.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type (
verchanged bool // version changed
retry bool // once
cold bool // true if executed backend.Get
latestVer bool // a.k.a. versioning.validate_warm_get
}

// textbook append: (packed) handle and control structure (see also `putA2I` arch below)
Expand Down Expand Up @@ -557,7 +558,7 @@ do:
}
goto fin
}
} else if goi.lom.Bck().IsRemote() && goi.lom.VersionConf().ValidateWarmGet { // check remote version
} else if goi.latestVer { // apc.QparamLatestVer or versioning.validate_warm_get
var equal bool
goi.lom.Unlock(false)
if equal, errCode, err = goi.t.CompareObjects(goi.ctx, goi.lom); err != nil {
Expand Down
5 changes: 5 additions & 0 deletions api/apc/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ const (
// - shutdown the primary and the entire cluster
// - attach invalid mountpath
QparamForce = "frc"

// same as `Versioning.ValidateWarmGet` (cluster config and bucket props)
// - usage: GET and (copy|transform) x (bucket|multi-object) operations
// - implies remote backend
QparamLatestVer = "latest-ver"
)

// QparamFltPresence enum.
Expand Down
1 change: 1 addition & 0 deletions api/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type (
// 1. `apc.QparamETLName`: named ETL to transform the object (i.e., perform "inline transformation")
// 2. `apc.QparamOrigURL`: GET from a vanilla http(s) location (`ht://` bucket with the corresponding `OrigURLBck`)
// 3. `apc.QparamSilent`: do not log errors
// 4. `apc.QparamLatestVer`: get latest version from the associated Cloud bucket; see also: `ValidateWarmGet`
Query url.Values

// The field is exclusively used to facilitate Range Read.
Expand Down
2 changes: 1 addition & 1 deletion cmd/cli/cli/arch_hdlr.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ var (
indent4 + "\t- ais archive get ais://abc/trunk-0123.tar.lz4 --archpath file456 /tmp/out - same as above\n" +
indent4 + "\t- ais archive get ais://abc/trunk-0123.tar.lz4/file456 /tmp/out/file456.new - same as above w/ rename",
ArgsUsage: getShardArgument,
Flags: rmFlags(objectCmdGet.Flags, checkObjCachedFlag, lengthFlag, offsetFlag),
Flags: rmFlags(objectCmdGet.Flags, headObjPresentFlag, lengthFlag, offsetFlag),
Action: getArchHandler,
BashComplete: objectCmdGet.BashComplete,
}
Expand Down
7 changes: 6 additions & 1 deletion cmd/cli/cli/bucket_hdlr.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,14 +450,19 @@ func setPropsHandler(c *cli.Context) (err error) {
return updateBckProps(c, bck, currProps, newProps)
}

// TODO: more validation; e.g. `validate_warm_get = true` is only supported for buckets with Cloud and remais backends
func updateBckProps(c *cli.Context, bck cmn.Bck, currProps *cmn.Bprops, updateProps *cmn.BpropsToSet) (err error) {
// Apply updated props and check for change
// apply updated props
allNewProps := currProps.Clone()
allNewProps.Apply(updateProps)

// check for changes
if allNewProps.Equal(currProps) {
displayPropsEqMsg(c, bck)
return nil
}

// do
if _, err = api.SetBucketProps(apiBP, bck, updateProps); err != nil {
if herr, ok := err.(*cmn.ErrHTTP); ok && herr.Status == http.StatusNotFound {
return herr
Expand Down
16 changes: 12 additions & 4 deletions cmd/cli/cli/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,13 @@ var (
Usage: "server-side flag, an indication for aistore _not_ to log assorted errors (e.g., HEAD(object) failures)",
}

latestVersionFlag = cli.BoolFlag{
Name: "latest",
Usage: "GET or copy the latest object version from the associated remote bucket:\n" +
indent1 + "\t- allows fine-grained (operation-level) control without changing bucket configuration\n" +
indent1 + "\t- see also: 'ais bucket props ... versioning.validate_warm_get'",
}

averageSizeFlag = cli.BoolFlag{Name: "average-size", Usage: "show average GET, PUT, etc. request size"}

ignoreErrorFlag = cli.BoolFlag{
Expand Down Expand Up @@ -666,17 +673,18 @@ var (
// speaking, not true for AIStore where LRU eviction is per-bucket configurable with default
// settings inherited from the cluster config, etc. etc.
// See also: apc.Flt* enum.
checkObjCachedFlag = cli.BoolFlag{
Name: "check-cached",
Usage: "check if a given object from a remote bucket is present (\"cached\") in AIS",
headObjPresentFlag = cli.BoolFlag{
Name: "check-cached",
Usage: "instead of GET execute HEAD(object) to check if the object is present in aistore\n" +
indent1 + "\t(applies only to buckets with remote backend)",
}
listObjCachedFlag = cli.BoolFlag{
Name: "cached",
Usage: "list only those objects from a remote bucket that are present (\"cached\")",
}
getObjCachedFlag = cli.BoolFlag{
Name: "cached",
Usage: "get only those objects from a remote bucket that are present (\"cached\") in AIS",
Usage: "get only those objects from a remote bucket that are present (\"cached\") in aistore",
}
// when '--all' is used for/by another flag
objNotCachedPropsFlag = cli.BoolFlag{
Expand Down
82 changes: 51 additions & 31 deletions cmd/cli/cli/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,32 @@ func getHandler(c *cli.Context) error {
if flagIsSet(c, lengthFlag) != flagIsSet(c, offsetFlag) {
return fmt.Errorf("%s and %s must be both present (or not)", qflprn(lengthFlag), qflprn(offsetFlag))
}
if flagIsSet(c, latestVersionFlag) {
if flagIsSet(c, headObjPresentFlag) {
return fmt.Errorf(errFmtExclusive, qflprn(latestVersionFlag), qflprn(headObjPresentFlag))
}
if flagIsSet(c, getObjCachedFlag) {
return fmt.Errorf(errFmtExclusive, qflprn(latestVersionFlag), qflprn(getObjCachedFlag))
}
}

// source
uri := c.Args().Get(0)
bck, objName, err := parseBckObjURI(c, uri, flagIsSet(c, getObjPrefixFlag))
if err != nil {
return err
}
if !bck.IsHTTP() {
if bck.Props, err = headBucket(bck, false /* don't add */); err != nil {
return err
}
}
if flagIsSet(c, latestVersionFlag) && !bck.IsCloud() && !bck.IsRemoteAIS() {
return fmt.Errorf("option %s is incompatible with the specified bucket %s\n"+
"(tip: can only GET latest object's version from a bucket with Cloud or remote AIS backend)",
qflprn(latestVersionFlag), bck.String())
}

// destination (empty "" implies using source `basename`)
outFile := c.Args().Get(1)

Expand Down Expand Up @@ -82,8 +101,8 @@ func getHandler(c *cli.Context) error {

// validate extract and archpath
if extract {
if flagIsSet(c, checkObjCachedFlag) {
return fmt.Errorf(errFmtExclusive, extractVia, qflprn(checkObjCachedFlag))
if flagIsSet(c, headObjPresentFlag) {
return fmt.Errorf(errFmtExclusive, extractVia, qflprn(headObjPresentFlag))
}
if flagIsSet(c, lengthFlag) {
return fmt.Errorf("read range (%s, %s) of archived files (%s) is not implemented yet",
Expand All @@ -94,9 +113,9 @@ func getHandler(c *cli.Context) error {
if flagIsSet(c, getObjPrefixFlag) {
return fmt.Errorf(errFmtExclusive, qflprn(getObjPrefixFlag), qflprn(archpathGetFlag))
}
if flagIsSet(c, checkObjCachedFlag) {
if flagIsSet(c, headObjPresentFlag) {
return fmt.Errorf("checking presence (%s) of archived files (%s) is not implemented yet",
qflprn(checkObjCachedFlag), qflprn(archpathGetFlag))
qflprn(headObjPresentFlag), qflprn(archpathGetFlag))
}
if flagIsSet(c, lengthFlag) {
return fmt.Errorf("read range (%s, %s) of archived files (%s) is not implemented yet",
Expand All @@ -118,11 +137,6 @@ func getHandler(c *cli.Context) error {
}

// GET
if !bck.IsHTTP() {
if _, err = headBucket(bck, false /* don't add */); err != nil {
return err
}
}
return getObject(c, bck, objName, archpath, outFile, false /*quiet*/, extract)
}

Expand Down Expand Up @@ -150,7 +164,7 @@ func getMultiObj(c *cli.Context, bck cmn.Bck, archpath, outFile string, extract
if flagIsSet(c, listArchFlag) || extract || archpath != "" {
msg.SetFlag(apc.LsArchDir)
}
if flagIsSet(c, checkObjCachedFlag) {
if flagIsSet(c, getObjCachedFlag) {
msg.SetFlag(apc.LsObjCached)
}
pageSize, limit, err := _setPage(c, bck)
Expand Down Expand Up @@ -339,7 +353,7 @@ func getObject(c *cli.Context, bck cmn.Bck, objName, archpath, outFile string, q
}

// just check if a remote object is present (do not GET)
if flagIsSet(c, checkObjCachedFlag) {
if flagIsSet(c, headObjPresentFlag) {
return isObjPresent(c, bck, objName)
}

Expand All @@ -349,10 +363,10 @@ func getObject(c *cli.Context, bck cmn.Bck, objName, archpath, outFile string, q
return err
}
if offset, err = parseSizeFlag(c, offsetFlag, units); err != nil {
return
return err
}
if length, err = parseSizeFlag(c, lengthFlag, units); err != nil {
return
return err
}

// where to
Expand Down Expand Up @@ -394,7 +408,7 @@ func getObject(c *cli.Context, bck cmn.Bck, objName, archpath, outFile string, q
} else {
var file *os.File
if file, err = os.Create(outFile); err != nil {
return
return err
}
defer func() {
file.Close()
Expand All @@ -405,24 +419,12 @@ func getObject(c *cli.Context, bck cmn.Bck, objName, archpath, outFile string, q
getArgs = api.GetArgs{Writer: file, Header: hdr}
}

if bck.IsHTTP() {
uri := c.Args().Get(0)
getArgs.Query = make(url.Values, 2)
getArgs.Query.Set(apc.QparamOrigURL, uri)
}
if archpath != "" {
if getArgs.Query == nil {
getArgs.Query = make(url.Values, 1)
}
getArgs.Query.Set(apc.QparamArchpath, archpath)
}
if flagIsSet(c, silentFlag) {
if getArgs.Query == nil {
getArgs.Query = make(url.Values, 1)
}
getArgs.Query.Set(apc.QparamSilent, "true")
// finally, http query
if bck.IsHTTP() || archpath != "" || flagIsSet(c, silentFlag) || flagIsSet(c, latestVersionFlag) {
getArgs.Query = _getQparams(c, &bck, archpath)
}

// do
if flagIsSet(c, cksumFlag) {
oah, err = api.GetObjectWithValidation(apiBP, bck, objName, &getArgs)
} else {
Expand All @@ -432,7 +434,7 @@ func getObject(c *cli.Context, bck cmn.Bck, objName, archpath, outFile string, q
if cmn.IsStatusNotFound(err) && archpath == "" {
err = &errDoesNotExist{what: "object", name: bck.Cname(objName)}
}
return
return err
}

var (
Expand Down Expand Up @@ -492,6 +494,24 @@ func getObject(c *cli.Context, bck cmn.Bck, objName, archpath, outFile string, q
return
}

func _getQparams(c *cli.Context, bck *cmn.Bck, archpath string) (q url.Values) {
q = make(url.Values, 2)
if bck.IsHTTP() {
uri := c.Args().Get(0)
q.Set(apc.QparamOrigURL, uri)
}
if archpath != "" {
q.Set(apc.QparamArchpath, archpath)
}
if flagIsSet(c, silentFlag) {
q.Set(apc.QparamSilent, "true")
}
if flagIsSet(c, latestVersionFlag) {
q.Set(apc.QparamLatestVer, "true")
}
return q
}

//
// post-GET extraction
//
Expand Down
9 changes: 5 additions & 4 deletions cmd/cli/cli/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,17 +224,18 @@ func concatObject(c *cli.Context, bck cmn.Bck, objName string, fileNames []strin
return nil
}

func isObjPresent(c *cli.Context, bck cmn.Bck, object string) error {
_, err := api.HeadObject(apiBP, bck, object, apc.FltPresentNoProps, true)
func isObjPresent(c *cli.Context, bck cmn.Bck, objName string) error {
name := bck.Cname(objName)
_, err := api.HeadObject(apiBP, bck, objName, apc.FltPresentNoProps, true)
if err != nil {
if cmn.IsStatusNotFound(err) {
fmt.Fprintf(c.App.Writer, "Cached: %v\n", false)
fmt.Fprintf(c.App.Writer, "%s is not present (\"not cached\")\n", name)
return nil
}
return V(err)
}

fmt.Fprintf(c.App.Writer, "Cached: %v\n", true)
fmt.Fprintf(c.App.Writer, "%s is present (is cached)\n", name)
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion cmd/cli/cli/object_hdlr.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ var (
lengthFlag,
cksumFlag,
yesFlag,
checkObjCachedFlag,
headObjPresentFlag,
latestVersionFlag,
refreshFlag,
progressFlag,
// archive
Expand Down
2 changes: 1 addition & 1 deletion cmd/cli/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.21

// direct
require (
github.com/NVIDIA/aistore v1.3.22-0.20231226164558-674a39f74ed0
github.com/NVIDIA/aistore v1.3.22-0.20231227014413-fb1c571ff085
github.com/fatih/color v1.16.0
github.com/json-iterator/go v1.1.12
github.com/onsi/ginkgo v1.16.5
Expand Down
4 changes: 2 additions & 2 deletions cmd/cli/go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
code.cloudfoundry.org/bytefmt v0.0.0-20190710193110-1eb035ffe2b6/go.mod h1:wN/zk7mhREp/oviagqUXY3EwuHhWyOvAdsn5Y4CzOrc=
github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/NVIDIA/aistore v1.3.22-0.20231226164558-674a39f74ed0 h1:7OjWH5l+smCqB8jTzagcLMBAfgaLQyCZZLxjZkBjaAM=
github.com/NVIDIA/aistore v1.3.22-0.20231226164558-674a39f74ed0/go.mod h1:jpWmGuqxnY+akx81S5eqHhGdgSENm0mVYRrVbpCW4/I=
github.com/NVIDIA/aistore v1.3.22-0.20231227014413-fb1c571ff085 h1:5Cw7+VXpsb0faz6cTmX4cr3VysmYCYLjoz+qguiyoM4=
github.com/NVIDIA/aistore v1.3.22-0.20231227014413-fb1c571ff085/go.mod h1:jpWmGuqxnY+akx81S5eqHhGdgSENm0mVYRrVbpCW4/I=
github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8=
github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q=
github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA=
Expand Down
5 changes: 3 additions & 2 deletions cmn/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,10 +381,11 @@ type (
}

VersionConf struct {
// Determines if the versioning is enabled.
// Determines if versioning is enabled
Enabled bool `json:"enabled"`

// Validate object version upon warm GET.
// Validate object version upon warm GET
// See also: apc.QparamLatestVer
ValidateWarmGet bool `json:"validate_warm_get"`
}
VersionConfToSet struct {
Expand Down
13 changes: 13 additions & 0 deletions core/lom.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,19 @@ func (lom *LOM) Version(special ...bool) string {
return lom.md.Ver
}

func (lom *LOM) ValidateWarmGet(qparam string /*apc.QparamLatestVer*/) bool {
switch {
case !lom.Bck().IsCloud() && !lom.Bck().IsRemoteAIS():
return false
case qparam == "":
return lom.VersionConf().ValidateWarmGet // bucket prop
case qparam == "true":
return true
default:
return cos.IsParseBool(qparam)
}
}

func (lom *LOM) Uname() string { return lom.md.uname }
func (lom *LOM) Digest() uint64 { return lom.digest }

Expand Down
Loading

0 comments on commit d8806b3

Please sign in to comment.