Skip to content

Commit

Permalink
copy/transform remote, non-present
Browse files Browse the repository at this point in the history
* add a separate target-side POST to handle pages 2, 3, ...
* clarify, cleanup, micro-optimize
* prev. commit: d8143f1

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Dec 23, 2023
1 parent 173b569 commit 4353c56
Show file tree
Hide file tree
Showing 9 changed files with 162 additions and 83 deletions.
131 changes: 78 additions & 53 deletions ais/plstcx.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ package ais

import (
"fmt"
"net/http"
"strings"
"sync"

"github.com/NVIDIA/aistore/api/apc"
"github.com/NVIDIA/aistore/cluster"
"github.com/NVIDIA/aistore/cluster/meta"
"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/cmn/atomic"
Expand All @@ -32,27 +34,30 @@ type (
bckTo *meta.Bck
amsg *apc.ActMsg // orig
config *cmn.Config
smap *smapX
// work
tsi *meta.Snode
xid string // x-tco
cnt int
lsmsg apc.LsoMsg
altmsg apc.ActMsg
tcomsg cmn.TCObjsMsg
stopped atomic.Bool
}
)

func (a *lstca) add(c *lstcx, xid string) {
func (a *lstca) add(c *lstcx) {
a.mu.Lock()
if a.a == nil {
a.a = make(map[string]*lstcx, 4)
}
a.a[xid] = c
a.a[c.xid] = c
a.mu.Unlock()
}

func (a *lstca) del(xid string) {
func (a *lstca) del(c *lstcx) {
a.mu.Lock()
delete(a.a, xid)
delete(a.a, c.xid)
a.mu.Unlock()
}

Expand Down Expand Up @@ -92,8 +97,8 @@ func (c *lstcx) do() (string, error) {
PageSize: 0, // i.e., backend.MaxPageSize()
}
c.lsmsg.SetFlag(apc.LsNameOnly)
smap := c.p.owner.smap.get()
tsi, err := smap.HrwTargetTask(c.lsmsg.UUID)
c.smap = c.p.owner.smap.get()
tsi, err := c.smap.HrwTargetTask(c.lsmsg.UUID)
if err != nil {
return "", err
}
Expand All @@ -102,7 +107,7 @@ func (c *lstcx) do() (string, error) {

// 2. ls 1st page
var lst *cmn.LsoResult
lst, err = c.p.lsObjsR(c.bckFrom, &c.lsmsg, smap, tsi /*designated target*/, c.config, true)
lst, err = c.p.lsObjsR(c.bckFrom, &c.lsmsg, c.smap, tsi /*designated target*/, c.config, true)
if err != nil {
return "", err
}
Expand All @@ -114,76 +119,96 @@ func (c *lstcx) do() (string, error) {

// 3. tcomsg
c.tcomsg.ToBck = c.bckTo.Clone()
names := make([]string, 0, len(lst.Entries))
lr, cnt := &c.tcomsg.ListRange, len(lst.Entries)
lr.ObjNames = make([]string, 0, cnt)
for _, e := range lst.Entries {
names = append(names, e.Name)
lr.ObjNames = append(lr.ObjNames, e.Name)
}
c.tcomsg.ListRange.ObjNames = names

// 4. multi-obj action: transform/copy
// 4. multi-obj action: transform/copy 1st page
c.altmsg.Value = &c.tcomsg
c.altmsg.Action = apc.ActCopyObjects
if c.amsg.Action == apc.ActETLBck {
c.altmsg.Action = apc.ActETLObjects
}

xid, err := c.p.tcobjs(c.bckFrom, c.bckTo, c.config, &c.altmsg, &c.tcomsg)
if err != nil {
if c.xid, err = c.p.tcobjs(c.bckFrom, c.bckTo, c.config, &c.altmsg, &c.tcomsg); err != nil {
return "", err
}

s := fmt.Sprintf("(%s => %s)[%s]: %s => %s %v", c.amsg.Action, c.altmsg.Action, xid, c.bckFrom, c.bckTo, names[:min(len(names), 4)])
nlog.Infoln("'ls --all' to execute [" + c.amsg.Action + " -> " + c.altmsg.Action + "]")
s := fmt.Sprintf("%s[%s] %s => %s", c.altmsg.Action, c.xid, c.bckFrom, c.bckTo)

// 5. more pages, if any
if lst.ContinuationToken != "" {
// Run
nlog.Infoln("run", s, "...")
c.lsmsg.ContinuationToken = lst.ContinuationToken
go func() {
c.p.lstca.add(c, xid)
c.pages(smap, xid)
c.p.lstca.del(xid)
}()
go c.pages(s, cnt)
} else {
nlog.Infoln(s, "done.")
nlog.Infoln(s, "count", cnt)
}
return xid, nil
return c.xid, nil
}

// pages 2..last
func (c *lstcx) pages(smap *smapX, xid string) {
for !c.stopped.Load() {
// next page
lst, err := c.p.lsObjsR(c.bckFrom, &c.lsmsg, smap, c.tsi, c.config, true)
if err != nil {
nlog.Errorln(err)
return
}
if len(lst.Entries) == 0 {
return
}
func (c *lstcx) pages(s string, cnt int) {
c.cnt = cnt
c.p.lstca.add(c)

// next tcomsg
names := make([]string, 0, len(lst.Entries))
for _, e := range lst.Entries {
names = append(names, e.Name)
// pages 2, 3, ...
var err error
for !c.stopped.Load() && c.lsmsg.ContinuationToken != "" {
if cnt, err = c._page(); err != nil {
break
}
c.tcomsg.ListRange.ObjNames = names
c.cnt += cnt
}
c.p.lstca.del(c)
nlog.Infoln(s, "count", c.cnt, "stopped", c.stopped.Load(), "c-token", c.lsmsg.ContinuationToken, "err", err)
}

if c.stopped.Load() {
return
}
// next tco action
c.altmsg.Value = &c.tcomsg
xactID, err := c.p.tcobjs(c.bckFrom, c.bckTo, c.config, &c.altmsg, &c.tcomsg)
if err != nil {
nlog.Errorln(err)
return
}
debug.Assertf(xactID == xid, "%q vs %q", xactID, xid)
// next page
func (c *lstcx) _page() (int, error) {
lst, err := c.p.lsObjsR(c.bckFrom, &c.lsmsg, c.smap, c.tsi, c.config, true)
if err != nil {
return 0, err
}
c.lsmsg.ContinuationToken = lst.ContinuationToken
if len(lst.Entries) == 0 {
debug.Assert(lst.ContinuationToken == "")
return 0, nil
}

// last page?
if lst.ContinuationToken == "" {
return
lr := &c.tcomsg.ListRange
clear(lr.ObjNames)
lr.ObjNames = lr.ObjNames[:0]
for _, e := range lst.Entries {
lr.ObjNames = append(lr.ObjNames, e.Name)
}
c.altmsg.Value = &c.tcomsg
err = c.bcast()
return len(lr.ObjNames), err
}

// calls t.httpxpost (TODO: slice of names is the only "delta" - optimize)
func (c *lstcx) bcast() (err error) {
body := cos.MustMarshal(apc.ActMsg{Name: c.xid, Value: &c.tcomsg})
args := allocBcArgs()
{
args.req = cmn.HreqArgs{Method: http.MethodPost, Path: apc.URLPathXactions.S, Body: body}
args.to = cluster.Targets
args.timeout = cmn.Rom.MaxKeepalive()
}
if c.stopped.Load() {
return
}
results := c.p.bcastGroup(args)
freeBcArgs(args)
for _, res := range results {
if err = res.err; err != nil {
break
}
c.lsmsg.ContinuationToken = lst.ContinuationToken
}
freeBcastRes(results)
return err
}
42 changes: 21 additions & 21 deletions ais/prxtxn.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,27 @@ type txnCln struct {
// txnCln //
////////////

func (c *txnCln) init(msg *apc.ActMsg, bck *meta.Bck, config *cmn.Config, waitmsync bool) *txnCln {
query := make(url.Values, 3)
if bck == nil {
c.path = apc.URLPathTxn.S
} else {
c.path = apc.URLPathTxn.Join(bck.Name)
query = bck.AddToQuery(query)
}
c.timeout.netw = 2 * config.Timeout.MaxKeepalive.D()
c.timeout.host = config.Timeout.MaxHostBusy.D()
if !waitmsync { // when commit does not block behind metasync
query.Set(apc.QparamNetwTimeout, cos.UnixNano2S(int64(c.timeout.netw)))
}
query.Set(apc.QparamHostTimeout, cos.UnixNano2S(int64(c.timeout.host)))

c.msg = c.p.newAmsg(msg, nil, c.uuid)
body := cos.MustMarshal(c.msg)
c.req = cmn.HreqArgs{Method: http.MethodPost, Query: query, Body: body}
return c
}

func (c *txnCln) begin(what fmt.Stringer) (err error) {
results := c.bcast(apc.ActBegin, c.timeout.netw)
for _, res := range results {
Expand Down Expand Up @@ -977,27 +998,6 @@ func (p *proxy) prepTxnClient(msg *apc.ActMsg, bck *meta.Bck, waitmsync bool) *t
return c
}

func (c *txnCln) init(msg *apc.ActMsg, bck *meta.Bck, config *cmn.Config, waitmsync bool) *txnCln {
query := make(url.Values, 3)
if bck == nil {
c.path = apc.URLPathTxn.S
} else {
c.path = apc.URLPathTxn.Join(bck.Name)
query = bck.AddToQuery(query)
}
c.timeout.netw = 2 * config.Timeout.MaxKeepalive.D()
c.timeout.host = config.Timeout.MaxHostBusy.D()
if !waitmsync { // when commit does not block behind metasync
query.Set(apc.QparamNetwTimeout, cos.UnixNano2S(int64(c.timeout.netw)))
}
query.Set(apc.QparamHostTimeout, cos.UnixNano2S(int64(c.timeout.host)))

c.msg = c.p.newAmsg(msg, nil, c.uuid)
body := cos.MustMarshal(c.msg)
c.req = cmn.HreqArgs{Method: http.MethodPost, Query: query, Body: body}
return c
}

// two helpers to create ais:// destination on the fly, copy source bucket props
func bmodCpProps(ctx *bmdModifier, clone *bucketMD) error {
var (
Expand Down
42 changes: 42 additions & 0 deletions ais/tgtxact.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/NVIDIA/aistore/res"
"github.com/NVIDIA/aistore/xact"
"github.com/NVIDIA/aistore/xact/xreg"
"github.com/NVIDIA/aistore/xact/xs"
)

// TODO: uplift via higher-level query and similar (#668)
Expand All @@ -34,11 +35,17 @@ func (t *target) xactHandler(w http.ResponseWriter, r *http.Request) {
t.httpxget(w, r)
case http.MethodPut:
t.httpxput(w, r)
case http.MethodPost:
t.httpxpost(w, r)
default:
cmn.WriteErr405(w, r, http.MethodGet, http.MethodPut)
}
}

//
// GET
//

func (t *target) httpxget(w http.ResponseWriter, r *http.Request) {
var (
xactMsg xact.QueryMsg
Expand Down Expand Up @@ -158,6 +165,10 @@ func (t *target) xquery(w http.ResponseWriter, r *http.Request, what string, xac
}
}

//
// PUT
//

func (t *target) xstart(r *http.Request, args *xact.ArgsMsg, bck *meta.Bck) error {
const erfmb = "global xaction %q does not require bucket (%s) - ignoring it and proceeding to start"
const erfmn = "xaction %q requires a bucket to start"
Expand Down Expand Up @@ -237,3 +248,34 @@ func (t *target) xstart(r *http.Request, args *xact.ArgsMsg, bck *meta.Bck) erro
}
return nil
}

//
// POST
//

// client: plstcx.go
func (t *target) httpxpost(w http.ResponseWriter, r *http.Request) {
var (
err error
xctn cluster.Xact
amsg *apc.ActMsg
tcomsg cmn.TCObjsMsg
)
if amsg, err = t.readActionMsg(w, r); err != nil {
return
}

xactID := amsg.Name
if xctn, err = xreg.GetXact(xactID); err != nil {
t.writeErr(w, r, err)
return
}
xtco, ok := xctn.(*xs.XactTCObjs)
debug.Assert(ok)

if err = cos.MorphMarshal(amsg.Value, &tcomsg); err != nil {
t.writeErrf(w, r, cmn.FmtErrMorphUnmarshal, t.si, "special", amsg.Value, err)
return
}
xtco.Do(&tcomsg)
}
2 changes: 1 addition & 1 deletion cmd/cli/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.21

// direct
require (
github.com/NVIDIA/aistore v1.3.22-0.20231221174113-ee345a2731e7
github.com/NVIDIA/aistore v1.3.22-0.20231223025528-173b569ae75a
github.com/fatih/color v1.16.0
github.com/json-iterator/go v1.1.12
github.com/onsi/ginkgo v1.16.5
Expand Down
4 changes: 2 additions & 2 deletions cmd/cli/go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
code.cloudfoundry.org/bytefmt v0.0.0-20190710193110-1eb035ffe2b6/go.mod h1:wN/zk7mhREp/oviagqUXY3EwuHhWyOvAdsn5Y4CzOrc=
github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/NVIDIA/aistore v1.3.22-0.20231221174113-ee345a2731e7 h1:nZkbX2fnwoIkVixj+PCJ0ZCf2AyApHfF3iuOwxDo75A=
github.com/NVIDIA/aistore v1.3.22-0.20231221174113-ee345a2731e7/go.mod h1:jpWmGuqxnY+akx81S5eqHhGdgSENm0mVYRrVbpCW4/I=
github.com/NVIDIA/aistore v1.3.22-0.20231223025528-173b569ae75a h1:F1lkRJZVy0RkR5SQy15d/7VYiooyDdxaXxOoaTzVOAA=
github.com/NVIDIA/aistore v1.3.22-0.20231223025528-173b569ae75a/go.mod h1:jpWmGuqxnY+akx81S5eqHhGdgSENm0mVYRrVbpCW4/I=
github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8=
github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q=
github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA=
Expand Down
4 changes: 2 additions & 2 deletions cmn/ver_const.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ const GitHubHome = "https://github.com/NVIDIA/aistore"
// `jsp` formats its *signature* and other implementation details.

const (
VersionAIStore = "3.21.2"
VersionCLI = "1.8.1"
VersionAIStore = "3.21.3"
VersionCLI = "1.8.2"
VersionLoader = "1.9"
VersionAuthN = "1.0"
)
Expand Down
2 changes: 1 addition & 1 deletion mirror/put_copies.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (r *XactPut) stop() (err error) {
r.SubPending(n)
err = fmt.Errorf("%s: dropped %d object%s", r, n, cos.Plural(n))
}
if cnt := r.chanFull.Load(); cnt > 10 || (cnt > 0 && r.config.FastV(5, cos.SmoduleMirror)) {
if cnt := r.chanFull.Load(); (cnt >= 10 && cnt <= 20) || (cnt > 0 && r.config.FastV(5, cos.SmoduleMirror)) {
nlog.Errorln("work channel full (all mp workers)", r.String(), cnt)
}
return
Expand Down
2 changes: 1 addition & 1 deletion transport/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func (s *streamBase) sendLoop(dryrun bool) {
// cleanup
s.streamer.abortPending(err, false /*completions*/)

if cnt := s.chanFull.Load(); cnt > 10 || (cnt > 0 && verbose) {
if cnt := s.chanFull.Load(); (cnt >= 10 && cnt <= 20) || (cnt > 0 && verbose) {
nlog.Errorln("work channel full", s.lid, cnt)
}
}
Expand Down
Loading

0 comments on commit 4353c56

Please sign in to comment.