From 4006b517243c869d72464c0f55e75c9fa4fad26e Mon Sep 17 00:00:00 2001 From: Muhammad Iqbal Alaydrus Date: Tue, 14 Nov 2023 20:44:56 +0700 Subject: [PATCH] concurrent query --- pkg/datasource/datasource.go | 58 ++++++++++++++++++++++++------------ pkg/zabbix/methods.go | 2 +- pkg/zabbix/zabbix.go | 25 +++++++++++++--- 3 files changed, 61 insertions(+), 24 deletions(-) diff --git a/pkg/datasource/datasource.go b/pkg/datasource/datasource.go index b5553e58c..7179229c8 100644 --- a/pkg/datasource/datasource.go +++ b/pkg/datasource/datasource.go @@ -2,7 +2,9 @@ package datasource import ( "context" + "encoding/json" "errors" + "sync" "github.com/alexanderzobnin/grafana-zabbix/pkg/httpclient" "github.com/alexanderzobnin/grafana-zabbix/pkg/metrics" @@ -113,31 +115,49 @@ func (ds *ZabbixDatasource) QueryData(ctx context.Context, req *backend.QueryDat return nil, err } + responseLock := new(sync.Mutex) + wg := new(sync.WaitGroup) for _, q := range req.Queries { - res := backend.DataResponse{} - query, err := ReadQuery(q) - ds.logger.Debug("DS query", "query", q) - if err != nil { - res.Error = err - } else if query.QueryType == MODE_METRICS { - frames, err := zabbixDS.queryNumericItems(ctx, &query) - if err != nil { - res.Error = err - } else { - res.Frames = append(res.Frames, frames...) - } - } else if query.QueryType == MODE_ITEMID { - frames, err := zabbixDS.queryItemIdData(ctx, &query) + wg.Add(1) + qCopy := backend.DataQuery{ + RefID: q.RefID, + QueryType: q.QueryType, + MaxDataPoints: q.MaxDataPoints, + Interval: q.Interval, + TimeRange: q.TimeRange, + JSON: make(json.RawMessage, len(q.JSON)), + } + copy(qCopy.JSON, q.JSON) + go func() { + defer wg.Done() + res := backend.DataResponse{} + query, err := ReadQuery(qCopy) + ds.logger.Debug("DS query", "query", qCopy) if err != nil { res.Error = err + } else if query.QueryType == MODE_METRICS { + frames, err := zabbixDS.queryNumericItems(ctx, &query) + if err != nil { + res.Error = err + } else { + res.Frames = append(res.Frames, frames...) + } + } else if query.QueryType == MODE_ITEMID { + frames, err := zabbixDS.queryItemIdData(ctx, &query) + if err != nil { + res.Error = err + } else { + res.Frames = append(res.Frames, frames...) + } } else { - res.Frames = append(res.Frames, frames...) + res.Error = ErrNonMetricQueryNotSupported } - } else { - res.Error = ErrNonMetricQueryNotSupported - } - qdr.Responses[q.RefID] = res + responseLock.Lock() + qdr.Responses[qCopy.RefID] = res + responseLock.Unlock() + }() } + wg.Wait() return qdr, nil } diff --git a/pkg/zabbix/methods.go b/pkg/zabbix/methods.go index 6d0737ca5..7bc9e0365 100644 --- a/pkg/zabbix/methods.go +++ b/pkg/zabbix/methods.go @@ -388,7 +388,7 @@ func (ds *Zabbix) GetAllItems(ctx context.Context, hostids []string, appids []st filter["value_type"] = []int{1, 2, 4} } - if ds.version >= 54 { + if ds.version.Load() >= 54 { params["selectTags"] = "extend" if len(itemTagFilter) > 0 { allTags := strings.Split(itemTagFilter, ",") diff --git a/pkg/zabbix/zabbix.go b/pkg/zabbix/zabbix.go index 3cebbc03b..df195a497 100644 --- a/pkg/zabbix/zabbix.go +++ b/pkg/zabbix/zabbix.go @@ -4,6 +4,8 @@ import ( "context" "errors" "strings" + "sync" + "sync/atomic" "time" "github.com/alexanderzobnin/grafana-zabbix/pkg/metrics" @@ -15,6 +17,8 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/backend/log" ) +const AUTH_RATE_LIMIT = time.Duration(5 * time.Second) + // Zabbix is a wrapper for Zabbix API. It wraps Zabbix API queries and performs authentication, adds caching, // deduplication and other performance optimizations. type Zabbix struct { @@ -22,8 +26,10 @@ type Zabbix struct { dsInfo *backend.DataSourceInstanceSettings settings *settings.ZabbixDatasourceSettings cache *ZabbixCache - version int + version atomic.Int32 logger log.Logger + authLock sync.RWMutex + lastAuth time.Time } // New returns new instance of Zabbix client. @@ -49,14 +55,14 @@ func (ds *Zabbix) Request(ctx context.Context, apiReq *ZabbixAPIRequest) (*simpl var resultJson *simplejson.Json var err error - if ds.version == 0 { + if ds.version.Load() == 0 { version, err := ds.GetVersion(ctx) if err != nil { ds.logger.Error("Error querying Zabbix version", "error", err) - ds.version = -1 + ds.version.Store(-1) } else { ds.logger.Debug("Got Zabbix version", "version", version) - ds.version = version + ds.version.Store(int32(version)) } } @@ -92,14 +98,18 @@ func (zabbix *Zabbix) request(ctx context.Context, method string, params ZabbixA return zabbix.api.RequestUnauthenticated(ctx, method, params) } + zabbix.authLock.RLock() result, err := zabbix.api.Request(ctx, method, params) + zabbix.authLock.RUnlock() notAuthorized := isNotAuthorized(err) isTokenAuth := zabbix.settings.AuthType == settings.AuthTypeToken if err == zabbixapi.ErrNotAuthenticated || (notAuthorized && !isTokenAuth) { if notAuthorized { zabbix.logger.Debug("Authentication token expired, performing re-login") } + zabbix.authLock.Lock() err = zabbix.Authenticate(ctx) + zabbix.authLock.Unlock() if err != nil { return nil, err } @@ -112,6 +122,11 @@ func (zabbix *Zabbix) request(ctx context.Context, method string, params ZabbixA } func (zabbix *Zabbix) Authenticate(ctx context.Context) error { + // check for consecutive auth calls. this is a protection against + // parallel api calls + if time.Now().Sub(zabbix.lastAuth) < AUTH_RATE_LIMIT { + return nil + } jsonData, err := simplejson.NewJson(zabbix.dsInfo.JSONData) if err != nil { return err @@ -129,6 +144,7 @@ func (zabbix *Zabbix) Authenticate(ctx context.Context) error { return err } zabbix.logger.Debug("Using API token for authentication") + zabbix.lastAuth = time.Now() return nil } @@ -147,6 +163,7 @@ func (zabbix *Zabbix) Authenticate(ctx context.Context) error { return err } zabbix.logger.Debug("Successfully authenticated", "url", zabbix.api.GetUrl().String(), "user", zabbixLogin) + zabbix.lastAuth = time.Now() return nil }