Skip to content

Commit

Permalink
concurrent query
Browse files Browse the repository at this point in the history
  • Loading branch information
iqbalaydrus committed Nov 15, 2023
1 parent 17d71f6 commit 4006b51
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 24 deletions.
58 changes: 39 additions & 19 deletions pkg/datasource/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package datasource

import (
"context"
"encoding/json"
"errors"
"sync"

"github.com/alexanderzobnin/grafana-zabbix/pkg/httpclient"
"github.com/alexanderzobnin/grafana-zabbix/pkg/metrics"
Expand Down Expand Up @@ -113,31 +115,49 @@ func (ds *ZabbixDatasource) QueryData(ctx context.Context, req *backend.QueryDat
return nil, err
}

responseLock := new(sync.Mutex)
wg := new(sync.WaitGroup)
for _, q := range req.Queries {
res := backend.DataResponse{}
query, err := ReadQuery(q)
ds.logger.Debug("DS query", "query", q)
if err != nil {
res.Error = err
} else if query.QueryType == MODE_METRICS {
frames, err := zabbixDS.queryNumericItems(ctx, &query)
if err != nil {
res.Error = err
} else {
res.Frames = append(res.Frames, frames...)
}
} else if query.QueryType == MODE_ITEMID {
frames, err := zabbixDS.queryItemIdData(ctx, &query)
wg.Add(1)
qCopy := backend.DataQuery{
RefID: q.RefID,
QueryType: q.QueryType,
MaxDataPoints: q.MaxDataPoints,
Interval: q.Interval,
TimeRange: q.TimeRange,
JSON: make(json.RawMessage, len(q.JSON)),
}
copy(qCopy.JSON, q.JSON)
go func() {
defer wg.Done()
res := backend.DataResponse{}
query, err := ReadQuery(qCopy)
ds.logger.Debug("DS query", "query", qCopy)
if err != nil {
res.Error = err
} else if query.QueryType == MODE_METRICS {
frames, err := zabbixDS.queryNumericItems(ctx, &query)
if err != nil {
res.Error = err
} else {
res.Frames = append(res.Frames, frames...)
}
} else if query.QueryType == MODE_ITEMID {
frames, err := zabbixDS.queryItemIdData(ctx, &query)
if err != nil {
res.Error = err
} else {
res.Frames = append(res.Frames, frames...)
}
} else {
res.Frames = append(res.Frames, frames...)
res.Error = ErrNonMetricQueryNotSupported
}
} else {
res.Error = ErrNonMetricQueryNotSupported
}
qdr.Responses[q.RefID] = res
responseLock.Lock()
qdr.Responses[qCopy.RefID] = res
responseLock.Unlock()
}()
}
wg.Wait()

return qdr, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/zabbix/methods.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ func (ds *Zabbix) GetAllItems(ctx context.Context, hostids []string, appids []st
filter["value_type"] = []int{1, 2, 4}
}

if ds.version >= 54 {
if ds.version.Load() >= 54 {
params["selectTags"] = "extend"
if len(itemTagFilter) > 0 {
allTags := strings.Split(itemTagFilter, ",")
Expand Down
25 changes: 21 additions & 4 deletions pkg/zabbix/zabbix.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"errors"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/alexanderzobnin/grafana-zabbix/pkg/metrics"
Expand All @@ -15,15 +17,19 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
)

const AUTH_RATE_LIMIT = time.Duration(5 * time.Second)

// Zabbix is a wrapper for Zabbix API. It wraps Zabbix API queries and performs authentication, adds caching,
// deduplication and other performance optimizations.
type Zabbix struct {
api *zabbixapi.ZabbixAPI
dsInfo *backend.DataSourceInstanceSettings
settings *settings.ZabbixDatasourceSettings
cache *ZabbixCache
version int
version atomic.Int32
logger log.Logger
authLock sync.RWMutex
lastAuth time.Time
}

// New returns new instance of Zabbix client.
Expand All @@ -49,14 +55,14 @@ func (ds *Zabbix) Request(ctx context.Context, apiReq *ZabbixAPIRequest) (*simpl
var resultJson *simplejson.Json
var err error

if ds.version == 0 {
if ds.version.Load() == 0 {
version, err := ds.GetVersion(ctx)
if err != nil {
ds.logger.Error("Error querying Zabbix version", "error", err)
ds.version = -1
ds.version.Store(-1)
} else {
ds.logger.Debug("Got Zabbix version", "version", version)
ds.version = version
ds.version.Store(int32(version))
}
}

Expand Down Expand Up @@ -92,14 +98,18 @@ func (zabbix *Zabbix) request(ctx context.Context, method string, params ZabbixA
return zabbix.api.RequestUnauthenticated(ctx, method, params)
}

zabbix.authLock.RLock()
result, err := zabbix.api.Request(ctx, method, params)
zabbix.authLock.RUnlock()
notAuthorized := isNotAuthorized(err)
isTokenAuth := zabbix.settings.AuthType == settings.AuthTypeToken
if err == zabbixapi.ErrNotAuthenticated || (notAuthorized && !isTokenAuth) {
if notAuthorized {
zabbix.logger.Debug("Authentication token expired, performing re-login")
}
zabbix.authLock.Lock()
err = zabbix.Authenticate(ctx)
zabbix.authLock.Unlock()
if err != nil {
return nil, err
}
Expand All @@ -112,6 +122,11 @@ func (zabbix *Zabbix) request(ctx context.Context, method string, params ZabbixA
}

func (zabbix *Zabbix) Authenticate(ctx context.Context) error {
// check for consecutive auth calls. this is a protection against
// parallel api calls
if time.Now().Sub(zabbix.lastAuth) < AUTH_RATE_LIMIT {
return nil
}
jsonData, err := simplejson.NewJson(zabbix.dsInfo.JSONData)
if err != nil {
return err
Expand All @@ -129,6 +144,7 @@ func (zabbix *Zabbix) Authenticate(ctx context.Context) error {
return err
}
zabbix.logger.Debug("Using API token for authentication")
zabbix.lastAuth = time.Now()
return nil
}

Expand All @@ -147,6 +163,7 @@ func (zabbix *Zabbix) Authenticate(ctx context.Context) error {
return err
}
zabbix.logger.Debug("Successfully authenticated", "url", zabbix.api.GetUrl().String(), "user", zabbixLogin)
zabbix.lastAuth = time.Now()

return nil
}
Expand Down

0 comments on commit 4006b51

Please sign in to comment.