Skip to content

Commit

Permalink
observability: add prometheus variable labels; remove collector
Browse files Browse the repository at this point in the history
* variable labels: bucket, xaction (job), mountpath
* major update w/ partial rewrite
* part one
* remains:
  - disable prometheus _default_ metrics
  - make `statsValue` polymorphic; absorb prom.go types
  - keep all counters in bytes and nanoseconds (not megabytes/milliseconds)
  - look for a way to optimize `prometheus.Counter`
  - error counting buckets with a different remote backend
  - docs/metrics-reference

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Dec 24, 2024
1 parent e9a0482 commit 118a821
Show file tree
Hide file tree
Showing 26 changed files with 547 additions and 423 deletions.
18 changes: 16 additions & 2 deletions ais/backend/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func (b *base) init(snode *meta.Snode, tr stats.Tracker) {
Help: "GET: total number of executed remote requests (cold GETs)",
StrName: "remote_get_count",
Labels: labels,
VarLabs: stats.BckVarlabs,
},
)
tr.RegExtMetric(snode,
Expand All @@ -55,6 +56,7 @@ func (b *base) init(snode *meta.Snode, tr stats.Tracker) {
Help: "GET: total cumulative time (nanoseconds) to execute cold GETs and store new object versions in-cluster",
StrName: "remote_get_ns_total",
Labels: labels,
VarLabs: stats.BckVarlabs,
},
)
tr.RegExtMetric(snode,
Expand All @@ -65,6 +67,7 @@ func (b *base) init(snode *meta.Snode, tr stats.Tracker) {
"includes: receiving request, executing cold-GET, storing new object version in-cluster, and transmitting response",
StrName: "remote_e2e_get_ns_total",
Labels: labels,
VarLabs: stats.BckVarlabs,
},
)
tr.RegExtMetric(snode,
Expand All @@ -73,7 +76,9 @@ func (b *base) init(snode *meta.Snode, tr stats.Tracker) {
&stats.Extra{
Help: "GET: total cumulative size (bytes) of all cold-GET transactions",
StrName: "remote_get_bytes_total",
Labels: labels},
Labels: labels,
VarLabs: stats.BckVarlabs,
},
)

// PUT
Expand All @@ -89,6 +94,7 @@ func (b *base) init(snode *meta.Snode, tr stats.Tracker) {
Help: "PUT: total number of executed remote requests to a given backend",
StrName: "remote_put_count",
Labels: labels,
VarLabs: stats.BckXactVarlabs,
},
)
tr.RegExtMetric(snode,
Expand All @@ -98,6 +104,7 @@ func (b *base) init(snode *meta.Snode, tr stats.Tracker) {
Help: "PUT: total cumulative time (nanoseconds) to execute remote requests and store new object versions in-cluster",
StrName: "remote_put_ns_total",
Labels: labels,
VarLabs: stats.BckXactVarlabs,
},
)
tr.RegExtMetric(snode,
Expand All @@ -107,7 +114,9 @@ func (b *base) init(snode *meta.Snode, tr stats.Tracker) {
StrName: "remote_e2e_put_ns_total",
Help: "PUT: total end-to-end time (nanoseconds) servicing remote requests; " +
"includes: receiving PUT payload, storing it in-cluster, executing remote PUT, finalizing new in-cluster object",
Labels: labels},
Labels: labels,
VarLabs: stats.BckXactVarlabs,
},
)
tr.RegExtMetric(snode,
b.metrics[stats.PutSize],
Expand All @@ -116,6 +125,7 @@ func (b *base) init(snode *meta.Snode, tr stats.Tracker) {
Help: "PUT: total cumulative size (bytes) of all PUTs to a given remote backend",
StrName: "remote_e2e_put_bytes_total",
Labels: labels,
VarLabs: stats.BckXactVarlabs,
},
)

Expand All @@ -130,6 +140,7 @@ func (b *base) init(snode *meta.Snode, tr stats.Tracker) {
Help: "HEAD: total number of executed remote requests to a given backend",
StrName: "remote_head_count",
Labels: labels,
VarLabs: stats.BckVarlabs,
},
)
tr.RegExtMetric(snode,
Expand All @@ -139,6 +150,7 @@ func (b *base) init(snode *meta.Snode, tr stats.Tracker) {
Help: "HEAD: total cumulative time (nanoseconds) to execute remote requests",
StrName: "remote_head_ns_total",
Labels: labels,
VarLabs: stats.BckVarlabs,
},
)

Expand All @@ -153,6 +165,7 @@ func (b *base) init(snode *meta.Snode, tr stats.Tracker) {
Help: "number of out-of-band updates (by a 3rd party performing remote PUTs outside this cluster)",
StrName: "remote_ver_change_count",
Labels: labels,
VarLabs: stats.BckVarlabs,
},
)
tr.RegExtMetric(snode,
Expand All @@ -162,6 +175,7 @@ func (b *base) init(snode *meta.Snode, tr stats.Tracker) {
Help: "total cumulative size of objects that were updated out-of-band",
StrName: "remote_ver_change_bytes_total",
Labels: labels,
VarLabs: stats.BckVarlabs,
},
)
}
Expand Down
2 changes: 1 addition & 1 deletion ais/htrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -1051,7 +1051,7 @@ func (h *htrun) logerr(tag string, v any, err error) {
} else {
nlog.Errorln(msg)
}
h.statsT.IncErr(stats.ErrHTTPWriteCount)
h.statsT.Inc(stats.ErrHTTPWriteCount)
}

func _parseNCopies(value any) (copies int64, err error) {
Expand Down
8 changes: 4 additions & 4 deletions ais/kalive.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func (pkr *palive) updateSmap(config *cmn.Config) (stopped bool) {
// otherwise, go keepalive with retries
nlog.Warningln(pkr.p.String(), "failed to fast-kalive", si.StringEx(), "err: [", err, status, "]")

pkr.statsT.IncErr(stats.ErrKaliveCount)
pkr.statsT.Inc(stats.ErrKaliveCount)
wg.Add(1)
go pkr.goping(si, wg, smap, config)
}
Expand Down Expand Up @@ -344,7 +344,7 @@ func (pkr *palive) _pingRetry(si *meta.Snode, smap *smapX, config *cmn.Config) (

tout = config.Timeout.MaxKeepalive.D()
nlog.Warningln(pname, "failed to slow-kalive", sname, "- retrying [", err, status, tout, smap.StringEx(), "]")
pkr.statsT.IncErr(stats.ErrKaliveCount)
pkr.statsT.Inc(stats.ErrKaliveCount)

ticker := time.NewTicker(cmn.KeepaliveRetryDuration(config))
ok, stopped = pkr.retry(si, ticker, tout, config.Keepalive.NumRetries)
Expand Down Expand Up @@ -440,7 +440,7 @@ func (pkr *palive) retry(si *meta.Snode, ticker *time.Ticker, tout time.Duration
return true, false
}

pkr.statsT.IncErr(stats.ErrKaliveCount)
pkr.statsT.Inc(stats.ErrKaliveCount)
i++

if i >= kaNumRetries {
Expand Down Expand Up @@ -583,7 +583,7 @@ func (k *keepalive) do(smap *smapX, si *meta.Snode, config *cmn.Config) (stopped
return
}

k.statsT.IncErr(stats.ErrKaliveCount)
k.statsT.Inc(stats.ErrKaliveCount)

debug.Assert(cpid == pid && cpid != si.ID())
nlog.Warningln(sname, "=>", pname, "failure - retrying: [", fast, tout, err, status, "]")
Expand Down
49 changes: 26 additions & 23 deletions ais/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ func (p *proxy) httpbckget(w http.ResponseWriter, r *http.Request, dpq *dpq) {
}
lsmsg.Prefix = cos.TrimPrefix(lsmsg.Prefix)
if err := cmn.ValidatePrefix("bad list-objects request", lsmsg.Prefix); err != nil {
p.statsT.IncErr(stats.ErrListCount)
p.statsT.IncBck(stats.ErrListCount, bck.Bucket())
p.writeErr(w, r, err)
return
}
Expand All @@ -698,7 +698,6 @@ func (p *proxy) httpbckget(w http.ResponseWriter, r *http.Request, dpq *dpq) {
// do
bck, errN := bckArgs.initAndTry()
if errN != nil {
p.statsT.IncErr(stats.ErrListCount)
return
}
p.listObjects(w, r, bck, msg /*amsg*/, &lsmsg)
Expand Down Expand Up @@ -733,11 +732,11 @@ func (p *proxy) httpobjget(w http.ResponseWriter, r *http.Request, origURLBck ..
objName := apireq.items[1]
apiReqFree(apireq)
if err != nil {
p.statsT.IncErr(stats.ErrGetCount)
return
}

if err := cmn.ValidOname(objName); err != nil {
p.statsT.IncErr(stats.ErrGetCount)
p.statsT.IncBck(stats.ErrGetCount, bck.Bucket())
p.writeErr(w, r, err)
return
}
Expand All @@ -748,7 +747,7 @@ func (p *proxy) httpobjget(w http.ResponseWriter, r *http.Request, origURLBck ..
smap := p.owner.smap.get()
tsi, netPub, err := smap.HrwMultiHome(bck.MakeUname(objName))
if err != nil {
p.statsT.IncErr(stats.ErrGetCount)
p.statsT.IncBck(stats.ErrGetCount, bck.Bucket())
p.writeErr(w, r, err)
return
}
Expand All @@ -760,7 +759,7 @@ func (p *proxy) httpobjget(w http.ResponseWriter, r *http.Request, origURLBck ..
http.Redirect(w, r, redirectURL, http.StatusMovedPermanently)

// 4. stats
p.statsT.Inc(stats.GetCount)
p.statsT.IncBck(stats.GetCount, bck.Bucket())
}

// PUT /v1/objects/bucket-name/object-name
Expand All @@ -771,6 +770,7 @@ func (p *proxy) httpobjput(w http.ResponseWriter, r *http.Request, apireq *apiRe
errcnt = stats.ErrPutCount
scnt = stats.PutCount
perms = apc.AcePUT
vlabs = map[string]string{stats.VarlabBucket: "", stats.VarlabXactKind: "", stats.VarlabXactID: ""}
)
// 1. request
if err := p.parseReq(w, r, apireq); err != nil {
Expand All @@ -782,6 +782,7 @@ func (p *proxy) httpobjput(w http.ResponseWriter, r *http.Request, apireq *apiRe
perms = apc.AceAPPEND
errcnt = stats.ErrAppendCount
scnt = stats.AppendCount
vlabs = map[string]string{stats.VarlabBucket: ""}
if apireq.dpq.apnd.hdl != "" {
items, err := preParse(apireq.dpq.apnd.hdl) // apc.QparamAppendHandle
if err != nil {
Expand All @@ -805,9 +806,9 @@ func (p *proxy) httpobjput(w http.ResponseWriter, r *http.Request, apireq *apiRe
bck, err := bckArgs.initAndTry()
freeBctx(bckArgs)
if err != nil {
p.statsT.IncErr(errcnt)
return
}
vlabs[stats.VarlabBucket] = bck.Cname("")

// 3. redirect
var (
Expand All @@ -818,20 +819,20 @@ func (p *proxy) httpobjput(w http.ResponseWriter, r *http.Request, apireq *apiRe
netPub = cmn.NetPublic
)
if err := cmn.ValidOname(objName); err != nil {
p.statsT.IncErr(errcnt)
p.statsT.IncWith(errcnt, vlabs)
p.writeErr(w, r, err)
return
}
if nodeID == "" {
tsi, netPub, err = smap.HrwMultiHome(bck.MakeUname(objName))
if err != nil {
p.statsT.IncErr(errcnt)
p.statsT.IncWith(errcnt, vlabs)
p.writeErr(w, r, err)
return
}
} else {
if tsi = smap.GetTarget(nodeID); tsi == nil {
p.statsT.IncErr(errcnt)
p.statsT.IncWith(errcnt, vlabs)
err = &errNodeNotFound{p.si, smap, verb + " failure:", nodeID}
p.writeErr(w, r, err)
return
Expand All @@ -851,7 +852,7 @@ func (p *proxy) httpobjput(w http.ResponseWriter, r *http.Request, apireq *apiRe
http.Redirect(w, r, redirectURL, http.StatusTemporaryRedirect)

// 4. stats
p.statsT.Inc(scnt)
p.statsT.IncWith(scnt, vlabs)
}

// DELETE /v1/objects/bucket-name/object-name
Expand All @@ -869,14 +870,14 @@ func (p *proxy) httpobjdelete(w http.ResponseWriter, r *http.Request) {
return
}
if err := cmn.ValidOname(objName); err != nil {
p.statsT.IncErr(stats.ErrDeleteCount)
p.statsT.IncBck(stats.ErrDeleteCount, bck.Bucket())
p.writeErr(w, r, err)
return
}
smap := p.owner.smap.get()
tsi, err := smap.HrwName2T(bck.MakeUname(objName))
if err != nil {
p.statsT.IncErr(stats.ErrDeleteCount)
p.statsT.IncBck(stats.ErrDeleteCount, bck.Bucket())
p.writeErr(w, r, err)
return
}
Expand All @@ -886,7 +887,7 @@ func (p *proxy) httpobjdelete(w http.ResponseWriter, r *http.Request) {
redirectURL := p.redirectURL(r, tsi, time.Now() /*started*/, cmn.NetIntraControl)
http.Redirect(w, r, redirectURL, http.StatusTemporaryRedirect)

p.statsT.Inc(stats.DeleteCount)
p.statsT.IncBck(stats.DeleteCount, bck.Bucket())
}

// DELETE { action } /v1/buckets
Expand Down Expand Up @@ -1643,7 +1644,7 @@ func (p *proxy) listObjects(w http.ResponseWriter, r *http.Request, bck *meta.Bc
// LsVerChanged a.k.a. '--check-versions' limitations
if lsmsg.IsFlagSet(apc.LsVerChanged) {
if err := _checkVerChanged(bck, lsmsg); err != nil {
p.statsT.IncErr(stats.ErrListCount)
p.statsT.IncBck(stats.ErrListCount, bck.Bucket())
p.writeErr(w, r, err)
return
}
Expand Down Expand Up @@ -1671,13 +1672,15 @@ func (p *proxy) listObjects(w http.ResponseWriter, r *http.Request, bck *meta.Bc
beg := mono.NanoTime()
lst, err := p.lsPage(bck, amsg, lsmsg, r.Header, p.owner.smap.get())
if err != nil {
p.statsT.IncErr(stats.ErrListCount)
p.statsT.IncBck(stats.ErrListCount, bck.Bucket())
p.writeErr(w, r, err)
return
}
p.statsT.AddMany(
cos.NamedVal64{Name: stats.ListCount, Value: 1},
cos.NamedVal64{Name: stats.ListLatency, Value: mono.SinceNano(beg)},

vlabs := map[string]string{stats.VarlabBucket: bck.Cname("")}
p.statsT.AddWith(
cos.NamedVal64{Name: stats.ListCount, Value: 1, VarLabs: vlabs},
cos.NamedVal64{Name: stats.ListLatency, Value: mono.SinceNano(beg), VarLabs: vlabs},
)

var ok bool
Expand Down Expand Up @@ -1846,18 +1849,18 @@ func (p *proxy) httpobjpost(w http.ResponseWriter, r *http.Request, apireq *apiR
switch msg.Action {
case apc.ActRenameObject:
if err := p.checkAccess(w, r, bck, apc.AceObjMOVE); err != nil {
p.statsT.IncErr(stats.ErrRenameCount)
p.statsT.IncBck(stats.ErrRenameCount, bck.Bucket())
return
}
if err := _checkObjMv(bck, msg, apireq); err != nil {
p.statsT.IncErr(stats.ErrRenameCount)
p.statsT.IncBck(stats.ErrRenameCount, bck.Bucket())
p.writeErr(w, r, err)
}
p.redirectAction(w, r, bck, apireq.items[1], msg)
p.statsT.Inc(stats.RenameCount)
p.statsT.IncBck(stats.RenameCount, bck.Bucket())
case apc.ActPromote:
if err := p.checkAccess(w, r, bck, apc.AcePromote); err != nil {
p.statsT.IncErr(stats.ErrRenameCount)
p.statsT.IncBck(stats.ErrRenameCount, bck.Bucket())
return
}
// ActionMsg.Name is the source
Expand Down
2 changes: 1 addition & 1 deletion ais/prxbck.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (bctx *bctx) initAndTry() (bck *meta.Bck, err error) {
}
if ecode != http.StatusNotFound {
// user GET and PUT requests: making a _silent_ exception for assorted error codes
// (counting them via stats.IncErr though)
// (counting them via stats.Inc though)
if bctx.perms == apc.AceGET || bctx.perms == apc.AcePUT {
if ecode == http.StatusUnauthorized || ecode == http.StatusForbidden {
bctx.p.writeErr(bctx.w, bctx.r, err, ecode, Silent)
Expand Down
8 changes: 5 additions & 3 deletions ais/prxs3.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,9 +413,11 @@ func (p *proxy) lsAllPagesS3(bck *meta.Bck, amsg *apc.ActMsg, lsmsg *apc.LsoMsg,
if err != nil {
return lst, err
}
p.statsT.AddMany(
cos.NamedVal64{Name: stats.ListCount, Value: 1},
cos.NamedVal64{Name: stats.ListLatency, Value: mono.SinceNano(beg)},

vlabs := map[string]string{stats.VarlabBucket: bck.Cname("")}
p.statsT.AddWith(
cos.NamedVal64{Name: stats.ListCount, Value: 1, VarLabs: vlabs},
cos.NamedVal64{Name: stats.ListLatency, Value: mono.SinceNano(beg), VarLabs: vlabs},
)
if pageNum == 1 {
lst = page
Expand Down
Loading

0 comments on commit 118a821

Please sign in to comment.