Skip to content

Commit

Permalink
Add a warning system (#321)
Browse files Browse the repository at this point in the history
See [edgedb issue #7822](edgedb/edgedb#7822).
  • Loading branch information
fmoor authored Oct 8, 2024
1 parent 8354817 commit 58b34e7
Show file tree
Hide file tree
Showing 16 changed files with 341 additions and 72 deletions.
12 changes: 12 additions & 0 deletions export.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,11 @@ type (
// UUID is a universally unique identifier
// https://www.edgedb.com/docs/stdlib/uuid
UUID = edgedbtypes.UUID

// WarningHandler takes a slice of edgedb.Error that represent warnings and
// optionally returns an error. This can be used to log warnings, increment
// metrics, promote warnings to errors by returning them etc.
WarningHandler = edgedb.WarningHandler
)

var (
Expand All @@ -292,6 +297,9 @@ var (
// from a [time.Duration] represented as nanoseconds.
DurationFromNanoseconds = edgedbtypes.DurationFromNanoseconds

// LogWarnings is an edgedb.WarningHandler that logs warnings.
LogWarnings = edgedb.LogWarnings

// NewDateDuration returns a new DateDuration
NewDateDuration = edgedbtypes.NewDateDuration

Expand Down Expand Up @@ -439,4 +447,8 @@ var (

// ParseUUID parses s into a UUID or returns an error.
ParseUUID = edgedbtypes.ParseUUID

// WarningsAsErrors is an edgedb.WarningHandler that returns warnings as
// errors.
WarningsAsErrors = edgedb.WarningsAsErrors
)
2 changes: 1 addition & 1 deletion internal/client/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (c *protocolConnection) cacheTypeIDs(q *query, ids idPair) {

func (c *protocolConnection) cacheCapabilities0pX(
q *query,
headers header.Header,
headers header.Header0pX,
) {
if capabilities, ok := headers[header.Capabilities]; ok {
x := binary.BigEndian.Uint64(capabilities)
Expand Down
49 changes: 43 additions & 6 deletions internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ type Client struct {
cfg *connConfig
cacheCollection
state map[string]interface{}

warningHandler WarningHandler
}

// CreateClient returns a new client. The client connects lazily. Call
Expand All @@ -81,6 +83,11 @@ func CreateClientDSN(_ context.Context, dsn string, opts Options) (*Client, erro
return nil, err
}

warningHandler := LogWarnings
if opts.WarningHandler != nil {
warningHandler = opts.WarningHandler
}

False := false
p := &Client{
isClosed: &False,
Expand All @@ -98,7 +105,8 @@ func CreateClientDSN(_ context.Context, dsn string, opts Options) (*Client, erro
outCodecCache: cache.New(1_000),
capabilitiesCache: cache.New(1_000),
},
state: make(map[string]interface{}),
state: make(map[string]interface{}),
warningHandler: warningHandler,
}

return p, nil
Expand Down Expand Up @@ -326,6 +334,7 @@ func (p *Client) Execute(
copyState(p.state),
nil,
true,
p.warningHandler,
)
if err != nil {
return err
Expand All @@ -347,7 +356,8 @@ func (p *Client) Query(
return err
}

err = runQuery(ctx, conn, "Query", cmd, out, args, p.state)
err = runQuery(
ctx, conn, "Query", cmd, out, args, p.state, p.warningHandler)
return firstError(err, p.release(conn, err))
}

Expand All @@ -366,7 +376,16 @@ func (p *Client) QuerySingle(
return err
}

err = runQuery(ctx, conn, "QuerySingle", cmd, out, args, p.state)
err = runQuery(
ctx,
conn,
"QuerySingle",
cmd,
out,
args,
p.state,
p.warningHandler,
)
return firstError(err, p.release(conn, err))
}

Expand All @@ -382,7 +401,16 @@ func (p *Client) QueryJSON(
return err
}

err = runQuery(ctx, conn, "QueryJSON", cmd, out, args, p.state)
err = runQuery(
ctx,
conn,
"QueryJSON",
cmd,
out,
args,
p.state,
p.warningHandler,
)
return firstError(err, p.release(conn, err))
}

Expand All @@ -400,7 +428,16 @@ func (p *Client) QuerySingleJSON(
return err
}

err = runQuery(ctx, conn, "QuerySingleJSON", cmd, out, args, p.state)
err = runQuery(
ctx,
conn,
"QuerySingleJSON",
cmd,
out,
args,
p.state,
p.warningHandler,
)
return firstError(err, p.release(conn, err))
}

Expand All @@ -424,6 +461,6 @@ func (p *Client) Tx(ctx context.Context, action TxBlock) error {
return err
}

err = conn.tx(ctx, action, p.state)
err = conn.tx(ctx, action, p.state, p.warningHandler)
return firstError(err, p.release(conn, err))
}
14 changes: 7 additions & 7 deletions internal/client/granularflow0pX.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (c *protocolConnection) prepare0pX(r *buff.Reader, q *query) error {

w := buff.NewWriter(c.writeMemory[:0])
w.BeginMessage(uint8(Parse))
writeHeaders(w, headers)
writeHeaders0pX(w, headers)
w.PushUint8(uint8(q.fmt))
w.PushUint8(uint8(q.expCard))
w.PushUint32(0) // no statement name
Expand All @@ -194,7 +194,7 @@ func (c *protocolConnection) prepare0pX(r *buff.Reader, q *query) error {
for r.Next(done.Chan) {
switch Message(r.MsgType) {
case ParseComplete:
c.cacheCapabilities0pX(q, decodeHeaders(r))
c.cacheCapabilities0pX(q, decodeHeaders0pX(r))
r.Discard(1) // cardinality
ids := idPair{in: r.PopUUID(), out: r.PopUUID()}
c.cacheTypeIDs(q, ids)
Expand Down Expand Up @@ -269,7 +269,7 @@ func (c *protocolConnection) execute0pX(
) error {
w := buff.NewWriter(c.writeMemory[:0])
w.BeginMessage(uint8(Execute0pX))
writeHeaders(w, q.headers0pX())
writeHeaders0pX(w, q.headers0pX())
w.PushUint32(0) // no statement name
if e := cdcs.in.Encode(w, q.args, codecs.Path("args"), true); e != nil {
return &invalidArgumentError{msg: e.Error()}
Expand Down Expand Up @@ -348,7 +348,7 @@ func (c *protocolConnection) optimistic0pX(

w := buff.NewWriter(c.writeMemory[:0])
w.BeginMessage(uint8(Execute))
writeHeaders(w, headers)
writeHeaders0pX(w, headers)
w.PushUint8(uint8(q.fmt))
w.PushUint8(uint8(q.expCard))
w.PushString(q.cmd)
Expand Down Expand Up @@ -396,7 +396,7 @@ func (c *protocolConnection) optimistic0pX(
decodeCommandCompleteMsg0pX(r)
case CommandDataDescription:
var (
headers header.Header
headers header.Header0pX
e error
)

Expand Down Expand Up @@ -482,8 +482,8 @@ func decodeDataMsg(
func (c *protocolConnection) decodeCommandDataDescriptionMsg0pX(
r *buff.Reader,
q *query,
) (*CommandDescription, header.Header, error) {
headers := decodeHeaders(r)
) (*CommandDescription, header.Header0pX, error) {
headers := decodeHeaders0pX(r)
card := r.PopUint8()

var (
Expand Down
14 changes: 7 additions & 7 deletions internal/client/granularflow1pX.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,14 @@ func (c *protocolConnection) decodeCommandDataDescriptionMsg1pX(
r *buff.Reader,
q *query,
) (*CommandDescription, error) {
discardHeaders(r)
c.cacheCapabilities1pX(q, r.PopUint64())
_, err := decodeHeaders1pX(r, q.warningHandler)
if err != nil {
return nil, err
}

var (
err error
descs CommandDescription
)
c.cacheCapabilities1pX(q, r.PopUint64())

var descs CommandDescription
descs.Card = Cardinality(r.PopUint8())
id := r.PopUUID()
descs.In, err = descriptor.Pop(
Expand Down Expand Up @@ -323,7 +323,7 @@ func (c *protocolConnection) decodeCommandCompleteMsg1pX(
q *query,
r *buff.Reader,
) error {
discardHeaders(r)
discardHeaders0pX(r)
c.cacheCapabilities1pX(q, r.PopUint64())
r.Discard(int(r.PopUint32())) // discard command status
if r.PopUUID() == descriptor.IDZero {
Expand Down
18 changes: 9 additions & 9 deletions internal/client/granularflow2pX.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,14 @@ func (c *protocolConnection) decodeCommandDataDescriptionMsg2pX(
r *buff.Reader,
q *query,
) (*CommandDescriptionV2, error) {
discardHeaders(r)
c.cacheCapabilities1pX(q, r.PopUint64())
_, err := decodeHeaders2pX(r, q.warningHandler)
if err != nil {
return nil, err
}

var (
err error
descs CommandDescriptionV2
)
c.cacheCapabilities1pX(q, r.PopUint64())

var descs CommandDescriptionV2
descs.Card = Cardinality(r.PopUint8())
id := r.PopUUID()
descs.In, err = descriptor.PopV2(
Expand Down Expand Up @@ -231,9 +231,9 @@ func (c *protocolConnection) execute2pX(
err = wrapAll(err, e)
}
case CommandDataDescription:
descs, e := c.decodeCommandDataDescriptionMsg1pX(r, q)
descs, e := c.decodeCommandDataDescriptionMsg2pX(r, q)
err = wrapAll(err, e)
cdcs, e = c.codecsFromDescriptors1pX(q, descs)
cdcs, e = c.codecsFromDescriptors2pX(q, descs)
err = wrapAll(err, e)
case Data:
val, ok, e := decodeDataMsg(r, q, cdcs)
Expand Down Expand Up @@ -367,7 +367,7 @@ func (c *protocolConnection) decodeCommandCompleteMsg2pX(
q *query,
r *buff.Reader,
) error {
discardHeaders(r)
discardHeaders0pX(r)
c.cacheCapabilities1pX(q, r.PopUint64())
r.Discard(int(r.PopUint32())) // discard command status
if r.PopUUID() == descriptor.IDZero {
Expand Down
17 changes: 17 additions & 0 deletions internal/client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ type Options struct {

// SecretKey is used to connect to cloud instances.
SecretKey string

// WarningHandler is invoked when EdgeDB returns warnings. Defaults to
// edgedb.LogWarnings.
WarningHandler WarningHandler
}

// TLSOptions contains the parameters needed to configure TLS on EdgeDB
Expand Down Expand Up @@ -512,3 +516,16 @@ func (p Client) WithoutGlobals(globals ...string) *Client { // nolint:gocritic
p.state = state
return &p
}

// WithWarningHandler sets the warning handler for the returned client. If
// warningHandler is nil edgedb.LogWarnings is used.
func (p Client) WithWarningHandler( // nolint:gocritic
warningHandler WarningHandler,
) *Client {
if warningHandler == nil {
warningHandler = LogWarnings
}

p.warningHandler = warningHandler
return &p
}
Loading

0 comments on commit 58b34e7

Please sign in to comment.