diff --git a/connector/internal/collection.go b/connector/internal/collection.go index e33bfa1..4a0087e 100644 --- a/connector/internal/collection.go +++ b/connector/internal/collection.go @@ -118,18 +118,8 @@ func (qce *QueryCollectionExecutor) queryInstant(ctx context.Context, queryStrin return nil, schema.UnprocessableContentError(err.Error(), nil) } - qce.sortVector(vector, predicate.OrderBy) - - if qce.Request.Query.Offset != nil && *qce.Request.Query.Offset > 0 { - if len(vector) <= *qce.Request.Query.Offset { - return []map[string]any{}, nil - } - vector = vector[*qce.Request.Query.Offset:] - } - if qce.Request.Query.Limit != nil && *qce.Request.Query.Limit < len(vector) { - vector = vector[:*qce.Request.Query.Limit] - } - + sortVector(vector, predicate.OrderBy) + vector = paginateVector(vector, qce.Request.Query) results := createQueryResultsFromVector(vector, qce.Metric.Labels, qce.Runtime, flat) return results, nil } @@ -157,21 +147,10 @@ func (qce *QueryCollectionExecutor) queryRange(ctx context.Context, queryString return nil, schema.UnprocessableContentError(err.Error(), nil) } - qce.sortMatrix(matrix, predicate.OrderBy) - - if qce.Request.Query.Offset != nil && *qce.Request.Query.Offset > 0 { - if len(matrix) <= *qce.Request.Query.Offset { - return []map[string]any{}, nil - } - matrix = matrix[*qce.Request.Query.Offset:] - } + sortMatrix(matrix, predicate.OrderBy) results := createQueryResultsFromMatrix(matrix, qce.Metric.Labels, qce.Runtime, flat) - if qce.Request.Query.Limit != nil && *qce.Request.Query.Limit < len(results) { - results = results[:*qce.Request.Query.Limit] - } - - return results, nil + return paginateQueryResults(results, qce.Request.Query), nil } func (qce *QueryCollectionExecutor) buildQueryString(predicate *CollectionRequest) (string, bool, error) { diff --git a/connector/internal/native_query.go b/connector/internal/native_query.go index b2b0b05..39cc5bd 100644 --- a/connector/internal/native_query.go +++ b/connector/internal/native_query.go @@ -144,6 +144,8 @@ func (nqe *NativeQueryExecutor) queryInstant(ctx context.Context, queryString st return nil, schema.UnprocessableContentError(err.Error(), nil) } + sortVector(vector, params.OrderBy) + vector = paginateVector(vector, nqe.Request.Query) results := createQueryResultsFromVector(vector, nqe.NativeQuery.Labels, nqe.Runtime, flat) return results, nil @@ -161,9 +163,11 @@ func (nqe *NativeQueryExecutor) queryRange(ctx context.Context, queryString stri if err != nil { return nil, schema.UnprocessableContentError(err.Error(), nil) } + + sortMatrix(matrix, params.OrderBy) results := createQueryResultsFromMatrix(matrix, nqe.NativeQuery.Labels, nqe.Runtime, flat) - return results, nil + return paginateQueryResults(results, nqe.Request.Query), nil } func (nqe *NativeQueryExecutor) filterVectorResults(vector model.Vector, expr schema.Expression) (model.Vector, error) { diff --git a/connector/internal/sorting.go b/connector/internal/pagination.go similarity index 77% rename from connector/internal/sorting.go rename to connector/internal/pagination.go index 309ce71..52d73d0 100644 --- a/connector/internal/sorting.go +++ b/connector/internal/pagination.go @@ -6,10 +6,11 @@ import ( "strings" "github.com/hasura/ndc-prometheus/connector/metadata" + "github.com/hasura/ndc-sdk-go/schema" "github.com/prometheus/common/model" ) -func (qce *QueryCollectionExecutor) sortVector(vector model.Vector, sortElements []ColumnOrder) { +func sortVector(vector model.Vector, sortElements []ColumnOrder) { if len(sortElements) == 0 { return } @@ -58,7 +59,7 @@ func (qce *QueryCollectionExecutor) sortVector(vector model.Vector, sortElements }) } -func (qce *QueryCollectionExecutor) sortMatrix(matrix model.Matrix, sortElements []ColumnOrder) { +func sortMatrix(matrix model.Matrix, sortElements []ColumnOrder) { if len(sortElements) == 0 { return } @@ -135,3 +136,31 @@ func (qce *QueryCollectionExecutor) sortMatrix(matrix model.Matrix, sortElements return 0 }) } + +func paginateVector(vector model.Vector, q schema.Query) model.Vector { + if q.Offset != nil && *q.Offset > 0 { + if len(vector) <= *q.Offset { + return model.Vector{} + } + vector = vector[*q.Offset:] + } + if q.Limit != nil && *q.Limit < len(vector) { + vector = vector[:*q.Limit] + } + return vector +} + +func paginateQueryResults(results []map[string]any, q schema.Query) []map[string]any { + if q.Offset != nil && *q.Offset > 0 { + if len(results) <= *q.Offset { + return []map[string]any{} + } + results = results[*q.Offset:] + } + + if q.Limit != nil && *q.Limit < len(results) { + results = results[:*q.Limit] + } + + return results +}