Skip to content

Commit

Permalink
APPEND if exists; amend metadata comparison (major update)
Browse files Browse the repository at this point in the history
* APPEND will now perform PUT iff the destination does not exist;
* otherwise, it'll always append to existing content
  - refactor and rewrite the previous (target) implementation
* CLI: add 'ais put --append' option
* - in addition to 'concat' subcommand
* - add docs/cli, add inline help
* separately, amend object metadata comparison
* up cli mod

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Dec 11, 2023
1 parent 922e49a commit 07e1e0b
Show file tree
Hide file tree
Showing 13 changed files with 296 additions and 187 deletions.
14 changes: 8 additions & 6 deletions ais/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,13 +679,15 @@ func (p *proxy) httpobjput(w http.ResponseWriter, r *http.Request, apireq *apiRe
if !appendTyProvided {
perms = apc.AcePUT
} else {
hi, err := parseAppendHandle(apireq.dpq.appendHdl) // apc.QparamAppendHandle
if err != nil {
p.writeErr(w, r, err)
return
}
nodeID = hi.nodeID
perms = apc.AceAPPEND
if apireq.dpq.appendHdl != "" {
items, err := preParse(apireq.dpq.appendHdl) // apc.QparamAppendHandle
if err != nil {
p.writeErr(w, r, err)
return
}
nodeID = items[0] // nodeID; compare w/ apndOI.parse
}
}

// 2. bucket
Expand Down
50 changes: 14 additions & 36 deletions ais/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -806,8 +806,20 @@ func (t *target) httpobjput(w http.ResponseWriter, r *http.Request, apireq *apiR
errCode, err = t.putApndArch(r, lom, started, apireq.dpq)
lom.Unlock(true)
case apireq.dpq.appendTy != "": // apc.QparamAppendType
handle, errCode, err = t.appendObj(r, lom, started, apireq.dpq)
if err == nil {
a := &apndOI{
started: started,
t: t,
config: config,
lom: lom,
r: r.Body,
op: apireq.dpq.appendTy, // apc.QparamAppendType
}
if err := a.parse(apireq.dpq.appendHdl /*apc.QparamAppendHandle*/); err != nil {
t.writeErr(w, r, err)
return
}
handle, errCode, err = a.do(r)
if err == nil && handle != "" {
w.Header().Set(apc.HdrAppendHandle, handle)
return
}
Expand Down Expand Up @@ -1201,40 +1213,6 @@ func (t *target) CompareObjects(ctx context.Context, lom *cluster.LOM) (equal bo
return
}

func (t *target) appendObj(r *http.Request, lom *cluster.LOM, started int64, dpq *dpq) (string, int, error) {
var (
cksumValue = r.Header.Get(apc.HdrObjCksumVal)
cksumType = r.Header.Get(apc.HdrObjCksumType)
contentLength = r.Header.Get(cos.HdrContentLength)
)
hdl, err := parseAppendHandle(dpq.appendHdl) // apc.QparamAppendHandle
if err != nil {
return "", http.StatusBadRequest, err
}
a := &apndOI{
started: started,
t: t,
lom: lom,
r: r.Body,
op: dpq.appendTy, // apc.QparamAppendType
hdl: hdl,
}
if a.op != apc.AppendOp && a.op != apc.FlushOp {
err = fmt.Errorf("invalid operation %q (expecting either %q or %q) - check %q query",
a.op, apc.AppendOp, apc.FlushOp, apc.QparamAppendType)
return "", http.StatusBadRequest, err
}
if contentLength != "" {
if size, ers := strconv.ParseInt(contentLength, 10, 64); ers == nil {
a.size = size
}
}
if cksumValue != "" {
a.cksum = cos.NewCksum(cksumType, cksumValue)
}
return a.do()
}

// called under lock
func (t *target) putApndArch(r *http.Request, lom *cluster.LOM, started int64, dpq *dpq) (int, error) {
var (
Expand Down
220 changes: 121 additions & 99 deletions ais/tgtobj.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"net/http"
"os"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -45,7 +44,7 @@ import (

type (
putOI struct {
r io.ReadCloser // reader that has the content
r io.ReadCloser // content reader
xctn cluster.Xact // xaction that puts
t *target // this
lom *cluster.LOM // obj
Expand Down Expand Up @@ -78,24 +77,24 @@ type (
unlocked bool // internal
verchanged bool // version changed
retry bool // once
cold bool // executed backend.Get
cold bool // true if executed backend.Get
}

// append handle (packed)
// textbook append: (packed) handle and control structure (see also `putA2I` arch below)
aoHdl struct {
partialCksum *cos.CksumHash
nodeID string
filePath string
workFQN string
}
// (see also putA2I)
apndOI struct {
started int64 // started time of receiving - used to calculate the recv duration
r io.ReadCloser // reader with the content of the object.
started int64 // start time (nanoseconds)
r io.ReadCloser // content reader
t *target // this
lom *cluster.LOM // append to
cksum *cos.Cksum // expected checksum of the final object.
hdl aoHdl // packed
op string // operation (Append | Flush)
config *cmn.Config // (during this request)
lom *cluster.LOM // append to or _as_
cksum *cos.Cksum // checksum expected once Flush-ed
hdl aoHdl // (packed)
op string // enum {apc.AppendOp, apc.FlushOp}
size int64 // Content-Length
}

Expand Down Expand Up @@ -1159,124 +1158,147 @@ func (goi *getOI) parseRange(resphdr http.Header, size int64) (hrng *htrange, er
}

//
// APPEND to existing object (as file)
// APPEND a file or multiple files:
// - as a new object, if doesn't exist
// - to an existing object, if exists
//

func (a *apndOI) do() (newHandle string, errCode int, err error) {
filePath := a.hdl.filePath
func (a *apndOI) do(r *http.Request) (packedHdl string, errCode int, err error) {
var (
cksumValue = r.Header.Get(apc.HdrObjCksumVal)
cksumType = r.Header.Get(apc.HdrObjCksumType)
contentLength = r.Header.Get(cos.HdrContentLength)
)
if contentLength != "" {
if size, ers := strconv.ParseInt(contentLength, 10, 64); ers == nil {
a.size = size
}
}
if cksumValue != "" {
a.cksum = cos.NewCksum(cksumType, cksumValue)
}

switch a.op {
case apc.AppendOp:
var f *os.File
if filePath == "" {
filePath = fs.CSM.Gen(a.lom, fs.WorkfileType, fs.WorkfileAppend)
f, err = a.lom.CreateFile(filePath)
if err != nil {
errCode = http.StatusInternalServerError
return
}
a.hdl.partialCksum = cos.NewCksumHash(a.lom.CksumType())
} else {
f, err = os.OpenFile(filePath, os.O_APPEND|os.O_WRONLY, cos.PermRWR)
buf, slab := a.t.gmm.Alloc()
packedHdl, errCode, err = a.apnd(buf)
slab.Free(buf)
case apc.FlushOp:
errCode, err = a.flush()
default:
err = fmt.Errorf("invalid operation %q (expecting either %q or %q) - check %q query",
a.op, apc.AppendOp, apc.FlushOp, apc.QparamAppendType)
}

return packedHdl, errCode, err
}

func (a *apndOI) apnd(buf []byte) (packedHdl string, errCode int, err error) {
var (
fh *os.File
workFQN = a.hdl.workFQN
)
if workFQN == "" {
workFQN = fs.CSM.Gen(a.lom, fs.WorkfileType, fs.WorkfileAppend)
a.lom.Lock(false)
if a.lom.Load(false /*cache it*/, false /*locked*/) == nil {
_, a.hdl.partialCksum, err = cos.CopyFile(a.lom.FQN, workFQN, buf, a.lom.CksumType())
a.lom.Unlock(false)
if err != nil {
errCode = http.StatusInternalServerError
return
}
debug.Assert(a.hdl.partialCksum != nil)
}

var (
buf []byte
slab *memsys.Slab
)
if a.size == 0 {
buf, slab = a.t.gmm.Alloc()
fh, err = os.OpenFile(workFQN, os.O_APPEND|os.O_WRONLY, cos.PermRWR)
} else {
buf, slab = a.t.gmm.AllocSize(a.size)
}

w := cos.NewWriterMulti(f, a.hdl.partialCksum.H)
_, err = cos.CopyBuffer(w, a.r, buf)

slab.Free(buf)
cos.Close(f)
if err != nil {
errCode = http.StatusInternalServerError
return
}

newHandle = combineAppendHandle(a.t.SID(), filePath, a.hdl.partialCksum)
case apc.FlushOp:
if filePath == "" {
err = fmt.Errorf("failed to finalize append-file operation: empty source in the %+v handle", a.hdl)
errCode = http.StatusBadRequest
return
a.lom.Unlock(false)
a.hdl.partialCksum = cos.NewCksumHash(a.lom.CksumType())
fh, err = a.lom.CreateFile(workFQN)
}
} else {
fh, err = os.OpenFile(workFQN, os.O_APPEND|os.O_WRONLY, cos.PermRWR)
debug.Assert(a.hdl.partialCksum != nil)
a.hdl.partialCksum.Finalize()
partialCksum := a.hdl.partialCksum.Clone()
if !a.cksum.IsEmpty() && !partialCksum.Equal(a.cksum) {
err = cos.NewErrDataCksum(partialCksum, a.cksum)
errCode = http.StatusInternalServerError
return
}
params := cluster.PromoteParams{
Bck: a.lom.Bck(),
Cksum: partialCksum,
PromoteArgs: cluster.PromoteArgs{
SrcFQN: filePath,
ObjName: a.lom.ObjName,
OverwriteDst: true,
DeleteSrc: true, // NOTE: always overwrite and remove
},
}
if errCode, err = a.t.Promote(&params); err != nil {
return
}
default:
err = fmt.Errorf("invalid append-file operation %q", a.op)
debug.AssertNoErr(err)
}
if err != nil { // failed to open or create
errCode = http.StatusInternalServerError
return
}

w := cos.NewWriterMulti(fh, a.hdl.partialCksum.H)
_, err = cos.CopyBuffer(w, a.r, buf)
cos.Close(fh)
if err != nil {
errCode = http.StatusInternalServerError
return
}

delta := time.Now().UnixNano() - a.started
packedHdl = a.pack(workFQN)

// stats (TODO: add `stats.FlushCount` for symmetry)
lat := time.Now().UnixNano() - a.started
a.t.statsT.AddMany(
cos.NamedVal64{Name: stats.AppendCount, Value: 1},
cos.NamedVal64{Name: stats.AppendLatency, Value: delta},
cos.NamedVal64{Name: stats.AppendLatency, Value: lat},
)
if cmn.FastV(4, cos.SmoduleAIS) {
nlog.Infof("APPEND %s: %s", a.lom, delta)
if a.config.FastV(4, cos.SmoduleAIS) {
nlog.Infof("APPEND %s: %s", a.lom, lat)
}
return
}

func parseAppendHandle(handle string) (hdl aoHdl, err error) {
if handle == "" {
return
func (a *apndOI) flush() (int, error) {
if a.hdl.workFQN == "" {
return 0, fmt.Errorf("failed to finalize append-file operation: empty source in the %+v handle", a.hdl)
}

// finalize checksum
debug.Assert(a.hdl.partialCksum != nil)
a.hdl.partialCksum.Finalize()
partialCksum := a.hdl.partialCksum.Clone()
if !a.cksum.IsEmpty() && !partialCksum.Equal(a.cksum) {
return http.StatusInternalServerError, cos.NewErrDataCksum(partialCksum, a.cksum)
}

params := cluster.PromoteParams{
Bck: a.lom.Bck(),
Cksum: partialCksum,
PromoteArgs: cluster.PromoteArgs{
SrcFQN: a.hdl.workFQN,
ObjName: a.lom.ObjName,
OverwriteDst: true,
DeleteSrc: true, // NOTE: always overwrite and remove
},
}
p := strings.SplitN(handle, "|", 4)
if len(p) != 4 {
return hdl, fmt.Errorf("invalid APPEND handle: %q", handle)
return a.t.Promote(&params)
}

func (a *apndOI) parse(packedHdl string) error {
if packedHdl == "" {
return nil
}
hdl.partialCksum = cos.NewCksumHash(p[2])
buf, err := base64.StdEncoding.DecodeString(p[3])
items, err := preParse(packedHdl)
if err != nil {
return hdl, err
return err
}
err = hdl.partialCksum.H.(encoding.BinaryUnmarshaler).UnmarshalBinary(buf)
a.hdl.partialCksum = cos.NewCksumHash(items[2])
buf, err := base64.StdEncoding.DecodeString(items[3])
if err != nil {
return hdl, err
return err
}
hdl.nodeID = p[0]
hdl.filePath = p[1]
return
if err := a.hdl.partialCksum.H.(encoding.BinaryUnmarshaler).UnmarshalBinary(buf); err != nil {
return err
}

a.hdl.nodeID = items[0]
a.hdl.workFQN = items[1]
return nil
}

func combineAppendHandle(nodeID, filePath string, partialCksum *cos.CksumHash) string {
buf, err := partialCksum.H.(encoding.BinaryMarshaler).MarshalBinary()
func (a *apndOI) pack(workFQN string) string {
buf, err := a.hdl.partialCksum.H.(encoding.BinaryMarshaler).MarshalBinary()
debug.AssertNoErr(err)
cksumTy := partialCksum.Type()
cksumTy := a.hdl.partialCksum.Type()
cksumBinary := base64.StdEncoding.EncodeToString(buf)
return nodeID + "|" + filePath + "|" + cksumTy + "|" + cksumBinary
return a.t.SID() + appendHandleSepa + workFQN + appendHandleSepa + cksumTy + appendHandleSepa + cksumBinary
}

//
Expand Down
Loading

0 comments on commit 07e1e0b

Please sign in to comment.