From 35552d348f20d14dc776141b75508ef7d2354838 Mon Sep 17 00:00:00 2001 From: Alex Aizman Date: Thu, 19 Dec 2024 10:46:52 -0500 Subject: [PATCH] inline ETL: revise stats handling Signed-off-by: Ryan Koo --- ais/tgtetl.go | 11 ++++++----- ext/etl/boot.go | 2 +- ext/etl/communicator.go | 40 +++++++++++++--------------------------- xact/xs/etl.go | 17 ++++++++--------- 4 files changed, 28 insertions(+), 42 deletions(-) diff --git a/ais/tgtetl.go b/ais/tgtetl.go index 77d4aab726e..0d90dbed839 100644 --- a/ais/tgtetl.go +++ b/ais/tgtetl.go @@ -151,11 +151,7 @@ func (t *target) stopETL(w http.ResponseWriter, r *http.Request, etlName string) } func (t *target) getETL(w http.ResponseWriter, r *http.Request, etlName string, lom *core.LOM) { - var ( - comm etl.Communicator - err error - ) - comm, err = etl.GetCommunicator(etlName) + comm, err := etl.GetCommunicator(etlName) if err != nil { if cos.IsErrNotFound(err) { smap := t.owner.smap.Get() @@ -167,13 +163,18 @@ func (t *target) getETL(w http.ResponseWriter, r *http.Request, etlName string, t.writeErr(w, r, err) return } + if err := comm.InlineTransform(w, r, lom); err != nil { errV := cmn.NewErrETL(&cmn.ETLErrCtx{ETLName: etlName, PodName: comm.PodName(), SvcName: comm.SvcName()}, err.Error()) xetl := comm.Xact() xetl.AddErr(errV) t.writeErr(w, r, errV) + return } + + xetl := comm.Xact() + xetl.ObjsAdd(1, lom.Lsize()) } func (t *target) logsETL(w http.ResponseWriter, r *http.Request, etlName string) { diff --git a/ext/etl/boot.go b/ext/etl/boot.go index c6992ae008b..0aa07ddbd1c 100644 --- a/ext/etl/boot.go +++ b/ext/etl/boot.go @@ -219,7 +219,7 @@ func (b *etlBootstrapper) waitPodReady() error { } func (b *etlBootstrapper) setupXaction(xid string) { - rns := xreg.RenewETL(b.msg, xid) + rns := xreg.RenewETL(&b.msg, xid) debug.AssertNoErr(rns.Err) debug.Assert(!rns.IsRunning()) b.xctn = rns.Entry.Get() diff --git a/ext/etl/communicator.go b/ext/etl/communicator.go index c23d35ec438..d90281aafe7 100644 --- a/ext/etl/communicator.go +++ b/ext/etl/communicator.go @@ -153,7 +153,7 @@ func (c *baseComm) OutBytes() int64 { return c.boot.xctn.OutBytes() } func (c *baseComm) Stop() { c.boot.xctn.Finish() } -func (c *baseComm) getWithTimeout(url string, size int64, timeout time.Duration) (r cos.ReadCloseSizer, err error) { +func (c *baseComm) getWithTimeout(url string, timeout time.Duration) (r cos.ReadCloseSizer, err error) { if err := c.boot.xctn.AbortErr(); err != nil { return nil, err } @@ -181,15 +181,12 @@ func (c *baseComm) getWithTimeout(url string, size int64, timeout time.Duration) } return cos.NewReaderWithArgs(cos.ReaderArgs{ - R: resp.Body, - Size: resp.ContentLength, - ReadCb: func(n int, _ error) { c.boot.xctn.InObjsAdd(0, int64(n)) }, + R: resp.Body, + Size: resp.ContentLength, DeferCb: func() { if cancel != nil { cancel() } - c.boot.xctn.InObjsAdd(1, 0) - c.boot.xctn.OutObjsAdd(1, size) // see also: `coi.objsAdd` }, }), nil } @@ -293,15 +290,12 @@ finish: return nil, ecode, err } args := cos.ReaderArgs{ - R: resp.Body, - Size: resp.ContentLength, - ReadCb: func(n int, _ error) { pc.boot.xctn.InObjsAdd(0, int64(n)) }, + R: resp.Body, + Size: resp.ContentLength, DeferCb: func() { if cancel != nil { cancel() } - pc.boot.xctn.InObjsAdd(1, 0) - pc.boot.xctn.OutObjsAdd(1, size) // see also: `coi.objsAdd` }, } return cos.NewReaderWithArgs(args), 0, nil @@ -345,13 +339,10 @@ func (rc *redirectComm) InlineTransform(w http.ResponseWriter, r *http.Request, if err := rc.boot.xctn.AbortErr(); err != nil { return err } - size, err := lomLoad(lom) + err := lomLoad(lom) if err != nil { return err } - if size > 0 { - rc.boot.xctn.OutObjsAdd(1, size) - } http.Redirect(w, r, rc.redirectURL(lom), http.StatusTemporaryRedirect) if cmn.Rom.FastV(5, cos.SmoduleETL) { @@ -373,13 +364,13 @@ func (rc *redirectComm) redirectURL(lom *core.LOM) string { func (rc *redirectComm) OfflineTransform(lom *core.LOM, timeout time.Duration) (cos.ReadCloseSizer, error) { clone := *lom - size, errV := lomLoad(&clone) + errV := lomLoad(&clone) if errV != nil { return nil, errV } etlURL := rc.redirectURL(&clone) - r, err := rc.getWithTimeout(etlURL, size, timeout) + r, err := rc.getWithTimeout(etlURL, timeout) if cmn.Rom.FastV(5, cos.SmoduleETL) { nlog.Infoln(Hpull, clone.Cname(), err) @@ -392,13 +383,10 @@ func (rc *redirectComm) OfflineTransform(lom *core.LOM, timeout time.Duration) ( ////////////////// func (rp *revProxyComm) InlineTransform(w http.ResponseWriter, r *http.Request, lom *core.LOM) error { - size, err := lomLoad(lom) + err := lomLoad(lom) if err != nil { return err } - if size > 0 { - rp.boot.xctn.OutObjsAdd(1, size) - } path := transformerPath(lom) r.URL.Path, _ = url.PathUnescape(path) // `Path` must be unescaped otherwise it will be escaped again. @@ -410,12 +398,12 @@ func (rp *revProxyComm) InlineTransform(w http.ResponseWriter, r *http.Request, func (rp *revProxyComm) OfflineTransform(lom *core.LOM, timeout time.Duration) (cos.ReadCloseSizer, error) { clone := *lom - size, errV := lomLoad(&clone) + errV := lomLoad(&clone) if errV != nil { return nil, errV } etlURL := cos.JoinPath(rp.boot.uri, transformerPath(&clone)) - r, err := rp.getWithTimeout(etlURL, size, timeout) + r, err := rp.getWithTimeout(etlURL, timeout) if cmn.Rom.FastV(5, cos.SmoduleETL) { nlog.Infoln(Hrev, clone.Cname(), err) @@ -459,13 +447,11 @@ func transformerPath(lom *core.LOM) string { return "/" + url.PathEscape(lom.Uname()) } -func lomLoad(lom *core.LOM) (size int64, err error) { +func lomLoad(lom *core.LOM) (err error) { if err = lom.Load(true /*cacheIt*/, false /*locked*/); err != nil { if cos.IsNotExist(err, 0) && lom.Bucket().IsRemote() { err = nil // NOTE: size == 0 } - } else { - size = lom.Lsize() } - return size, err + return err } diff --git a/xact/xs/etl.go b/xact/xs/etl.go index 09d24c6f775..aab33f1720b 100644 --- a/xact/xs/etl.go +++ b/xact/xs/etl.go @@ -6,7 +6,6 @@ package xs import ( - "fmt" "sync" "github.com/NVIDIA/aistore/api/apc" @@ -14,6 +13,7 @@ import ( "github.com/NVIDIA/aistore/cmn/debug" "github.com/NVIDIA/aistore/core" "github.com/NVIDIA/aistore/core/meta" + "github.com/NVIDIA/aistore/ext/etl" "github.com/NVIDIA/aistore/xact" "github.com/NVIDIA/aistore/xact/xreg" ) @@ -25,6 +25,7 @@ type ( } xactETL struct { xact.Base + msg *etl.InitSpecMsg } ) @@ -53,14 +54,12 @@ func (*etlFactory) WhenPrevIsRunning(xreg.Renewable) (xreg.WPR, error) { // (tests only) -func newETL(p *etlFactory) (xctn *xactETL) { - var ( - s = p.Args.Custom.(fmt.Stringer) - ctlmsg = s.String() - ) - xctn = &xactETL{} - xctn.InitBase(p.Args.UUID, p.Kind(), ctlmsg, nil) - return +func newETL(p *etlFactory) *xactETL { + msg, ok := p.Args.Custom.(*etl.InitSpecMsg) + debug.Assert(ok) + xctn := &xactETL{msg: msg} + xctn.InitBase(p.Args.UUID, p.Kind(), msg.String(), nil) + return xctn } func (*xactETL) Run(*sync.WaitGroup) { debug.Assert(false) }