Skip to content

Commit

Permalink
copy bucket: extend the command to sync remote bucket
Browse files Browse the repository at this point in the history
* don't return "cannot copy bucket onto itself" when source is remote
* rather, proceed to synchronize from
* amend and refactor _copying_ datapath
  - in particular, remove redundant (nested) control struct
* part one

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Dec 13, 2023
1 parent 10f00f7 commit 6be0b2d
Show file tree
Hide file tree
Showing 16 changed files with 195 additions and 158 deletions.
20 changes: 12 additions & 8 deletions ais/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -1181,6 +1181,7 @@ func (p *proxy) _bckpost(w http.ResponseWriter, r *http.Request, msg *apc.ActMsg
}
case apc.ActCopyBck, apc.ActETLBck:
var (
bckFrom = bck
bckTo *meta.Bck
tcbmsg = &apc.TCBMsg{}
errCode int
Expand All @@ -1207,9 +1208,12 @@ func (p *proxy) _bckpost(w http.ResponseWriter, r *http.Request, msg *apc.ActMsg
p.writeErr(w, r, err)
return
}
if bck.Equal(bckTo, false, true) {
p.writeErrf(w, r, "cannot %s bucket %q onto itself", msg.Action, bck)
return
if bckFrom.Equal(bckTo, true, true) {
if !bckFrom.IsRemote() {
p.writeErrf(w, r, "cannot %s bucket %q onto itself", msg.Action, bckFrom)
return
}
nlog.Infoln("proceeding to copy remote", bckFrom.String())
}

bckTo, errCode, err = p.initBckTo(w, r, query, bckTo)
Expand All @@ -1223,27 +1227,27 @@ func (p *proxy) _bckpost(w http.ResponseWriter, r *http.Request, msg *apc.ActMsg
if err := p.checkAccess(w, r, nil, apc.AceCreateBucket); err != nil {
return
}
nlog.Warningf("%s: dst %s doesn't exist and will be created with the src (%s) props", p, bckTo, bck)
nlog.Infof("%s: dst %s doesn't exist and will be created with the src (%s) props", p, bckTo, bckFrom)
}

// start x-tcb or x-tco
if v := query.Get(apc.QparamFltPresence); v != "" {
fltPresence, _ = strconv.Atoi(v)
}
debug.Assertf(fltPresence != apc.FltExistsOutside, "(flt %d=\"outside\") not implemented yet", fltPresence)
if !apc.IsFltPresent(fltPresence) && bck.IsCloud() {
if !apc.IsFltPresent(fltPresence) && bckFrom.IsCloud() {
lstcx := &lstcx{
p: p,
bckFrom: bck,
bckFrom: bckFrom,
bckTo: bckTo,
amsg: msg,
tcbmsg: tcbmsg,
config: cmn.GCO.Get(),
}
xid, err = lstcx.do()
} else {
nlog.Infof("%s: %s => %s", msg.Action, bck, bckTo)
xid, err = p.tcb(bck, bckTo, msg, tcbmsg.DryRun)
nlog.Infof("%s: %s => %s", msg.Action, bckFrom, bckTo)
xid, err = p.tcb(bckFrom, bckTo, msg, tcbmsg.DryRun)
}
if err != nil {
p.writeErr(w, r, err)
Expand Down
16 changes: 13 additions & 3 deletions ais/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -1336,7 +1336,7 @@ func (t *target) delobj(lom *cluster.LOM, evict bool) (int, error, bool) {
}

// rename obj
func (t *target) objMv(lom *cluster.LOM, msg *apc.ActMsg) error {
func (t *target) objMv(lom *cluster.LOM, msg *apc.ActMsg) (err error) {
if lom.Bck().IsRemote() {
return fmt.Errorf("%s: cannot rename object %s from a remote bucket", t.si, lom)
}
Expand All @@ -1350,12 +1350,22 @@ func (t *target) objMv(lom *cluster.LOM, msg *apc.ActMsg) error {
buf, slab := t.gmm.Alloc()
coi := allocCOI()
{
coi.CopyObjectParams = cluster.CopyObjectParams{BckTo: lom.Bck(), Buf: buf}
coi.bckTo = lom.Bck()
coi.objnameTo = msg.Name /* new object name */
coi.buf = buf
coi.t = t
coi.owt = cmn.OwtMigrate
coi.finalize = true
}
_, err := coi.copyObject(lom, msg.Name /* new object name */)

if lom.Bck().IsRemote() || coi.bckTo.IsRemote() {
// when either one or both buckets are remote
coi.dp = &cluster.LDP{}
_, err = coi.copyReader(lom)
} else {
_, err = coi.copyObject(lom)
}

slab.Free(buf)
freeCOI(coi)
if err != nil {
Expand Down
40 changes: 27 additions & 13 deletions ais/tgtimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,27 +126,41 @@ func (t *target) HeadObjT2T(lom *cluster.LOM, si *meta.Snode) bool {
// the AIS cluster (by performing a cold GET if need be).
// - if the dst is cloud, we perform a regular PUT logic thus also making sure that the new
// replica gets created in the cloud bucket of _this_ AIS cluster.
func (t *target) CopyObject(lom *cluster.LOM, params *cluster.CopyObjectParams, dryRun bool) (size int64, err error) {
objNameTo := lom.ObjName
func (t *target) CopyObject(lom *cluster.LOM, dm cluster.DataMover, dp cluster.DP, xact cluster.Xact,
bckTo *meta.Bck, objnameTo string, buf []byte, dryRun bool) (size int64, err error) {
coi := allocCOI()
{
coi.CopyObjectParams = *params
coi.dm = dm
coi.dp = dp
coi.xact = xact
coi.bckTo = bckTo
coi.objnameTo = objnameTo
coi.buf = buf
coi.dryRun = dryRun
// defaults
coi.t = t
coi.owt = cmn.OwtMigrate
coi.finalize = false
coi.dryRun = dryRun
}
if params.ObjNameTo != "" {
objNameTo = params.ObjNameTo
if coi.objnameTo == "" {
coi.objnameTo = lom.ObjName
}
if params.DP != nil { // NOTE: w/ transformation
size, err = coi.copyReader(lom, objNameTo)
} else {
size, err = coi.copyObject(lom, objNameTo)

// TODO -- FIXME: must be provided by the caller
coi.copyRemote = lom.Bck().Equal(coi.bckTo, true /*same ID*/, true /*same backend*/) && lom.ObjName == coi.objnameTo

switch {
case dp != nil: // 1. w/ transformation
size, err = coi.copyReader(lom)
case lom.Bck().IsRemote() || coi.bckTo.IsRemote(): // 2. when either one or both buckets are remote
coi.dp = &cluster.LDP{}
size, err = coi.copyReader(lom)
default: // 3.
size, err = coi.copyObject(lom)
}
coi.objsAdd(size, err)
freeCOI(coi)
return
return size, err
}

// compare with goi.getCold
Expand Down Expand Up @@ -318,9 +332,9 @@ func (t *target) _promRemote(params *cluster.PromoteParams, lom *cluster.LOM, ts
coi := allocCOI()
{
coi.t = t
coi.BckTo = lom.Bck()
coi.bckTo = lom.Bck()
coi.owt = cmn.OwtPromote
coi.Xact = params.Xact
coi.xact = params.Xact
}
size, err := coi.sendRemote(lom, lom.ObjName, tsi)
freeCOI(coi)
Expand Down
Loading

0 comments on commit 6be0b2d

Please sign in to comment.