Skip to content

Commit

Permalink
Fix schema getting
Browse files Browse the repository at this point in the history
  • Loading branch information
vitalyisaev2 committed Dec 26, 2024
1 parent ada7fa5 commit c9f311e
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 15 deletions.
2 changes: 1 addition & 1 deletion app/server/datasource/rdbms/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (ds *dataSourceImpl) DescribeTable(
Logger: logger,
DataSourceInstance: request.DataSourceInstance,
TableName: request.Table,
MaxConnections: 1,
MaxConnections: 1, // single connection is enough to get metadata
}

cs, makeConnErr = ds.connectionManager.Make(params)
Expand Down
26 changes: 21 additions & 5 deletions app/server/datasource/rdbms/logging/connection_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package logging
import (
"context"
"fmt"
"sync"

"golang.org/x/sync/errgroup"

Expand Down Expand Up @@ -39,22 +40,37 @@ func (cm *connectionManager) Make(
return nil, fmt.Errorf("resolve YDB endpoint: %w", err)
}

// Determine how much connections we need to create
// taking into account optional limit.
totalConnections := len(response.sources)
if params.MaxConnections > 0 && params.MaxConnections < totalConnections {
totalConnections = params.MaxConnections
}

var (
group errgroup.Group
cs = make([]rdbms_utils.Connection, len(response.sources))
cs = make([]rdbms_utils.Connection, 0, totalConnections)
mutex sync.Mutex
)

for i, src := range response.sources {
i := i
// If connection limit is set, create only requested number of connections.
if i >= totalConnections {
break
}

src := src

group.Go(func() error {
var err error
cs[i], err = cm.makeConnectionFromYDBSource(params, src)
conn, err := cm.makeConnectionFromYDBSource(params, src)
if err != nil {
return fmt.Errorf("make connection from YDB source: %w", err)
}

mutex.Lock()
cs = append(cs, conn)
mutex.Unlock()

return nil
})
}
Expand All @@ -76,7 +92,7 @@ func (cm *connectionManager) makeConnectionFromYDBSource(
params *rdbms_utils.ConnectionManagerMakeParams,
src *ydbSource,
) (rdbms_utils.Connection, error) {
params.Logger.Debug("Resolved log group into YDB endpoint", src.ToZapFields()...)
params.Logger.Debug("resolved log group into YDB endpoint", src.ToZapFields()...)

// prepare new data source instance describing the underlying YDB database
ydbDataSourceInstance := &api_common.TGenericDataSourceInstance{
Expand Down
4 changes: 3 additions & 1 deletion app/server/datasource/rdbms/ydb/schema_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ func (f *schemaProvider) GetSchema(
conn rdbms_utils.Connection,
request *api_service_protos.TDescribeTableRequest,
) (*api_service_protos.TSchema, error) {
databaseName, tableName := conn.From()

var (
driver = conn.(ydbConnection).getDriver()
prefix = path.Join(driver.Name(), request.Table)
prefix = path.Join(databaseName, tableName)
desc options.Description
)

Expand Down
20 changes: 12 additions & 8 deletions common/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package common

import (
"fmt"
"strconv"
"strings"

api_common "github.com/ydb-platform/fq-connector-go/api/common"
)
Expand All @@ -11,17 +13,19 @@ func EndpointToString(ep *api_common.TGenericEndpoint) string {
}

func StringToEndpoint(s string) (*api_common.TGenericEndpoint, error) {
var (
host string
port uint32
)
ss := strings.Split(s, ":")

if _, err := fmt.Sscanf(s, "%s:%d", &host, &port); err != nil {
return nil, fmt.Errorf("parse endpoint '%s': %w", s, err)
if len(ss) != 2 {
return nil, fmt.Errorf("invalid endpoint format: %s", s)
}

port, err := strconv.ParseUint(ss[1], 10, 32)
if err != nil {
return nil, fmt.Errorf("invalid port: %s", ss[1])
}

return &api_common.TGenericEndpoint{
Host: host,
Port: port,
Host: ss[0],
Port: uint32(port),
}, nil
}

0 comments on commit c9f311e

Please sign in to comment.