diff --git a/CHANGELOG.md b/CHANGELOG.md index 8c39139c4..14144137b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,4 @@ +* Added `ydb.WithTraceRetry` option * Bumped `golang.org/x/sync` to `v0.3.0` * Bumped `google.golang.org/protobuf` to `v1.31.0` * Bumped `google.golang.org/grpc` to `v1.57.1` diff --git a/config/config.go b/config/config.go index 20dc44bb6..5cf9a0a09 100644 --- a/config/config.go +++ b/config/config.go @@ -155,6 +155,12 @@ func WithTrace(t trace.Driver, opts ...trace.DriverComposeOption) Option { //nol } } +func WithTraceRetry(t *trace.Retry, opts ...trace.RetryComposeOption) Option { + return func(c *Config) { + config.SetTraceRetry(&c.Common, t, opts...) + } +} + func WithUserAgent(userAgent string) Option { return func(c *Config) { c.metaOptions = append(c.metaOptions, meta.WithUserAgentOption(userAgent)) diff --git a/connection.go b/connection.go index fd2822a1f..3936360dc 100644 --- a/connection.go +++ b/connection.go @@ -138,6 +138,10 @@ type Driver struct { //nolint:maligned panicCallback func(e interface{}) } +func (d *Driver) TraceRetry() *trace.Retry { + return d.config.TraceRetry() +} + // Close closes Driver and clear resources func (d *Driver) Close(ctx context.Context) error { d.mtx.Lock() @@ -427,6 +431,7 @@ func newConnectionFromOptions(ctx context.Context, opts ...Option) (_ *Driver, e WithTraceDiscovery(log.Discovery(d.logger, d.loggerDetails, d.loggerOpts...)), WithTraceTopic(log.Topic(d.logger, d.loggerDetails, d.loggerOpts...)), WithTraceDatabaseSQL(log.DatabaseSQL(d.logger, d.loggerDetails, d.loggerOpts...)), + WithTraceRetry(log.Retry(d.logger, d.loggerDetails, d.loggerOpts...)), } { if opt != nil { err = opt(ctx, d) diff --git a/internal/balancer/balancer.go b/internal/balancer/balancer.go index 23c07b710..065b24567 100644 --- a/internal/balancer/balancer.go +++ b/internal/balancer/balancer.go @@ -63,26 +63,27 @@ func (b *Balancer) OnUpdate(onApplyDiscoveredEndpoints func(ctx context.Context, } func (b *Balancer) clusterDiscovery(ctx context.Context) (err error) { - if err = retry.Retry(ctx, func(childCtx context.Context) (err error) { - if err = b.clusterDiscoveryAttempt(childCtx); err != nil { - if credentials.IsAccessError(err) { - return credentials.AccessError("cluster discovery failed", err, - credentials.WithEndpoint(b.driverConfig.Endpoint()), - credentials.WithDatabase(b.driverConfig.Database()), - credentials.WithCredentials(b.driverConfig.Credentials()), - ) + return retry.Retry( + xcontext.WithTraceRetry(ctx, b.driverConfig.TraceRetry()), + func(childCtx context.Context) (err error) { + if err = b.clusterDiscoveryAttempt(childCtx); err != nil { + if credentials.IsAccessError(err) { + return credentials.AccessError("cluster discovery failed", err, + credentials.WithEndpoint(b.driverConfig.Endpoint()), + credentials.WithDatabase(b.driverConfig.Database()), + credentials.WithCredentials(b.driverConfig.Credentials()), + ) + } + // if got err but parent context is not done - mark error as retryable + if ctx.Err() == nil && xerrors.IsTimeoutError(err) { + return xerrors.WithStackTrace(xerrors.Retryable(err)) + } + return xerrors.WithStackTrace(err) } - // if got err but parent context is not done - mark error as retryable - if ctx.Err() == nil && xerrors.IsTimeoutError(err) { - return xerrors.WithStackTrace(xerrors.Retryable(err)) - } - return xerrors.WithStackTrace(err) - } - return nil - }, retry.WithIdempotent(true)); err != nil { - return xerrors.WithStackTrace(err) - } - return nil + return nil + }, + retry.WithIdempotent(true), + ) } func (b *Balancer) clusterDiscoveryAttempt(ctx context.Context) (err error) { diff --git a/internal/config/config.go b/internal/config/config.go index f7a094d7e..dcf89c4b7 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -2,12 +2,15 @@ package config import ( "time" + + "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) type Common struct { operationTimeout time.Duration operationCancelAfter time.Duration disableAutoRetry bool + traceRetry trace.Retry panicCallback func(e interface{}) } @@ -41,6 +44,10 @@ func (c *Common) OperationCancelAfter() time.Duration { return c.operationCancelAfter } +func (c *Common) TraceRetry() *trace.Retry { + return &c.traceRetry +} + // SetOperationTimeout define the maximum amount of time a YDB server will process // an operation. After timeout exceeds YDB will try to cancel operation and // regardless of the cancellation appropriate error will be returned to @@ -70,3 +77,7 @@ func SetPanicCallback(c *Common, panicCallback func(e interface{})) { func SetAutoRetry(c *Common, autoRetry bool) { c.disableAutoRetry = !autoRetry } + +func SetTraceRetry(c *Common, t *trace.Retry, opts ...trace.RetryComposeOption) { + c.traceRetry = *c.traceRetry.Compose(t, opts...) +} diff --git a/internal/coordination/client.go b/internal/coordination/client.go index a73a81d45..a08a0e85e 100644 --- a/internal/coordination/client.go +++ b/internal/coordination/client.go @@ -11,6 +11,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/coordination" "github.com/ydb-platform/ydb-go-sdk/v3/internal/coordination/config" "github.com/ydb-platform/ydb-go-sdk/v3/internal/operation" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" "github.com/ydb-platform/ydb-go-sdk/v3/retry" "github.com/ydb-platform/ydb-go-sdk/v3/scheme" @@ -34,7 +35,7 @@ func New(cc grpc.ClientConnInterface, config config.Config) *Client { } } -func (c *Client) CreateNode(ctx context.Context, path string, config coordination.NodeConfig) (err error) { +func (c *Client) CreateNode(ctx context.Context, path string, config coordination.NodeConfig) error { if c == nil { return xerrors.WithStackTrace(errNilClient) } @@ -42,13 +43,16 @@ func (c *Client) CreateNode(ctx context.Context, path string, config coordinatio return xerrors.WithStackTrace(c.createNode(ctx, path, config)) } if !c.config.AutoRetry() { - return call(ctx) + return xerrors.WithStackTrace(call(ctx)) } - return retry.Retry(ctx, call, retry.WithStackTrace(), retry.WithIdempotent(true)) + return retry.Retry( + xcontext.WithTraceRetry(ctx, c.config.TraceRetry()), + call, retry.WithStackTrace(), retry.WithIdempotent(true), + ) } -func (c *Client) createNode(ctx context.Context, path string, config coordination.NodeConfig) (err error) { - _, err = c.service.CreateNode( +func (c *Client) createNode(ctx context.Context, path string, config coordination.NodeConfig) error { + _, err := c.service.CreateNode( ctx, &Ydb_Coordination.CreateNodeRequest{ Path: path, @@ -71,7 +75,7 @@ func (c *Client) createNode(ctx context.Context, path string, config coordinatio return xerrors.WithStackTrace(err) } -func (c *Client) AlterNode(ctx context.Context, path string, config coordination.NodeConfig) (err error) { +func (c *Client) AlterNode(ctx context.Context, path string, config coordination.NodeConfig) error { if c == nil { return xerrors.WithStackTrace(errNilClient) } @@ -79,13 +83,18 @@ func (c *Client) AlterNode(ctx context.Context, path string, config coordination return xerrors.WithStackTrace(c.alterNode(ctx, path, config)) } if !c.config.AutoRetry() { - return call(ctx) + return xerrors.WithStackTrace(call(ctx)) } - return retry.Retry(ctx, call, retry.WithStackTrace(), retry.WithIdempotent(true)) + return retry.Retry( + xcontext.WithTraceRetry(ctx, c.config.TraceRetry()), + call, + retry.WithStackTrace(), + retry.WithIdempotent(true), + ) } -func (c *Client) alterNode(ctx context.Context, path string, config coordination.NodeConfig) (err error) { - _, err = c.service.AlterNode( +func (c *Client) alterNode(ctx context.Context, path string, config coordination.NodeConfig) error { + _, err := c.service.AlterNode( ctx, &Ydb_Coordination.AlterNodeRequest{ Path: path, @@ -108,7 +117,7 @@ func (c *Client) alterNode(ctx context.Context, path string, config coordination return xerrors.WithStackTrace(err) } -func (c *Client) DropNode(ctx context.Context, path string) (err error) { +func (c *Client) DropNode(ctx context.Context, path string) error { if c == nil { return xerrors.WithStackTrace(errNilClient) } @@ -116,13 +125,16 @@ func (c *Client) DropNode(ctx context.Context, path string) (err error) { return xerrors.WithStackTrace(c.dropNode(ctx, path)) } if !c.config.AutoRetry() { - return call(ctx) + return xerrors.WithStackTrace(call(ctx)) } - return retry.Retry(ctx, call, retry.WithStackTrace(), retry.WithIdempotent(true)) + return retry.Retry( + xcontext.WithTraceRetry(ctx, c.config.TraceRetry()), + call, retry.WithStackTrace(), retry.WithIdempotent(true), + ) } -func (c *Client) dropNode(ctx context.Context, path string) (err error) { - _, err = c.service.DropNode( +func (c *Client) dropNode(ctx context.Context, path string) error { + _, err := c.service.DropNode( ctx, &Ydb_Coordination.DropNodeRequest{ Path: path, @@ -143,22 +155,24 @@ func (c *Client) DescribeNode( ) ( entry *scheme.Entry, config *coordination.NodeConfig, - err error, + _ error, ) { if c == nil { - err = xerrors.WithStackTrace(errNilClient) - return + return nil, nil, xerrors.WithStackTrace(errNilClient) } - call := func(ctx context.Context) error { + call := func(ctx context.Context) (err error) { entry, config, err = c.describeNode(ctx, path) return xerrors.WithStackTrace(err) } if !c.config.AutoRetry() { - err = call(ctx) - return + err := call(ctx) + return entry, config, xerrors.WithStackTrace(err) } - err = retry.Retry(ctx, call, retry.WithStackTrace(), retry.WithIdempotent(true)) - return + err := retry.Retry( + xcontext.WithTraceRetry(ctx, c.config.TraceRetry()), + call, retry.WithStackTrace(), retry.WithIdempotent(true), + ) + return entry, config, xerrors.WithStackTrace(err) } // DescribeNode describes a coordination node diff --git a/internal/ratelimiter/client.go b/internal/ratelimiter/client.go index 6b51a21c5..f3fcc4c6c 100644 --- a/internal/ratelimiter/client.go +++ b/internal/ratelimiter/client.go @@ -14,6 +14,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/ratelimiter/config" ratelimiterErrors "github.com/ydb-platform/ydb-go-sdk/v3/internal/ratelimiter/errors" "github.com/ydb-platform/ydb-go-sdk/v3/internal/ratelimiter/options" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" "github.com/ydb-platform/ydb-go-sdk/v3/ratelimiter" "github.com/ydb-platform/ydb-go-sdk/v3/retry" @@ -57,7 +58,10 @@ func (c *Client) CreateResource( if !c.config.AutoRetry() { return call(ctx) } - return retry.Retry(ctx, call, retry.WithStackTrace(), retry.WithIdempotent(true)) + return retry.Retry( + xcontext.WithTraceRetry(ctx, c.config.TraceRetry()), + call, retry.WithStackTrace(), retry.WithIdempotent(true), + ) } func (c *Client) createResource( @@ -100,7 +104,10 @@ func (c *Client) AlterResource( if !c.config.AutoRetry() { return call(ctx) } - return retry.Retry(ctx, call, retry.WithStackTrace(), retry.WithIdempotent(true)) + return retry.Retry( + xcontext.WithTraceRetry(ctx, c.config.TraceRetry()), + call, retry.WithStackTrace(), retry.WithIdempotent(true), + ) } func (c *Client) alterResource( @@ -143,7 +150,10 @@ func (c *Client) DropResource( if !c.config.AutoRetry() { return call(ctx) } - return retry.Retry(ctx, call, retry.WithStackTrace(), retry.WithIdempotent(true)) + return retry.Retry( + xcontext.WithTraceRetry(ctx, c.config.TraceRetry()), + call, retry.WithStackTrace(), retry.WithIdempotent(true), + ) } func (c *Client) dropResource( @@ -169,20 +179,23 @@ func (c *Client) ListResource( coordinationNodePath string, resourcePath string, recursive bool, -) (list []string, err error) { +) (list []string, _ error) { if c == nil { return list, xerrors.WithStackTrace(errNilClient) } - call := func(ctx context.Context) error { + call := func(ctx context.Context) (err error) { list, err = c.listResource(ctx, coordinationNodePath, resourcePath, recursive) return xerrors.WithStackTrace(err) } if !c.config.AutoRetry() { - err = call(ctx) - return + err := call(ctx) + return list, err } - err = retry.Retry(ctx, call, retry.WithIdempotent(true), retry.WithStackTrace()) - return + err := retry.Retry( + xcontext.WithTraceRetry(ctx, c.config.TraceRetry()), + call, retry.WithIdempotent(true), retry.WithStackTrace(), + ) + return list, err } func (c *Client) listResource( @@ -232,7 +245,10 @@ func (c *Client) DescribeResource( err = call(ctx) return } - err = retry.Retry(ctx, call, retry.WithIdempotent(true), retry.WithStackTrace()) + err = retry.Retry( + xcontext.WithTraceRetry(ctx, c.config.TraceRetry()), + call, retry.WithIdempotent(true), retry.WithStackTrace(), + ) return } @@ -295,7 +311,10 @@ func (c *Client) AcquireResource( if !c.config.AutoRetry() { return call(ctx) } - return retry.Retry(ctx, call, retry.WithStackTrace()) + return retry.Retry( + xcontext.WithTraceRetry(ctx, c.config.TraceRetry()), + call, retry.WithStackTrace(), + ) } func (c *Client) acquireResource( diff --git a/internal/scheme/client.go b/internal/scheme/client.go index d345e47e2..c97e7ea0a 100644 --- a/internal/scheme/client.go +++ b/internal/scheme/client.go @@ -10,6 +10,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/operation" "github.com/ydb-platform/ydb-go-sdk/v3/internal/scheme/config" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" "github.com/ydb-platform/ydb-go-sdk/v3/retry" "github.com/ydb-platform/ydb-go-sdk/v3/scheme" @@ -52,7 +53,10 @@ func (c *Client) MakeDirectory(ctx context.Context, path string) (err error) { if !c.config.AutoRetry() { return call(ctx) } - return retry.Retry(ctx, call, retry.WithStackTrace(), retry.WithIdempotent(true)) + return retry.Retry( + xcontext.WithTraceRetry(ctx, c.config.TraceRetry()), + call, retry.WithStackTrace(), retry.WithIdempotent(true), + ) } func (c *Client) makeDirectory(ctx context.Context, path string) (err error) { @@ -81,7 +85,10 @@ func (c *Client) RemoveDirectory(ctx context.Context, path string) (err error) { if !c.config.AutoRetry() { return call(ctx) } - return retry.Retry(ctx, call, retry.WithStackTrace(), retry.WithIdempotent(true)) + return retry.Retry( + xcontext.WithTraceRetry(ctx, c.config.TraceRetry()), + call, retry.WithStackTrace(), retry.WithIdempotent(true), + ) } func (c *Client) removeDirectory(ctx context.Context, path string) (err error) { @@ -110,10 +117,13 @@ func (c *Client) ListDirectory(ctx context.Context, path string) (d scheme.Direc } if !c.config.AutoRetry() { err = call(ctx) - return + return d, xerrors.WithStackTrace(err) } - err = retry.Retry(ctx, call, retry.WithIdempotent(true), retry.WithStackTrace()) - return + err = retry.Retry( + xcontext.WithTraceRetry(ctx, c.config.TraceRetry()), + call, retry.WithIdempotent(true), retry.WithStackTrace(), + ) + return d, err } func (c *Client) listDirectory(ctx context.Context, path string) (scheme.Directory, error) { @@ -163,11 +173,11 @@ func (c *Client) DescribePath(ctx context.Context, path string) (e scheme.Entry, err = call(ctx) return } - err = retry.Retry(ctx, call, retry.WithIdempotent(true), retry.WithStackTrace()) - if err != nil { - return e, xerrors.WithStackTrace(err) - } - return e, nil + err = retry.Retry( + xcontext.WithTraceRetry(ctx, c.config.TraceRetry()), + call, retry.WithIdempotent(true), retry.WithStackTrace(), + ) + return e, err } func (c *Client) describePath(ctx context.Context, path string) (e scheme.Entry, err error) { @@ -208,7 +218,10 @@ func (c *Client) ModifyPermissions(ctx context.Context, path string, opts ...sch if !c.config.AutoRetry() { return call(ctx) } - return retry.Retry(ctx, call, retry.WithStackTrace(), retry.WithIdempotent(true)) + return retry.Retry( + xcontext.WithTraceRetry(ctx, c.config.TraceRetry()), + call, retry.WithStackTrace(), retry.WithIdempotent(true), + ) } func (c *Client) modifyPermissions(ctx context.Context, path string, opts ...scheme.PermissionsOption) (err error) { diff --git a/internal/scripting/client.go b/internal/scripting/client.go index b79c2bea1..db79685ad 100644 --- a/internal/scripting/client.go +++ b/internal/scripting/client.go @@ -52,8 +52,11 @@ func (c *Client) Execute( err = call(ctx) return } - err = retry.Retry(ctx, call, retry.WithStackTrace()) - return + err = retry.Retry( + xcontext.WithTraceRetry(ctx, c.config.TraceRetry()), + call, retry.WithStackTrace(), + ) + return r, xerrors.WithStackTrace(err) } func (c *Client) execute( @@ -120,8 +123,11 @@ func (c *Client) Explain( err = call(ctx) return } - err = retry.Retry(ctx, call, retry.WithStackTrace(), retry.WithIdempotent(true)) - return + err = retry.Retry( + xcontext.WithTraceRetry(ctx, c.config.TraceRetry()), + call, retry.WithStackTrace(), retry.WithIdempotent(true), + ) + return e, xerrors.WithStackTrace(err) } func (c *Client) explain( @@ -184,8 +190,11 @@ func (c *Client) StreamExecute( err = call(ctx) return } - err = retry.Retry(ctx, call, retry.WithStackTrace()) - return + err = retry.Retry( + xcontext.WithTraceRetry(ctx, c.config.TraceRetry()), + call, retry.WithStackTrace(), + ) + return r, xerrors.WithStackTrace(err) } func (c *Client) streamExecute( diff --git a/internal/table/client.go b/internal/table/client.go index 63c2b5102..6534bb448 100644 --- a/internal/table/client.go +++ b/internal/table/client.go @@ -241,7 +241,8 @@ func (c *Client) CreateSession(ctx context.Context, opts ...table.Option) (_ tab } return s, nil } - err = retry.Retry(ctx, + err = retry.Retry( + xcontext.WithTraceRetry(ctx, c.config.TraceRetry()), func(ctx context.Context) (err error) { s, err = createSession(ctx) if err != nil { @@ -252,7 +253,6 @@ func (c *Client) CreateSession(ctx context.Context, opts ...table.Option) (_ tab append( []retry.Option{ retry.WithIdempotent(true), - retry.WithID("CreateSession"), retry.WithTrace(trace.Retry{ OnRetry: func(info trace.RetryLoopStartInfo) func(trace.RetryLoopIntermediateInfo) func(trace.RetryLoopDoneInfo) { onIntermediate := trace.TableOnCreateSession(c.config.Trace(), info.Context) @@ -267,10 +267,7 @@ func (c *Client) CreateSession(ctx context.Context, opts ...table.Option) (_ tab }, retryOptions(c.config.Trace(), opts...).RetryOptions..., )..., ) - if err != nil { - return nil, xerrors.WithStackTrace(err) - } - return s, nil + return s, xerrors.WithStackTrace(err) } func (c *Client) isClosed() bool { @@ -615,35 +612,33 @@ func (c *Client) Close(ctx context.Context) (err error) { // - deadline was canceled or deadlined // - retry operation returned nil as error // Warning: if deadline without deadline or cancellation func Retry will be worked infinite -func (c *Client) Do(ctx context.Context, op table.Operation, opts ...table.Option) (err error) { +func (c *Client) Do(ctx context.Context, op table.Operation, opts ...table.Option) error { if c == nil { return xerrors.WithStackTrace(errNilClient) } if c.isClosed() { return xerrors.WithStackTrace(errClosedClient) } - return do( - ctx, - c, - c.config, - op, - retryOptions(c.config.Trace(), opts...), + err := do( + xcontext.WithTraceRetry(ctx, c.config.TraceRetry()), + c, c.config, op, retryOptions(c.config.Trace(), opts...), ) + if err != nil { + return xerrors.WithStackTrace(err) + } + return nil } -func (c *Client) DoTx(ctx context.Context, op table.TxOperation, opts ...table.Option) (err error) { +func (c *Client) DoTx(ctx context.Context, op table.TxOperation, opts ...table.Option) error { if c == nil { return xerrors.WithStackTrace(errNilClient) } if c.isClosed() { return xerrors.WithStackTrace(errClosedClient) } - err = doTx( - ctx, - c, - c.config, - op, - retryOptions(c.config.Trace(), opts...), + err := doTx( + xcontext.WithTraceRetry(ctx, c.config.TraceRetry()), + c, c.config, op, retryOptions(c.config.Trace(), opts...), ) if err != nil { return xerrors.WithStackTrace(err) diff --git a/internal/table/retry.go b/internal/table/retry.go index 764764bd6..072ac4451 100644 --- a/internal/table/retry.go +++ b/internal/table/retry.go @@ -3,7 +3,6 @@ package table import ( "context" - "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" "github.com/ydb-platform/ydb-go-sdk/v3/internal/table/config" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" "github.com/ydb-platform/ydb-go-sdk/v3/retry" @@ -57,7 +56,7 @@ func doTx( defer func() { onIntermediate(err)(attempts, err) }() - err = retryBackoff(ctx, c, + return retryBackoff(ctx, c, func(ctx context.Context, s table.Session) (err error) { attempts++ @@ -108,10 +107,6 @@ func doTx( }, opts.RetryOptions..., ) - if err != nil { - return xerrors.WithStackTrace(err) - } - return nil } func do( @@ -194,9 +189,7 @@ func retryOptions(trace *trace.Table, opts ...table.Option) *table.Options { TxSettings: table.TxSettings( table.WithSerializableReadWrite(), ), - RetryOptions: []retry.Option{ - retry.WithID(stack.Record(1, stack.Lambda(false), stack.FileName(false))), - }, + RetryOptions: []retry.Option{}, } for _, opt := range opts { if opt != nil { diff --git a/internal/topic/topicclientinternal/client.go b/internal/topic/topicclientinternal/client.go index 6b2c44f50..56adefd0f 100644 --- a/internal/topic/topicclientinternal/client.go +++ b/internal/topic/topicclientinternal/client.go @@ -12,6 +12,8 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/topic" "github.com/ydb-platform/ydb-go-sdk/v3/internal/topic/topicreaderinternal" "github.com/ydb-platform/ydb-go-sdk/v3/internal/topic/topicwriterinternal" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" "github.com/ydb-platform/ydb-go-sdk/v3/retry" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader" @@ -76,10 +78,13 @@ func (c *Client) Alter(ctx context.Context, path string, opts ...topicoptions.Al return alterErr } - if c.cfg.AutoRetry() { - return retry.Retry(ctx, call, retry.WithIdempotent(true)) + if !c.cfg.AutoRetry() { + return xerrors.WithStackTrace(call(ctx)) } - return call(ctx) + return retry.Retry( + xcontext.WithTraceRetry(ctx, c.cfg.TraceRetry()), + call, retry.WithIdempotent(true), + ) } // Create new topic @@ -103,10 +108,13 @@ func (c *Client) Create( return createErr } - if c.cfg.AutoRetry() { - return retry.Retry(ctx, call, retry.WithIdempotent(true)) + if !c.cfg.AutoRetry() { + return xerrors.WithStackTrace(call(ctx)) } - return call(ctx) + return retry.Retry( + xcontext.WithTraceRetry(ctx, c.cfg.TraceRetry()), + call, retry.WithIdempotent(true), + ) } // Describe topic @@ -136,9 +144,12 @@ func (c *Client) Describe( var err error if c.cfg.AutoRetry() { - err = retry.Retry(ctx, call, retry.WithIdempotent(true)) + err = xerrors.WithStackTrace(call(ctx)) } else { - err = call(ctx) + err = retry.Retry( + xcontext.WithTraceRetry(ctx, c.cfg.TraceRetry()), + call, retry.WithIdempotent(true), + ) } if err != nil { @@ -166,11 +177,13 @@ func (c *Client) Drop(ctx context.Context, path string, opts ...topicoptions.Dro return removeErr } - if c.cfg.AutoRetry() { - return retry.Retry(ctx, call, retry.WithIdempotent(true)) + if !c.cfg.AutoRetry() { + return xerrors.WithStackTrace(call(ctx)) } - - return call(ctx) + return retry.Retry( + xcontext.WithTraceRetry(ctx, c.cfg.TraceRetry()), + call, retry.WithIdempotent(true), + ) } // StartReader create new topic reader and start pull messages from server diff --git a/internal/xcontext/retry.go b/internal/xcontext/retry.go new file mode 100644 index 000000000..912aa1f13 --- /dev/null +++ b/internal/xcontext/retry.go @@ -0,0 +1,22 @@ +package xcontext + +import ( + "context" + + "github.com/ydb-platform/ydb-go-sdk/v3/trace" +) + +type ( + ctxTraceRetryKey struct{} +) + +func WithTraceRetry(ctx context.Context, t *trace.Retry) context.Context { + return context.WithValue(ctx, ctxTraceRetryKey{}, TraceRetry(ctx).Compose(t)) +} + +func TraceRetry(ctx context.Context) *trace.Retry { + if traceRetry, ok := ctx.Value(ctxTraceRetryKey{}).(*trace.Retry); ok { + return traceRetry + } + return &trace.Retry{} +} diff --git a/internal/xsql/connector.go b/internal/xsql/connector.go index 9735d3109..447b6e045 100644 --- a/internal/xsql/connector.go +++ b/internal/xsql/connector.go @@ -312,6 +312,15 @@ type driverWrapper struct { c *Connector } +func (d *driverWrapper) TraceRetry() *trace.Retry { + if tracer, has := d.c.parent.(interface { + TraceRetry() *trace.Retry + }); has { + return tracer.TraceRetry() + } + return &trace.Retry{} +} + func (d *driverWrapper) Open(_ string) (driver.Conn, error) { return nil, ErrUnsupported } diff --git a/metrics/retry.go b/metrics/retry.go index a0eaca54f..8d3a059ff 100644 --- a/metrics/retry.go +++ b/metrics/retry.go @@ -4,7 +4,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) -// Retry makes table.RetryTrace with New publishing -func Retry(config Config) (t trace.Retry) { +// retry makes table.RetryTrace with New publishing +func retry(config Config) (t trace.Retry) { return t } diff --git a/metrics/traces.go b/metrics/traces.go index a601b5bcc..5fc978606 100644 --- a/metrics/traces.go +++ b/metrics/traces.go @@ -18,5 +18,6 @@ func WithTraces(config Config) ydb.Option { ydb.WithTraceRatelimiter(ratelimiter(config)), ydb.WithTraceDiscovery(discovery(config)), ydb.WithTraceDatabaseSQL(databaseSQL(config)), + ydb.WithTraceRetry(retry(config)), ) } diff --git a/options.go b/options.go index 1f92d4ce0..80fba7603 100644 --- a/options.go +++ b/options.go @@ -249,7 +249,7 @@ func WithDiscoveryInterval(discoveryInterval time.Duration) Option { } } -// WithTraceDriver returns deadline which has associated Driver with it. +// WithTraceDriver appends trace.Driver into driver traces func WithTraceDriver(trace trace.Driver, opts ...trace.DriverComposeOption) Option { //nolint:gocritic return func(ctx context.Context, c *Driver) error { c.options = append(c.options, config.WithTrace(trace, opts...)) @@ -257,6 +257,21 @@ func WithTraceDriver(trace trace.Driver, opts ...trace.DriverComposeOption) Opti } } +// WithTraceRetry appends trace.Retry into retry traces +func WithTraceRetry(t trace.Retry, opts ...trace.RetryComposeOption) Option { + return func(ctx context.Context, c *Driver) error { + c.options = append(c.options, + config.WithTraceRetry(&t, append( + []trace.RetryComposeOption{ + trace.WithRetryPanicCallback(c.panicCallback), + }, + opts..., + )...), + ) + return nil + } +} + // WithCertificate appends certificate to TLS config root certificates func WithCertificate(cert *x509.Certificate) Option { return func(ctx context.Context, c *Driver) error { @@ -394,7 +409,7 @@ func WithPanicCallback(panicCallback func(e interface{})) Option { } } -// WithTraceTable returns table trace option +// WithTraceTable appends trace.Table into table traces func WithTraceTable(t trace.Table, opts ...trace.TableComposeOption) Option { //nolint:gocritic return func(ctx context.Context, c *Driver) error { c.tableOptions = append( diff --git a/retry/retry.go b/retry/retry.go index 2c2589a07..aba25a787 100644 --- a/retry/retry.go +++ b/retry/retry.go @@ -80,7 +80,7 @@ type traceOption struct { } func (t traceOption) ApplyRetryOption(opts *retryOptions) { - opts.trace = t.trace + opts.trace = opts.trace.Compose(t.trace) } func (t traceOption) ApplyDoOption(opts *doOptions) { @@ -221,7 +221,7 @@ func Retry(ctx context.Context, op retryOperation, opts ...Option) (err error) { options := &retryOptions{ fastBackoff: backoff.Fast, slowBackoff: backoff.Slow, - trace: &trace.Retry{}, + trace: xcontext.TraceRetry(ctx), } for _, opt := range opts { if opt != nil { diff --git a/retry/retry_test.go b/retry/retry_test.go index f467b3dcc..bb9818775 100644 --- a/retry/retry_test.go +++ b/retry/retry_test.go @@ -121,7 +121,7 @@ func TestRetryWithCustomErrors(t *testing.T) { } { t.Run(tt.error.Error(), func(t *testing.T) { i := 0 - err := Retry(ctx, func(ctx context.Context) (err error) { + err := Retry(ctx, func(ctx context.Context) error { i++ if i < limit { return tt.error @@ -149,7 +149,7 @@ func TestRetryTransportDeadlineExceeded(t *testing.T) { } { counter := 0 ctx, cancel := xcontext.WithTimeout(context.Background(), time.Hour) - err := Retry(ctx, func(ctx context.Context) (err error) { + err := Retry(ctx, func(ctx context.Context) error { counter++ if !(counter < cancelCounterValue) { cancel() @@ -169,7 +169,7 @@ func TestRetryTransportCancelled(t *testing.T) { } { counter := 0 ctx, cancel := xcontext.WithCancel(context.Background()) - err := Retry(ctx, func(ctx context.Context) (err error) { + err := Retry(ctx, func(ctx context.Context) error { counter++ if !(counter < cancelCounterValue) { cancel() diff --git a/retry/sql.go b/retry/sql.go index f6e6dd3c9..c43bba623 100644 --- a/retry/sql.go +++ b/retry/sql.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" + "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) type doOptions struct { @@ -40,6 +41,13 @@ func Do(ctx context.Context, db *sql.DB, f func(ctx context.Context, cc *sql.Con options = doOptions{} attempts = 0 ) + if tracer, has := db.Driver().(interface { + TraceRetry() *trace.Retry + }); has { + options.retryOptions = append(options.retryOptions, nil) + copy(options.retryOptions[1:], options.retryOptions) + options.retryOptions[0] = WithTrace(*tracer.TraceRetry()) + } for _, opt := range opts { if opt != nil { opt.ApplyDoOption(&options) @@ -112,6 +120,7 @@ func WithTxOptions(txOptions *sql.TxOptions) txOptionsOption { func DoTx(ctx context.Context, db *sql.DB, f func(context.Context, *sql.Tx) error, opts ...doTxOption) error { var ( options = doTxOptions{ + retryOptions: []Option{}, txOptions: &sql.TxOptions{ Isolation: sql.LevelDefault, ReadOnly: false, @@ -119,6 +128,13 @@ func DoTx(ctx context.Context, db *sql.DB, f func(context.Context, *sql.Tx) erro } attempts = 0 ) + if tracer, has := db.Driver().(interface { + TraceRetry() *trace.Retry + }); has { + options.retryOptions = append(options.retryOptions, nil) + copy(options.retryOptions[1:], options.retryOptions) + options.retryOptions[0] = WithTrace(*tracer.TraceRetry()) + } for _, opt := range opts { if opt != nil { opt.ApplyDoTxOption(&options) diff --git a/tests/integration/with_trace_retry_test.go b/tests/integration/with_trace_retry_test.go new file mode 100644 index 000000000..0d27ecf06 --- /dev/null +++ b/tests/integration/with_trace_retry_test.go @@ -0,0 +1,98 @@ +//go:build integration +// +build integration + +package integration + +import ( + "context" + "database/sql" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/ydb-platform/ydb-go-sdk/v3" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest" + "github.com/ydb-platform/ydb-go-sdk/v3/retry" + "github.com/ydb-platform/ydb-go-sdk/v3/table" + "github.com/ydb-platform/ydb-go-sdk/v3/trace" +) + +func TestWithTraceRetry(t *testing.T) { + ctx := xtest.Context(t) + + t.Run("table", func(t *testing.T) { + var ( + retryCalled = make(map[string]bool, 2) + scope = newScope(t) + db = scope.Driver( + ydb.WithTraceRetry(trace.Retry{ + OnRetry: func( + info trace.RetryLoopStartInfo, + ) func(trace.RetryLoopIntermediateInfo) func(trace.RetryLoopDoneInfo) { + retryCalled[info.ID] = true + return nil + }, + }), + ) + ) + + require.NoError(t, db.Table().Do(ctx, + func(ctx context.Context, s table.Session) error { + return nil + }, + table.WithID("db.Table().Do"), + )) + + require.NoError(t, db.Table().DoTx(ctx, + func(ctx context.Context, tx table.TransactionActor) error { + return nil + }, + table.WithID("db.Table().DoTx"), + )) + + for _, key := range []string{ + "db.Table().Do", + "db.Table().DoTx", + } { + require.True(t, retryCalled[key], key) + } + }) + + t.Run("database/sql", func(t *testing.T) { + var ( + retryCalled = make(map[string]bool, 2) + scope = newScope(t) + nativeDb = scope.Driver( + ydb.WithTraceRetry(trace.Retry{ + OnRetry: func( + info trace.RetryLoopStartInfo, + ) func(trace.RetryLoopIntermediateInfo) func(trace.RetryLoopDoneInfo) { + retryCalled[info.ID] = true + return nil + }, + }), + ) + db = sql.OpenDB(ydb.MustConnector(nativeDb)) + ) + require.NoError(t, retry.Do(ctx, db, + func(ctx context.Context, cc *sql.Conn) error { + return nil + }, + retry.WithID("retry.Do"), + )) + + require.NoError(t, retry.DoTx(ctx, db, + func(ctx context.Context, tx *sql.Tx) error { + return nil + }, + retry.WithID("retry.DoTx"), + )) + + for _, key := range []string{ + "retry.Do", + "retry.DoTx", + } { + require.True(t, retryCalled[key], key) + } + }) +}