Skip to content

Commit

Permalink
Add lifted expression parsing (naive), support for ident matching
Browse files Browse the repository at this point in the history
  • Loading branch information
tonyhb committed Jan 4, 2024
1 parent 1702030 commit 63e3953
Show file tree
Hide file tree
Showing 9 changed files with 676 additions and 92 deletions.
113 changes: 113 additions & 0 deletions caching_parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package expr

import (
"regexp"
"strconv"
"strings"
"sync"
"sync/atomic"

"github.com/google/cel-go/cel"
// "github.com/karlseguin/ccache/v2"
)

var (
doubleQuoteMatch *regexp.Regexp
replace = []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"}
)

func init() {
doubleQuoteMatch = regexp.MustCompile(`"[^"]*"`)
}

// NewCachingParser returns a CELParser which lifts quoted literals out of the expression
// as variables and uses caching to cache expression parsing, resulting in improved
// performance when parsing expressions.
func NewCachingParser(env *cel.Env) CELParser {
return &cachingParser{
env: env,
}
}

type cachingParser struct {
// cache is a global cache of precompiled expressions.
// cache *ccache.Cache
stupidNoInternetCache sync.Map

env *cel.Env

hits int64
misses int64
}

// liftLiterals lifts quoted literals into variables, allowing us to normalize
// expressions to increase cache hit rates.
func liftLiterals(expr string) (string, map[string]any) {
// TODO: Optimize this please. Use strconv.Unquote as the basis, and perform
// searches across each index quotes.

// If this contains an escape sequence (eg. `\` or `\'`), skip the lifting
// of literals out of the expression.
if strings.Contains(expr, `\"`) || strings.Contains(expr, `\'`) {
return expr, nil
}

var (
counter int
vars = map[string]any{}
)

rewrite := func(str string) string {
if counter > len(replace) {
return str
}

idx := replace[counter]
if val, err := strconv.Unquote(str); err == nil {
str = val
}
vars[idx] = str

counter++
return VarPrefix + idx
}

expr = doubleQuoteMatch.ReplaceAllStringFunc(expr, rewrite)
return expr, vars
}

func (c *cachingParser) Parse(expr string) (*cel.Ast, *cel.Issues, map[string]any) {
expr, vars := liftLiterals(expr)

// TODO: ccache, when I have internet.
if cached, ok := c.stupidNoInternetCache.Load(expr); ok {
p := cached.(ParsedCelExpr)
atomic.AddInt64(&c.hits, 1)
return p.AST, p.Issues, vars
}

ast, issues := c.env.Parse(expr)

c.stupidNoInternetCache.Store(expr, ParsedCelExpr{
Expr: expr,
AST: ast,
Issues: issues,
})

atomic.AddInt64(&c.misses, 1)
return ast, issues, vars
}

func (c *cachingParser) Hits() int64 {
return atomic.LoadInt64(&c.hits)
}

func (c *cachingParser) Misses() int64 {
return atomic.LoadInt64(&c.misses)
}

type ParsedCelExpr struct {
Expr string
AST *cel.Ast
Issues *cel.Issues
}
140 changes: 140 additions & 0 deletions caching_parser_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package expr

import (
"testing"

"github.com/google/cel-go/cel"
"github.com/stretchr/testify/require"
)

func TestCachingParser_CachesSame(t *testing.T) {
c := cachingParser{env: newEnv()}

a := `event.data.a == "cache"`
b := `event.data.b == "cache"`

var (
prevAST *cel.Ast
prevIssues *cel.Issues
prevVars map[string]any
)

t.Run("With an uncached expression", func(t *testing.T) {
prevAST, prevIssues, prevVars = c.Parse(a)
require.NotNil(t, prevAST)
require.Nil(t, prevIssues)
require.NotNil(t, prevVars)
require.EqualValues(t, 0, c.Hits())
require.EqualValues(t, 1, c.Misses())
})

t.Run("With a cached expression", func(t *testing.T) {
ast, issues, vars := c.Parse(a)
require.NotNil(t, ast)
require.Nil(t, issues)

require.Equal(t, prevAST, ast)
require.Equal(t, prevIssues, issues)
require.Equal(t, prevVars, vars)

require.EqualValues(t, 1, c.Hits())
require.EqualValues(t, 1, c.Misses())
})

t.Run("With another uncached expression", func(t *testing.T) {
prevAST, prevIssues, prevVars = c.Parse(b)
require.NotNil(t, prevAST)
require.Nil(t, prevIssues)
// This misses the cache, as the vars have changed - not the
// literals.
require.EqualValues(t, 1, c.Hits())
require.EqualValues(t, 2, c.Misses())
})
}

func TestCachingParser_CacheIgnoreLiterals_Unescaped(t *testing.T) {
c := cachingParser{env: newEnv()}

a := `event.data.a == "literal-a" && event.data.b == "yes-1"`
b := `event.data.a == "literal-b" && event.data.b == "yes-2"`

var (
prevAST *cel.Ast
prevIssues *cel.Issues
prevVars map[string]any
)

t.Run("With an uncached expression", func(t *testing.T) {
prevAST, prevIssues, prevVars = c.Parse(a)
require.NotNil(t, prevAST)
require.Nil(t, prevIssues)
require.EqualValues(t, 0, c.Hits())
require.EqualValues(t, 1, c.Misses())
})

t.Run("With a cached expression", func(t *testing.T) {
ast, issues, vars := c.Parse(a)
require.NotNil(t, ast)
require.Nil(t, issues)

require.Equal(t, prevAST, ast)
require.Equal(t, prevIssues, issues)
require.Equal(t, prevVars, vars)

require.EqualValues(t, 1, c.Hits())
require.EqualValues(t, 1, c.Misses())
})

t.Run("With a cached expression having different literals ONLY", func(t *testing.T) {
prevAST, prevIssues, _ = c.Parse(b)
require.NotNil(t, prevAST)
require.Nil(t, prevIssues)
// This misses the cache.
require.EqualValues(t, 2, c.Hits())
require.EqualValues(t, 1, c.Misses())
})
}

/*
func TestCachingParser_CacheIgnoreLiterals_Escaped(t *testing.T) {
return
c := cachingParser{env: newEnv()}
a := `event.data.a == "literal\"-a" && event.data.b == "yes"`
b := `event.data.a == "literal\"-b" && event.data.b == "yes"`
var (
prevAST *cel.Ast
prevIssues *cel.Issues
)
t.Run("With an uncached expression", func(t *testing.T) {
prevAST, prevIssues = c.Parse(a)
require.NotNil(t, prevAST)
require.Nil(t, prevIssues)
require.EqualValues(t, 0, c.Hits())
require.EqualValues(t, 1, c.Misses())
})
t.Run("With a cached expression", func(t *testing.T) {
ast, issues := c.Parse(a)
require.NotNil(t, ast)
require.Nil(t, issues)
require.Equal(t, prevAST, ast)
require.Equal(t, prevIssues, issues)
require.EqualValues(t, 1, c.Hits())
require.EqualValues(t, 1, c.Misses())
})
t.Run("With a cached expression having different literals ONLY", func(t *testing.T) {
prevAST, prevIssues = c.Parse(b)
require.NotNil(t, prevAST)
require.Nil(t, prevIssues)
// This misses the cache.
require.EqualValues(t, 2, c.Hits())
require.EqualValues(t, 1, c.Misses())
})
}
*/
41 changes: 28 additions & 13 deletions expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ type AggregateEvaluator interface {
ConstantLen() int
}

func NewAggregateEvaluator(parser TreeParser, eval ExpressionEvaluator) AggregateEvaluator {
func NewAggregateEvaluator(
parser TreeParser,
eval ExpressionEvaluator,
) AggregateEvaluator {
return &aggregator{
eval: eval,
parser: parser,
Expand All @@ -81,7 +84,7 @@ type aggregator struct {

// constants tracks evaluable instances that must always be evaluated, due to
// the expression containing non-aggregateable clauses.
constants []Evaluable
constants []*ParsedExpression
}

// Len returns the total number of aggregateable and constantly matched expressions
Expand Down Expand Up @@ -118,13 +121,17 @@ func (a *aggregator) Evaluate(ctx context.Context, data map[string]any) ([]Evalu
// TODO: Concurrently match constant expressions using a semaphore for capacity.
for _, expr := range a.constants {
atomic.AddInt32(&matched, 1)
ok, evalerr := a.eval(ctx, expr, data)

// NOTE: We don't need to add lifted expression variables,
// because match.Parsed.Evaluable() returns the original expression
// string.
ok, evalerr := a.eval(ctx, expr.Evaluable, data)
if evalerr != nil {
err = errors.Join(err, evalerr)
continue
}
if ok {
result = append(result, expr)
result = append(result, expr.Evaluable)
}
}

Expand All @@ -138,13 +145,17 @@ func (a *aggregator) Evaluate(ctx context.Context, data map[string]any) ([]Evalu
// for each group ID and then skip evaluating expressions if so.
for _, match := range matches {
atomic.AddInt32(&matched, 1)
ok, evalerr := a.eval(ctx, match.Evaluable, data)

// NOTE: We don't need to add lifted expression variables,
// because match.Parsed.Evaluable() returns the original expression
// string.
ok, evalerr := a.eval(ctx, match.Parsed.Evaluable, data)
if evalerr != nil {
err = errors.Join(err, evalerr)
continue
}
if ok {
result = append(result, match.Evaluable)
result = append(result, match.Parsed.Evaluable)
}
}

Expand Down Expand Up @@ -189,20 +200,22 @@ func (a *aggregator) aggregateMatch(ctx context.Context, data map[string]any, pr
}

func (a *aggregator) Add(ctx context.Context, eval Evaluable) (bool, error) {
parsed, err := a.parser.Parse(ctx, eval.Expression())
parsed, err := a.parser.Parse(ctx, eval)
if err != nil {
return false, err
}

aggregateable := true
for _, g := range parsed.RootGroups() {
ok, err := a.addGroup(ctx, g, eval)
ok, err := a.addGroup(ctx, g, parsed)
if err != nil {
return false, err
}
if !ok && aggregateable {
// Add this expression as a constant once.
a.constants = append(a.constants, eval)
a.lock.Lock()
a.constants = append(a.constants, parsed)
a.lock.Unlock()
aggregateable = false
}
}
Expand All @@ -214,7 +227,7 @@ func (a *aggregator) Add(ctx context.Context, eval Evaluable) (bool, error) {
return aggregateable, nil
}

func (a *aggregator) addGroup(ctx context.Context, node *Node, eval Evaluable) (bool, error) {
func (a *aggregator) addGroup(ctx context.Context, node *Node, parsed *ParsedExpression) (bool, error) {
if len(node.Ors) > 0 {
// If there are additional branches, don't bother to add this to the aggregate tree.
// Mark this as a non-exhaustive addition and skip immediately.
Expand Down Expand Up @@ -246,7 +259,7 @@ func (a *aggregator) addGroup(ctx context.Context, node *Node, eval Evaluable) (
// items from the same identifier. If so, the evaluation is true.
groupID := newGroupID(uint16(len(all)))
for _, n := range all {
err := a.addNode(ctx, n, groupID, eval)
err := a.addNode(ctx, n, groupID, parsed)
if err == errTreeUnimplemented {
return false, nil
}
Expand All @@ -258,7 +271,7 @@ func (a *aggregator) addGroup(ctx context.Context, node *Node, eval Evaluable) (
return true, nil
}

func (a *aggregator) addNode(ctx context.Context, n *Node, gid groupID, eval Evaluable) error {
func (a *aggregator) addNode(ctx context.Context, n *Node, gid groupID, parsed *ParsedExpression) error {
// Don't allow anything to update in parallel.
a.lock.Lock()
defer a.lock.Unlock()
Expand All @@ -273,7 +286,7 @@ func (a *aggregator) addNode(ctx context.Context, n *Node, gid groupID, eval Eva
err := tree.Add(ctx, ExpressionPart{
GroupID: gid,
Predicate: *n.Predicate,
Evaluable: eval,
Parsed: parsed,
})
if err != nil {
return err
Expand All @@ -291,6 +304,8 @@ func (a *aggregator) Remove(ctx context.Context, eval Evaluable) error {

func isAggregateable(n *Node) bool {
if n.Predicate == nil {
// This is a parent node. We skip aggregateable checks and only
// return false based off of predicate information.
return true
}
switch n.Predicate.Literal.(type) {
Expand Down
Loading

0 comments on commit 63e3953

Please sign in to comment.