Skip to content

Commit

Permalink
Don't store entire predicate in memory, either
Browse files Browse the repository at this point in the history
  • Loading branch information
tonyhb committed Feb 1, 2024
1 parent 2a19745 commit 53529de
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 55 deletions.
35 changes: 32 additions & 3 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package expr

import (
"context"

"github.com/cespare/xxhash/v2"
)

type EngineType int
Expand All @@ -27,7 +29,7 @@ type MatchingEngine interface {
// expression parts received. Some may return false positives, but
// each MatchingEngine should NEVER omit ExpressionParts which match
// the given input.
Match(ctx context.Context, input map[string]any) ([]*ExpressionPart, error)
Match(ctx context.Context, input map[string]any) ([]*StoredExpressionPart, error)
// Add adds a new expression part to the matching engine for future matches.
Add(ctx context.Context, p ExpressionPart) error
// Remove removes an expression part from the matching engine, ensuring that the
Expand All @@ -42,7 +44,7 @@ type MatchingEngine interface {
// ignoring the variable name. Note that each MatchingEngine should NEVER
// omit ExpressionParts which match the given input; false positives are okay,
// but not returning valid matches must be impossible.
Search(ctx context.Context, variable string, input any) []*ExpressionPart
Search(ctx context.Context, variable string, input any) []*StoredExpressionPart
}

// Leaf represents the leaf within a tree. This stores all expressions
Expand Down Expand Up @@ -70,10 +72,21 @@ type ExpressionPart struct {
//
// This lets us determine whether the entire group has been matched.
GroupID groupID
Predicate Predicate
Predicate *Predicate
Parsed *ParsedExpression
}

func (p ExpressionPart) Hash() uint64 {
return xxhash.Sum64String(p.Predicate.String())
}

func (p ExpressionPart) EqualsStored(n *StoredExpressionPart) bool {
if p.GroupID != n.GroupID {
return false
}
return p.Hash() == n.PredicateID
}

func (p ExpressionPart) Equals(n ExpressionPart) bool {
if p.GroupID != n.GroupID {
return false
Expand All @@ -83,3 +96,19 @@ func (p ExpressionPart) Equals(n ExpressionPart) bool {
}
return p.Parsed.EvaluableID == n.Parsed.EvaluableID
}

func (p ExpressionPart) ToStored() *StoredExpressionPart {
return &StoredExpressionPart{
GroupID: p.GroupID,
Parsed: p.Parsed,
PredicateID: p.Hash(),
}
}

// StoredExpressionPart is a lightweight expression part which only stores
// a hash of the predicate to reduce memory usage.
type StoredExpressionPart struct {
GroupID groupID
PredicateID uint64
Parsed *ParsedExpression
}
24 changes: 12 additions & 12 deletions engine_null.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ func newNullMatcher() MatchingEngine {
return &nullLookup{
lock: &sync.RWMutex{},
paths: map[string]struct{}{},
null: map[string][]*ExpressionPart{},
not: map[string][]*ExpressionPart{},
null: map[string][]*StoredExpressionPart{},
not: map[string][]*StoredExpressionPart{},
}
}

Expand All @@ -24,16 +24,16 @@ type nullLookup struct {
// paths stores all variable names as JSON paths used within the engine.
paths map[string]struct{}

null map[string][]*ExpressionPart
not map[string][]*ExpressionPart
null map[string][]*StoredExpressionPart
not map[string][]*StoredExpressionPart
}

func (n *nullLookup) Type() EngineType {
return EngineTypeNullMatch
}

func (n *nullLookup) Match(ctx context.Context, data map[string]any) ([]*ExpressionPart, error) {
found := []*ExpressionPart{}
func (n *nullLookup) Match(ctx context.Context, data map[string]any) ([]*StoredExpressionPart, error) {
found := []*StoredExpressionPart{}
eg := errgroup.Group{}

for item := range n.paths {
Expand All @@ -59,7 +59,7 @@ func (n *nullLookup) Match(ctx context.Context, data map[string]any) ([]*Express
return found, eg.Wait()
}

func (n *nullLookup) Search(ctx context.Context, variable string, input any) []*ExpressionPart {
func (n *nullLookup) Search(ctx context.Context, variable string, input any) []*StoredExpressionPart {
if input == nil {
// The input data is null, so the only items that can match are equality
// comparisons to null.
Expand All @@ -85,18 +85,18 @@ func (n *nullLookup) Add(ctx context.Context, p ExpressionPart) error {
// Any other comparison is a not-null comparison.
if p.Predicate.Operator == operators.Equals {
if _, ok := n.null[varName]; !ok {
n.null[varName] = []*ExpressionPart{&p}
n.null[varName] = []*StoredExpressionPart{p.ToStored()}
return nil
}
n.null[varName] = append(n.null[varName], &p)
n.null[varName] = append(n.null[varName], p.ToStored())
return nil
}

if _, ok := n.not[varName]; !ok {
n.not[varName] = []*ExpressionPart{&p}
n.not[varName] = []*StoredExpressionPart{p.ToStored()}
return nil
}
n.not[varName] = append(n.not[varName], &p)
n.not[varName] = append(n.not[varName], p.ToStored())
return nil
}

Expand All @@ -117,7 +117,7 @@ func (n *nullLookup) Remove(ctx context.Context, p ExpressionPart) error {

// Remove the expression part from the leaf.
for i, eval := range coll {
if p.Equals(*eval) {
if p.EqualsStored(eval) {
coll = append(coll[:i], coll[i+1:]...)
if p.Predicate.Operator == operators.Equals {
n.null[p.Predicate.Ident] = coll
Expand Down
16 changes: 8 additions & 8 deletions engine_stringmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func newStringEqualityMatcher() MatchingEngine {
return &stringLookup{
lock: &sync.RWMutex{},
vars: map[string]struct{}{},
strings: map[string][]*ExpressionPart{},
strings: map[string][]*StoredExpressionPart{},
}
}

Expand All @@ -40,15 +40,15 @@ type stringLookup struct {
vars map[string]struct{}
// strings stores all strings referenced within expressions, mapped to the expression part.
// this performs string equality lookups.
strings map[string][]*ExpressionPart
strings map[string][]*StoredExpressionPart
}

func (s stringLookup) Type() EngineType {
return EngineTypeStringHash
}

func (n *stringLookup) Match(ctx context.Context, input map[string]any) ([]*ExpressionPart, error) {
found := []*ExpressionPart{}
func (n *stringLookup) Match(ctx context.Context, input map[string]any) ([]*StoredExpressionPart, error) {
found := []*StoredExpressionPart{}
eg := errgroup.Group{}

for item := range n.vars {
Expand Down Expand Up @@ -78,7 +78,7 @@ func (n *stringLookup) Match(ctx context.Context, input map[string]any) ([]*Expr

// Search returns all ExpressionParts which match the given input, ignoring the variable name
// entirely.
func (n *stringLookup) Search(ctx context.Context, variable string, input any) []*ExpressionPart {
func (n *stringLookup) Search(ctx context.Context, variable string, input any) []*StoredExpressionPart {
n.lock.RLock()
defer n.lock.RUnlock()
str, ok := input.(string)
Expand Down Expand Up @@ -109,10 +109,10 @@ func (n *stringLookup) Add(ctx context.Context, p ExpressionPart) error {
n.vars[p.Predicate.Ident] = struct{}{}

if _, ok := n.strings[val]; !ok {
n.strings[val] = []*ExpressionPart{&p}
n.strings[val] = []*StoredExpressionPart{p.ToStored()}
return nil
}
n.strings[val] = append(n.strings[val], &p)
n.strings[val] = append(n.strings[val], p.ToStored())

return nil
}
Expand All @@ -136,7 +136,7 @@ func (n *stringLookup) Remove(ctx context.Context, p ExpressionPart) error {

// Remove the expression part from the leaf.
for i, eval := range coll {
if p.Equals(*eval) {
if p.EqualsStored(eval) {
coll = append(coll[:i], coll[i+1:]...)
n.strings[val] = coll
return nil
Expand Down
58 changes: 33 additions & 25 deletions engine_stringmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,52 +12,60 @@ func TestEngineStringmap(t *testing.T) {
ctx := context.Background()
s := newStringEqualityMatcher().(*stringLookup)

a := ExpressionPart{
Predicate: &Predicate{
Ident: "async.data.id",
Literal: "123",
Operator: operators.Equals,
},
}
b := ExpressionPart{
Predicate: &Predicate{
Ident: "async.data.id",
Literal: "123",
Operator: operators.Equals,
},
}
c := ExpressionPart{
Predicate: &Predicate{
Ident: "async.data.another",
Literal: "456",
Operator: operators.Equals,
},
}

t.Run("It adds strings", func(t *testing.T) {
var err error

err = s.Add(ctx, ExpressionPart{
Predicate: Predicate{
Ident: "async.data.id",
Literal: "123",
Operator: operators.Equals,
},
})
require.NoError(t, err)

err = s.Add(ctx, ExpressionPart{
Predicate: Predicate{
Ident: "async.data.another",
Literal: "456",
Operator: operators.Equals,
},
})
err = s.Add(ctx, a)
require.NoError(t, err)

t.Run("Adding the same string twice", func(t *testing.T) {
err = s.Add(ctx, ExpressionPart{
Predicate: Predicate{
Ident: "async.data.id",
Literal: "123",
Operator: operators.Equals,
},
})
err = s.Add(ctx, b)
require.NoError(t, err)
require.Equal(t, 2, len(s.strings[s.hash("123")]))
})

// A different expression
err = s.Add(ctx, c)
require.NoError(t, err)
})

t.Run("It searches strings", func(t *testing.T) {
parts := s.Search(ctx, "async.data.id", "123")
require.Equal(t, 2, len(parts))

for _, part := range parts {
require.EqualValues(t, "123", part.Predicate.Literal)
require.EqualValues(t, part.PredicateID, a.Hash())
require.EqualValues(t, part.PredicateID, b.Hash())
}

t.Run("It ignores variable names (for now)", func(t *testing.T) {
parts = s.Search(ctx, "this doesn't matter", "123")
require.Equal(t, 2, len(parts))
for _, part := range parts {
require.EqualValues(t, "123", part.Predicate.Literal)
require.EqualValues(t, part.PredicateID, a.Hash())
require.EqualValues(t, part.PredicateID, b.Hash())
}
})

Expand Down
14 changes: 7 additions & 7 deletions expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type AggregateEvaluator interface {
Evaluate(ctx context.Context, data map[string]any) ([]Evaluable, int32, error)

// AggregateMatch returns all expression parts which are evaluable given the input data.
AggregateMatch(ctx context.Context, data map[string]any) ([]*ExpressionPart, error)
AggregateMatch(ctx context.Context, data map[string]any) ([]*StoredExpressionPart, error)

// Len returns the total number of aggregateable and constantly matched expressions
// stored in the evaluator.
Expand Down Expand Up @@ -199,8 +199,8 @@ func (a *aggregator) Evaluate(ctx context.Context, data map[string]any) ([]Evalu

// AggregateMatch attempts to match incoming data to all PredicateTrees, resulting in a selection
// of parts of an expression that have matched.
func (a *aggregator) AggregateMatch(ctx context.Context, data map[string]any) ([]*ExpressionPart, error) {
result := []*ExpressionPart{}
func (a *aggregator) AggregateMatch(ctx context.Context, data map[string]any) ([]*StoredExpressionPart, error) {
result := []*StoredExpressionPart{}

a.lock.RLock()
defer a.lock.RUnlock()
Expand All @@ -213,7 +213,7 @@ func (a *aggregator) AggregateMatch(ctx context.Context, data map[string]any) ([
// Note that having a count >= the group ID value does not guarantee that the expression is valid.
counts := map[groupID]int{}
// Store all expression parts per group ID for returning.
found := map[groupID][]*ExpressionPart{}
found := map[groupID][]*StoredExpressionPart{}
// protect the above locks with a map.
lock := &sync.Mutex{}

Expand All @@ -228,7 +228,7 @@ func (a *aggregator) AggregateMatch(ctx context.Context, data map[string]any) ([
for _, eval := range matched {
counts[eval.GroupID] += 1
if _, ok := found[eval.GroupID]; !ok {
found[eval.GroupID] = []*ExpressionPart{}
found[eval.GroupID] = []*StoredExpressionPart{}
}
found[eval.GroupID] = append(found[eval.GroupID], eval)
}
Expand Down Expand Up @@ -444,7 +444,7 @@ func (a *aggregator) addNode(ctx context.Context, n *Node, parsed *ParsedExpress
}
return engine.Add(ctx, ExpressionPart{
GroupID: n.GroupID,
Predicate: *n.Predicate,
Predicate: n.Predicate,
Parsed: parsed,
})
}
Expand Down Expand Up @@ -492,7 +492,7 @@ func (a *aggregator) removeNode(ctx context.Context, n *Node, parsed *ParsedExpr
}
return engine.Remove(ctx, ExpressionPart{
GroupID: n.GroupID,
Predicate: *n.Predicate,
Predicate: n.Predicate,
Parsed: parsed,
})
}
Expand Down

0 comments on commit 53529de

Please sign in to comment.