Skip to content

Commit

Permalink
chore: Updated redis version and refactored online read (#99)
Browse files Browse the repository at this point in the history
Co-authored-by: Bhargav Dodla <[email protected]>
  • Loading branch information
EXPEbdodla and Bhargav Dodla authored Apr 3, 2024
1 parent ac0e571 commit d3094da
Show file tree
Hide file tree
Showing 5 changed files with 175 additions and 24 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/google/uuid v1.3.0
github.com/mattn/go-sqlite3 v1.14.16
github.com/pkg/errors v0.9.1
github.com/redis/go-redis/v9 v9.1.0
github.com/redis/go-redis/v9 v9.5.1
github.com/spaolacci/murmur3 v1.1.0
github.com/stretchr/testify v1.8.4
google.golang.org/grpc v1.54.0
Expand Down Expand Up @@ -43,6 +43,7 @@ require (
)

require github.com/rs/zerolog v1.21.0

require github.com/ianlancetaylor/cgosymbolizer v0.0.0-20230801000641-8736a9d41aaa

require (
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+Ce
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8=
github.com/bsm/ginkgo/v2 v2.9.5 h1:rtVBYPs3+TC5iLUVOis1B9tjLTup7Cj5IfzosKtvTJ0=
github.com/bsm/gomega v1.26.0 h1:LhQm+AFcgV2M0WyKroMASzAzCAJVpAxQXv4SaI9a69Y=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
Expand Down Expand Up @@ -347,8 +347,8 @@ github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/redis/go-redis/v9 v9.1.0 h1:137FnGdk+EQdCbye1FW+qOEcY5S+SpY9T0NiuqvtfMY=
github.com/redis/go-redis/v9 v9.1.0/go.mod h1:urWj3He21Dj5k4TK1y59xH8Uj6ATueP8AH1cY3lZl4c=
github.com/redis/go-redis/v9 v9.5.1 h1:H1X4D3yHPaYrkL5X06Wh6xNVM/pX0Ft4RV0vMGvLBh8=
github.com/redis/go-redis/v9 v9.5.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/richardartoul/molecule v1.0.1-0.20221107223329-32cfee06a052 h1:Qp27Idfgi6ACvFQat5+VJvlYToylpM/hcyLBI3WaKPA=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
Expand Down
60 changes: 43 additions & 17 deletions go/internal/feast/onlinestore/redisonlinestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ import (
"encoding/binary"
"errors"
"fmt"
"os"
"sort"
"strconv"
"strings"

"github.com/feast-dev/feast/go/internal/feast/registry"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"

"github.com/redis/go-redis/v9"
"github.com/spaolacci/murmur3"
Expand All @@ -19,6 +21,7 @@ import (

"github.com/feast-dev/feast/go/protos/feast/serving"
"github.com/feast-dev/feast/go/protos/feast/types"
redistrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/redis/go-redis.v9"
)

type redisType int
Expand Down Expand Up @@ -102,20 +105,32 @@ func NewRedisOnlineStore(project string, config *registry.RepoConfig, onlineStor
}
}

// Metrics are not showing up when the service name is set to DD_SERVICE
redisTraceServiceName := os.Getenv("DD_SERVICE") + "-redis"
if redisTraceServiceName == "" {
redisTraceServiceName = "redis.client" // default service name if DD_SERVICE is not set
}

if redisStoreType == redisNode {
store.client = redis.NewClient(&redis.Options{
Addr: address[0],
Password: password, // No password set
DB: db,
TLSConfig: tlsConfig,
})
if strings.ToLower(os.Getenv("ENABLE_DATADOG_REDIS_TRACING")) == "true" {
redistrace.WrapClient(store.client, redistrace.WithServiceName(redisTraceServiceName))
}
} else if redisStoreType == redisCluster {
store.clusterClient = redis.NewClusterClient(&redis.ClusterOptions{
Addrs: []string{address[0]},
Password: password, // No password set
TLSConfig: tlsConfig,
ReadOnly: true,
})
if strings.ToLower(os.Getenv("ENABLE_DATADOG_REDIS_TRACING")) == "true" {
redistrace.WrapClient(store.clusterClient, redistrace.WithServiceName(redisTraceServiceName))
}
}

return &store, nil
Expand All @@ -142,18 +157,22 @@ func getRedisType(onlineStoreConfig map[string]interface{}) (redisType, error) {
return t, nil
}

func (r *RedisOnlineStore) OnlineRead(ctx context.Context, entityKeys []*types.EntityKey, featureViewNames []string, featureNames []string) ([][]FeatureData, error) {
featureCount := len(featureNames)
index := featureCount
func (r *RedisOnlineStore) buildFeatureViewIndices(featureViewNames []string, featureNames []string) (map[string]int, map[int]string, int) {
featureViewIndices := make(map[string]int)
indicesFeatureView := make(map[int]string)
index := len(featureNames)
for _, featureViewName := range featureViewNames {
if _, ok := featureViewIndices[featureViewName]; !ok {
featureViewIndices[featureViewName] = index
indicesFeatureView[index] = featureViewName
index += 1
}
}
return featureViewIndices, indicesFeatureView, index
}

func (r *RedisOnlineStore) buildHsetKeys(featureViewNames []string, featureNames []string, indicesFeatureView map[int]string, index int) ([]string, []string) {
featureCount := len(featureNames)
var hsetKeys = make([]string, index)
h := murmur3.New32()
intBuffer := h.Sum32()
Expand All @@ -172,52 +191,59 @@ func (r *RedisOnlineStore) OnlineRead(ctx context.Context, entityKeys []*types.E
hsetKeys[i] = tsKey
featureNames = append(featureNames, tsKey)
}
return hsetKeys, featureNames
}

func (r *RedisOnlineStore) buildRedisKeys(entityKeys []*types.EntityKey) ([]*[]byte, map[string]int, error) {
redisKeys := make([]*[]byte, len(entityKeys))
redisKeyToEntityIndex := make(map[string]int)
for i := 0; i < len(entityKeys); i++ {

var key, err = buildRedisKey(r.project, entityKeys[i], r.config.EntityKeySerializationVersion)
if err != nil {
return nil, err
return nil, nil, err
}
redisKeys[i] = key
redisKeyToEntityIndex[string(*key)] = i
}
return redisKeys, redisKeyToEntityIndex, nil
}

// Retrieve features from Redis
// TODO: Move context object out
func (r *RedisOnlineStore) OnlineRead(ctx context.Context, entityKeys []*types.EntityKey, featureViewNames []string, featureNames []string) ([][]FeatureData, error) {
span, _ := tracer.StartSpanFromContext(ctx, "redis.OnlineRead")
defer span.Finish()

results := make([][]FeatureData, len(entityKeys))
featureCount := len(featureNames)
featureViewIndices, indicesFeatureView, index := r.buildFeatureViewIndices(featureViewNames, featureNames)
hsetKeys, featureNamesWithTimeStamps := r.buildHsetKeys(featureViewNames, featureNames, indicesFeatureView, index)
redisKeys, redisKeyToEntityIndex, err := r.buildRedisKeys(entityKeys)
if err != nil {
return nil, err
}

results := make([][]FeatureData, len(entityKeys))
commands := map[string]*redis.SliceCmd{}

if r.t == redisNode {
pipe := r.client.Pipeline()

for _, redisKey := range redisKeys {
keyString := string(*redisKey)
commands[keyString] = pipe.HMGet(ctx, keyString, hsetKeys...)
}

_, err := pipe.Exec(ctx)
_, err = pipe.Exec(ctx)
if err != nil {
return nil, err
}
} else if r.t == redisCluster {
pipe := r.clusterClient.Pipeline()

for _, redisKey := range redisKeys {
keyString := string(*redisKey)
commands[keyString] = pipe.HMGet(ctx, keyString, hsetKeys...)
}

_, err := pipe.Exec(ctx)
_, err = pipe.Exec(ctx)
if err != nil {
return nil, err
}
}

var entityIndex int
var resContainsNonNil bool
for redisKey, values := range commands {
Expand All @@ -240,7 +266,7 @@ func (r *RedisOnlineStore) OnlineRead(ctx context.Context, entityKeys []*types.E

if resString == nil {
// TODO (Ly): Can there be nil result within each feature or they will all be returned as string proto of types.Value_NullVal proto?
featureName := featureNames[featureIndex]
featureName := featureNamesWithTimeStamps[featureIndex]
featureViewName := featureViewNames[featureIndex]
timeStampIndex := featureViewIndices[featureViewName]
timeStampInterface := res[timeStampIndex]
Expand All @@ -267,7 +293,7 @@ func (r *RedisOnlineStore) OnlineRead(ctx context.Context, entityKeys []*types.E
if err := proto.Unmarshal([]byte(valueString), &value); err != nil {
return nil, errors.New("error converting parsed redis Value to types.Value")
} else {
featureName := featureNames[featureIndex]
featureName := featureNamesWithTimeStamps[featureIndex]
featureViewName := featureViewNames[featureIndex]
timeStampIndex := featureViewIndices[featureViewName]
timeStampInterface := res[timeStampIndex]
Expand Down
126 changes: 125 additions & 1 deletion go/internal/feast/onlinestore/redisonlinestore_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package onlinestore

import (
"github.com/feast-dev/feast/go/internal/feast/registry"
"testing"

"github.com/feast-dev/feast/go/internal/feast/registry"
"github.com/feast-dev/feast/go/protos/feast/types"

"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -68,3 +70,125 @@ func TestNewRedisOnlineStoreWithSsl(t *testing.T) {
assert.Equal(t, opts.Addr, "redis://localhost:6379")
assert.NotNil(t, opts.TLSConfig)
}

func TestBuildFeatureViewIndices(t *testing.T) {
r := &RedisOnlineStore{}

t.Run("test with empty featureViewNames and featureNames", func(t *testing.T) {
featureViewIndices, indicesFeatureView, index := r.buildFeatureViewIndices([]string{}, []string{})
assert.Equal(t, 0, len(featureViewIndices))
assert.Equal(t, 0, len(indicesFeatureView))
assert.Equal(t, 0, index)
})

t.Run("test with non-empty featureNames and empty featureViewNames", func(t *testing.T) {
featureViewIndices, indicesFeatureView, index := r.buildFeatureViewIndices([]string{}, []string{"feature1", "feature2"})
assert.Equal(t, 0, len(featureViewIndices))
assert.Equal(t, 0, len(indicesFeatureView))
assert.Equal(t, 2, index)
})

t.Run("test with non-empty featureViewNames and featureNames", func(t *testing.T) {
featureViewIndices, indicesFeatureView, index := r.buildFeatureViewIndices([]string{"view1", "view2"}, []string{"feature1", "feature2"})
assert.Equal(t, 2, len(featureViewIndices))
assert.Equal(t, 2, len(indicesFeatureView))
assert.Equal(t, 4, index)
assert.Equal(t, "view1", indicesFeatureView[2])
assert.Equal(t, "view2", indicesFeatureView[3])
})

t.Run("test with duplicate featureViewNames", func(t *testing.T) {
featureViewIndices, indicesFeatureView, index := r.buildFeatureViewIndices([]string{"view1", "view1"}, []string{"feature1", "feature2"})
assert.Equal(t, 1, len(featureViewIndices))
assert.Equal(t, 1, len(indicesFeatureView))
assert.Equal(t, 3, index)
assert.Equal(t, "view1", indicesFeatureView[2])
})
}

func TestBuildHsetKeys(t *testing.T) {
r := &RedisOnlineStore{}

t.Run("test with empty featureViewNames and featureNames", func(t *testing.T) {
hsetKeys, featureNames := r.buildHsetKeys([]string{}, []string{}, map[int]string{}, 0)
assert.Equal(t, 0, len(hsetKeys))
assert.Equal(t, 0, len(featureNames))
})

t.Run("test with non-empty featureViewNames and featureNames", func(t *testing.T) {
hsetKeys, featureNames := r.buildHsetKeys([]string{"view1", "view2"}, []string{"feature1", "feature2"}, map[int]string{2: "view1", 3: "view2"}, 4)
assert.Equal(t, 4, len(hsetKeys))
assert.Equal(t, 4, len(featureNames))
assert.Equal(t, "_ts:view1", hsetKeys[2])
assert.Equal(t, "_ts:view2", hsetKeys[3])
assert.Contains(t, featureNames, "_ts:view1")
assert.Contains(t, featureNames, "_ts:view2")
})

t.Run("test with more featureViewNames than featureNames", func(t *testing.T) {
hsetKeys, featureNames := r.buildHsetKeys([]string{"view1", "view2", "view3"}, []string{"feature1", "feature2", "feature3"}, map[int]string{3: "view1", 4: "view2", 5: "view3"}, 6)
assert.Equal(t, 6, len(hsetKeys))
assert.Equal(t, 6, len(featureNames))
assert.Equal(t, "_ts:view1", hsetKeys[3])
assert.Equal(t, "_ts:view2", hsetKeys[4])
assert.Equal(t, "_ts:view3", hsetKeys[5])
assert.Contains(t, featureNames, "_ts:view1")
assert.Contains(t, featureNames, "_ts:view2")
assert.Contains(t, featureNames, "_ts:view3")
})
}

func TestBuildRedisKeys(t *testing.T) {
r := &RedisOnlineStore{
project: "test_project",
config: &registry.RepoConfig{
EntityKeySerializationVersion: 2,
},
}

entity_key1 := types.EntityKey{
JoinKeys: []string{"driver_id"},
EntityValues: []*types.Value{{Val: &types.Value_Int64Val{Int64Val: 1005}}},
}

entity_key2 := types.EntityKey{
JoinKeys: []string{"driver_id"},
EntityValues: []*types.Value{{Val: &types.Value_Int64Val{Int64Val: 1001}}},
}

error_entity_key1 := types.EntityKey{
JoinKeys: []string{"driver_id", "vehicle_id"},
EntityValues: []*types.Value{{Val: &types.Value_Int64Val{Int64Val: 1005}}},
}

t.Run("test with empty entityKeys", func(t *testing.T) {
redisKeys, redisKeyToEntityIndex, err := r.buildRedisKeys([]*types.EntityKey{})
assert.Nil(t, err)
assert.Equal(t, 0, len(redisKeys))
assert.Equal(t, 0, len(redisKeyToEntityIndex))
})

t.Run("test with single entityKey", func(t *testing.T) {
entityKeys := []*types.EntityKey{&entity_key1}
redisKeys, redisKeyToEntityIndex, err := r.buildRedisKeys(entityKeys)
assert.Nil(t, err)
assert.Equal(t, 1, len(redisKeys))
assert.Equal(t, 1, len(redisKeyToEntityIndex))
})

t.Run("test with multiple entityKeys", func(t *testing.T) {
entityKeys := []*types.EntityKey{
&entity_key1, &entity_key2,
}
redisKeys, redisKeyToEntityIndex, err := r.buildRedisKeys(entityKeys)
assert.Nil(t, err)
assert.Equal(t, 2, len(redisKeys))
assert.Equal(t, 2, len(redisKeyToEntityIndex))
})

t.Run("test with error in buildRedisKey", func(t *testing.T) {
entityKeys := []*types.EntityKey{&error_entity_key1}
_, _, err := r.buildRedisKeys(entityKeys)
assert.NotNil(t, err)
})
}
2 changes: 1 addition & 1 deletion go/internal/feast/server/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ func recoverMiddleware(next http.Handler) http.Handler {

func (s *httpServer) Serve(host string, port int) error {
if strings.ToLower(os.Getenv("ENABLE_DATADOG_TRACING")) == "true" {
tracer.Start()
tracer.Start(tracer.WithRuntimeMetrics())
defer tracer.Stop()
}
mux := httptrace.NewServeMux()
Expand Down

0 comments on commit d3094da

Please sign in to comment.