Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
asmyasnikov committed Mar 31, 2024
1 parent abe3bf3 commit cae12e7
Show file tree
Hide file tree
Showing 52 changed files with 33,737 additions and 510 deletions.
21 changes: 11 additions & 10 deletions balancers/balancers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"

"github.com/stretchr/testify/require"
"google.golang.org/grpc/connectivity"

balancerConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer/config"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
Expand All @@ -13,8 +14,8 @@ import (
func TestPreferLocalDC(t *testing.T) {
conns := []conn.Info{
&mock.ConnInfo{EndpointAddrField: "1", EndpointLocationField: "1"},
&mock.ConnInfo{EndpointAddrField: "2", ConnState: conn.Online, EndpointLocationField: "2"},
&mock.ConnInfo{EndpointAddrField: "3", ConnState: conn.Online, EndpointLocationField: "2"},
&mock.ConnInfo{EndpointAddrField: "2", ConnState: connectivity.Ready, EndpointLocationField: "2"},
&mock.ConnInfo{EndpointAddrField: "3", ConnState: connectivity.Ready, EndpointLocationField: "2"},
}
rr := PreferLocalDC(RandomChoice())
require.False(t, rr.AllowFallback)
Expand All @@ -24,8 +25,8 @@ func TestPreferLocalDC(t *testing.T) {
func TestPreferLocalDCWithFallBack(t *testing.T) {
conns := []conn.Info{
&mock.ConnInfo{EndpointAddrField: "1", EndpointLocationField: "1"},
&mock.ConnInfo{EndpointAddrField: "2", ConnState: conn.Online, EndpointLocationField: "2"},
&mock.ConnInfo{EndpointAddrField: "3", ConnState: conn.Online, EndpointLocationField: "2"},
&mock.ConnInfo{EndpointAddrField: "2", ConnState: connectivity.Ready, EndpointLocationField: "2"},
&mock.ConnInfo{EndpointAddrField: "3", ConnState: connectivity.Ready, EndpointLocationField: "2"},
}
rr := PreferLocalDCWithFallBack(RandomChoice())
require.True(t, rr.AllowFallback)
Expand All @@ -34,9 +35,9 @@ func TestPreferLocalDCWithFallBack(t *testing.T) {

func TestPreferLocations(t *testing.T) {
conns := []conn.Info{
&mock.ConnInfo{EndpointAddrField: "1", EndpointLocationField: "zero", ConnState: conn.Online},
&mock.ConnInfo{EndpointAddrField: "2", ConnState: conn.Online, EndpointLocationField: "one"},
&mock.ConnInfo{EndpointAddrField: "3", ConnState: conn.Online, EndpointLocationField: "two"},
&mock.ConnInfo{EndpointAddrField: "1", EndpointLocationField: "zero", ConnState: connectivity.Ready},
&mock.ConnInfo{EndpointAddrField: "2", ConnState: connectivity.Ready, EndpointLocationField: "one"},
&mock.ConnInfo{EndpointAddrField: "3", ConnState: connectivity.Ready, EndpointLocationField: "two"},
}

rr := PreferLocations(RandomChoice(), "zero", "two")
Expand All @@ -46,9 +47,9 @@ func TestPreferLocations(t *testing.T) {

func TestPreferLocationsWithFallback(t *testing.T) {
conns := []conn.Info{
&mock.ConnInfo{EndpointAddrField: "1", EndpointLocationField: "zero", ConnState: conn.Online},
&mock.ConnInfo{EndpointAddrField: "2", ConnState: conn.Online, EndpointLocationField: "one"},
&mock.ConnInfo{EndpointAddrField: "3", ConnState: conn.Online, EndpointLocationField: "two"},
&mock.ConnInfo{EndpointAddrField: "1", EndpointLocationField: "zero", ConnState: connectivity.Ready},
&mock.ConnInfo{EndpointAddrField: "2", ConnState: connectivity.Ready, EndpointLocationField: "one"},
&mock.ConnInfo{EndpointAddrField: "3", ConnState: connectivity.Ready, EndpointLocationField: "two"},
}

rr := PreferLocationsWithFallback(RandomChoice(), "zero", "two")
Expand Down
159 changes: 78 additions & 81 deletions internal/balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"context"
"fmt"
"sort"
"sync/atomic"

"google.golang.org/grpc"
grpcCodes "google.golang.org/grpc/codes"

"github.com/ydb-platform/ydb-go-sdk/v3/config"
balancerConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer/config"
Expand All @@ -19,7 +21,6 @@ import (
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync"
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
)
Expand All @@ -40,33 +41,13 @@ type Balancer struct {
discoveryRepeater repeater.Repeater
localDCDetector func(ctx context.Context, endpoints []endpoint.Endpoint) (string, error)

mu xsync.RWMutex
connectionsState *connectionsState[conn.Conn]
connections atomic.Pointer[connections[conn.Conn]]

closed chan struct{}

onApplyDiscoveredEndpoints []func(ctx context.Context, endpoints []endpoint.Info)
}

func (b *Balancer) HasNode(id uint32) bool {
if b.config.SingleConn {
return true
}
b.mu.RLock()
defer b.mu.RUnlock()
if _, has := b.connectionsState.connByNodeID[id]; has {
return true
}

return false
}

func (b *Balancer) OnUpdate(onApplyDiscoveredEndpoints func(ctx context.Context, endpoints []endpoint.Info)) {
b.mu.WithLock(func() {
b.onApplyDiscoveredEndpoints = append(b.onApplyDiscoveredEndpoints, onApplyDiscoveredEndpoints)
})
}

func (b *Balancer) clusterDiscovery(ctx context.Context) (err error) {
return retry.Retry(
repeater.WithEvent(ctx, repeater.EventInit),
Expand Down Expand Up @@ -135,37 +116,37 @@ func (b *Balancer) clusterDiscoveryAttempt(ctx context.Context) (err error) {
return nil
}

func endpointsDiff(newestEndpoints []endpoint.Endpoint, previousConns []conn.Info) (
func endpointsDiff(newestEndpoints []trace.EndpointInfo, previousEndpoints []trace.EndpointInfo) (

Check failure on line 119 in internal/balancer/balancer.go

View workflow job for this annotation

GitHub Actions / golangci-lint

paramTypeCombine: func(newestEndpoints []trace.EndpointInfo, previousEndpoints []trace.EndpointInfo) (
nodes []trace.EndpointInfo,
added []trace.EndpointInfo,
dropped []trace.EndpointInfo,
) {
nodes = make([]trace.EndpointInfo, 0, len(newestEndpoints))
added = make([]trace.EndpointInfo, 0, len(previousConns))
dropped = make([]trace.EndpointInfo, 0, len(previousConns))
added = make([]trace.EndpointInfo, 0, len(previousEndpoints))
dropped = make([]trace.EndpointInfo, 0, len(previousEndpoints))
var (
newestMap = make(map[string]struct{}, len(newestEndpoints))
previousMap = make(map[string]struct{}, len(previousConns))
previousMap = make(map[string]struct{}, len(previousEndpoints))
)
sort.Slice(newestEndpoints, func(i, j int) bool {
return newestEndpoints[i].Address() < newestEndpoints[j].Address()
})
sort.Slice(previousConns, func(i, j int) bool {
return previousConns[i].Endpoint().Address() < previousConns[j].Endpoint().Address()
sort.Slice(previousEndpoints, func(i, j int) bool {
return previousEndpoints[i].Address() < previousEndpoints[j].Address()
})
for _, e := range previousConns {
previousMap[e.Endpoint().Address()] = struct{}{}
for _, e := range previousEndpoints {
previousMap[e.Address()] = struct{}{}
}
for _, e := range newestEndpoints {
nodes = append(nodes, e.Copy())
newestMap[e.Address()] = struct{}{}
if _, has := previousMap[e.Address()]; !has {
added = append(added, e.Copy())
for _, info := range newestEndpoints {
nodes = append(nodes, info)
newestMap[info.Address()] = struct{}{}
if _, has := previousMap[info.Address()]; !has {
added = append(added, info)
}
}
for _, c := range previousConns {
if _, has := newestMap[c.Endpoint().Address()]; !has {
dropped = append(dropped, c.Endpoint().Copy())
for _, info := range previousEndpoints {
if _, has := newestMap[info.Address()]; !has {
dropped = append(dropped, info)
}
}

Expand All @@ -180,41 +161,28 @@ func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, endpoints []end
"github.com/ydb-platform/ydb-go-sdk/3/internal/balancer.(*Balancer).applyDiscoveredEndpoints"),
b.config.DetectLocalDC,
)
previousConns []conn.Info
)
defer func() {
nodes, added, dropped := endpointsDiff(endpoints, previousConns)
onDone(nodes, added, dropped, localDC)
}()

connections := endpointsToConnections(b.pool, endpoints)
for _, c := range connections {
if c.State() == conn.Banned {
b.pool.Unban(ctx, c)
}
c.Endpoint().Touch()
}

info := balancerConfig.Info{SelfLocation: localDC}
state := newConnectionsState(connections, b.config.Filter, info, b.config.AllowFallback)

endpointsInfo := make([]endpoint.Info, len(endpoints))
for i, e := range endpoints {
endpointsInfo[i] = e
}

b.mu.WithLock(func() {
if b.connectionsState != nil {
previousConns = make([]conn.Info, len(b.connectionsState.all))
for i := range b.connectionsState.all {
previousConns[i] = b.connectionsState.all[i]
}
}
b.connectionsState = state
for _, onApplyDiscoveredEndpoints := range b.onApplyDiscoveredEndpoints {
onApplyDiscoveredEndpoints(ctx, endpointsInfo)
newestConnections := newConns(connections, b.config.Filter, info, b.config.AllowFallback)
previousConnections := b.connections.Swap(newestConnections)
defer func() {
if previousConnections != nil {
nodes, added, dropped := endpointsDiff(newestConnections.all.ToTraceEndpointInfo(), previousConnections.all.ToTraceEndpointInfo())

Check failure on line 176 in internal/balancer/balancer.go

View workflow job for this annotation

GitHub Actions / golangci-lint

line is 133 characters (lll)
onDone(nodes, added, dropped, localDC)
} else {
nodes, added, dropped := endpointsDiff(newestConnections.all.ToTraceEndpointInfo(), nil)
onDone(nodes, added, dropped, localDC)
}
})
}()
for _, onApplyDiscoveredEndpoints := range b.onApplyDiscoveredEndpoints {
onApplyDiscoveredEndpoints(ctx, newestConnections.all.ToEndpointInfo())
}
}

func (b *Balancer) Close(ctx context.Context) (err error) {
Expand All @@ -241,6 +209,44 @@ func (b *Balancer) Close(ctx context.Context) (err error) {
return nil
}

func (b *Balancer) markConnAsBad(ctx context.Context, cc conn.Conn, cause error) {
onDone := trace.DriverOnBalancerMarkConnAsBad(
b.driverConfig.Trace(), &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/balancer.(*Balancer).markConnAsBad"),
cc.Endpoint(), cause,
)

if !xerrors.IsTransportError(cause,
grpcCodes.ResourceExhausted,
grpcCodes.Unavailable,
// grpcCodes.OK,
// grpcCodes.Canceled,
// grpcCodes.Unknown,
// grpcCodes.InvalidArgument,
// grpcCodes.DeadlineExceeded,
// grpcCodes.NotFound,
// grpcCodes.AlreadyExists,
// grpcCodes.PermissionDenied,
// grpcCodes.FailedPrecondition,
// grpcCodes.Aborted,
// grpcCodes.OutOfRange,
// grpcCodes.Unimplemented,
// grpcCodes.Internal,
// grpcCodes.DataLoss,
// grpcCodes.Unauthenticated,
) {
return
}

newestConns, changed := b.connections.Load().withBadConn(cc)

if changed {
b.connections.Store(newestConns)
}

onDone(newestConns.prefer.ToTraceEndpointInfo(), newestConns.fallback.ToTraceEndpointInfo())
}

func New(
ctx context.Context,
driverConfig *config.Config,
Expand Down Expand Up @@ -353,10 +359,8 @@ func (b *Balancer) wrapCall(ctx context.Context, f func(ctx context.Context, cc
}

defer func() {
if err == nil {
b.pool.Unban(ctx, cc)
} else if xerrors.MustBanConn(err, b.driverConfig.ExcludeGRPCCodesForPessimization()...) {
b.pool.Ban(ctx, cc, err)
if err != nil {
b.markConnAsBad(ctx, cc, err)
}
}()

Expand All @@ -383,13 +387,6 @@ func (b *Balancer) wrapCall(ctx context.Context, f func(ctx context.Context, cc
return nil
}

func (b *Balancer) connections() *connectionsState[conn.Conn] {
b.mu.RLock()
defer b.mu.RUnlock()

return b.connectionsState
}

func (b *Balancer) getConn(ctx context.Context) (c conn.Conn, err error) {
onDone := trace.DriverOnBalancerChooseEndpoint(
b.driverConfig.Trace(), &ctx,
Expand All @@ -408,17 +405,17 @@ func (b *Balancer) getConn(ctx context.Context) (c conn.Conn, err error) {
}

var (
state = b.connections()
connections = b.connections.Load()
failedCount int
)

defer func() {
if failedCount*2 > state.PreferredCount() && b.discoveryRepeater != nil {
if failedCount*2 > connections.PreferredCount() && b.discoveryRepeater != nil {
b.discoveryRepeater.Force()
}
}()

c, failedCount = state.GetConnection(ctx)
c, failedCount = connections.GetConn(ctx)
if c == nil {
return nil, xerrors.WithStackTrace(
fmt.Errorf("cannot get connection from Balancer after %d attempts: %w", failedCount, ErrNoEndpoints),
Expand All @@ -429,10 +426,10 @@ func (b *Balancer) getConn(ctx context.Context) (c conn.Conn, err error) {
}

func endpointsToConnections(p *conn.Pool, endpoints []endpoint.Endpoint) []conn.Conn {
conns := make([]conn.Conn, 0, len(endpoints))
connections := make([]conn.Conn, 0, len(endpoints))
for _, e := range endpoints {
conns = append(conns, p.Get(e))
connections = append(connections, p.Get(e))
}

return conns
return connections
}
Loading

0 comments on commit cae12e7

Please sign in to comment.