Skip to content

Commit

Permalink
Refactoring for better tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
asmyasnikov committed Nov 8, 2023
1 parent 36f3718 commit b0d13e3
Show file tree
Hide file tree
Showing 42 changed files with 1,197 additions and 382 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Added context to some internal methods for better tracing
* Added `trace.FunctionID` helper and `FunctionID` field to trace start info's
* Replaced lazy initialization of ydb clients (table, topic, etc.) to explicit initialization on `ydb.Open` step

## v3.54.1
Expand Down
6 changes: 1 addition & 5 deletions driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,11 +292,7 @@ func connect(ctx context.Context, d *Driver) error {
}

onDone := trace.DriverOnInit(
d.config.Trace(),
&ctx,
d.config.Endpoint(),
d.config.Database(),
d.config.Secure(),
d.config.Trace(), &ctx, trace.FunctionID(2), d.config.Endpoint(), d.config.Database(), d.config.Secure(),
)
defer func() {
onDone(err)
Expand Down
18 changes: 7 additions & 11 deletions internal/balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (b *Balancer) clusterDiscoveryAttempt(ctx context.Context) (err error) {
var (
address = "ydb:///" + b.driverConfig.Endpoint()
onDone = trace.DriverOnBalancerClusterDiscoveryAttempt(
b.driverConfig.Trace(), &ctx, address,
b.driverConfig.Trace(), &ctx, trace.FunctionID(0), address,
)
endpoints []endpoint.Endpoint
localDC string
Expand Down Expand Up @@ -127,9 +127,7 @@ func (b *Balancer) clusterDiscoveryAttempt(ctx context.Context) (err error) {

func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, endpoints []endpoint.Endpoint, localDC string) {
onDone := trace.DriverOnBalancerUpdate(
b.driverConfig.Trace(),
&ctx,
b.balancerConfig.DetectlocalDC,
b.driverConfig.Trace(), &ctx, trace.FunctionID(0), b.balancerConfig.DetectlocalDC,
)
defer func() {
nodes := make([]trace.EndpointInfo, 0, len(endpoints))
Expand Down Expand Up @@ -163,8 +161,7 @@ func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, endpoints []end

func (b *Balancer) Close(ctx context.Context) (err error) {
onDone := trace.DriverOnBalancerClose(
b.driverConfig.Trace(),
&ctx,
b.driverConfig.Trace(), &ctx, trace.FunctionID(0),
)
defer func() {
onDone(err)
Expand All @@ -189,8 +186,7 @@ func New(
) (b *Balancer, finalErr error) {
var (
onDone = trace.DriverOnBalancerInit(
driverConfig.Trace(),
&ctx,
driverConfig.Trace(), &ctx, trace.FunctionID(0),
)
discoveryConfig = discoveryConfig.New(append(opts,
discoveryConfig.With(driverConfig.Common),
Expand Down Expand Up @@ -235,7 +231,8 @@ func New(
}
// run background discovering
if d := discoveryConfig.Interval(); d > 0 {
b.discoveryRepeater = repeater.New(d, b.clusterDiscoveryAttempt,
b.discoveryRepeater = repeater.New(xcontext.WithoutDeadline(ctx),
d, b.clusterDiscoveryAttempt,
repeater.WithName("discovery"),
repeater.WithTrace(b.driverConfig.Trace()),
)
Expand Down Expand Up @@ -320,8 +317,7 @@ func (b *Balancer) connections() *connectionsState {

func (b *Balancer) getConn(ctx context.Context) (c conn.Conn, err error) {
onDone := trace.DriverOnBalancerChooseEndpoint(
b.driverConfig.Trace(),
&ctx,
b.driverConfig.Trace(), &ctx, trace.FunctionID(0),
)
defer func() {
if err == nil {
Expand Down
50 changes: 19 additions & 31 deletions internal/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ type Conn interface {
Ping(ctx context.Context) error
IsState(states ...State) bool
GetState() State
SetState(State) State
Unban() State
SetState(context.Context, State) State
Unban(context.Context) State
}

type conn struct {
Expand Down Expand Up @@ -93,9 +93,7 @@ func (c *conn) IsState(states ...State) bool {

func (c *conn) park(ctx context.Context) (err error) {
onDone := trace.DriverOnConnPark(
c.config.Trace(),
&ctx,
c.Endpoint(),
c.config.Trace(), &ctx, trace.FunctionID(0), c.Endpoint(),
)
defer func() {
onDone(err)
Expand All @@ -112,7 +110,7 @@ func (c *conn) park(ctx context.Context) (err error) {
return nil
}

err = c.close()
err = c.close(ctx)

if err != nil {
return c.wrapError(err)
Expand All @@ -135,20 +133,20 @@ func (c *conn) Endpoint() endpoint.Endpoint {
return nil
}

func (c *conn) SetState(s State) State {
return c.setState(s)
func (c *conn) SetState(ctx context.Context, s State) State {
return c.setState(ctx, s)
}

func (c *conn) setState(s State) State {
func (c *conn) setState(ctx context.Context, s State) State {
if state := State(c.state.Swap(uint32(s))); state != s {
trace.DriverOnConnStateChange(
c.config.Trace(), c.endpoint.Copy(), state,
c.config.Trace(), &ctx, trace.FunctionID(0), c.endpoint.Copy(), state,
)(s)
}
return s
}

func (c *conn) Unban() State {
func (c *conn) Unban(ctx context.Context) State {
var newState State
c.mtx.RLock()
cc := c.cc
Expand All @@ -159,7 +157,7 @@ func (c *conn) Unban() State {
newState = Offline
}

c.setState(newState)
c.setState(ctx, newState)
return newState
}

Expand All @@ -186,9 +184,7 @@ func (c *conn) realConn(ctx context.Context) (cc *grpc.ClientConn, err error) {
}

onDone := trace.DriverOnConnDial(
c.config.Trace(),
&ctx,
c.endpoint.Copy(),
c.config.Trace(), &ctx, trace.FunctionID(0), c.endpoint.Copy(),
)

defer func() {
Expand Down Expand Up @@ -221,7 +217,7 @@ func (c *conn) realConn(ctx context.Context) (cc *grpc.ClientConn, err error) {
}

c.cc = cc
c.setState(Online)
c.setState(ctx, Online)

return c.cc, nil
}
Expand All @@ -243,13 +239,13 @@ func isAvailable(raw *grpc.ClientConn) bool {
}

// conn must be locked
func (c *conn) close() (err error) {
func (c *conn) close(ctx context.Context) (err error) {
if c.cc == nil {
return nil
}
err = c.cc.Close()
c.cc = nil
c.setState(Offline)
c.setState(ctx, Offline)
return c.wrapError(err)
}

Expand All @@ -268,19 +264,17 @@ func (c *conn) Close(ctx context.Context) (err error) {
}

onDone := trace.DriverOnConnClose(
c.config.Trace(),
&ctx,
c.Endpoint(),
c.config.Trace(), &ctx, trace.FunctionID(0), c.Endpoint(),
)
defer func() {
onDone(err)
}()

c.closed = true

err = c.close()
err = c.close(ctx)

c.setState(Destroyed)
c.setState(ctx, Destroyed)

for _, onClose := range c.onClose {
onClose(c)
Expand All @@ -301,10 +295,7 @@ func (c *conn) Invoke(
issues []trace.Issue
useWrapping = UseWrapping(ctx)
onDone = trace.DriverOnConnInvoke(
c.config.Trace(),
&ctx,
c.endpoint,
trace.Method(method),
c.config.Trace(), &ctx, trace.FunctionID(0), c.endpoint, trace.Method(method),
)
cc *grpc.ClientConn
md = metadata.MD{}
Expand Down Expand Up @@ -386,10 +377,7 @@ func (c *conn) NewStream(
) (_ grpc.ClientStream, err error) {
var (
streamRecv = trace.DriverOnConnNewStream(
c.config.Trace(),
&ctx,
c.endpoint.Copy(),
trace.Method(method),
c.config.Trace(), &ctx, trace.FunctionID(0), c.endpoint.Copy(), trace.Method(method),
)
useWrapping = UseWrapping(ctx)
cc *grpc.ClientConn
Expand Down
16 changes: 5 additions & 11 deletions internal/conn/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,8 @@ func (p *Pool) Ban(ctx context.Context, cc Conn, cause error) {
}

trace.DriverOnConnBan(
p.config.Trace(),
&ctx,
e,
cc.GetState(),
cause,
)(cc.SetState(Banned))
p.config.Trace(), &ctx, trace.FunctionID(0), e, cc.GetState(), cause,
)(cc.SetState(ctx, Banned))
}

func (p *Pool) Allow(ctx context.Context, cc Conn) {
Expand All @@ -111,11 +107,9 @@ func (p *Pool) Allow(ctx context.Context, cc Conn) {
return
}

trace.DriverOnConnAllow(p.config.Trace(),
&ctx,
e,
cc.GetState(),
)(cc.Unban())
trace.DriverOnConnAllow(
p.config.Trace(), &ctx, trace.FunctionID(0), e, cc.GetState(),
)(cc.Unban(ctx))
}

func (p *Pool) Take(context.Context) error {
Expand Down
6 changes: 4 additions & 2 deletions internal/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ type Client struct {
// Discover cluster endpoints
func (c *Client) Discover(ctx context.Context) (endpoints []endpoint.Endpoint, err error) {
var (
onDone = trace.DiscoveryOnDiscover(c.config.Trace(), &ctx, c.config.Endpoint(), c.config.Database())
onDone = trace.DiscoveryOnDiscover(
c.config.Trace(), &ctx, trace.FunctionID(0), c.config.Endpoint(), c.config.Database(),
)
request = Ydb_Discovery.ListEndpointsRequest{
Database: c.config.Database(),
}
Expand Down Expand Up @@ -97,7 +99,7 @@ func (c *Client) Discover(ctx context.Context) (endpoints []endpoint.Endpoint, e

func (c *Client) WhoAmI(ctx context.Context) (whoAmI *discovery.WhoAmI, err error) {
var (
onDone = trace.DiscoveryOnWhoAmI(c.config.Trace(), &ctx)
onDone = trace.DiscoveryOnWhoAmI(c.config.Trace(), &ctx, trace.FunctionID(0))
request = Ydb_Discovery.WhoAmIRequest{}
response *Ydb_Discovery.WhoAmIResponse
whoAmIResultResult Ydb_Discovery.WhoAmIResult
Expand Down
2 changes: 1 addition & 1 deletion internal/meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (m *Meta) meta(ctx context.Context) (_ metadata.MD, err error) {

var token string

done := trace.DriverOnGetCredentials(m.trace, &ctx)
done := trace.DriverOnGetCredentials(m.trace, &ctx, trace.FunctionID(0))
defer func() {
done(token, err)
}()
Expand Down
6 changes: 3 additions & 3 deletions internal/mock/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,13 @@ func (c *Conn) GetState() conn.State {
return c.State
}

func (c *Conn) SetState(state conn.State) conn.State {
func (c *Conn) SetState(ctx context.Context, state conn.State) conn.State {
c.State = state
return c.State
}

func (c *Conn) Unban() conn.State {
c.SetState(conn.Online)
func (c *Conn) Unban(ctx context.Context) conn.State {
c.SetState(ctx, conn.Online)
return conn.Online
}

Expand Down
5 changes: 3 additions & 2 deletions internal/repeater/repeater.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,12 @@ func WithEvent(ctx context.Context, event Event) context.Context {

// New creates and begins to execute task periodically.
func New(
ctx context.Context,
interval time.Duration,
task func(ctx context.Context) (err error),
opts ...option,
) *repeater {
ctx, cancel := xcontext.WithCancel(context.Background())
ctx, cancel := xcontext.WithCancel(ctx)

r := &repeater{
interval: interval,
Expand Down Expand Up @@ -143,7 +144,7 @@ func (r *repeater) wakeUp(ctx context.Context, e Event) (err error) {

ctx = WithEvent(ctx, e)

onDone := trace.DriverOnRepeaterWakeUp(r.trace, &ctx, r.name, e)
onDone := trace.DriverOnRepeaterWakeUp(r.trace, &ctx, trace.FunctionID(0), r.name, e)
defer func() {
onDone(err)

Expand Down
4 changes: 2 additions & 2 deletions internal/repeater/repeater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestRepeaterNoWakeUpsAfterStop(t *testing.T) {
wakeUpDone = make(chan struct{})
)
fakeClock := clockwork.NewFakeClock()
r := New(interval, func(ctx context.Context) (err error) {
r := New(context.Background(), interval, func(ctx context.Context) (err error) {
wakeUpStart <- struct{}{}
<-wakeUpDone
return nil
Expand Down Expand Up @@ -71,7 +71,7 @@ func TestRepeaterForceLogBackoff(t *testing.T) {
)

repeaterDone := make(chan struct{})
r := New(10*time.Minute, func(ctx context.Context) (err error) {
r := New(context.Background(), 10*time.Minute, func(ctx context.Context) (err error) {
defer func() {
repeaterDone <- struct{}{}
}()
Expand Down
Loading

0 comments on commit b0d13e3

Please sign in to comment.