From 9f6f344c6fea3628a7c7e6eebeeffb93a4fdd8cc Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Fri, 10 Nov 2023 12:36:46 +0300 Subject: [PATCH] added `Added` and `Dropped` fields into balancer update done info --- internal/balancer/balancer.go | 69 ++++++++++++---- internal/balancer/balancer_test.go | 125 +++++++++++++++++++++++++++++ internal/balancer/local_dc_test.go | 6 +- log/driver.go | 2 + trace/driver.go | 2 + trace/driver_gtrace.go | 6 +- 6 files changed, 190 insertions(+), 20 deletions(-) create mode 100644 internal/balancer/balancer_test.go diff --git a/internal/balancer/balancer.go b/internal/balancer/balancer.go index 8f348cd2e..c68207049 100644 --- a/internal/balancer/balancer.go +++ b/internal/balancer/balancer.go @@ -3,8 +3,8 @@ package balancer import ( "context" "fmt" - "google.golang.org/grpc" + "sort" "github.com/ydb-platform/ydb-go-sdk/v3/config" balancerConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer/config" @@ -33,7 +33,7 @@ type discoveryClient interface { type Balancer struct { driverConfig *config.Config - balancerConfig balancerConfig.Config + config balancerConfig.Config pool *conn.Pool discoveryClient discoveryClient discoveryRepeater repeater.Repeater @@ -46,7 +46,7 @@ type Balancer struct { } func (b *Balancer) HasNode(id uint32) bool { - if b.balancerConfig.SingleConn { + if b.config.SingleConn { return true } b.mu.RLock() @@ -114,7 +114,7 @@ func (b *Balancer) clusterDiscoveryAttempt(ctx context.Context) (err error) { return xerrors.WithStackTrace(err) } - if b.balancerConfig.DetectLocalDC { + if b.config.DetectLocalDC { localDC, err = b.localDCDetector(ctx, endpoints) if err != nil { return xerrors.WithStackTrace(err) @@ -126,16 +126,52 @@ func (b *Balancer) clusterDiscoveryAttempt(ctx context.Context) (err error) { return nil } +func endpointsDiff(newestEndpoints []endpoint.Endpoint, previousConns []conn.Conn) ( + nodes []trace.EndpointInfo, + added []trace.EndpointInfo, + dropped []trace.EndpointInfo, +) { + nodes = make([]trace.EndpointInfo, 0, len(newestEndpoints)) + added = make([]trace.EndpointInfo, 0, len(previousConns)) + dropped = make([]trace.EndpointInfo, 0, len(previousConns)) + var ( + newestMap = make(map[string]struct{}, len(newestEndpoints)) + previousMap = make(map[string]struct{}, len(previousConns)) + ) + sort.Slice(newestEndpoints, func(i, j int) bool { + return newestEndpoints[i].Address() < newestEndpoints[j].Address() + }) + sort.Slice(previousConns, func(i, j int) bool { + return previousConns[i].Endpoint().Address() < previousConns[j].Endpoint().Address() + }) + for _, e := range previousConns { + previousMap[e.Endpoint().Address()] = struct{}{} + } + for _, e := range newestEndpoints { + nodes = append(nodes, e.Copy()) + newestMap[e.Address()] = struct{}{} + if _, has := previousMap[e.Address()]; !has { + added = append(added, e.Copy()) + } + } + for _, c := range previousConns { + if _, has := newestMap[c.Endpoint().Address()]; !has { + dropped = append(dropped, c.Endpoint().Copy()) + } + } + return nodes, added, dropped +} + func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, endpoints []endpoint.Endpoint, localDC string) { - onDone := trace.DriverOnBalancerUpdate( - b.driverConfig.Trace(), &ctx, stack.FunctionID(0), b.balancerConfig.DetectLocalDC, + var ( + onDone = trace.DriverOnBalancerUpdate( + b.driverConfig.Trace(), &ctx, stack.FunctionID(0), b.config.DetectLocalDC, + ) + previousConns []conn.Conn ) defer func() { - nodes := make([]trace.EndpointInfo, 0, len(endpoints)) - for _, e := range endpoints { - nodes = append(nodes, e.Copy()) - } - onDone(nodes, localDC, nil) + nodes, added, dropped := endpointsDiff(endpoints, previousConns) + onDone(nodes, added, dropped, localDC, nil) }() connections := endpointsToConnections(b.pool, endpoints) @@ -145,7 +181,7 @@ func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, endpoints []end } info := balancerConfig.Info{SelfLocation: localDC} - state := newConnectionsState(connections, b.balancerConfig.Filter, info, b.balancerConfig.AllowFallback) + state := newConnectionsState(connections, b.config.Filter, info, b.config.AllowFallback) endpointsInfo := make([]endpoint.Info, len(endpoints)) for i, e := range endpoints { @@ -153,6 +189,9 @@ func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, endpoints []end } b.mu.WithLock(func() { + if b.connectionsState != nil { + previousConns = b.connectionsState.all + } b.connectionsState = state for _, onApplyDiscoveredEndpoints := range b.onApplyDiscoveredEndpoints { onApplyDiscoveredEndpoints(ctx, endpointsInfo) @@ -216,12 +255,12 @@ func New( b.discoveryClient = d if config := driverConfig.Balancer(); config == nil { - b.balancerConfig = balancerConfig.Config{} + b.config = balancerConfig.Config{} } else { - b.balancerConfig = *config + b.config = *config } - if b.balancerConfig.SingleConn { + if b.config.SingleConn { b.applyDiscoveredEndpoints(ctx, []endpoint.Endpoint{ endpoint.New(driverConfig.Endpoint()), }, "") diff --git a/internal/balancer/balancer_test.go b/internal/balancer/balancer_test.go new file mode 100644 index 000000000..356952f38 --- /dev/null +++ b/internal/balancer/balancer_test.go @@ -0,0 +1,125 @@ +package balancer + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/ydb-platform/ydb-go-sdk/v3/internal/conn" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/mock" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest" + "github.com/ydb-platform/ydb-go-sdk/v3/trace" +) + +func TestEndpointsDiff(t *testing.T) { + for _, tt := range []struct { + newestEndpoints []endpoint.Endpoint + previousConns []conn.Conn + nodes []trace.EndpointInfo + added []trace.EndpointInfo + dropped []trace.EndpointInfo + }{ + { + newestEndpoints: []endpoint.Endpoint{ + &mock.Endpoint{AddrField: "1"}, + &mock.Endpoint{AddrField: "3"}, + &mock.Endpoint{AddrField: "2"}, + &mock.Endpoint{AddrField: "0"}, + }, + previousConns: []conn.Conn{ + &mock.Conn{AddrField: "2"}, + &mock.Conn{AddrField: "1"}, + &mock.Conn{AddrField: "0"}, + &mock.Conn{AddrField: "3"}, + }, + nodes: []trace.EndpointInfo{ + &mock.Endpoint{AddrField: "0"}, + &mock.Endpoint{AddrField: "1"}, + &mock.Endpoint{AddrField: "2"}, + &mock.Endpoint{AddrField: "3"}, + }, + added: []trace.EndpointInfo{}, + dropped: []trace.EndpointInfo{}, + }, + { + newestEndpoints: []endpoint.Endpoint{ + &mock.Endpoint{AddrField: "1"}, + &mock.Endpoint{AddrField: "3"}, + &mock.Endpoint{AddrField: "2"}, + &mock.Endpoint{AddrField: "0"}, + }, + previousConns: []conn.Conn{ + &mock.Conn{AddrField: "1"}, + &mock.Conn{AddrField: "0"}, + &mock.Conn{AddrField: "3"}, + }, + nodes: []trace.EndpointInfo{ + &mock.Endpoint{AddrField: "0"}, + &mock.Endpoint{AddrField: "1"}, + &mock.Endpoint{AddrField: "2"}, + &mock.Endpoint{AddrField: "3"}, + }, + added: []trace.EndpointInfo{ + &mock.Endpoint{AddrField: "2"}, + }, + dropped: []trace.EndpointInfo{}, + }, + { + newestEndpoints: []endpoint.Endpoint{ + &mock.Endpoint{AddrField: "1"}, + &mock.Endpoint{AddrField: "3"}, + &mock.Endpoint{AddrField: "0"}, + }, + previousConns: []conn.Conn{ + &mock.Conn{AddrField: "1"}, + &mock.Conn{AddrField: "2"}, + &mock.Conn{AddrField: "0"}, + &mock.Conn{AddrField: "3"}, + }, + nodes: []trace.EndpointInfo{ + &mock.Endpoint{AddrField: "0"}, + &mock.Endpoint{AddrField: "1"}, + &mock.Endpoint{AddrField: "3"}, + }, + added: []trace.EndpointInfo{}, + dropped: []trace.EndpointInfo{ + &mock.Endpoint{AddrField: "2"}, + }, + }, + { + newestEndpoints: []endpoint.Endpoint{ + &mock.Endpoint{AddrField: "1"}, + &mock.Endpoint{AddrField: "3"}, + &mock.Endpoint{AddrField: "0"}, + }, + previousConns: []conn.Conn{ + &mock.Conn{AddrField: "4"}, + &mock.Conn{AddrField: "7"}, + &mock.Conn{AddrField: "8"}, + }, + nodes: []trace.EndpointInfo{ + &mock.Endpoint{AddrField: "0"}, + &mock.Endpoint{AddrField: "1"}, + &mock.Endpoint{AddrField: "3"}, + }, + added: []trace.EndpointInfo{ + &mock.Endpoint{AddrField: "0"}, + &mock.Endpoint{AddrField: "1"}, + &mock.Endpoint{AddrField: "3"}, + }, + dropped: []trace.EndpointInfo{ + &mock.Endpoint{AddrField: "4"}, + &mock.Endpoint{AddrField: "7"}, + &mock.Endpoint{AddrField: "8"}, + }, + }, + } { + t.Run(xtest.CurrentFileLine(), func(t *testing.T) { + nodes, added, dropped := endpointsDiff(tt.newestEndpoints, tt.previousConns) + require.Equal(t, tt.nodes, nodes) + require.Equal(t, tt.added, added) + require.Equal(t, tt.dropped, dropped) + }) + } +} diff --git a/internal/balancer/local_dc_test.go b/internal/balancer/local_dc_test.go index 26eaf38a6..2eab1e9a8 100644 --- a/internal/balancer/local_dc_test.go +++ b/internal/balancer/local_dc_test.go @@ -134,9 +134,9 @@ func TestLocalDCDiscovery(t *testing.T) { config.WithBalancer(balancers.PreferLocalDC(balancers.Default())), ) r := &Balancer{ - driverConfig: cfg, - balancerConfig: *cfg.Balancer(), - pool: conn.NewPool(context.Background(), cfg), + driverConfig: cfg, + config: *cfg.Balancer(), + pool: conn.NewPool(context.Background(), cfg), discoveryClient: discoveryMock{endpoints: []endpoint.Endpoint{ &mock.Endpoint{AddrField: "a:123", LocationField: "a"}, &mock.Endpoint{AddrField: "b:234", LocationField: "b"}, diff --git a/log/driver.go b/log/driver.go index 7ceb14631..8292f31bc 100644 --- a/log/driver.go +++ b/log/driver.go @@ -430,6 +430,8 @@ func internalDriver(l *wrapper, d trace.Detailer) (t trace.Driver) { //nolint:go l.Log(ctx, "done", latencyField(start), Stringer("endpoints", endpoints(info.Endpoints)), + Stringer("added", endpoints(info.Added)), + Stringer("dropped", endpoints(info.Dropped)), String("detectedLocalDC", info.LocalDC), ) } diff --git a/trace/driver.go b/trace/driver.go index 0ec751cf5..45b9f14fc 100644 --- a/trace/driver.go +++ b/trace/driver.go @@ -165,6 +165,8 @@ type ( } DriverBalancerUpdateDoneInfo struct { Endpoints []EndpointInfo + Added []EndpointInfo + Dropped []EndpointInfo LocalDC string // Deprecated: this field always nil Error error diff --git a/trace/driver_gtrace.go b/trace/driver_gtrace.go index c2dcb59ba..b51e2f07e 100644 --- a/trace/driver_gtrace.go +++ b/trace/driver_gtrace.go @@ -1664,15 +1664,17 @@ func DriverOnBalancerClusterDiscoveryAttempt(t *Driver, c *context.Context, call res(p) } } -func DriverOnBalancerUpdate(t *Driver, c *context.Context, call call, needLocalDC bool) func(endpoints []EndpointInfo, localDC string, _ error) { +func DriverOnBalancerUpdate(t *Driver, c *context.Context, call call, needLocalDC bool) func(endpoints []EndpointInfo, added []EndpointInfo, dropped []EndpointInfo, localDC string, _ error) { var p DriverBalancerUpdateStartInfo p.Context = c p.Call = call p.NeedLocalDC = needLocalDC res := t.onBalancerUpdate(p) - return func(endpoints []EndpointInfo, localDC string, e error) { + return func(endpoints []EndpointInfo, added []EndpointInfo, dropped []EndpointInfo, localDC string, e error) { var p DriverBalancerUpdateDoneInfo p.Endpoints = endpoints + p.Added = added + p.Dropped = dropped p.LocalDC = localDC p.Error = e res(p)