Skip to content

Commit

Permalink
inline ETL: revise stats handling
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Koo <[email protected]>
  • Loading branch information
alex-aizman authored and rkoo19 committed Dec 30, 2024
1 parent c505649 commit 35552d3
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 42 deletions.
11 changes: 6 additions & 5 deletions ais/tgtetl.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,7 @@ func (t *target) stopETL(w http.ResponseWriter, r *http.Request, etlName string)
}

func (t *target) getETL(w http.ResponseWriter, r *http.Request, etlName string, lom *core.LOM) {
var (
comm etl.Communicator
err error
)
comm, err = etl.GetCommunicator(etlName)
comm, err := etl.GetCommunicator(etlName)
if err != nil {
if cos.IsErrNotFound(err) {
smap := t.owner.smap.Get()
Expand All @@ -167,13 +163,18 @@ func (t *target) getETL(w http.ResponseWriter, r *http.Request, etlName string,
t.writeErr(w, r, err)
return
}

if err := comm.InlineTransform(w, r, lom); err != nil {
errV := cmn.NewErrETL(&cmn.ETLErrCtx{ETLName: etlName, PodName: comm.PodName(), SvcName: comm.SvcName()},
err.Error())
xetl := comm.Xact()
xetl.AddErr(errV)
t.writeErr(w, r, errV)
return
}

xetl := comm.Xact()
xetl.ObjsAdd(1, lom.Lsize())
}

func (t *target) logsETL(w http.ResponseWriter, r *http.Request, etlName string) {
Expand Down
2 changes: 1 addition & 1 deletion ext/etl/boot.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (b *etlBootstrapper) waitPodReady() error {
}

func (b *etlBootstrapper) setupXaction(xid string) {
rns := xreg.RenewETL(b.msg, xid)
rns := xreg.RenewETL(&b.msg, xid)
debug.AssertNoErr(rns.Err)
debug.Assert(!rns.IsRunning())
b.xctn = rns.Entry.Get()
Expand Down
40 changes: 13 additions & 27 deletions ext/etl/communicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (c *baseComm) OutBytes() int64 { return c.boot.xctn.OutBytes() }

func (c *baseComm) Stop() { c.boot.xctn.Finish() }

func (c *baseComm) getWithTimeout(url string, size int64, timeout time.Duration) (r cos.ReadCloseSizer, err error) {
func (c *baseComm) getWithTimeout(url string, timeout time.Duration) (r cos.ReadCloseSizer, err error) {
if err := c.boot.xctn.AbortErr(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -181,15 +181,12 @@ func (c *baseComm) getWithTimeout(url string, size int64, timeout time.Duration)
}

return cos.NewReaderWithArgs(cos.ReaderArgs{
R: resp.Body,
Size: resp.ContentLength,
ReadCb: func(n int, _ error) { c.boot.xctn.InObjsAdd(0, int64(n)) },
R: resp.Body,
Size: resp.ContentLength,
DeferCb: func() {
if cancel != nil {
cancel()
}
c.boot.xctn.InObjsAdd(1, 0)
c.boot.xctn.OutObjsAdd(1, size) // see also: `coi.objsAdd`
},
}), nil
}
Expand Down Expand Up @@ -293,15 +290,12 @@ finish:
return nil, ecode, err
}
args := cos.ReaderArgs{
R: resp.Body,
Size: resp.ContentLength,
ReadCb: func(n int, _ error) { pc.boot.xctn.InObjsAdd(0, int64(n)) },
R: resp.Body,
Size: resp.ContentLength,
DeferCb: func() {
if cancel != nil {
cancel()
}
pc.boot.xctn.InObjsAdd(1, 0)
pc.boot.xctn.OutObjsAdd(1, size) // see also: `coi.objsAdd`
},
}
return cos.NewReaderWithArgs(args), 0, nil
Expand Down Expand Up @@ -345,13 +339,10 @@ func (rc *redirectComm) InlineTransform(w http.ResponseWriter, r *http.Request,
if err := rc.boot.xctn.AbortErr(); err != nil {
return err
}
size, err := lomLoad(lom)
err := lomLoad(lom)
if err != nil {
return err
}
if size > 0 {
rc.boot.xctn.OutObjsAdd(1, size)
}
http.Redirect(w, r, rc.redirectURL(lom), http.StatusTemporaryRedirect)

if cmn.Rom.FastV(5, cos.SmoduleETL) {
Expand All @@ -373,13 +364,13 @@ func (rc *redirectComm) redirectURL(lom *core.LOM) string {

func (rc *redirectComm) OfflineTransform(lom *core.LOM, timeout time.Duration) (cos.ReadCloseSizer, error) {
clone := *lom
size, errV := lomLoad(&clone)
errV := lomLoad(&clone)
if errV != nil {
return nil, errV
}

etlURL := rc.redirectURL(&clone)
r, err := rc.getWithTimeout(etlURL, size, timeout)
r, err := rc.getWithTimeout(etlURL, timeout)

if cmn.Rom.FastV(5, cos.SmoduleETL) {
nlog.Infoln(Hpull, clone.Cname(), err)
Expand All @@ -392,13 +383,10 @@ func (rc *redirectComm) OfflineTransform(lom *core.LOM, timeout time.Duration) (
//////////////////

func (rp *revProxyComm) InlineTransform(w http.ResponseWriter, r *http.Request, lom *core.LOM) error {
size, err := lomLoad(lom)
err := lomLoad(lom)
if err != nil {
return err
}
if size > 0 {
rp.boot.xctn.OutObjsAdd(1, size)
}
path := transformerPath(lom)

r.URL.Path, _ = url.PathUnescape(path) // `Path` must be unescaped otherwise it will be escaped again.
Expand All @@ -410,12 +398,12 @@ func (rp *revProxyComm) InlineTransform(w http.ResponseWriter, r *http.Request,

func (rp *revProxyComm) OfflineTransform(lom *core.LOM, timeout time.Duration) (cos.ReadCloseSizer, error) {
clone := *lom
size, errV := lomLoad(&clone)
errV := lomLoad(&clone)
if errV != nil {
return nil, errV
}
etlURL := cos.JoinPath(rp.boot.uri, transformerPath(&clone))
r, err := rp.getWithTimeout(etlURL, size, timeout)
r, err := rp.getWithTimeout(etlURL, timeout)

if cmn.Rom.FastV(5, cos.SmoduleETL) {
nlog.Infoln(Hrev, clone.Cname(), err)
Expand Down Expand Up @@ -459,13 +447,11 @@ func transformerPath(lom *core.LOM) string {
return "/" + url.PathEscape(lom.Uname())
}

func lomLoad(lom *core.LOM) (size int64, err error) {
func lomLoad(lom *core.LOM) (err error) {
if err = lom.Load(true /*cacheIt*/, false /*locked*/); err != nil {
if cos.IsNotExist(err, 0) && lom.Bucket().IsRemote() {
err = nil // NOTE: size == 0
}
} else {
size = lom.Lsize()
}
return size, err
return err
}
17 changes: 8 additions & 9 deletions xact/xs/etl.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@
package xs

import (
"fmt"
"sync"

"github.com/NVIDIA/aistore/api/apc"
"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/cmn/debug"
"github.com/NVIDIA/aistore/core"
"github.com/NVIDIA/aistore/core/meta"
"github.com/NVIDIA/aistore/ext/etl"
"github.com/NVIDIA/aistore/xact"
"github.com/NVIDIA/aistore/xact/xreg"
)
Expand All @@ -25,6 +25,7 @@ type (
}
xactETL struct {
xact.Base
msg *etl.InitSpecMsg
}
)

Expand Down Expand Up @@ -53,14 +54,12 @@ func (*etlFactory) WhenPrevIsRunning(xreg.Renewable) (xreg.WPR, error) {

// (tests only)

func newETL(p *etlFactory) (xctn *xactETL) {
var (
s = p.Args.Custom.(fmt.Stringer)
ctlmsg = s.String()
)
xctn = &xactETL{}
xctn.InitBase(p.Args.UUID, p.Kind(), ctlmsg, nil)
return
func newETL(p *etlFactory) *xactETL {
msg, ok := p.Args.Custom.(*etl.InitSpecMsg)
debug.Assert(ok)
xctn := &xactETL{msg: msg}
xctn.InitBase(p.Args.UUID, p.Kind(), msg.String(), nil)
return xctn
}

func (*xactETL) Run(*sync.WaitGroup) { debug.Assert(false) }
Expand Down

0 comments on commit 35552d3

Please sign in to comment.