From 07e1e0bc41e9504de3061dd3b7eb9f2303ea0617 Mon Sep 17 00:00:00 2001 From: Alex Aizman Date: Mon, 11 Dec 2023 10:07:30 -0500 Subject: [PATCH] APPEND if exists; amend metadata comparison (major update) * APPEND will now perform PUT iff the destination does not exist; * otherwise, it'll always append to existing content - refactor and rewrite the previous (target) implementation * CLI: add 'ais put --append' option * - in addition to 'concat' subcommand * - add docs/cli, add inline help * separately, amend object metadata comparison * up cli mod Signed-off-by: Alex Aizman --- ais/proxy.go | 14 ++- ais/target.go | 50 +++----- ais/tgtobj.go | 220 ++++++++++++++++++--------------- ais/tgtobj_internal_test.go | 9 +- ais/utils.go | 14 +++ cmd/cli/cli/const.go | 5 + cmd/cli/cli/object_hdlr.go | 32 +++-- cmd/cli/cli/verbfobj.go | 38 ++++-- cmd/cli/go.mod | 2 +- cmd/cli/go.sum | 4 +- cmd/cli/test/put_object.stdout | 4 +- cmn/objattrs.go | 40 +++--- docs/cli/object.md | 51 ++++++++ 13 files changed, 296 insertions(+), 187 deletions(-) diff --git a/ais/proxy.go b/ais/proxy.go index 0004d6d932..c790f81b08 100644 --- a/ais/proxy.go +++ b/ais/proxy.go @@ -679,13 +679,15 @@ func (p *proxy) httpobjput(w http.ResponseWriter, r *http.Request, apireq *apiRe if !appendTyProvided { perms = apc.AcePUT } else { - hi, err := parseAppendHandle(apireq.dpq.appendHdl) // apc.QparamAppendHandle - if err != nil { - p.writeErr(w, r, err) - return - } - nodeID = hi.nodeID perms = apc.AceAPPEND + if apireq.dpq.appendHdl != "" { + items, err := preParse(apireq.dpq.appendHdl) // apc.QparamAppendHandle + if err != nil { + p.writeErr(w, r, err) + return + } + nodeID = items[0] // nodeID; compare w/ apndOI.parse + } } // 2. bucket diff --git a/ais/target.go b/ais/target.go index a340d6ed16..b9d4ebeb8c 100644 --- a/ais/target.go +++ b/ais/target.go @@ -806,8 +806,20 @@ func (t *target) httpobjput(w http.ResponseWriter, r *http.Request, apireq *apiR errCode, err = t.putApndArch(r, lom, started, apireq.dpq) lom.Unlock(true) case apireq.dpq.appendTy != "": // apc.QparamAppendType - handle, errCode, err = t.appendObj(r, lom, started, apireq.dpq) - if err == nil { + a := &apndOI{ + started: started, + t: t, + config: config, + lom: lom, + r: r.Body, + op: apireq.dpq.appendTy, // apc.QparamAppendType + } + if err := a.parse(apireq.dpq.appendHdl /*apc.QparamAppendHandle*/); err != nil { + t.writeErr(w, r, err) + return + } + handle, errCode, err = a.do(r) + if err == nil && handle != "" { w.Header().Set(apc.HdrAppendHandle, handle) return } @@ -1201,40 +1213,6 @@ func (t *target) CompareObjects(ctx context.Context, lom *cluster.LOM) (equal bo return } -func (t *target) appendObj(r *http.Request, lom *cluster.LOM, started int64, dpq *dpq) (string, int, error) { - var ( - cksumValue = r.Header.Get(apc.HdrObjCksumVal) - cksumType = r.Header.Get(apc.HdrObjCksumType) - contentLength = r.Header.Get(cos.HdrContentLength) - ) - hdl, err := parseAppendHandle(dpq.appendHdl) // apc.QparamAppendHandle - if err != nil { - return "", http.StatusBadRequest, err - } - a := &apndOI{ - started: started, - t: t, - lom: lom, - r: r.Body, - op: dpq.appendTy, // apc.QparamAppendType - hdl: hdl, - } - if a.op != apc.AppendOp && a.op != apc.FlushOp { - err = fmt.Errorf("invalid operation %q (expecting either %q or %q) - check %q query", - a.op, apc.AppendOp, apc.FlushOp, apc.QparamAppendType) - return "", http.StatusBadRequest, err - } - if contentLength != "" { - if size, ers := strconv.ParseInt(contentLength, 10, 64); ers == nil { - a.size = size - } - } - if cksumValue != "" { - a.cksum = cos.NewCksum(cksumType, cksumValue) - } - return a.do() -} - // called under lock func (t *target) putApndArch(r *http.Request, lom *cluster.LOM, started int64, dpq *dpq) (int, error) { var ( diff --git a/ais/tgtobj.go b/ais/tgtobj.go index bd653bc3a2..68c7229ae0 100644 --- a/ais/tgtobj.go +++ b/ais/tgtobj.go @@ -15,7 +15,6 @@ import ( "net/http" "os" "strconv" - "strings" "sync" "time" @@ -45,7 +44,7 @@ import ( type ( putOI struct { - r io.ReadCloser // reader that has the content + r io.ReadCloser // content reader xctn cluster.Xact // xaction that puts t *target // this lom *cluster.LOM // obj @@ -78,24 +77,24 @@ type ( unlocked bool // internal verchanged bool // version changed retry bool // once - cold bool // executed backend.Get + cold bool // true if executed backend.Get } - // append handle (packed) + // textbook append: (packed) handle and control structure (see also `putA2I` arch below) aoHdl struct { partialCksum *cos.CksumHash nodeID string - filePath string + workFQN string } - // (see also putA2I) apndOI struct { - started int64 // started time of receiving - used to calculate the recv duration - r io.ReadCloser // reader with the content of the object. + started int64 // start time (nanoseconds) + r io.ReadCloser // content reader t *target // this - lom *cluster.LOM // append to - cksum *cos.Cksum // expected checksum of the final object. - hdl aoHdl // packed - op string // operation (Append | Flush) + config *cmn.Config // (during this request) + lom *cluster.LOM // append to or _as_ + cksum *cos.Cksum // checksum expected once Flush-ed + hdl aoHdl // (packed) + op string // enum {apc.AppendOp, apc.FlushOp} size int64 // Content-Length } @@ -1159,124 +1158,147 @@ func (goi *getOI) parseRange(resphdr http.Header, size int64) (hrng *htrange, er } // -// APPEND to existing object (as file) +// APPEND a file or multiple files: +// - as a new object, if doesn't exist +// - to an existing object, if exists // -func (a *apndOI) do() (newHandle string, errCode int, err error) { - filePath := a.hdl.filePath +func (a *apndOI) do(r *http.Request) (packedHdl string, errCode int, err error) { + var ( + cksumValue = r.Header.Get(apc.HdrObjCksumVal) + cksumType = r.Header.Get(apc.HdrObjCksumType) + contentLength = r.Header.Get(cos.HdrContentLength) + ) + if contentLength != "" { + if size, ers := strconv.ParseInt(contentLength, 10, 64); ers == nil { + a.size = size + } + } + if cksumValue != "" { + a.cksum = cos.NewCksum(cksumType, cksumValue) + } + switch a.op { case apc.AppendOp: - var f *os.File - if filePath == "" { - filePath = fs.CSM.Gen(a.lom, fs.WorkfileType, fs.WorkfileAppend) - f, err = a.lom.CreateFile(filePath) - if err != nil { - errCode = http.StatusInternalServerError - return - } - a.hdl.partialCksum = cos.NewCksumHash(a.lom.CksumType()) - } else { - f, err = os.OpenFile(filePath, os.O_APPEND|os.O_WRONLY, cos.PermRWR) + buf, slab := a.t.gmm.Alloc() + packedHdl, errCode, err = a.apnd(buf) + slab.Free(buf) + case apc.FlushOp: + errCode, err = a.flush() + default: + err = fmt.Errorf("invalid operation %q (expecting either %q or %q) - check %q query", + a.op, apc.AppendOp, apc.FlushOp, apc.QparamAppendType) + } + + return packedHdl, errCode, err +} + +func (a *apndOI) apnd(buf []byte) (packedHdl string, errCode int, err error) { + var ( + fh *os.File + workFQN = a.hdl.workFQN + ) + if workFQN == "" { + workFQN = fs.CSM.Gen(a.lom, fs.WorkfileType, fs.WorkfileAppend) + a.lom.Lock(false) + if a.lom.Load(false /*cache it*/, false /*locked*/) == nil { + _, a.hdl.partialCksum, err = cos.CopyFile(a.lom.FQN, workFQN, buf, a.lom.CksumType()) + a.lom.Unlock(false) if err != nil { errCode = http.StatusInternalServerError return } - debug.Assert(a.hdl.partialCksum != nil) - } - - var ( - buf []byte - slab *memsys.Slab - ) - if a.size == 0 { - buf, slab = a.t.gmm.Alloc() + fh, err = os.OpenFile(workFQN, os.O_APPEND|os.O_WRONLY, cos.PermRWR) } else { - buf, slab = a.t.gmm.AllocSize(a.size) - } - - w := cos.NewWriterMulti(f, a.hdl.partialCksum.H) - _, err = cos.CopyBuffer(w, a.r, buf) - - slab.Free(buf) - cos.Close(f) - if err != nil { - errCode = http.StatusInternalServerError - return - } - - newHandle = combineAppendHandle(a.t.SID(), filePath, a.hdl.partialCksum) - case apc.FlushOp: - if filePath == "" { - err = fmt.Errorf("failed to finalize append-file operation: empty source in the %+v handle", a.hdl) - errCode = http.StatusBadRequest - return + a.lom.Unlock(false) + a.hdl.partialCksum = cos.NewCksumHash(a.lom.CksumType()) + fh, err = a.lom.CreateFile(workFQN) } + } else { + fh, err = os.OpenFile(workFQN, os.O_APPEND|os.O_WRONLY, cos.PermRWR) debug.Assert(a.hdl.partialCksum != nil) - a.hdl.partialCksum.Finalize() - partialCksum := a.hdl.partialCksum.Clone() - if !a.cksum.IsEmpty() && !partialCksum.Equal(a.cksum) { - err = cos.NewErrDataCksum(partialCksum, a.cksum) - errCode = http.StatusInternalServerError - return - } - params := cluster.PromoteParams{ - Bck: a.lom.Bck(), - Cksum: partialCksum, - PromoteArgs: cluster.PromoteArgs{ - SrcFQN: filePath, - ObjName: a.lom.ObjName, - OverwriteDst: true, - DeleteSrc: true, // NOTE: always overwrite and remove - }, - } - if errCode, err = a.t.Promote(¶ms); err != nil { - return - } - default: - err = fmt.Errorf("invalid append-file operation %q", a.op) - debug.AssertNoErr(err) + } + if err != nil { // failed to open or create + errCode = http.StatusInternalServerError + return + } + + w := cos.NewWriterMulti(fh, a.hdl.partialCksum.H) + _, err = cos.CopyBuffer(w, a.r, buf) + cos.Close(fh) + if err != nil { + errCode = http.StatusInternalServerError return } - delta := time.Now().UnixNano() - a.started + packedHdl = a.pack(workFQN) + + // stats (TODO: add `stats.FlushCount` for symmetry) + lat := time.Now().UnixNano() - a.started a.t.statsT.AddMany( cos.NamedVal64{Name: stats.AppendCount, Value: 1}, - cos.NamedVal64{Name: stats.AppendLatency, Value: delta}, + cos.NamedVal64{Name: stats.AppendLatency, Value: lat}, ) - if cmn.FastV(4, cos.SmoduleAIS) { - nlog.Infof("APPEND %s: %s", a.lom, delta) + if a.config.FastV(4, cos.SmoduleAIS) { + nlog.Infof("APPEND %s: %s", a.lom, lat) } return } -func parseAppendHandle(handle string) (hdl aoHdl, err error) { - if handle == "" { - return +func (a *apndOI) flush() (int, error) { + if a.hdl.workFQN == "" { + return 0, fmt.Errorf("failed to finalize append-file operation: empty source in the %+v handle", a.hdl) + } + + // finalize checksum + debug.Assert(a.hdl.partialCksum != nil) + a.hdl.partialCksum.Finalize() + partialCksum := a.hdl.partialCksum.Clone() + if !a.cksum.IsEmpty() && !partialCksum.Equal(a.cksum) { + return http.StatusInternalServerError, cos.NewErrDataCksum(partialCksum, a.cksum) + } + + params := cluster.PromoteParams{ + Bck: a.lom.Bck(), + Cksum: partialCksum, + PromoteArgs: cluster.PromoteArgs{ + SrcFQN: a.hdl.workFQN, + ObjName: a.lom.ObjName, + OverwriteDst: true, + DeleteSrc: true, // NOTE: always overwrite and remove + }, } - p := strings.SplitN(handle, "|", 4) - if len(p) != 4 { - return hdl, fmt.Errorf("invalid APPEND handle: %q", handle) + return a.t.Promote(¶ms) +} + +func (a *apndOI) parse(packedHdl string) error { + if packedHdl == "" { + return nil } - hdl.partialCksum = cos.NewCksumHash(p[2]) - buf, err := base64.StdEncoding.DecodeString(p[3]) + items, err := preParse(packedHdl) if err != nil { - return hdl, err + return err } - err = hdl.partialCksum.H.(encoding.BinaryUnmarshaler).UnmarshalBinary(buf) + a.hdl.partialCksum = cos.NewCksumHash(items[2]) + buf, err := base64.StdEncoding.DecodeString(items[3]) if err != nil { - return hdl, err + return err } - hdl.nodeID = p[0] - hdl.filePath = p[1] - return + if err := a.hdl.partialCksum.H.(encoding.BinaryUnmarshaler).UnmarshalBinary(buf); err != nil { + return err + } + + a.hdl.nodeID = items[0] + a.hdl.workFQN = items[1] + return nil } -func combineAppendHandle(nodeID, filePath string, partialCksum *cos.CksumHash) string { - buf, err := partialCksum.H.(encoding.BinaryMarshaler).MarshalBinary() +func (a *apndOI) pack(workFQN string) string { + buf, err := a.hdl.partialCksum.H.(encoding.BinaryMarshaler).MarshalBinary() debug.AssertNoErr(err) - cksumTy := partialCksum.Type() + cksumTy := a.hdl.partialCksum.Type() cksumBinary := base64.StdEncoding.EncodeToString(buf) - return nodeID + "|" + filePath + "|" + cksumTy + "|" + cksumBinary + return a.t.SID() + appendHandleSepa + workFQN + appendHandleSepa + cksumTy + appendHandleSepa + cksumBinary } // diff --git a/ais/tgtobj_internal_test.go b/ais/tgtobj_internal_test.go index 09582d4d38..c3b1f2d66f 100644 --- a/ais/tgtobj_internal_test.go +++ b/ais/tgtobj_internal_test.go @@ -122,6 +122,7 @@ func BenchmarkObjPut(b *testing.B) { lom: lom, r: r, workFQN: path.Join(testMountpath, "objname.work"), + config: cmn.GCO.Get(), } os.Remove(lom.FQN) b.StartTimer() @@ -150,6 +151,7 @@ func BenchmarkObjAppend(b *testing.B) { {fileSize: 16 * cos.MiB}, } + buf := make([]byte, 16*cos.KiB) for _, bench := range benches { b.Run(cos.ToSizeIEC(bench.fileSize, 2), func(b *testing.B) { lom := cluster.AllocLOM("objname") @@ -175,18 +177,18 @@ func BenchmarkObjAppend(b *testing.B) { os.Remove(lom.FQN) b.StartTimer() - newHandle, _, err := aoi.do() + newHandle, _, err := aoi.apnd(buf) if err != nil { b.Fatal(err) } - hdl, err = parseAppendHandle(newHandle) + err = aoi.parse(newHandle) if err != nil { b.Fatal(err) } } b.StopTimer() os.Remove(lom.FQN) - os.Remove(hdl.filePath) + os.Remove(hdl.workFQN) }) } } @@ -231,6 +233,7 @@ func BenchmarkObjGetDiscard(b *testing.B) { lom: lom, r: r, workFQN: path.Join(testMountpath, "objname.work"), + config: cmn.GCO.Get(), } _, err = poi.putObject() if err != nil { diff --git a/ais/utils.go b/ais/utils.go index b064678b1d..056af68592 100644 --- a/ais/utils.go +++ b/ais/utils.go @@ -367,3 +367,17 @@ func cleanupConfigDir(name string, keepInitialConfig bool) { return nil }) } + +// +// common APPEND(file(s)) pre-parser +// + +const appendHandleSepa = "|" + +func preParse(packedHdl string) (items []string, err error) { + items = strings.SplitN(packedHdl, appendHandleSepa, 4) + if len(items) != 4 { + err = fmt.Errorf("invalid APPEND handle: %q", packedHdl) + } + return +} diff --git a/cmd/cli/cli/const.go b/cmd/cli/cli/const.go index 1fc5f49f13..791880de7c 100644 --- a/cmd/cli/cli/const.go +++ b/cmd/cli/cli/const.go @@ -728,6 +728,11 @@ var ( putObjCksumText, } + appendConcatFlag = cli.BoolFlag{ + Name: "append", + Usage: "concatenate files: append a file or multiple files as a new _or_ to an existing object", + } + skipVerCksumFlag = cli.BoolFlag{ Name: "skip-vc", Usage: "skip loading object metadata (and the associated checksum & version related processing)", diff --git a/cmd/cli/cli/object_hdlr.go b/cmd/cli/cli/object_hdlr.go index d68a58c783..87e49e7b6d 100644 --- a/cmd/cli/cli/object_hdlr.go +++ b/cmd/cli/cli/object_hdlr.go @@ -1,5 +1,4 @@ // Package cli provides easy-to-use commands to manage, monitor, and utilize AIS clusters. -// This file handles CLI commands that pertain to AIS objects. /* * Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved. */ @@ -19,6 +18,8 @@ import ( "github.com/urfave/cli" ) +// in this file: operations on objects + var ( objectCmdsFlags = map[string][]cli.Flag{ commandRemove: append( @@ -62,6 +63,8 @@ var ( // cksum skipVerCksumFlag, putObjDfltCksumFlag, + // append + appendConcatFlag, ), commandSetCustom: { setNewCustomMDFlag, @@ -106,7 +109,7 @@ var ( objectCmdPut = cli.Command{ Name: commandPut, - Usage: "PUT or APPEND one file, one directory, or multiple files and/or directories.\n" + + Usage: "PUT or append one file, one directory, or multiple files and/or directories.\n" + indent1 + "Use optional shell filename PATTERN (wildcard) to match/select multiple sources.\n" + indent1 + "Destination naming is consistent with 'ais object promote' command, whereby the optional OBJECT_NAME_or_PREFIX\n" + indent1 + "becomes either a name, a prefix, or a virtual destination directory (if it ends with a forward '/').\n" + @@ -116,9 +119,10 @@ var ( indent1 + "\t- '--compute-checksum': use '--compute-checksum' to facilitate end-to-end protection;\n" + indent1 + "\t- '--progress': progress bar, to show running counts and sizes of uploaded files;\n" + indent1 + "\t- Ctrl-D: when writing directly from standard input use Ctrl-D to terminate;\n" + + indent1 + "\t- '--append' to append (concatenate) files, e.g.: 'ais put docs ais://nnn/all-docs --append';\n" + indent1 + "\t- '--dry-run': see the results without making any changes.\n" + indent1 + "\tNotes:\n" + - indent1 + "\t- to write or append to " + archExts + "-formatted objects (\"shards\"), use 'ais archive'", + indent1 + "\t- to write or add files to " + archExts + "-formatted objects (\"shards\"), use 'ais archive'", ArgsUsage: putObjectArgument, Flags: append(objectCmdsFlags[commandPut], putObjCksumFlags...), Action: putHandler, @@ -142,6 +146,15 @@ var ( Action: promoteHandler, BashComplete: putPromApndCompletions, } + objectCmdConcat = cli.Command{ + Name: commandConcat, + Usage: "append a file, a directory, or multiple files and/or directories\n" + + indent1 + "as a new " + objectArgument + " if doesn't exists, and to an existing " + objectArgument + " otherwise, e.g.:\n" + + indent1 + "$ ais object concat docs ais://nnn/all-docs ### concatenate all files from docs/ directory.", + ArgsUsage: concatObjectArgument, + Flags: objectCmdsFlags[commandConcat], + Action: concatHandler, + } objectCmdSetCustom = cli.Command{ Name: commandSetCustom, @@ -159,6 +172,7 @@ var ( bucketsObjectsCmdList, objectCmdPut, objectCmdPromote, + objectCmdConcat, objectCmdSetCustom, bucketObjCmdEvict, makeAlias(showCmdObject, "", true, commandShow), // alias for `ais show` @@ -178,13 +192,6 @@ var ( Action: removeObjectHandler, BashComplete: bucketCompletions(bcmplop{multiple: true, separator: true}), }, - { - Name: commandConcat, - Usage: "concatenate multiple files and/or directories (with or without matching pattern) as a new single object", - ArgsUsage: concatObjectArgument, - Flags: objectCmdsFlags[commandConcat], - Action: concatHandler, - }, { Name: commandCat, Usage: "cat an object (i.e., print its contents to STDOUT)", @@ -289,6 +296,10 @@ func removeObjectHandler(c *cli.Context) (err error) { // main PUT handler: cases 1 through 4 func putHandler(c *cli.Context) error { + if flagIsSet(c, appendConcatFlag) { + return concatHandler(c) + } + var a putargs if err := a.parse(c, true /*empty dst oname*/); err != nil { return err @@ -296,7 +307,6 @@ func putHandler(c *cli.Context) error { if flagIsSet(c, dryRunFlag) { dryRunCptn(c) } - // 1. one file if a.srcIsRegular() { debug.Assert(a.src.abspath != "") diff --git a/cmd/cli/cli/verbfobj.go b/cmd/cli/cli/verbfobj.go index 470d46c5da..bf64123685 100644 --- a/cmd/cli/cli/verbfobj.go +++ b/cmd/cli/cli/verbfobj.go @@ -388,7 +388,9 @@ func putRegular(c *cli.Context, bck cmn.Bck, objName, path string, finfo os.File return err } -// PUT fixed-sized chunks using `api.AppendObject` and `api.FlushObject` +// PUT and then APPEND fixed-sized chunks using `api.PutObject`, `api.AppendObject` and `api.FlushObject` +// - currently, is only used to PUT from standard input when we do expect to overwrite existing destination object +// - APPEND and flush will only be executed with there's a second chunk func putAppendChunks(c *cli.Context, bck cmn.Bck, objName string, r io.Reader, cksumType string, chunkSize int64) error { var ( handle string @@ -398,7 +400,7 @@ func putAppendChunks(c *cli.Context, bck cmn.Bck, objName string, r io.Reader, c if flagIsSet(c, progressFlag) { pi.start() } - for { + for i := 0; ; i++ { var ( b = bytes.NewBuffer(nil) n int64 @@ -434,14 +436,27 @@ func putAppendChunks(c *cli.Context, bck cmn.Bck, objName string, r io.Reader, c pi.printProgress(int64(n)) }) } - handle, err = api.AppendObject(&api.AppendArgs{ - BaseParams: apiBP, - Bck: bck, - Object: objName, - Handle: handle, - Reader: reader, - Size: n, - }) + if i == 0 { + // overwrite, if exists + // NOTE: when followed by APPEND (below) will increment resulting ais object's version one extra time + putArgs := api.PutArgs{ + BaseParams: apiBP, + Bck: bck, + ObjName: objName, + Reader: reader, + Size: uint64(n), + } + _, err = api.PutObject(&putArgs) + } else { + handle, err = api.AppendObject(&api.AppendArgs{ + BaseParams: apiBP, + Bck: bck, + Object: objName, + Handle: handle, + Reader: reader, + Size: n, + }) + } if err != nil { return err } @@ -453,6 +468,9 @@ func putAppendChunks(c *cli.Context, bck cmn.Bck, objName string, r io.Reader, c if cksumType != cos.ChecksumNone { cksum.Finalize() } + if handle == "" { + return nil + } return api.FlushObject(&api.FlushArgs{ BaseParams: apiBP, Bck: bck, diff --git a/cmd/cli/go.mod b/cmd/cli/go.mod index e2afa3ad44..756d205544 100644 --- a/cmd/cli/go.mod +++ b/cmd/cli/go.mod @@ -4,7 +4,7 @@ go 1.21 // direct require ( - github.com/NVIDIA/aistore v1.3.22-0.20231206200848-2f4dc8789da0 + github.com/NVIDIA/aistore v1.3.22-0.20231210175847-922e49a5a0bd github.com/fatih/color v1.16.0 github.com/json-iterator/go v1.1.12 github.com/onsi/ginkgo v1.16.5 diff --git a/cmd/cli/go.sum b/cmd/cli/go.sum index 7654d78751..470e4083dc 100644 --- a/cmd/cli/go.sum +++ b/cmd/cli/go.sum @@ -1,7 +1,7 @@ code.cloudfoundry.org/bytefmt v0.0.0-20190710193110-1eb035ffe2b6/go.mod h1:wN/zk7mhREp/oviagqUXY3EwuHhWyOvAdsn5Y4CzOrc= github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= -github.com/NVIDIA/aistore v1.3.22-0.20231206200848-2f4dc8789da0 h1:IBAJFfHJ1V8aNvTYTzJ1nEGPcNfjd3BnoyK+hz1CTmc= -github.com/NVIDIA/aistore v1.3.22-0.20231206200848-2f4dc8789da0/go.mod h1:cOTgDt5fVCQOB+rnvYZgVFRF3dEzPqu8f22F3F+Yvtg= +github.com/NVIDIA/aistore v1.3.22-0.20231210175847-922e49a5a0bd h1:T4OLzL9od1KNTMzgWgmDThug/ud3/W7/7sBw7PY8Mnc= +github.com/NVIDIA/aistore v1.3.22-0.20231210175847-922e49a5a0bd/go.mod h1:cOTgDt5fVCQOB+rnvYZgVFRF3dEzPqu8f22F3F+Yvtg= github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8= github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q= github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA= diff --git a/cmd/cli/test/put_object.stdout b/cmd/cli/test/put_object.stdout index 56ea57f6a4..9d25d207e5 100644 --- a/cmd/cli/test/put_object.stdout +++ b/cmd/cli/test/put_object.stdout @@ -21,7 +21,7 @@ NAME SIZE VERSION cksum 11B 4 object_1.txt 11B 1 object_2.txt 11B 1 -rand.txt 30.49MiB 1 +rand.txt 30.49MiB 2 ^PUT.*=> ais://$BUCKET_1/object_1.txt$ ^PUT.*=> ais://$BUCKET_1/object_3.txt$ @@ -31,7 +31,7 @@ cksum 11B 4 object_1.txt 2B 2 object_2.txt 11B 1 object_3.txt 11B 1 -rand.txt 30.49MiB 1 +rand.txt 30.49MiB 2 "ais://$BUCKET_2" created ^GET.*from ais://$BUCKET_2.*$ ^GET.*$ diff --git a/cmn/objattrs.go b/cmn/objattrs.go index 3799442492..0a6af95b6c 100644 --- a/cmn/objattrs.go +++ b/cmn/objattrs.go @@ -19,7 +19,7 @@ import ( // LOM custom metadata stored under `lomCustomMD`. const ( // source of the cold-GET and download; the values include all - // 3rd party backend providers (remote AIS not including) + // 3rd party backend providers SourceObjMD = "source" // downloader' source is "web" @@ -204,20 +204,23 @@ func (oa *ObjAttrs) Equal(rem cos.OAH) (eq bool) { ) // size check if remSize := rem.SizeBytes(true); oa.Size != 0 && remSize != 0 && oa.Size != remSize { - return + return false } // version check if remVer := rem.Version(true); oa.Ver != "" && remVer != "" { if oa.Ver != remVer { - return + return false } ver = oa.Ver - count++ + // NOTE: ais own version is, currently, a nonunique sequence number - not counting + if remSrc, _ := rem.GetCustomKey(SourceObjMD); remSrc != apc.AIS { + count++ + } } else if remMeta, ok := rem.GetCustomKey(VersionObjMD); ok && remMeta != "" { if locMeta, ok := oa.GetCustomKey(VersionObjMD); ok && locMeta != "" { if remMeta != locMeta { - return + return false } count++ ver = locMeta @@ -225,7 +228,10 @@ func (oa *ObjAttrs) Equal(rem cos.OAH) (eq bool) { } // checksum check - if rem.Checksum().Equal(oa.Cksum) { + if !rem.Checksum().IsEmpty() && !oa.Cksum.IsEmpty() { + if !rem.Checksum().Equal(oa.Cksum) { + return false + } cksumVal = oa.Cksum.Val() count++ } @@ -234,7 +240,7 @@ func (oa *ObjAttrs) Equal(rem cos.OAH) (eq bool) { if remMeta, ok := rem.GetCustomKey(ETag); ok && remMeta != "" { if locMeta, ok := oa.GetCustomKey(ETag); ok && locMeta != "" { if remMeta != locMeta { - return + return false } etag = locMeta if ver != locMeta && cksumVal != locMeta { // against double-counting @@ -246,10 +252,12 @@ func (oa *ObjAttrs) Equal(rem cos.OAH) (eq bool) { // custom MD: CRC check if remMeta, ok := rem.GetCustomKey(CRC32CObjMD); ok && remMeta != "" { if locMeta, ok := oa.GetCustomKey(CRC32CObjMD); ok && locMeta != "" { - if remMeta != locMeta && cksumVal != locMeta { // (ditto) - return + if remMeta != locMeta { + return false + } + if cksumVal != locMeta { + count++ } - count++ } } @@ -271,21 +279,19 @@ func (oa *ObjAttrs) Equal(rem cos.OAH) (eq bool) { switch { case count >= 2: // e.g., equal because they have the same (version & md5, where version != md5) - eq = true - return + return true case count == 0: - return + return false default: // same version or ETag from the same (remote) backend // (arguably, must be configurable) if remMeta, ok := rem.GetCustomKey(SourceObjMD); ok && remMeta != "" { if locMeta, ok := oa.GetCustomKey(SourceObjMD); ok && locMeta != "" { - if ver != "" || etag != "" { - eq = true - return + if (ver != "" || etag != "") && remMeta == locMeta { + return true } } } } - return + return eq } diff --git a/docs/cli/object.md b/docs/cli/object.md index f47b2fd884..d75f7749ba 100644 --- a/docs/cli/object.md +++ b/docs/cli/object.md @@ -36,6 +36,7 @@ This document contains `ais object` commands - the commands to read (GET), write - [Dry-Run option](#dry-run-option) - [Put multiple directories](#put-multiple-directories) - [Put multiple directories with the `--skip-vc` option](#put-multiple-directories-with-the-skip-vc-option) +- [APPEND object](#append-object) - [Delete object](#delete-object) - [Evict object](#evict-object) - [Promote files and directories](#promote-files-and-directories) @@ -1009,6 +1010,56 @@ $ ais object promote /target/1014646t8081/nonexistent/dir/ ais://testbucket --ta (...) Bad Request: stat /target/1014646t8081/nonexistent/dir: no such file or directory ``` +# APPEND object + +APPEND operation (not to confuse with appending or [adding to existing archive](/docs/cli/archive.md)) can be executed in 3 different ways: + +* using `ais put` with `--append` option; +* using `ais object concat`; +and finally +* writing from standard input with chunk size (ie., `--chunk-size`) small enough to require (appending) multiple chunks. + +Here're some examples: + +```console +## append all files from a given directory as a single object: + +$ ais put docs ais://nnn/all-docs --append + +Created ais://nnn/all-docs (size 571.45KiB) +$ ais ls ais://nnn/all-docs -props all +PROPERTY VALUE +atime 11 Dec 23 12:18 EST +checksum xxhash[f0eac0698e2489ff] +copies 1 [/ais/mp1/7] +custom - +ec - +location t[VQWtTyuI]:mp[/ais/mp1/7, nvme0n1] +name ais://nnn/all-docs +size 571.45KiB +version 1 +``` + +```console +## overwrite existing object with 4KiB of random data; +## note that the operation (below) will write about 410 chunks from standard input + +$ head -c 4096 /dev/urandom | ais object put - ais://nnn/all-docs --chunk-size 10 +PUT (standard input) => ais://nnn/all-docs + +$ ais ls ais://nnn/all-docs -props all +PROPERTY VALUE +atime 11 Dec 23 12:21 EST +checksum xxhash[b5edf46a1b9459fb] +copies 1 [/ais/mp1/7] +custom - +ec - +location t[VQWtTyuI]:mp[/ais/mp1/7, nvme0n1] +name ais://nnn/all-docs +size 4.00KiB +version 3 +``` + # Delete object `ais object rm BUCKET/[OBJECT_NAME]...`