Skip to content

Commit

Permalink
fixes of query service client internals
Browse files Browse the repository at this point in the history
  • Loading branch information
asmyasnikov committed Apr 9, 2024
1 parent 5d5dff4 commit 902c990
Show file tree
Hide file tree
Showing 90 changed files with 36,589 additions and 2,110 deletions.
119 changes: 25 additions & 94 deletions balancers/balancers.go
Original file line number Diff line number Diff line change
@@ -1,154 +1,85 @@
package balancers

import (
"sort"
"strings"

balancerConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer/config"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xstring"
)

// Deprecated: RoundRobin is RandomChoice now
func RoundRobin() *balancerConfig.Config {
return &balancerConfig.Config{}
return balancerConfig.New()
}

func RandomChoice() *balancerConfig.Config {
return &balancerConfig.Config{}
return balancerConfig.New()
}

func SingleConn() *balancerConfig.Config {
return &balancerConfig.Config{
SingleConn: true,
}
}

type filterLocalDC struct{}

func (filterLocalDC) Allow(info balancerConfig.Info, c conn.Conn) bool {
return c.Endpoint().Location() == info.SelfLocation
}

func (filterLocalDC) String() string {
return "LocalDC"
return balancerConfig.New(balancerConfig.UseSingleConn())
}

// PreferLocalDC creates balancer which use endpoints only in location such as initial endpoint location
// Balancer "balancer" defines balancing algorithm between endpoints selected with filter by location
// PreferLocalDC balancer try to autodetect local DC from client side.
func PreferLocalDC(balancer *balancerConfig.Config) *balancerConfig.Config {
balancer.Filter = filterLocalDC{}
balancer.DetectLocalDC = true

return balancer
return balancer.With(
balancerConfig.FilterLocalDC(),
balancerConfig.DetectLocalDC(),
)
}

// PreferLocalDCWithFallBack creates balancer which use endpoints only in location such as initial endpoint location
// Balancer "balancer" defines balancing algorithm between endpoints selected with filter by location
// If filter returned zero endpoints from all discovery endpoints list - used all endpoint instead
func PreferLocalDCWithFallBack(balancer *balancerConfig.Config) *balancerConfig.Config {
balancer = PreferLocalDC(balancer)
balancer.AllowFallback = true

return balancer
}

type filterLocations []string

func (locations filterLocations) Allow(_ balancerConfig.Info, c conn.Conn) bool {
location := strings.ToUpper(c.Endpoint().Location())
for _, l := range locations {
if location == l {
return true
}
}

return false
}

func (locations filterLocations) String() string {
buffer := xstring.Buffer()
defer buffer.Free()

buffer.WriteString("Locations{")
for i, l := range locations {
if i != 0 {
buffer.WriteByte(',')
}
buffer.WriteString(l)
}
buffer.WriteByte('}')

return buffer.String()
return PreferLocalDC(balancer).With(balancerConfig.AllowFallback())
}

// PreferLocations creates balancer which use endpoints only in selected locations (such as "ABC", "DEF", etc.)
// Balancer "balancer" defines balancing algorithm between endpoints selected with filter by location
func PreferLocations(balancer *balancerConfig.Config, locations ...string) *balancerConfig.Config {
if len(locations) == 0 {
panic("empty list of locations")
}
for i := range locations {
locations[i] = strings.ToUpper(locations[i])
}
sort.Strings(locations)
balancer.Filter = filterLocations(locations)

return balancer
return balancer.With(balancerConfig.FilterLocations(locations...))
}

// PreferLocationsWithFallback creates balancer which use endpoints only in selected locations
// Balancer "balancer" defines balancing algorithm between endpoints selected with filter by location
// If filter returned zero endpoints from all discovery endpoints list - used all endpoint instead
func PreferLocationsWithFallback(balancer *balancerConfig.Config, locations ...string) *balancerConfig.Config {
balancer = PreferLocations(balancer, locations...)
balancer.AllowFallback = true

return balancer
return balancer.With(
balancerConfig.FilterLocations(locations...),
balancerConfig.AllowFallback(),
)
}

type Endpoint interface {
NodeID() uint32
Address() string
Location() string

// Deprecated: LocalDC check "local" by compare endpoint location with discovery "selflocation" field.
// It work good only if connection url always point to local dc.
LocalDC() bool
}

type filterFunc func(info balancerConfig.Info, c conn.Conn) bool

func (p filterFunc) Allow(info balancerConfig.Info, c conn.Conn) bool {
return p(info, c)
}

func (p filterFunc) String() string {
return "Custom"
}

// Prefer creates balancer which use endpoints by filter
// Balancer "balancer" defines balancing algorithm between endpoints selected with filter
func Prefer(balancer *balancerConfig.Config, filter func(endpoint Endpoint) bool) *balancerConfig.Config {
balancer.Filter = filterFunc(func(_ balancerConfig.Info, c conn.Conn) bool {
return filter(c.Endpoint())
})

return balancer
return balancer.With(
balancerConfig.FilterFunc(func(_ balancerConfig.Info, c conn.Info) bool {
return filter(c.Endpoint())
}),
)
}

// PreferWithFallback creates balancer which use endpoints by filter
// Balancer "balancer" defines balancing algorithm between endpoints selected with filter
// If filter returned zero endpoints from all discovery endpoints list - used all endpoint instead
func PreferWithFallback(balancer *balancerConfig.Config, filter func(endpoint Endpoint) bool) *balancerConfig.Config {
balancer = Prefer(balancer, filter)
balancer.AllowFallback = true

return balancer
return balancer.With(
balancerConfig.FilterFunc(func(_ balancerConfig.Info, c conn.Info) bool {
return filter(c.Endpoint())
}),
balancerConfig.AllowFallback(),
)
}

// Default balancer used by default
func Default() *balancerConfig.Config {
return RandomChoice()
return balancerConfig.New()
}
58 changes: 28 additions & 30 deletions balancers/balancers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,65 +4,63 @@ import (
"testing"

"github.com/stretchr/testify/require"
"google.golang.org/grpc/connectivity"

balancerConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer/config"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/mock"
)

func TestPreferLocalDC(t *testing.T) {
conns := []conn.Conn{
&mock.Conn{AddrField: "1", LocationField: "1"},
&mock.Conn{AddrField: "2", State: conn.Online, LocationField: "2"},
&mock.Conn{AddrField: "3", State: conn.Online, LocationField: "2"},
conns := []conn.Info{
&mock.Conn{EndpointField: &mock.Endpoint{AddressField: "1", LocationField: "1"}},
&mock.Conn{EndpointField: &mock.Endpoint{AddressField: "2", LocationField: "2"}, StateField: connectivity.Ready},
&mock.Conn{EndpointField: &mock.Endpoint{AddressField: "3", LocationField: "2"}, StateField: connectivity.Ready},
}
rr := PreferLocalDC(RandomChoice())
require.False(t, rr.AllowFallback)
require.Equal(t, []conn.Conn{conns[1], conns[2]}, applyPreferFilter(balancerConfig.Info{SelfLocation: "2"}, rr, conns))
require.False(t, rr.AllowFallback())
require.Equal(t, []conn.Info{conns[1], conns[2]}, applyPreferFilter(balancerConfig.Info{SelfLocation: "2"}, rr, conns))
}

func TestPreferLocalDCWithFallBack(t *testing.T) {
conns := []conn.Conn{
&mock.Conn{AddrField: "1", LocationField: "1"},
&mock.Conn{AddrField: "2", State: conn.Online, LocationField: "2"},
&mock.Conn{AddrField: "3", State: conn.Online, LocationField: "2"},
conns := []conn.Info{
&mock.Conn{EndpointField: &mock.Endpoint{AddressField: "1", LocationField: "1"}},
&mock.Conn{EndpointField: &mock.Endpoint{AddressField: "2", LocationField: "2"}, StateField: connectivity.Ready},
&mock.Conn{EndpointField: &mock.Endpoint{AddressField: "3", LocationField: "2"}, StateField: connectivity.Ready},
}
rr := PreferLocalDCWithFallBack(RandomChoice())
require.True(t, rr.AllowFallback)
require.Equal(t, []conn.Conn{conns[1], conns[2]}, applyPreferFilter(balancerConfig.Info{SelfLocation: "2"}, rr, conns))
require.True(t, rr.AllowFallback())
require.Equal(t, []conn.Info{conns[1], conns[2]}, applyPreferFilter(balancerConfig.Info{SelfLocation: "2"}, rr, conns))
}

func TestPreferLocations(t *testing.T) {
conns := []conn.Conn{
&mock.Conn{AddrField: "1", LocationField: "zero", State: conn.Online},
&mock.Conn{AddrField: "2", State: conn.Online, LocationField: "one"},
&mock.Conn{AddrField: "3", State: conn.Online, LocationField: "two"},
conns := []conn.Info{
&mock.Conn{EndpointField: &mock.Endpoint{AddressField: "1", LocationField: "zero"}, StateField: connectivity.Ready},
&mock.Conn{EndpointField: &mock.Endpoint{AddressField: "2", LocationField: "one"}, StateField: connectivity.Ready},
&mock.Conn{EndpointField: &mock.Endpoint{AddressField: "3", LocationField: "two"}, StateField: connectivity.Ready},
}

rr := PreferLocations(RandomChoice(), "zero", "two")
require.False(t, rr.AllowFallback)
require.Equal(t, []conn.Conn{conns[0], conns[2]}, applyPreferFilter(balancerConfig.Info{}, rr, conns))
require.False(t, rr.AllowFallback())
require.Equal(t, []conn.Info{conns[0], conns[2]}, applyPreferFilter(balancerConfig.Info{}, rr, conns))
}

func TestPreferLocationsWithFallback(t *testing.T) {
conns := []conn.Conn{
&mock.Conn{AddrField: "1", LocationField: "zero", State: conn.Online},
&mock.Conn{AddrField: "2", State: conn.Online, LocationField: "one"},
&mock.Conn{AddrField: "3", State: conn.Online, LocationField: "two"},
conns := []conn.Info{
&mock.Conn{EndpointField: &mock.Endpoint{AddressField: "1", LocationField: "zero"}, StateField: connectivity.Ready},
&mock.Conn{EndpointField: &mock.Endpoint{AddressField: "2", LocationField: "one"}, StateField: connectivity.Ready},
&mock.Conn{EndpointField: &mock.Endpoint{AddressField: "3", LocationField: "two"}, StateField: connectivity.Ready},
}

rr := PreferLocationsWithFallback(RandomChoice(), "zero", "two")
require.True(t, rr.AllowFallback)
require.Equal(t, []conn.Conn{conns[0], conns[2]}, applyPreferFilter(balancerConfig.Info{}, rr, conns))
require.True(t, rr.AllowFallback())
require.Equal(t, []conn.Info{conns[0], conns[2]}, applyPreferFilter(balancerConfig.Info{}, rr, conns))
}

func applyPreferFilter(info balancerConfig.Info, b *balancerConfig.Config, conns []conn.Conn) []conn.Conn {
if b.Filter == nil {
b.Filter = filterFunc(func(info balancerConfig.Info, c conn.Conn) bool { return true })
}
res := make([]conn.Conn, 0, len(conns))
func applyPreferFilter(info balancerConfig.Info, b *balancerConfig.Config, conns []conn.Info) []conn.Info {
res := make([]conn.Info, 0, len(conns))
for _, c := range conns {
if b.Filter.Allow(info, c) {
if b.Filter().Allow(info, c) {
res = append(res, c)
}
}
Expand Down
Loading

0 comments on commit 902c990

Please sign in to comment.