Skip to content

Commit

Permalink
added Added and Dropped fields into balancer update done info
Browse files Browse the repository at this point in the history
  • Loading branch information
asmyasnikov committed Nov 10, 2023
1 parent 6373d76 commit 9f6f344
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 20 deletions.
69 changes: 54 additions & 15 deletions internal/balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package balancer
import (
"context"
"fmt"

Check failure on line 5 in internal/balancer/balancer.go

View workflow job for this annotation

GitHub Actions / golangci-lint

File is not `gofumpt`-ed (gofumpt)

"google.golang.org/grpc"

Check failure on line 6 in internal/balancer/balancer.go

View workflow job for this annotation

GitHub Actions / golangci-lint

File is not `goimports`-ed with -local github.com/ydb-platform/ydb-go-sdk/v3 (goimports)
"sort"

Check failure on line 7 in internal/balancer/balancer.go

View workflow job for this annotation

GitHub Actions / golangci-lint

File is not `gofumpt`-ed (gofumpt)

"github.com/ydb-platform/ydb-go-sdk/v3/config"
balancerConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer/config"
Expand Down Expand Up @@ -33,7 +33,7 @@ type discoveryClient interface {

type Balancer struct {
driverConfig *config.Config
balancerConfig balancerConfig.Config
config balancerConfig.Config
pool *conn.Pool
discoveryClient discoveryClient
discoveryRepeater repeater.Repeater
Expand All @@ -46,7 +46,7 @@ type Balancer struct {
}

func (b *Balancer) HasNode(id uint32) bool {
if b.balancerConfig.SingleConn {
if b.config.SingleConn {
return true
}
b.mu.RLock()
Expand Down Expand Up @@ -114,7 +114,7 @@ func (b *Balancer) clusterDiscoveryAttempt(ctx context.Context) (err error) {
return xerrors.WithStackTrace(err)
}

if b.balancerConfig.DetectLocalDC {
if b.config.DetectLocalDC {
localDC, err = b.localDCDetector(ctx, endpoints)
if err != nil {
return xerrors.WithStackTrace(err)
Expand All @@ -126,16 +126,52 @@ func (b *Balancer) clusterDiscoveryAttempt(ctx context.Context) (err error) {
return nil
}

func endpointsDiff(newestEndpoints []endpoint.Endpoint, previousConns []conn.Conn) (
nodes []trace.EndpointInfo,
added []trace.EndpointInfo,
dropped []trace.EndpointInfo,
) {
nodes = make([]trace.EndpointInfo, 0, len(newestEndpoints))
added = make([]trace.EndpointInfo, 0, len(previousConns))
dropped = make([]trace.EndpointInfo, 0, len(previousConns))
var (
newestMap = make(map[string]struct{}, len(newestEndpoints))
previousMap = make(map[string]struct{}, len(previousConns))
)
sort.Slice(newestEndpoints, func(i, j int) bool {
return newestEndpoints[i].Address() < newestEndpoints[j].Address()
})
sort.Slice(previousConns, func(i, j int) bool {
return previousConns[i].Endpoint().Address() < previousConns[j].Endpoint().Address()
})
for _, e := range previousConns {
previousMap[e.Endpoint().Address()] = struct{}{}
}
for _, e := range newestEndpoints {
nodes = append(nodes, e.Copy())
newestMap[e.Address()] = struct{}{}
if _, has := previousMap[e.Address()]; !has {
added = append(added, e.Copy())
}
}
for _, c := range previousConns {
if _, has := newestMap[c.Endpoint().Address()]; !has {
dropped = append(dropped, c.Endpoint().Copy())
}
}
return nodes, added, dropped
}

func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, endpoints []endpoint.Endpoint, localDC string) {
onDone := trace.DriverOnBalancerUpdate(
b.driverConfig.Trace(), &ctx, stack.FunctionID(0), b.balancerConfig.DetectLocalDC,
var (
onDone = trace.DriverOnBalancerUpdate(
b.driverConfig.Trace(), &ctx, stack.FunctionID(0), b.config.DetectLocalDC,
)
previousConns []conn.Conn
)
defer func() {
nodes := make([]trace.EndpointInfo, 0, len(endpoints))
for _, e := range endpoints {
nodes = append(nodes, e.Copy())
}
onDone(nodes, localDC, nil)
nodes, added, dropped := endpointsDiff(endpoints, previousConns)
onDone(nodes, added, dropped, localDC, nil)
}()

connections := endpointsToConnections(b.pool, endpoints)
Expand All @@ -145,14 +181,17 @@ func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, endpoints []end
}

info := balancerConfig.Info{SelfLocation: localDC}
state := newConnectionsState(connections, b.balancerConfig.Filter, info, b.balancerConfig.AllowFallback)
state := newConnectionsState(connections, b.config.Filter, info, b.config.AllowFallback)

endpointsInfo := make([]endpoint.Info, len(endpoints))
for i, e := range endpoints {
endpointsInfo[i] = e
}

b.mu.WithLock(func() {
if b.connectionsState != nil {
previousConns = b.connectionsState.all
}
b.connectionsState = state
for _, onApplyDiscoveredEndpoints := range b.onApplyDiscoveredEndpoints {
onApplyDiscoveredEndpoints(ctx, endpointsInfo)
Expand Down Expand Up @@ -216,12 +255,12 @@ func New(
b.discoveryClient = d

if config := driverConfig.Balancer(); config == nil {
b.balancerConfig = balancerConfig.Config{}
b.config = balancerConfig.Config{}
} else {
b.balancerConfig = *config
b.config = *config
}

if b.balancerConfig.SingleConn {
if b.config.SingleConn {
b.applyDiscoveredEndpoints(ctx, []endpoint.Endpoint{
endpoint.New(driverConfig.Endpoint()),
}, "")
Expand Down
125 changes: 125 additions & 0 deletions internal/balancer/balancer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package balancer

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/mock"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest"
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
)

func TestEndpointsDiff(t *testing.T) {
for _, tt := range []struct {
newestEndpoints []endpoint.Endpoint
previousConns []conn.Conn
nodes []trace.EndpointInfo
added []trace.EndpointInfo
dropped []trace.EndpointInfo
}{
{
newestEndpoints: []endpoint.Endpoint{
&mock.Endpoint{AddrField: "1"},
&mock.Endpoint{AddrField: "3"},
&mock.Endpoint{AddrField: "2"},
&mock.Endpoint{AddrField: "0"},
},
previousConns: []conn.Conn{
&mock.Conn{AddrField: "2"},
&mock.Conn{AddrField: "1"},
&mock.Conn{AddrField: "0"},
&mock.Conn{AddrField: "3"},
},
nodes: []trace.EndpointInfo{
&mock.Endpoint{AddrField: "0"},
&mock.Endpoint{AddrField: "1"},
&mock.Endpoint{AddrField: "2"},
&mock.Endpoint{AddrField: "3"},
},
added: []trace.EndpointInfo{},
dropped: []trace.EndpointInfo{},
},
{
newestEndpoints: []endpoint.Endpoint{
&mock.Endpoint{AddrField: "1"},
&mock.Endpoint{AddrField: "3"},
&mock.Endpoint{AddrField: "2"},
&mock.Endpoint{AddrField: "0"},
},
previousConns: []conn.Conn{
&mock.Conn{AddrField: "1"},
&mock.Conn{AddrField: "0"},
&mock.Conn{AddrField: "3"},
},
nodes: []trace.EndpointInfo{
&mock.Endpoint{AddrField: "0"},
&mock.Endpoint{AddrField: "1"},
&mock.Endpoint{AddrField: "2"},
&mock.Endpoint{AddrField: "3"},
},
added: []trace.EndpointInfo{
&mock.Endpoint{AddrField: "2"},
},
dropped: []trace.EndpointInfo{},
},
{
newestEndpoints: []endpoint.Endpoint{
&mock.Endpoint{AddrField: "1"},
&mock.Endpoint{AddrField: "3"},
&mock.Endpoint{AddrField: "0"},
},
previousConns: []conn.Conn{
&mock.Conn{AddrField: "1"},
&mock.Conn{AddrField: "2"},
&mock.Conn{AddrField: "0"},
&mock.Conn{AddrField: "3"},
},
nodes: []trace.EndpointInfo{
&mock.Endpoint{AddrField: "0"},
&mock.Endpoint{AddrField: "1"},
&mock.Endpoint{AddrField: "3"},
},
added: []trace.EndpointInfo{},
dropped: []trace.EndpointInfo{
&mock.Endpoint{AddrField: "2"},
},
},
{
newestEndpoints: []endpoint.Endpoint{
&mock.Endpoint{AddrField: "1"},
&mock.Endpoint{AddrField: "3"},
&mock.Endpoint{AddrField: "0"},
},
previousConns: []conn.Conn{
&mock.Conn{AddrField: "4"},
&mock.Conn{AddrField: "7"},
&mock.Conn{AddrField: "8"},
},
nodes: []trace.EndpointInfo{
&mock.Endpoint{AddrField: "0"},
&mock.Endpoint{AddrField: "1"},
&mock.Endpoint{AddrField: "3"},
},
added: []trace.EndpointInfo{
&mock.Endpoint{AddrField: "0"},
&mock.Endpoint{AddrField: "1"},
&mock.Endpoint{AddrField: "3"},
},
dropped: []trace.EndpointInfo{
&mock.Endpoint{AddrField: "4"},
&mock.Endpoint{AddrField: "7"},
&mock.Endpoint{AddrField: "8"},
},
},
} {
t.Run(xtest.CurrentFileLine(), func(t *testing.T) {
nodes, added, dropped := endpointsDiff(tt.newestEndpoints, tt.previousConns)
require.Equal(t, tt.nodes, nodes)
require.Equal(t, tt.added, added)
require.Equal(t, tt.dropped, dropped)
})
}
}
6 changes: 3 additions & 3 deletions internal/balancer/local_dc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,9 @@ func TestLocalDCDiscovery(t *testing.T) {
config.WithBalancer(balancers.PreferLocalDC(balancers.Default())),
)
r := &Balancer{
driverConfig: cfg,
balancerConfig: *cfg.Balancer(),
pool: conn.NewPool(context.Background(), cfg),
driverConfig: cfg,
config: *cfg.Balancer(),
pool: conn.NewPool(context.Background(), cfg),
discoveryClient: discoveryMock{endpoints: []endpoint.Endpoint{
&mock.Endpoint{AddrField: "a:123", LocationField: "a"},
&mock.Endpoint{AddrField: "b:234", LocationField: "b"},
Expand Down
2 changes: 2 additions & 0 deletions log/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,8 @@ func internalDriver(l *wrapper, d trace.Detailer) (t trace.Driver) { //nolint:go
l.Log(ctx, "done",
latencyField(start),
Stringer("endpoints", endpoints(info.Endpoints)),
Stringer("added", endpoints(info.Added)),
Stringer("dropped", endpoints(info.Dropped)),
String("detectedLocalDC", info.LocalDC),
)
}
Expand Down
2 changes: 2 additions & 0 deletions trace/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ type (
}
DriverBalancerUpdateDoneInfo struct {
Endpoints []EndpointInfo
Added []EndpointInfo
Dropped []EndpointInfo
LocalDC string
// Deprecated: this field always nil
Error error
Expand Down
6 changes: 4 additions & 2 deletions trace/driver_gtrace.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 9f6f344

Please sign in to comment.