From 49b26b4acd4e3990e703a06bf1a8665e9192f529 Mon Sep 17 00:00:00 2001 From: Toan Nguyen Date: Fri, 11 Oct 2024 14:33:32 +0700 Subject: [PATCH] change native queries to collection --- Makefile | 5 +- connector/internal/collection.go | 18 -- connector/internal/collection_request.go | 47 +++-- connector/internal/native_query.go | 215 +++++++++++--------- connector/internal/native_query_bool_exp.go | 151 -------------- connector/internal/native_query_request.go | 111 ++++++++++ connector/internal/raw_query.go | 138 +++++++++++++ connector/internal/utils.go | 36 ++++ connector/metadata/const.go | 31 --- connector/metadata/native_operation.go | 74 +++---- connector/query.go | 39 +++- tests/engine/app/metadata/prometheus.hml | 167 ++++----------- tests/engine/app/metadata/service_up.hml | 179 +++++++--------- 13 files changed, 599 insertions(+), 612 deletions(-) delete mode 100644 connector/internal/native_query_bool_exp.go create mode 100644 connector/internal/native_query_request.go create mode 100644 connector/internal/raw_query.go diff --git a/Makefile b/Makefile index 21bee64..1f99c69 100644 --- a/Makefile +++ b/Makefile @@ -40,10 +40,11 @@ ci-build-configuration: clean .PHONY: build-supergraph-test build-supergraph-test: + docker compose up -d --build cd tests/engine && \ ddn connector-link update prometheus --add-all-resources --subgraph ./app/subgraph.yaml && \ - ddn supergraph build local && \ - docker compose up -d --build engine + ddn supergraph build local + docker compose up -d --build engine .PHONY: generate-api-types generate-api-types: diff --git a/connector/internal/collection.go b/connector/internal/collection.go index cd9791a..e33bfa1 100644 --- a/connector/internal/collection.go +++ b/connector/internal/collection.go @@ -397,21 +397,3 @@ func (qce *QueryCollectionExecutor) evalValueComparisonCondition(operator *schem func (qce *QueryCollectionExecutor) getComparisonValue(input schema.ComparisonValue) (any, error) { return getComparisonValue(input, qce.Variables) } - -func getComparisonValue(input schema.ComparisonValue, variables map[string]any) (any, error) { - if len(input) == 0 { - return nil, nil - } - - switch v := input.Interface().(type) { - case *schema.ComparisonValueScalar: - return v.Value, nil - case *schema.ComparisonValueVariable: - if value, ok := variables[v.Name]; ok { - return value, nil - } - return nil, fmt.Errorf("variable %s does not exist", v.Name) - default: - return nil, fmt.Errorf("invalid comparison value: %v", input) - } -} diff --git a/connector/internal/collection_request.go b/connector/internal/collection_request.go index 34dd1da..2a05b52 100644 --- a/connector/internal/collection_request.go +++ b/connector/internal/collection_request.go @@ -28,10 +28,10 @@ type CollectionRequest struct { Timestamp schema.ComparisonValue Start schema.ComparisonValue End schema.ComparisonValue + OrderBy []ColumnOrder Value *schema.ExpressionBinaryComparisonOperator LabelExpressions map[string]*LabelExpression Functions []KeyValue - OrderBy []ColumnOrder } // EvalCollectionRequest evaluates the requested collection data of the query request @@ -67,24 +67,11 @@ func EvalCollectionRequest(request *schema.QueryRequest, arguments map[string]an } } - if request.Query.OrderBy != nil { - for _, elem := range request.Query.OrderBy.Elements { - switch target := elem.Target.Interface().(type) { - case *schema.OrderByColumn: - if slices.Contains([]string{metadata.LabelsKey, metadata.ValuesKey}, target.Name) { - return nil, fmt.Errorf("ordering by `%s` is unsupported", target.Name) - } - - orderBy := ColumnOrder{ - Name: target.Name, - Descending: elem.OrderDirection == schema.OrderDirectionDesc, - } - result.OrderBy = append(result.OrderBy, orderBy) - default: - return nil, fmt.Errorf("support ordering by column only, got: %v", elem.Target) - } - } + orderBy, err := evalCollectionOrderBy(request.Query.OrderBy) + if err != nil { + return nil, err } + result.OrderBy = orderBy return result, nil } @@ -142,3 +129,27 @@ func (pr *CollectionRequest) evalQueryPredicate(expression schema.Expression) er return nil } + +func evalCollectionOrderBy(orderBy *schema.OrderBy) ([]ColumnOrder, error) { + var results []ColumnOrder + if orderBy == nil { + return results, nil + } + for _, elem := range orderBy.Elements { + switch target := elem.Target.Interface().(type) { + case *schema.OrderByColumn: + if slices.Contains([]string{metadata.LabelsKey, metadata.ValuesKey}, target.Name) { + return nil, fmt.Errorf("ordering by `%s` is unsupported", target.Name) + } + + orderBy := ColumnOrder{ + Name: target.Name, + Descending: elem.OrderDirection == schema.OrderDirectionDesc, + } + results = append(results, orderBy) + default: + return nil, fmt.Errorf("support ordering by column only, got: %v", elem.Target) + } + } + return results, nil +} diff --git a/connector/internal/native_query.go b/connector/internal/native_query.go index aa66e5a..6a7145d 100644 --- a/connector/internal/native_query.go +++ b/connector/internal/native_query.go @@ -3,32 +3,18 @@ package internal import ( "context" "fmt" + "regexp" + "slices" "strconv" "github.com/hasura/ndc-prometheus/connector/client" "github.com/hasura/ndc-prometheus/connector/metadata" "github.com/hasura/ndc-sdk-go/schema" "github.com/hasura/ndc-sdk-go/utils" + "github.com/prometheus/common/model" "go.opentelemetry.io/otel/trace" ) -// nativeQueryParameters the structured arguments which is evaluated from the raw expression -type nativeQueryParameters struct { - Timestamp any - Start any - End any - Timeout any - Step any - Where map[string]NativeQueryLabelBoolExp -} - -// NewNativeQueryParameters creates a nativeQueryParameters instance -func NewNativeQueryParameters() *nativeQueryParameters { - return &nativeQueryParameters{ - Where: make(map[string]NativeQueryLabelBoolExp), - } -} - type NativeQueryExecutor struct { Client *client.Client Tracer trace.Tracer @@ -36,37 +22,19 @@ type NativeQueryExecutor struct { Request *schema.QueryRequest NativeQuery *metadata.NativeQuery Arguments map[string]any - - selection schema.NestedField + Variables map[string]any } // Explain explains the query request -func (nqe *NativeQueryExecutor) Explain(ctx context.Context) (*nativeQueryParameters, string, error) { - var err error - params := NewNativeQueryParameters() +func (nqe *NativeQueryExecutor) Explain(ctx context.Context) (*NativeQueryRequest, string, error) { + params, err := EvalNativeQueryRequest(nqe.Request, nqe.Arguments, nqe.Variables) queryString := nqe.NativeQuery.Query for key, arg := range nqe.Arguments { switch key { - case metadata.ArgumentKeyStart: - params.Start = arg - case metadata.ArgumentKeyEnd: - params.End = arg case metadata.ArgumentKeyStep: params.Step = arg - case metadata.ArgumentKeyTime: - params.Timestamp = arg case metadata.ArgumentKeyTimeout: params.Timeout = arg - case metadata.ArgumentKeyWhere: - if utils.IsNil(arg) { - continue - } - - boolExps, err := decodeNativeQueryLabelBoolExps(arg) - if err != nil { - return nil, "", schema.UnprocessableContentError(err.Error(), nil) - } - params.Where = boolExps default: argInfo, ok := nqe.NativeQuery.Arguments[key] if ok { @@ -106,44 +74,14 @@ func (nqe *NativeQueryExecutor) Explain(ctx context.Context) (*nativeQueryParame if unresolvedArguments := metadata.FindNativeQueryVariableNames(queryString); len(unresolvedArguments) > 0 { return nil, "", schema.BadRequestError(fmt.Sprintf("unresolved variables %v in the Prometheus query", unresolvedArguments), map[string]any{ - "query": queryString, + "collection": nqe.Request.Collection, + "query": queryString, }) } return params, queryString, nil } -// ExplainRaw explains the raw promQL query request -func (nqe *NativeQueryExecutor) ExplainRaw(ctx context.Context) (*nativeQueryParameters, string, error) { - params := NewNativeQueryParameters() - var err error - var queryString string - for key, arg := range nqe.Arguments { - switch key { - case metadata.ArgumentKeyStart: - params.Start = arg - case metadata.ArgumentKeyEnd: - params.End = arg - case metadata.ArgumentKeyStep: - params.Step = arg - case metadata.ArgumentKeyTime: - params.Timestamp = arg - case metadata.ArgumentKeyTimeout: - params.Timeout = arg - case metadata.ArgumentKeyQuery: - queryString, err = utils.DecodeString(arg) - if err != nil { - return nil, "", schema.UnprocessableContentError(err.Error(), nil) - } - if queryString == "" { - return nil, "", schema.UnprocessableContentError("the query argument must not be empty", nil) - } - } - } - - return params, queryString, nil -} - // Execute executes the query request func (nqe *NativeQueryExecutor) Execute(ctx context.Context) (*schema.RowSet, error) { ctx, span := nqe.Tracer.Start(ctx, "Execute Native Query") @@ -156,24 +94,7 @@ func (nqe *NativeQueryExecutor) Execute(ctx context.Context) (*schema.RowSet, er return nqe.execute(ctx, params, queryString) } -// ExecuteRaw executes the raw promQL query request -func (nqe *NativeQueryExecutor) ExecuteRaw(ctx context.Context) (*schema.RowSet, error) { - ctx, span := nqe.Tracer.Start(ctx, "Execute Raw PromQL Query") - defer span.End() - params, queryString, err := nqe.ExplainRaw(ctx) - if err != nil { - return nil, err - } - return nqe.execute(ctx, params, queryString) -} - -func (nqe *NativeQueryExecutor) execute(ctx context.Context, params *nativeQueryParameters, queryString string) (*schema.RowSet, error) { - var err error - nqe.selection, err = utils.EvalFunctionSelectionFieldValue(nqe.Request) - if err != nil { - return nil, schema.UnprocessableContentError(err.Error(), nil) - } - +func (nqe *NativeQueryExecutor) execute(ctx context.Context, params *NativeQueryRequest, queryString string) (*schema.RowSet, error) { flat, err := utils.DecodeNullableBoolean(nqe.Arguments[metadata.ArgumentKeyFlat]) if err != nil { return nil, schema.UnprocessableContentError(fmt.Sprintf("expected boolean type for the flat field, got: %v", err), map[string]any{ @@ -195,46 +116,140 @@ func (nqe *NativeQueryExecutor) execute(ctx context.Context, params *nativeQuery return nil, err } - results, err := utils.EvalNestedColumnFields(nqe.selection, rawResults) + results, err := utils.EvalObjectsWithColumnSelection(nqe.Request.Query.Fields, rawResults) if err != nil { return nil, err } return &schema.RowSet{ Aggregates: schema.RowSetAggregates{}, - Rows: []map[string]any{ - { - "__value": results, - }, - }, + Rows: results, }, nil } -func (nqe *NativeQueryExecutor) queryInstant(ctx context.Context, queryString string, params *nativeQueryParameters, flat bool) ([]map[string]any, error) { +func (nqe *NativeQueryExecutor) queryInstant(ctx context.Context, queryString string, params *NativeQueryRequest, flat bool) ([]map[string]any, error) { vector, _, err := nqe.Client.Query(ctx, queryString, params.Timestamp, params.Timeout) if err != nil { return nil, schema.UnprocessableContentError(err.Error(), nil) } span := trace.SpanFromContext(ctx) - span.AddEvent("post_filter_results", trace.WithAttributes(utils.JSONAttribute("where", params.Where))) - vector = nqe.filterVectorResults(vector, params.Where) + span.AddEvent("post_filter_results", trace.WithAttributes(utils.JSONAttribute("expression", params.Expression))) + vector, err = nqe.filterVectorResults(vector, params.Expression) + if err != nil { + return nil, schema.UnprocessableContentError(err.Error(), nil) + } results := createQueryResultsFromVector(vector, nqe.NativeQuery.Labels, nqe.Runtime, flat) return results, nil } -func (nqe *NativeQueryExecutor) queryRange(ctx context.Context, queryString string, params *nativeQueryParameters, flat bool) ([]map[string]any, error) { +func (nqe *NativeQueryExecutor) queryRange(ctx context.Context, queryString string, params *NativeQueryRequest, flat bool) ([]map[string]any, error) { matrix, _, err := nqe.Client.QueryRange(ctx, queryString, params.Start, params.End, params.Step, params.Timeout) if err != nil { return nil, schema.UnprocessableContentError(err.Error(), nil) } span := trace.SpanFromContext(ctx) - span.AddEvent("post_filter_results", trace.WithAttributes(utils.JSONAttribute("where", params.Where))) - matrix = nqe.filterMatrixResults(matrix, params.Where) + span.AddEvent("post_filter_results", trace.WithAttributes(utils.JSONAttribute("expression", params.Expression))) + matrix, err = nqe.filterMatrixResults(matrix, params.Expression) + if err != nil { + return nil, schema.UnprocessableContentError(err.Error(), nil) + } results := createQueryResultsFromMatrix(matrix, nqe.NativeQuery.Labels, nqe.Runtime, flat) return results, nil } + +func (nqe *NativeQueryExecutor) filterVectorResults(vector model.Vector, expr schema.Expression) (model.Vector, error) { + if expr == nil || len(vector) == 0 { + return vector, nil + } + results := model.Vector{} + for _, item := range vector { + valid, err := nqe.validateLabelBoolExp(item.Metric, expr) + if err != nil { + return nil, err + } + if valid { + results = append(results, item) + } + } + return results, nil +} + +func (nqe *NativeQueryExecutor) filterMatrixResults(matrix model.Matrix, expr schema.Expression) (model.Matrix, error) { + if expr == nil || len(matrix) == 0 { + return matrix, nil + } + results := model.Matrix{} + for _, item := range matrix { + valid, err := nqe.validateLabelBoolExp(item.Metric, expr) + if err != nil { + return nil, err + } + if valid { + results = append(results, item) + } + } + return results, nil +} + +func (nqe *NativeQueryExecutor) validateLabelBoolExp(labels model.Metric, expr schema.Expression) (bool, error) { + switch exprs := expr.Interface().(type) { + case *schema.ExpressionAnd: + for _, e := range exprs.Expressions { + valid, err := nqe.validateLabelBoolExp(labels, e) + if !valid || err != nil { + return false, err + } + } + return true, nil + case *schema.ExpressionBinaryComparisonOperator: + labelValue := labels[model.LabelName(exprs.Column.Name)] + switch exprs.Operator { + case metadata.Equal, metadata.NotEqual, metadata.Regex, metadata.NotRegex: + value, err := getComparisonValueString(exprs.Value, nqe.Variables) + if err != nil { + return false, err + } + if value == nil { + return true, nil + } + if exprs.Operator == metadata.Equal { + return *value == string(labelValue), nil + } + if exprs.Operator == metadata.NotEqual { + return *value != string(labelValue), nil + } + + regex, err := regexp.Compile(*value) + if err != nil { + return false, fmt.Errorf("invalid regular expression %s: %s", *value, err) + } + if exprs.Operator == metadata.Regex { + return regex.MatchString(string(labelValue)), nil + } + return !regex.MatchString(string(labelValue)), nil + case metadata.In, metadata.NotIn: + value, err := getComparisonValueStringSlice(exprs.Value, nqe.Variables) + if err != nil { + return false, fmt.Errorf("failed to decode string array; %s", err) + } + if value == nil { + return true, nil + } + if exprs.Operator == metadata.In { + return slices.Contains(value, string(labelValue)), nil + } else { + return !slices.Contains(value, string(labelValue)), nil + } + } + // case *schema.ExpressionExists: + // case *schema.ExpressionNot: + // case *schema.ExpressionOr: + // case *schema.ExpressionUnaryComparisonOperator: + } + return false, nil +} diff --git a/connector/internal/native_query_bool_exp.go b/connector/internal/native_query_bool_exp.go deleted file mode 100644 index a161402..0000000 --- a/connector/internal/native_query_bool_exp.go +++ /dev/null @@ -1,151 +0,0 @@ -package internal - -import ( - "fmt" - "regexp" - "slices" - - "github.com/hasura/ndc-prometheus/connector/metadata" - "github.com/hasura/ndc-sdk-go/utils" - "github.com/prometheus/common/model" -) - -// NativeQueryLabelBoolExp represents the boolean expression object type -type NativeQueryLabelBoolExp struct { - Equal *string - NotEqual *string - In []string - NotIn []string - Regex *regexp.Regexp - NotRegex *regexp.Regexp -} - -// Validate validates the value -func (be NativeQueryLabelBoolExp) Validate(value string) bool { - return (be.Equal == nil || *be.Equal == value) && - (be.NotEqual == nil || *be.NotEqual != value) && - (be.In == nil || slices.Contains(be.In, value)) && - (be.NotIn == nil || !slices.Contains(be.NotIn, value)) && - (be.Regex == nil || be.Regex.MatchString(value)) && - (be.NotRegex == nil || !be.NotRegex.MatchString(value)) -} - -// FromValue decode any value to NativeQueryLabelBoolExp. -func (be *NativeQueryLabelBoolExp) FromValue(value any) error { - valueMap, ok := value.(map[string]any) - if !ok { - return fmt.Errorf("invalid boolean expression argument %v", value) - } - if len(valueMap) == 0 { - return nil - } - - var err error - be.Equal, err = utils.GetNullableString(valueMap, metadata.Equal) - if err != nil { - return err - } - be.NotEqual, err = utils.GetNullableString(valueMap, metadata.NotEqual) - if err != nil { - return err - } - rawRegex, err := utils.GetNullableString(valueMap, metadata.Regex) - if err != nil { - return err - } - - if rawRegex != nil { - regex, err := regexp.Compile(*rawRegex) - if err != nil { - return fmt.Errorf("invalid _regex: %s", err) - } - be.Regex = regex - } - - rawNotRegex, err := utils.GetNullableString(valueMap, metadata.NotRegex) - if err != nil { - return err - } - - if rawNotRegex != nil { - nregex, err := regexp.Compile(*rawNotRegex) - if err != nil { - return fmt.Errorf("invalid _nregex: %s", err) - } - be.NotRegex = nregex - } - - if v, ok := valueMap[metadata.In]; ok { - be.In, err = decodeStringSlice(v) - if err != nil { - return err - } - } - if v, ok := valueMap[metadata.NotIn]; ok { - be.NotIn, err = decodeStringSlice(v) - if err != nil { - return err - } - } - - return nil -} - -func (nqe *NativeQueryExecutor) filterVectorResults(vector model.Vector, where map[string]NativeQueryLabelBoolExp) model.Vector { - if len(where) == 0 || len(vector) == 0 { - return vector - } - results := model.Vector{} - for _, item := range vector { - if nqe.validateLabelBoolExp(item.Metric, where) { - results = append(results, item) - } - } - return results -} - -func (nqe *NativeQueryExecutor) filterMatrixResults(matrix model.Matrix, where map[string]NativeQueryLabelBoolExp) model.Matrix { - if len(where) == 0 || len(matrix) == 0 { - return matrix - } - results := model.Matrix{} - for _, item := range matrix { - if nqe.validateLabelBoolExp(item.Metric, where) { - results = append(results, item) - } - } - return results -} - -func (nqe *NativeQueryExecutor) validateLabelBoolExp(labels model.Metric, where map[string]NativeQueryLabelBoolExp) bool { - for key, boolExp := range where { - if labelValue, ok := labels[model.LabelName(key)]; ok { - if !boolExp.Validate(string(labelValue)) { - return false - } - } - } - return true -} - -func decodeNativeQueryLabelBoolExps(value any) (map[string]NativeQueryLabelBoolExp, error) { - results := make(map[string]NativeQueryLabelBoolExp) - if utils.IsNil(value) { - return results, nil - } - - valueMap, ok := value.(map[string]any) - if !ok { - return nil, fmt.Errorf("invalid where; expected map, got: %v", value) - } - - for k, v := range valueMap { - boolExp := NativeQueryLabelBoolExp{} - if err := boolExp.FromValue(v); err != nil { - return nil, err - } - results[k] = boolExp - } - - return results, nil -} diff --git a/connector/internal/native_query_request.go b/connector/internal/native_query_request.go new file mode 100644 index 0000000..4555ada --- /dev/null +++ b/connector/internal/native_query_request.go @@ -0,0 +1,111 @@ +package internal + +import ( + "errors" + "fmt" + + "github.com/hasura/ndc-prometheus/connector/metadata" + "github.com/hasura/ndc-sdk-go/schema" +) + +// NativeQueryRequest the structured native request which is evaluated from the raw expression +type NativeQueryRequest struct { + Timestamp any + Start any + End any + Timeout any + Step any + OrderBy []ColumnOrder + Variables map[string]any + Expression schema.Expression +} + +// EvalNativeQueryRequest evaluates the requested collection data of the query request +func EvalNativeQueryRequest(request *schema.QueryRequest, arguments map[string]any, variables map[string]any) (*NativeQueryRequest, error) { + result := &NativeQueryRequest{ + Variables: variables, + } + if len(request.Query.Predicate) > 0 { + newExpr, err := result.evalQueryPredicate(request.Query.Predicate) + if err != nil { + return nil, err + } + if newExpr != nil { + result.Expression = newExpr.Encode() + } + } + + orderBy, err := evalCollectionOrderBy(request.Query.OrderBy) + if err != nil { + return nil, err + } + result.OrderBy = orderBy + return result, nil +} + +func (pr *NativeQueryRequest) getComparisonValue(input schema.ComparisonValue) (any, error) { + return getComparisonValue(input, pr.Variables) +} + +func (pr *NativeQueryRequest) evalQueryPredicate(expression schema.Expression) (schema.ExpressionEncoder, error) { + switch expr := expression.Interface().(type) { + case *schema.ExpressionAnd: + exprs := []schema.ExpressionEncoder{} + for _, nestedExpr := range expr.Expressions { + evalExpr, err := pr.evalQueryPredicate(nestedExpr) + if err != nil { + return nil, err + } + if evalExpr != nil { + exprs = append(exprs, evalExpr) + } + } + return schema.NewExpressionAnd(exprs...), nil + case *schema.ExpressionBinaryComparisonOperator: + if expr.Column.Type != schema.ComparisonTargetTypeColumn { + return nil, fmt.Errorf("%s: unsupported comparison target `%s`", expr.Column.Name, expr.Column.Type) + } + + switch expr.Column.Name { + case metadata.TimestampKey: + switch expr.Operator { + case metadata.Equal: + if pr.Timestamp != nil { + return nil, errors.New("unsupported multiple equality for the timestamp") + } + ts, err := pr.getComparisonValue(expr.Value) + if err != nil { + return nil, err + } + pr.Timestamp = ts + return nil, nil + case metadata.Least: + if pr.End != nil { + return nil, errors.New("unsupported multiple _lt expressions for the timestamp") + } + end, err := pr.getComparisonValue(expr.Value) + if err != nil { + return nil, err + } + pr.End = end + return nil, nil + case metadata.Greater: + if pr.Start != nil { + return nil, errors.New("unsupported multiple _gt expressions for the timestamp") + } + start, err := pr.getComparisonValue(expr.Value) + if err != nil { + return nil, err + } + pr.Start = start + return nil, nil + default: + return nil, fmt.Errorf("unsupported operator `%s` for the timestamp", expr.Operator) + } + default: + return expr, nil + } + default: + return nil, fmt.Errorf("unsupported expression: %+v", expression) + } +} diff --git a/connector/internal/raw_query.go b/connector/internal/raw_query.go new file mode 100644 index 0000000..bf155b6 --- /dev/null +++ b/connector/internal/raw_query.go @@ -0,0 +1,138 @@ +package internal + +import ( + "context" + "fmt" + + "github.com/hasura/ndc-prometheus/connector/client" + "github.com/hasura/ndc-prometheus/connector/metadata" + "github.com/hasura/ndc-sdk-go/schema" + "github.com/hasura/ndc-sdk-go/utils" + "go.opentelemetry.io/otel/trace" +) + +// rawQueryParameters the structured arguments which is evaluated from the raw expression +type rawQueryParameters struct { + Timestamp any + Start any + End any + Timeout any + Step any +} + +type RawQueryExecutor struct { + Client *client.Client + Tracer trace.Tracer + Runtime *metadata.RuntimeSettings + Request *schema.QueryRequest + Arguments map[string]any + + selection schema.NestedField +} + +// Explain explains the raw promQL query request +func (nqe *RawQueryExecutor) Explain(ctx context.Context) (*rawQueryParameters, string, error) { + params := &rawQueryParameters{} + var err error + var queryString string + for key, arg := range nqe.Arguments { + switch key { + case metadata.ArgumentKeyStart: + params.Start = arg + case metadata.ArgumentKeyEnd: + params.End = arg + case metadata.ArgumentKeyStep: + params.Step = arg + case metadata.ArgumentKeyTime: + params.Timestamp = arg + case metadata.ArgumentKeyTimeout: + params.Timeout = arg + case metadata.ArgumentKeyQuery: + queryString, err = utils.DecodeString(arg) + if err != nil { + return nil, "", schema.UnprocessableContentError(err.Error(), nil) + } + if queryString == "" { + return nil, "", schema.UnprocessableContentError("the query argument must not be empty", nil) + } + } + } + + return params, queryString, nil +} + +// Execute executes the raw promQL query request +func (nqe *RawQueryExecutor) Execute(ctx context.Context) (*schema.RowSet, error) { + ctx, span := nqe.Tracer.Start(ctx, "Execute Raw PromQL Query") + defer span.End() + params, queryString, err := nqe.Explain(ctx) + if err != nil { + return nil, err + } + return nqe.execute(ctx, params, queryString) +} + +func (nqe *RawQueryExecutor) execute(ctx context.Context, params *rawQueryParameters, queryString string) (*schema.RowSet, error) { + var err error + nqe.selection, err = utils.EvalFunctionSelectionFieldValue(nqe.Request) + if err != nil { + return nil, schema.UnprocessableContentError(err.Error(), nil) + } + + flat, err := utils.DecodeNullableBoolean(nqe.Arguments[metadata.ArgumentKeyFlat]) + if err != nil { + return nil, schema.UnprocessableContentError(fmt.Sprintf("expected boolean type for the flat field, got: %v", err), map[string]any{ + "field": metadata.ArgumentKeyFlat, + }) + } + if flat == nil { + flat = &nqe.Runtime.Flat + } + + var rawResults []map[string]any + if _, ok := nqe.Arguments[metadata.ArgumentKeyTime]; ok { + rawResults, err = nqe.queryInstant(ctx, queryString, params, *flat) + } else { + rawResults, err = nqe.queryRange(ctx, queryString, params, *flat) + } + + if err != nil { + return nil, err + } + + results, err := utils.EvalNestedColumnFields(nqe.selection, rawResults) + if err != nil { + return nil, err + } + + return &schema.RowSet{ + Aggregates: schema.RowSetAggregates{}, + Rows: []map[string]any{ + { + "__value": results, + }, + }, + }, nil +} + +func (nqe *RawQueryExecutor) queryInstant(ctx context.Context, queryString string, params *rawQueryParameters, flat bool) ([]map[string]any, error) { + vector, _, err := nqe.Client.Query(ctx, queryString, params.Timestamp, params.Timeout) + if err != nil { + return nil, schema.UnprocessableContentError(err.Error(), nil) + } + + results := createQueryResultsFromVector(vector, map[string]metadata.LabelInfo{}, nqe.Runtime, flat) + + return results, nil +} + +func (nqe *RawQueryExecutor) queryRange(ctx context.Context, queryString string, params *rawQueryParameters, flat bool) ([]map[string]any, error) { + matrix, _, err := nqe.Client.QueryRange(ctx, queryString, params.Start, params.End, params.Step, params.Timeout) + if err != nil { + return nil, schema.UnprocessableContentError(err.Error(), nil) + } + + results := createQueryResultsFromMatrix(matrix, map[string]metadata.LabelInfo{}, nqe.Runtime, flat) + + return results, nil +} diff --git a/connector/internal/utils.go b/connector/internal/utils.go index 9ddc059..dbb4106 100644 --- a/connector/internal/utils.go +++ b/connector/internal/utils.go @@ -2,11 +2,13 @@ package internal import ( "encoding/json" + "fmt" "slices" "time" "github.com/hasura/ndc-prometheus/connector/client" "github.com/hasura/ndc-prometheus/connector/metadata" + "github.com/hasura/ndc-sdk-go/schema" "github.com/hasura/ndc-sdk-go/utils" "github.com/prometheus/common/model" ) @@ -157,3 +159,37 @@ func intersection[T comparable](sliceA []T, sliceB []T) []T { return result } + +func getComparisonValue(input schema.ComparisonValue, variables map[string]any) (any, error) { + if len(input) == 0 { + return nil, nil + } + + switch v := input.Interface().(type) { + case *schema.ComparisonValueScalar: + return v.Value, nil + case *schema.ComparisonValueVariable: + if value, ok := variables[v.Name]; ok { + return value, nil + } + return nil, fmt.Errorf("variable %s does not exist", v.Name) + default: + return nil, fmt.Errorf("invalid comparison value: %v", input) + } +} + +func getComparisonValueString(input schema.ComparisonValue, variables map[string]any) (*string, error) { + rawValue, err := getComparisonValue(input, variables) + if err != nil { + return nil, err + } + return utils.DecodeNullableString(rawValue) +} + +func getComparisonValueStringSlice(input schema.ComparisonValue, variables map[string]any) ([]string, error) { + rawValue, err := getComparisonValue(input, variables) + if err != nil { + return nil, err + } + return decodeStringSlice(rawValue) +} diff --git a/connector/metadata/const.go b/connector/metadata/const.go index 291e6ac..4e72187 100644 --- a/connector/metadata/const.go +++ b/connector/metadata/const.go @@ -195,7 +195,6 @@ const ( objectName_HoltWintersInput = "HoltWintersInput" objectName_PredictLinearInput = "PredictLinearInput" objectName_QuantileOverTimeInput = "QuantileOverTimeInput" - objectName_NativeQueryLabelBoolExp = "NativeQueryLabelBoolExp" ) var defaultObjectTypes = map[string]schema.ObjectType{ @@ -263,35 +262,6 @@ var defaultObjectTypes = map[string]schema.ObjectType{ }, }, }, - objectName_NativeQueryLabelBoolExp: { - Description: utils.ToPtr("The boolean expression for native query labels"), - Fields: schema.ObjectTypeFields{ - Equal: schema.ObjectField{ - Description: utils.ToPtr("The equality operator"), - Type: schema.NewNullableNamedType(string(ScalarString)).Encode(), - }, - NotEqual: schema.ObjectField{ - Description: utils.ToPtr("The not-equality operator"), - Type: schema.NewNullableNamedType(string(ScalarString)).Encode(), - }, - In: schema.ObjectField{ - Description: utils.ToPtr("The in-array operator"), - Type: schema.NewNullableNamedType(string(ScalarJSON)).Encode(), - }, - NotIn: schema.ObjectField{ - Description: utils.ToPtr("The not-in-array operator"), - Type: schema.NewNullableNamedType(string(ScalarJSON)).Encode(), - }, - Regex: schema.ObjectField{ - Description: utils.ToPtr("The regular expression operator"), - Type: schema.NewNullableNamedType(string(ScalarString)).Encode(), - }, - NotRegex: schema.ObjectField{ - Description: utils.ToPtr("The falsy regular expression operator"), - Type: schema.NewNullableNamedType(string(ScalarString)).Encode(), - }, - }, - }, } const ( @@ -304,7 +274,6 @@ const ( ArgumentKeyOffset = "offset" ArgumentKeyQuery = "query" ArgumentKeyFunctions = "fn" - ArgumentKeyWhere = "where" ) var defaultArgumentInfos = map[string]schema.ArgumentInfo{ diff --git a/connector/metadata/native_operation.go b/connector/metadata/native_operation.go index e9d272d..6d59706 100644 --- a/connector/metadata/native_operation.go +++ b/connector/metadata/native_operation.go @@ -53,60 +53,45 @@ func (scb *connectorSchemaBuilder) buildNativeQueries() error { } func (scb *connectorSchemaBuilder) buildNativeQuery(name string, query *NativeQuery) error { - fn := schema.FunctionInfo{ - Name: name, - Description: query.Description, - Arguments: createNativeQueryArguments(), - } + + arguments := createCollectionArguments() for key, arg := range query.Arguments { - if _, ok := fn.Arguments[key]; ok { + if _, ok := arguments[key]; ok { return fmt.Errorf("argument `%s` is already used by the function", key) } - fn.Arguments[key] = schema.ArgumentInfo{ + arguments[key] = schema.ArgumentInfo{ Description: arg.Description, Type: schema.NewNamedType(string(ScalarString)).Encode(), } } - if len(query.Labels) > 0 { - resultType := schema.ObjectType{ - Fields: createQueryResultValuesObjectFields(), - } - - boolExpType := schema.ObjectType{ - Description: utils.ToPtr(fmt.Sprintf("Boolean expression of the native query %s", name)), - Fields: schema.ObjectTypeFields{}, - } - - for key, label := range query.Labels { - // build boolean expression argument - boolExpType.Fields[key] = schema.ObjectField{ - Type: schema.NewNullableNamedType(objectName_NativeQueryLabelBoolExp).Encode(), - } - - // build the result object type - resultType.Fields[key] = schema.ObjectField{ - Description: label.Description, - Type: schema.NewNamedType(string(ScalarString)).Encode(), - } - } - - objectName := fmt.Sprintf("%sResult", strcase.ToCamel(name)) - scb.ObjectTypes[objectName] = resultType + resultType := schema.ObjectType{ + Fields: createQueryResultValuesObjectFields(), + } - boolExpObjectName := fmt.Sprintf("%sBoolExp", strcase.ToCamel(name)) - scb.ObjectTypes[boolExpObjectName] = boolExpType - fn.Arguments[ArgumentKeyWhere] = schema.ArgumentInfo{ - Description: boolExpType.Description, - Type: schema.NewNullableNamedType(boolExpObjectName).Encode(), + for key, label := range query.Labels { + resultType.Fields[key] = schema.ObjectField{ + Description: label.Description, + Type: schema.NewNamedType(string(ScalarString)).Encode(), } + } - fn.ResultType = schema.NewArrayType(schema.NewNamedType(objectName)).Encode() - } else { - fn.ResultType = schema.NewArrayType(schema.NewNamedType(objectName_QueryResultValues)).Encode() + objectName := strcase.ToCamel(name) + if _, ok := scb.ObjectTypes[objectName]; ok { + objectName = fmt.Sprintf("%sResult", strcase.ToCamel(name)) + } + scb.ObjectTypes[objectName] = resultType + collection := schema.CollectionInfo{ + Name: name, + Type: objectName, + Arguments: arguments, + Description: query.Description, + ForeignKeys: schema.CollectionInfoForeignKeys{}, + UniquenessConstraints: schema.CollectionInfoUniquenessConstraints{}, } - scb.Functions[name] = fn + scb.Collections[name] = collection + return nil } @@ -126,16 +111,11 @@ func ReplaceNativeQueryVariable(query string, name string, value string) string return strings.ReplaceAll(query, fmt.Sprintf("${%s}", name), value) } -func createNativeQueryArguments() schema.FunctionInfoArguments { +func createPromQLQueryArguments() schema.FunctionInfoArguments { arguments := schema.FunctionInfoArguments{} for _, key := range []string{ArgumentKeyStart, ArgumentKeyEnd, ArgumentKeyStep, ArgumentKeyTime, ArgumentKeyTimeout, ArgumentKeyFlat} { arguments[key] = defaultArgumentInfos[key] } - return arguments -} - -func createPromQLQueryArguments() schema.FunctionInfoArguments { - arguments := createNativeQueryArguments() arguments[ArgumentKeyQuery] = schema.ArgumentInfo{ Description: utils.ToPtr("The raw promQL query"), Type: schema.NewNamedType(string(ScalarString)).Encode(), diff --git a/connector/query.go b/connector/query.go index 27a1884..47b0d39 100644 --- a/connector/query.go +++ b/connector/query.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "regexp" - "slices" "github.com/hasura/ndc-prometheus/connector/internal" "github.com/hasura/ndc-prometheus/connector/metadata" @@ -84,15 +83,14 @@ func (c *PrometheusConnector) execQuery(ctx context.Context, state *metadata.Sta span.SetAttributes(utils.JSONAttribute("arguments", arguments)) if request.Collection == metadata.FunctionPromQLQuery { - executor := &internal.NativeQueryExecutor{ - Tracer: state.Tracer, - Client: state.Client, - Runtime: c.runtime, - Request: request, - NativeQuery: &metadata.NativeQuery{}, - Arguments: arguments, + executor := &internal.RawQueryExecutor{ + Tracer: state.Tracer, + Client: state.Client, + Runtime: c.runtime, + Request: request, + Arguments: arguments, } - result, err := executor.ExecuteRaw(ctx) + result, err := executor.Execute(ctx) if err != nil { span.SetStatus(codes.Error, "failed to execute raw query") span.RecordError(err) @@ -121,6 +119,7 @@ func (c *PrometheusConnector) execQuery(ctx context.Context, state *metadata.Sta Request: request, NativeQuery: &nativeQuery, Arguments: arguments, + Variables: variables, } result, err := executor.Execute(ctx) if err != nil { @@ -179,7 +178,7 @@ func (c *PrometheusConnector) QueryExplain(ctx context.Context, conf *metadata.C requestVars = []schema.QueryRequestVariablesElem{make(schema.QueryRequestVariablesElem)} } - if slices.Contains([]string{metadata.FunctionPromQLQuery}, request.Collection) || c.apiHandler.QueryExists(request.Collection) { + if c.apiHandler.QueryExists(request.Collection) { return &schema.ExplainResponse{ Details: schema.ExplainResponseDetails{}, }, nil @@ -189,6 +188,26 @@ func (c *PrometheusConnector) QueryExplain(ctx context.Context, conf *metadata.C if err != nil { return nil, err } + + if request.Collection == metadata.FunctionPromQLQuery { + executor := &internal.RawQueryExecutor{ + Tracer: state.Tracer, + Client: state.Client, + Request: request, + Arguments: arguments, + } + _, queryString, err := executor.Explain(ctx) + if err != nil { + return nil, err + } + + return &schema.ExplainResponse{ + Details: schema.ExplainResponseDetails{ + "query": queryString, + }, + }, nil + } + if nativeQuery, ok := c.metadata.NativeOperations.Queries[request.Collection]; ok { executor := &internal.NativeQueryExecutor{ Tracer: state.Tracer, diff --git a/tests/engine/app/metadata/prometheus.hml b/tests/engine/app/metadata/prometheus.hml index e9a401c..c257f4d 100644 --- a/tests/engine/app/metadata/prometheus.hml +++ b/tests/engine/app/metadata/prometheus.hml @@ -3864,51 +3864,6 @@ definition: type: type: named name: String - NativeQueryLabelBoolExp: - description: The boolean expression for native query labels - fields: - _eq: - description: The equality operator - type: - type: nullable - underlying_type: - type: named - name: String - _in: - description: The in-array operator - type: - type: nullable - underlying_type: - type: named - name: JSON - _neq: - description: The not-equality operator - type: - type: nullable - underlying_type: - type: named - name: String - _nin: - description: The not-in-array operator - type: - type: nullable - underlying_type: - type: named - name: JSON - _nregex: - description: The falsy regular expression operator - type: - type: nullable - underlying_type: - type: named - name: String - _regex: - description: The regular expression operator - type: - type: nullable - underlying_type: - type: named - name: String NdcPrometheusQueryTotal: fields: collection: @@ -17549,22 +17504,7 @@ definition: element_type: type: named name: RuleGroup - ServiceUpBoolExp: - description: Boolean expression of the native query service_up - fields: - instance: - type: - type: nullable - underlying_type: - type: named - name: NativeQueryLabelBoolExp - job: - type: - type: nullable - underlying_type: - type: named - name: NativeQueryLabelBoolExp - ServiceUpResult: + ServiceUp: fields: instance: type: @@ -19403,6 +19343,47 @@ definition: type: PromhttpMetricHandlerRequestsTotal uniqueness_constraints: {} foreign_keys: {} + - name: service_up + arguments: + flat: + description: Flatten grouped values out the root array + type: + type: nullable + underlying_type: + type: named + name: Boolean + instance: + type: + type: named + name: String + job: + type: + type: named + name: String + offset: + description: The offset modifier allows changing the time offset for individual instant and range vectors in a query + type: + type: nullable + underlying_type: + type: named + name: Duration + step: + description: Query resolution step width in duration format or float number of seconds + type: + type: nullable + underlying_type: + type: named + name: Duration + timeout: + description: Evaluation timeout + type: + type: nullable + underlying_type: + type: named + name: Duration + type: ServiceUp + uniqueness_constraints: {} + foreign_keys: {} - name: target_info description: Target metadata arguments: @@ -19653,70 +19634,6 @@ definition: element_type: type: named name: QueryResultValues - - name: service_up - arguments: - end: - description: End timestamp. Use this argument if you want to run an range query - type: - type: nullable - underlying_type: - type: named - name: Timestamp - flat: - description: Flatten grouped values out the root array - type: - type: nullable - underlying_type: - type: named - name: Boolean - instance: - type: - type: named - name: String - job: - type: - type: named - name: String - start: - description: Start timestamp. Use this argument if you want to run an range query - type: - type: nullable - underlying_type: - type: named - name: Timestamp - step: - description: Query resolution step width in duration format or float number of seconds - type: - type: nullable - underlying_type: - type: named - name: Duration - time: - description: Evaluation timestamp. Use this argument if you want to run an instant query - type: - type: nullable - underlying_type: - type: named - name: Timestamp - timeout: - description: Evaluation timeout - type: - type: nullable - underlying_type: - type: named - name: Duration - where: - description: Boolean expression of the native query service_up - type: - type: nullable - underlying_type: - type: named - name: ServiceUpBoolExp - result_type: - type: array - element_type: - type: named - name: ServiceUpResult procedures: [] capabilities: version: 0.1.6 diff --git a/tests/engine/app/metadata/service_up.hml b/tests/engine/app/metadata/service_up.hml index b480473..34fff2b 100644 --- a/tests/engine/app/metadata/service_up.hml +++ b/tests/engine/app/metadata/service_up.hml @@ -2,7 +2,7 @@ kind: ObjectType version: v1 definition: - name: ServiceUpResult + name: ServiceUp fields: - name: instance type: String! @@ -21,17 +21,17 @@ definition: type: "[QueryResultValue!]!" description: An array of query result values graphql: - typeName: ServiceUpResult - inputTypeName: ServiceUpResult_input + typeName: ServiceUp + inputTypeName: ServiceUp_input dataConnectorTypeMapping: - dataConnectorName: prometheus - dataConnectorObjectType: ServiceUpResult + dataConnectorObjectType: ServiceUp --- kind: TypePermissions version: v1 definition: - typeName: ServiceUpResult + typeName: ServiceUp permissions: - role: admin output: @@ -44,131 +44,90 @@ definition: - values --- -kind: Command +kind: BooleanExpressionType +version: v1 +definition: + name: ServiceUp_bool_exp + operand: + object: + type: ServiceUp + comparableFields: + - fieldName: instance + booleanExpressionType: String_bool_exp + - fieldName: job + booleanExpressionType: String_bool_exp + - fieldName: labels + booleanExpressionType: LabelSet_bool_exp + - fieldName: timestamp + booleanExpressionType: Timestamp_bool_exp + - fieldName: value + booleanExpressionType: Decimal_bool_exp + comparableRelationships: [] + logicalOperators: + enable: true + isNull: + enable: true + graphql: + typeName: ServiceUp_bool_exp + +--- +kind: Model version: v1 definition: name: service_up - outputType: "[ServiceUpResult!]!" + objectType: ServiceUp arguments: - - name: end - type: Timestamp - description: End timestamp. Use this argument if you want to run an range query + - name: flat + type: Boolean + description: Flatten grouped values out the root array - name: instance type: String! - name: job type: String! - - name: start - type: Timestamp - description: Start timestamp. Use this argument if you want to run an range query + - name: offset + type: Duration + description: The offset modifier allows changing the time offset for individual + instant and range vectors in a query - name: step type: Duration - description: Query resolution step width in duration format or float number of - seconds. - - name: time - type: Timestamp - description: Evaluation timestamp. Use this argument if you want to run an - instant query + description: Query resolution step width in duration format or float number of seconds - name: timeout type: Duration description: Evaluation timeout - - name: flat - type: Boolean - description: Flatten nested the values group to the root array - - name: where - type: ServiceUpBoolExp - description: Boolean expression of the native query service_up source: dataConnectorName: prometheus - dataConnectorCommand: - function: service_up - graphql: - rootFieldName: service_up - rootFieldKind: Query - ---- -kind: CommandPermissions -version: v1 -definition: - commandName: service_up - permissions: - - role: admin - allowExecution: true - ---- -kind: ObjectType -version: v1 -definition: - name: NativeQueryLabelBoolExp - description: The boolean expression for native query labels - fields: - - name: _eq - type: String - description: The equality operator - - name: _in - type: JSON - description: The in-array operator - - name: _neq - type: String - description: The not-equality operator - - name: _nin - type: JSON - description: The not-in-array operator - - name: _nregex - type: String - description: The falsy regular expression operator - - name: _regex - type: String - description: The regular expression operator - graphql: - typeName: NativeQueryLabelBoolExp - inputTypeName: NativeQueryLabelBoolExp_input - dataConnectorTypeMapping: - - dataConnectorName: prometheus - dataConnectorObjectType: NativeQueryLabelBoolExp - ---- -kind: TypePermissions -version: v1 -definition: - typeName: NativeQueryLabelBoolExp - permissions: - - role: admin - output: - allowedFields: - - _eq - - _in - - _neq - - _nin - - _nregex - - _regex - ---- -kind: ObjectType -version: v1 -definition: - name: ServiceUpBoolExp - description: Boolean expression of the native query service_up - fields: - - name: instance - type: NativeQueryLabelBoolExp - - name: job - type: NativeQueryLabelBoolExp + collection: service_up + filterExpressionType: ServiceUp_bool_exp + orderableFields: + - fieldName: instance + orderByDirections: + enableAll: true + - fieldName: job + orderByDirections: + enableAll: true + - fieldName: labels + orderByDirections: + enableAll: true + - fieldName: timestamp + orderByDirections: + enableAll: true + - fieldName: value + orderByDirections: + enableAll: true graphql: - typeName: ServiceUpBoolExp - inputTypeName: ServiceUpBoolExp_input - dataConnectorTypeMapping: - - dataConnectorName: prometheus - dataConnectorObjectType: ServiceUpBoolExp + selectMany: + queryRootField: service_up + selectUniques: [] + argumentsInputType: service_up_arguments + orderByExpressionType: service_up_order_by --- -kind: TypePermissions +kind: ModelPermissions version: v1 definition: - typeName: ServiceUpBoolExp + modelName: service_up permissions: - role: admin - output: - allowedFields: - - instance - - job + select: + filter: null