Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Go: XREAD. #2882

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions go/api/base_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1307,6 +1307,37 @@ func (client *baseClient) XAddWithOptions(
return handleStringOrNullResponse(result)
}

func (client *baseClient) XRead(keysAndIds map[string]string) (map[string]map[string][][]string, error) {
return client.XReadWithOptions(keysAndIds, options.NewXReadOptions())
}

func (client *baseClient) XReadWithOptions(
keysAndIds map[string]string,
options *options.XReadOptions,
) (map[string]map[string][][]string, error) {
args := make([]string, 0, 5+2*len(keysAndIds))
optionArgs, _ := options.ToArgs()
args = append(args, optionArgs...)

// Note: this loop iterates in an indeterminate order, but it is OK for that case
keys := make([]string, 0, len(keysAndIds))
values := make([]string, 0, len(keysAndIds))
for key := range keysAndIds {
keys = append(keys, key)
values = append(values, keysAndIds[key])
}
args = append(args, "STREAMS")
args = append(args, keys...)
args = append(args, values...)

result, err := client.executeCommand(C.XRead, args)
if err != nil {
return nil, err
}

return handleXReadResponse(result)
}

func (client *baseClient) ZAdd(
key string,
membersScoreMap map[string]float64,
Expand Down
42 changes: 37 additions & 5 deletions go/api/options/stream_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func NewXAddOptions() *XAddOptions {
return &XAddOptions{}
}

// New entry will be added with this `id.
// New entry will be added with this `id`.
func (xao *XAddOptions) SetId(id string) *XAddOptions {
xao.id = id
return xao
Expand All @@ -47,7 +47,6 @@ func (xao *XAddOptions) SetTrimOptions(options *XTrimOptions) *XAddOptions {

func (xao *XAddOptions) ToArgs() ([]string, error) {
args := []string{}
var err error
if xao.makeStream == triStateBoolFalse {
args = append(args, "NOMKSTREAM")
}
Expand All @@ -63,7 +62,7 @@ func (xao *XAddOptions) ToArgs() ([]string, error) {
} else {
args = append(args, "*")
}
return args, err
return args, nil
}

// Optional arguments for `XTrim` and `XAdd` in [StreamCommands]
Expand Down Expand Up @@ -115,6 +114,39 @@ func (xto *XTrimOptions) ToArgs() ([]string, error) {
if xto.limit > 0 {
args = append(args, "LIMIT", utils.IntToString(xto.limit))
}
var err error
return args, err
return args, nil
}

// Optional arguments for `XRead` in [StreamCommands]
type XReadOptions struct {
count, block int64
}

// Create new empty `XReadOptions`
func NewXReadOptions() *XReadOptions {
return &XReadOptions{-1, -1}
}

// The maximal number of elements requested. Equivalent to `COUNT` in the Valkey API.
func (xro *XReadOptions) SetCount(count int64) *XReadOptions {
xro.count = count
return xro
}

// If set, the request will be blocked for the set amount of milliseconds or until the server has
// the required number of entries. A value of `0` will block indefinitely. Equivalent to `BLOCK` in the Valkey API.
func (xro *XReadOptions) SetBlock(block int64) *XReadOptions {
xro.block = block
return xro
}

func (xro *XReadOptions) ToArgs() ([]string, error) {
args := []string{}
if xro.count >= 0 {
args = append(args, "COUNT", utils.IntToString(xro.count))
}
if xro.block >= 0 {
args = append(args, "BLOCK", utils.IntToString(xro.block))
}
return args, nil
}
104 changes: 104 additions & 0 deletions go/api/response_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import "C"

import (
"fmt"
"reflect"
"unsafe"
)

Expand Down Expand Up @@ -434,3 +435,106 @@ func convertToResultStringArray(input []interface{}) ([]Result[string], error) {
}
return result, nil
}

// get type of T
func getType[T any]() reflect.Type {
var zero [0]T
return reflect.TypeOf(zero).Elem()
}

// convert (typecast) untyped response into a typed value
// for example, an arbitrary array `[]interface{}` into `[]string`
type responseConverter interface {
convert(data interface{}) (interface{}, error)
}

// convert maps, T - type of the value, key is string
type mapConverter[T any] struct {
next responseConverter
}

func (node mapConverter[T]) convert(data interface{}) (interface{}, error) {
result := make(map[string]T)

for key, value := range data.(map[string]interface{}) {
if node.next == nil {
valueT, ok := value.(T)
if !ok {
return nil, &RequestError{fmt.Sprintf("Unexpected type received: %T, expected: %v", value, getType[T]())}
}
result[key] = valueT
} else {
val, err := node.next.convert(value)
if err != nil {
return nil, err
}
valueT, ok := val.(T)
if !ok {
return nil, &RequestError{fmt.Sprintf("Unexpected type received: %T, expected: %v", valueT, getType[T]())}
}
result[key] = valueT
}
}

return result, nil
}

// convert arrays, T - type of the value
type arrayConverter[T any] struct {
next responseConverter
}

func (node arrayConverter[T]) convert(data interface{}) (interface{}, error) {
arrData := data.([]interface{})
result := make([]T, 0, len(arrData))
for _, value := range arrData {
if node.next == nil {
valueT, ok := value.(T)
if !ok {
return nil, &RequestError{fmt.Sprintf("Unexpected type received: %T, expected: %v", value, getType[T]())}
}
result = append(result, valueT)
} else {
val, err := node.next.convert(value)
if err != nil {
return nil, err
}
valueT, ok := val.(T)
if !ok {
return nil, &RequestError{fmt.Sprintf("Unexpected type received: %T, expected: %v", valueT, getType[T]())}
}
result = append(result, valueT)
}
}

return result, nil
}

// TODO: convert sets

func handleXReadResponse(response *C.struct_CommandResponse) (map[string]map[string][][]string, error) {
data, err := parseMap(response)
if err != nil {
return nil, err
}
if data == nil {
return nil, nil
}

converters := mapConverter[map[string][][]string]{
mapConverter[[][]string]{
arrayConverter[[]string]{
arrayConverter[string]{},
},
},
}
Comment on lines +524 to +530
Copy link
Collaborator

@umit umit Jan 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Yury-Fridlyand I think that we can make the method chain here more readable as shown below.

converter := NewConverterBuilder().
        WithMapConverter[map[string][][]string]().
        WithMapConverter[[][]string]().
        WithArrayConverter[[]string]().
        WithArrayConverter[string]().
        Build()

res, err := converter.convert(data1)
if err != nil {
    return nil, err
}
  
// Pseudo ConverterBuilder
type ConverterBuilder struct {
    converter responseConverter
}

func NewConverterBuilder() *ConverterBuilder {
    return &ConverterBuilder{}
}

func (b *ConverterBuilder) WithMapConverter[T any]() *ConverterBuilder {
    b.converter = mapConverter[T]{next: b.converter}
    return b
}

func (b *ConverterBuilder) WithArrayConverter[T any]() *ConverterBuilder {
    b.converter = arrayConverter[T]{next: b.converter}
    return b
}

func (b *ConverterBuilder) Build() responseConverter {
    return b.converter
}


res, err := converters.convert(data)
if err != nil {
return nil, err
}
if result, ok := res.(map[string]map[string][][]string); ok {
return result, nil
}
return nil, &RequestError{fmt.Sprintf("unexpected type received: %T", res)}
}
52 changes: 52 additions & 0 deletions go/api/stream_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,56 @@ type StreamCommands interface {
//
// [valkey.io]: https://valkey.io/commands/xadd/
XAddWithOptions(key string, values [][]string, options *options.XAddOptions) (Result[string], error)

// Reads entries from the given streams.
//
// Note:
// When in cluster mode, all keys in `keysAndIds` must map to the same hash slot.
//
// See [valkey.io] for details.
//
// Parameters:
// keysAndIds - A map of keys and entry IDs to read from.
//
// Return value:
// A `map[string]map[string][][]string` of stream keys to a map of stream entry IDs mapped to an array entries or `nil` if
// a key does not exist or does not contain requiested entries.
//
// For example:
// result, err := client.XRead({"stream1": "0-0", "stream2": "0-1"})
// err == nil: true
// result: map[string]map[string][][]string{
// "stream1": {"0-1": {{"field1", "value1"}}, "0-2": {{"field2", "value2"}, {"field2", "value3"}}},
// "stream2": {},
// }
//
// [valkey.io]: https://valkey.io/commands/xread/
XRead(keysAndIds map[string]string) (map[string]map[string][][]string, error)

// Reads entries from the given streams.
//
// Note:
// When in cluster mode, all keys in `keysAndIds` must map to the same hash slot.
//
// See [valkey.io] for details.
//
// Parameters:
// keysAndIds - A map of keys and entry IDs to read from.
// options - Options detailing how to read the stream.
//
// Return value:
// A `map[string]map[string][][]string` of stream keys to a map of stream entry IDs mapped to an array entries or `nil` if
// a key does not exist or does not contain requiested entries.
//
// For example:
// options := options.NewXReadOptions().SetBlock(100500)
// result, err := client.XReadWithOptions({"stream1": "0-0", "stream2": "0-1"}, options)
// err == nil: true
// result: map[string]map[string][][]string{
// "stream1": {"0-1": {{"field1", "value1"}}, "0-2": {{"field2", "value2"}, {"field2", "value3"}}},
// "stream2": {},
// }
//
// [valkey.io]: https://valkey.io/commands/xread/
XReadWithOptions(keysAndIds map[string]string, options *options.XReadOptions) (map[string]map[string][][]string, error)
}
76 changes: 76 additions & 0 deletions go/integTest/shared_commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4016,6 +4016,82 @@ func (suite *GlideTestSuite) TestXAddWithOptions() {
})
}

func (suite *GlideTestSuite) TestXRead() {
suite.runWithDefaultClients(func(client api.BaseClient) {
key1 := "{xread}" + uuid.NewString()
key2 := "{xread}" + uuid.NewString()
key3 := "{xread}" + uuid.NewString()

// key does not exist
read, err := client.XRead(map[string]string{key1: "0-0"})
assert.Nil(suite.T(), err)
assert.Nil(suite.T(), read)

res, err := client.XAddWithOptions(
key1,
[][]string{{"k1_field1", "k1_value1"}, {"k1_field1", "k1_value2"}},
options.NewXAddOptions().SetId("0-1"),
)
assert.Nil(suite.T(), err)
assert.False(suite.T(), res.IsNil())

res, err = client.XAddWithOptions(key2, [][]string{{"k2_field1", "k2_value1"}}, options.NewXAddOptions().SetId("2-0"))
assert.Nil(suite.T(), err)
assert.False(suite.T(), res.IsNil())

// reading ID which does not exist yet
read, err = client.XRead(map[string]string{key1: "100-500"})
assert.Nil(suite.T(), err)
assert.Nil(suite.T(), read)

read, err = client.XRead(map[string]string{key1: "0-0", key2: "0-0"})
assert.Nil(suite.T(), err)
assert.Equal(suite.T(), map[string]map[string][][]string{
key1: {
"0-1": {{"k1_field1", "k1_value1"}, {"k1_field1", "k1_value2"}},
},
key2: {
"2-0": {{"k2_field1", "k2_value1"}},
},
}, read)

// Key exists, but it is not a stream
client.Set(key3, "xread")
_, err = client.XRead(map[string]string{key1: "0-0", key3: "0-0"})
assert.NotNil(suite.T(), err)
assert.IsType(suite.T(), &api.RequestError{}, err)

// ensure that commands doesn't time out even if timeout > request timeout
var testClient api.BaseClient
if _, ok := client.(api.GlideClient); ok {
testClient = suite.client(api.NewGlideClientConfiguration().
WithAddress(&suite.standaloneHosts[0]).
WithUseTLS(suite.tls))
} else {
testClient = suite.clusterClient(api.NewGlideClusterClientConfiguration().
WithAddress(&suite.clusterHosts[0]).
WithUseTLS(suite.tls))
}
read, err = testClient.XReadWithOptions(map[string]string{key1: "0-1"}, options.NewXReadOptions().SetBlock(1000))
assert.Nil(suite.T(), err)
assert.Nil(suite.T(), read)

// with 0 timeout (no timeout) should never time out,
// but we wrap the test with timeout to avoid test failing or stuck forever
finished := make(chan bool)
go func() {
testClient.XReadWithOptions(map[string]string{key1: "0-1"}, options.NewXReadOptions().SetBlock(0))
finished <- true
}()
select {
case <-finished:
assert.Fail(suite.T(), "Infinite block finished")
case <-time.After(3 * time.Second):
}
testClient.Close()
})
}

func (suite *GlideTestSuite) TestZAddAndZAddIncr() {
suite.runWithDefaultClients(func(client api.BaseClient) {
key := uuid.New().String()
Expand Down
2 changes: 1 addition & 1 deletion go/utils/transform_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func MapToString(parameter map[string]string) []string {
return flat
}

// Flattens a map[string, V] to a value-key string array
// Flattens a map[string, V] to a value-key string array like { value1, key1, value2, key2..}
func ConvertMapToValueKeyStringArray[V any](args map[string]V) []string {
result := make([]string, 0, len(args)*2)
for key, value := range args {
Expand Down
Loading