Skip to content

Commit

Permalink
Merge pull request #1133 from ydb-platform/ban
Browse files Browse the repository at this point in the history
logs over query service + fix conns pessimization
  • Loading branch information
asmyasnikov authored Mar 15, 2024
2 parents 76e07ce + fd8435a commit 25b9a06
Show file tree
Hide file tree
Showing 11 changed files with 372 additions and 112 deletions.
1 change: 1 addition & 0 deletions driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ func newConnectionFromOptions(ctx context.Context, opts ...Option) (_ *Driver, e
for _, opt := range []Option{
WithTraceDriver(log.Driver(d.logger, d.loggerDetails, d.loggerOpts...)), //nolint:contextcheck
WithTraceTable(log.Table(d.logger, d.loggerDetails, d.loggerOpts...)), //nolint:contextcheck
WithTraceQuery(log.Query(d.logger, d.loggerDetails, d.loggerOpts...)), //nolint:contextcheck
WithTraceScripting(log.Scripting(d.logger, d.loggerDetails, d.loggerOpts...)), //nolint:contextcheck
WithTraceScheme(log.Scheme(d.logger, d.loggerDetails, d.loggerOpts...)),
WithTraceCoordination(log.Coordination(d.logger, d.loggerDetails, d.loggerOpts...)),
Expand Down
10 changes: 10 additions & 0 deletions internal/conn/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"google.golang.org/grpc"
grpcCodes "google.golang.org/grpc/codes"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/closer"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
Expand Down Expand Up @@ -79,6 +80,15 @@ func (p *Pool) Ban(ctx context.Context, cc Conn, cause error) {
return
}

if xerrors.IsTransportError(cause,
grpcCodes.OK,
grpcCodes.Canceled,
grpcCodes.ResourceExhausted,
grpcCodes.OutOfRange,
) {
return
}

e := cc.Endpoint().Copy()

p.mtx.RLock()
Expand Down
2 changes: 1 addition & 1 deletion internal/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (p *Pool[PT, T]) produce(ctx context.Context) {
onDone(&ProduceDoneInfo{})
}()

p.spawn = make(chan PT, p.producersCount)
p.spawn = make(chan PT, p.maxSize)

var wg, started sync.WaitGroup
wg.Add(p.producersCount)
Expand Down
18 changes: 0 additions & 18 deletions internal/query/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"context"

"github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query"
"google.golang.org/grpc"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/pool"
Expand Down Expand Up @@ -142,22 +140,6 @@ func (c *Client) DoTx(ctx context.Context, op query.TxOperation, opts ...options
return doTx(ctx, c.pool, op, c.config.Trace(), opts...)
}

func deleteSession(ctx context.Context, client Ydb_Query_V1.QueryServiceClient, sessionID string) error {
response, err := client.DeleteSession(ctx,
&Ydb_Query.DeleteSessionRequest{
SessionId: sessionID,
},
)
if err != nil {
return xerrors.WithStackTrace(xerrors.Transport(err))
}
if response.GetStatus() != Ydb.StatusIds_SUCCESS {
return xerrors.WithStackTrace(xerrors.FromOperation(response))
}

return nil
}

func New(ctx context.Context, balancer balancer, cfg *config.Config) (_ *Client, err error) {
onDone := trace.QueryOnNew(cfg.Trace(), &ctx, stack.FunctionID(""))
defer func() {
Expand Down
6 changes: 3 additions & 3 deletions internal/query/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@ func TestCreateSession(t *testing.T) {
attachStream.EXPECT().Recv().Return(&Ydb_Query.SessionState{
Status: Ydb.StatusIds_SUCCESS,
}, nil).AnyTimes()
attachStream.EXPECT().CloseSend().Return(nil)
service := NewMockQueryServiceClient(ctrl)
service.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(&Ydb_Query.CreateSessionResponse{
Status: Ydb.StatusIds_SUCCESS,
SessionId: "test",
}, nil)
service.EXPECT().AttachSession(gomock.Any(), gomock.Any()).Return(attachStream, nil)
service.EXPECT().DeleteSession(gomock.Any(), gomock.Any()).Return(&Ydb_Query.DeleteSessionResponse{
Status: Ydb.StatusIds_SUCCESS,
}, nil)
t.Log("execute")
attached := 0
s, err := createSession(ctx, service, withSessionTrace(
Expand Down Expand Up @@ -101,7 +103,6 @@ func TestCreateSession(t *testing.T) {
ctrl := gomock.NewController(t)
attachStream := NewMockQueryService_AttachSessionClient(ctrl)
attachStream.EXPECT().Recv().Return(nil, grpcStatus.Error(grpcCodes.Unavailable, "")).AnyTimes()
attachStream.EXPECT().CloseSend().Return(nil)
service := NewMockQueryServiceClient(ctrl)
service.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(&Ydb_Query.CreateSessionResponse{
Status: Ydb.StatusIds_SUCCESS,
Expand Down Expand Up @@ -141,7 +142,6 @@ func TestCreateSession(t *testing.T) {
attachStream.EXPECT().Recv().Return(&Ydb_Query.SessionState{
Status: Ydb.StatusIds_UNAVAILABLE,
}, nil).AnyTimes()
attachStream.EXPECT().CloseSend().Return(nil)
service := NewMockQueryServiceClient(ctrl)
service.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(&Ydb_Query.CreateSessionResponse{
Status: Ydb.StatusIds_SUCCESS,
Expand Down
60 changes: 35 additions & 25 deletions internal/query/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package query

import (
"context"
"io"
"sync/atomic"

"github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1"
Expand All @@ -11,7 +12,6 @@ import (
"github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/options"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync"
"github.com/ydb-platform/ydb-go-sdk/v3/query"
Expand Down Expand Up @@ -170,28 +170,14 @@ func (s *Session) attach(ctx context.Context) (finalErr error) {
onDone(finalErr)
}()

attachCtx, cancelAttach := xcontext.WithCancel(context.Background())
defer func() {
if finalErr != nil {
cancelAttach()
}
}()

attach, err := s.grpcClient.AttachSession(attachCtx, &Ydb_Query.AttachSessionRequest{
attach, err := s.grpcClient.AttachSession(context.Background(), &Ydb_Query.AttachSessionRequest{
SessionId: s.id,
})
if err != nil {
return xerrors.WithStackTrace(
xerrors.Transport(err),
)
}

defer func() {
if finalErr != nil {
_ = attach.CloseSend()
}
}()

state, err := attach.Recv()
if err != nil {
return xerrors.WithStackTrace(xerrors.Transport(err))
Expand All @@ -202,13 +188,21 @@ func (s *Session) attach(ctx context.Context) (finalErr error) {
}

go func() {
defer func() {
_ = s.closeOnce(ctx)
}()

for {
if !s.IsAlive() {
return
}
recv, recvErr := attach.Recv()
if recvErr != nil || recv.GetStatus() != Ydb.StatusIds_SUCCESS {
if recvErr != nil {
if xerrors.Is(recvErr, io.EOF) {
s.setStatus(statusClosed)
} else {
s.setStatus(statusError)
}

return
}
if recv.GetStatus() != Ydb.StatusIds_SUCCESS {
s.setStatus(statusError)

return
Expand All @@ -227,16 +221,32 @@ func (s *Session) attach(ctx context.Context) (finalErr error) {
}()
}

err = attach.CloseSend()

cancelAttach()
if err = deleteSession(ctx, s.grpcClient, s.id); err != nil {
return xerrors.WithStackTrace(err)
}

return err
return nil
})

return nil
}

func deleteSession(ctx context.Context, client Ydb_Query_V1.QueryServiceClient, sessionID string) error {
response, err := client.DeleteSession(ctx,
&Ydb_Query.DeleteSessionRequest{
SessionId: sessionID,
},
)
if err != nil {
return xerrors.WithStackTrace(xerrors.Transport(err))
}
if response.GetStatus() != Ydb.StatusIds_SUCCESS {
return xerrors.WithStackTrace(xerrors.FromOperation(response))
}

return nil
}

func (s *Session) IsAlive() bool {
switch s.status() {
case statusIdle, statusInUse:
Expand Down
7 changes: 4 additions & 3 deletions log/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func internalDriver(l Logger, d trace.Detailer) (t trace.Driver) { //nolint:gocy
latencyField(start),
)
} else {
l.Log(WithLevel(ctx, WARN), "intermediate fail",
l.Log(WithLevel(ctx, DEBUG), "intermediate fail",
Error(info.Error),
Stringer("endpoint", endpoint),
String("method", method),
Expand Down Expand Up @@ -300,10 +300,10 @@ func internalDriver(l Logger, d trace.Detailer) (t trace.Driver) { //nolint:gocy
}
ctx := with(*info.Context, TRACE, "ydb", "driver", "conn", "ban")
endpoint := info.Endpoint
cause := info.Cause
l.Log(ctx, "start",
Stringer("endpoint", endpoint),
NamedError("cause", info.Cause),
versionField(),
NamedError("cause", cause),
)
start := time.Now()

Expand All @@ -312,6 +312,7 @@ func internalDriver(l Logger, d trace.Detailer) (t trace.Driver) { //nolint:gocy
Stringer("endpoint", endpoint),
latencyField(start),
Stringer("state", info.State),
NamedError("cause", cause),
versionField(),
)
}
Expand Down
8 changes: 4 additions & 4 deletions log/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ func internalQuery(

return func(info trace.QueryNewDoneInfo) {
if info.Error == nil {
l.Log(ctx, "done",
l.Log(WithLevel(ctx, INFO), "done",
latencyField(start),
)
} else {
lvl := FATAL
if !xerrors.IsYdb(info.Error) {
lvl = DEBUG
lvl = ERROR
}
l.Log(WithLevel(ctx, lvl), "failed",
latencyField(start),
Expand Down Expand Up @@ -83,7 +83,7 @@ func internalQuery(

return func(info trace.QueryPoolNewDoneInfo) {
if info.Error == nil {
l.Log(ctx, "done",
l.Log(WithLevel(ctx, INFO), "done",
latencyField(start),
Int("MinSize", info.MinSize),
Int("MaxSize", info.MaxSize),
Expand All @@ -92,7 +92,7 @@ func internalQuery(
} else {
lvl := FATAL
if !xerrors.IsYdb(info.Error) {
lvl = DEBUG
lvl = ERROR
}
l.Log(WithLevel(ctx, lvl), "failed",
latencyField(start),
Expand Down
3 changes: 2 additions & 1 deletion tests/slo/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
github.com/jinzhu/now v1.1.5 // indirect
github.com/jonboulle/clockwork v0.4.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/mattn/go-sqlite3 v1.14.16 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
Expand All @@ -51,4 +52,4 @@ require (

replace github.com/ydb-platform/ydb-go-sdk/v3 => ../../.

replace xorm.io/xorm => github.com/ydb-platform/xorm v0.0.6
replace xorm.io/xorm => github.com/ydb-platform/xorm v0.0.3
Loading

0 comments on commit 25b9a06

Please sign in to comment.