Skip to content

Commit

Permalink
feat: eventually consistency API controls
Browse files Browse the repository at this point in the history
Adds a feature used in Ory Network which enables trading faster reads for slightly outdated data.

This feature depends on Cockroach functionality and configuration, and is not possible for MySQL or PostgreSQL.
  • Loading branch information
aeneasr committed Oct 6, 2023
1 parent df80377 commit ab471ef
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 69 deletions.
6 changes: 6 additions & 0 deletions driver/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"crypto/tls"
"encoding/json"
"fmt"
"github.com/ory/x/crdbx"
"io"
"net/http"
"net/url"
Expand Down Expand Up @@ -165,6 +166,7 @@ const (
ViperKeyCipherAlgorithm = "ciphers.algorithm"
ViperKeyDatabaseCleanupSleepTables = "database.cleanup.sleep.tables"
ViperKeyDatabaseCleanupBatchSize = "database.cleanup.batch_size"
ViperKeyDatabaseDefaultConsistencyLevel = "database.default_consistency_level"
ViperKeyLinkLifespan = "selfservice.methods.link.config.lifespan"
ViperKeyLinkBaseURL = "selfservice.methods.link.config.base_url"
ViperKeyCodeLifespan = "selfservice.methods.code.config.lifespan"
Expand Down Expand Up @@ -1488,3 +1490,7 @@ func (p *Config) TokenizeTemplate(ctx context.Context, key string) (_ *SessionTo

return &result, nil
}

func (p *Config) DefaultConsistencyLevel(ctx context.Context) crdbx.ConsistencyLevel {
return crdbx.ConsistencyLevelFromString(p.GetProvider(ctx).String(ViperKeyDatabaseDefaultConsistencyLevel))
}
10 changes: 10 additions & 0 deletions embedx/config.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -1842,6 +1842,16 @@
"title": "Database related configuration",
"description": "Miscellaneous settings used in database related tasks (cleanup, etc.)",
"properties": {
"default_consistency_level": {
"type": "string",
"title": "Default Read Consistency Level",
"description": "The default consistency level to use when reading from the database. Defaults to `strong` to not break existing API contracts..",
"enum": [
"strong",
"eventual"
],
"default": "strong"
},
"cleanup": {
"type": "object",
"title": "Database cleanup settings",
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ replace (
// Use the internal httpclient which can be generated in this codebase but mark it as the
// official SDK, allowing for the Ory CLI to consume Ory Kratos' CLI commands.
github.com/ory/client-go => ./internal/client-go
github.com/ory/x => ../x
)

require (
Expand Down
4 changes: 4 additions & 0 deletions identity/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package identity
import (
"context"
"encoding/json"
"github.com/ory/x/crdbx"
"io"
"net/http"
"time"
Expand Down Expand Up @@ -149,6 +150,8 @@ type listIdentitiesParameters struct {
// required: false
// in: query
CredentialsIdentifierSimilar string `json:"preview_credentials_identifier_similar"`

crdbx.ConsistencyRequestParameters
}

// swagger:route GET /admin/identities identity listIdentities
Expand Down Expand Up @@ -177,6 +180,7 @@ func (h *Handler) list(w http.ResponseWriter, r *http.Request, _ httprouter.Para
PerPage: itemsPerPage,
CredentialsIdentifier: r.URL.Query().Get("credentials_identifier"),
CredentialsIdentifierSimilar: r.URL.Query().Get("preview_credentials_identifier_similar"),
ConsistencyLevel: crdbx.ConsistencyLevelFromRequest(r),
}
if params.CredentialsIdentifier != "" {
params.Expand = ExpandEverything
Expand Down
2 changes: 2 additions & 0 deletions identity/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package identity

import (
"context"
"github.com/ory/x/crdbx"

"github.com/ory/x/sqlxx"

Expand All @@ -18,6 +19,7 @@ type (
CredentialsIdentifierSimilar string
Page int
PerPage int
ConsistencyLevel crdbx.ConsistencyLevel
}

Pool interface {
Expand Down
162 changes: 93 additions & 69 deletions persistence/sql/identity/persister_identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"database/sql"
"fmt"
"github.com/ory/x/crdbx"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -662,96 +663,119 @@ func (p *IdentityPersister) ListIdentities(ctx context.Context, params identity.
attribute.String("network.id", p.NetworkID(ctx).String()),
)

is := make([]identity.Identity, 0)

con := p.GetConnection(ctx)
nid := p.NetworkID(ctx)
var is []identity.Identity

if err = p.Transaction(ctx, func(ctx context.Context, con *pop.Connection) error {
is = make([]identity.Identity, 0) // Make sure we reset this to 0 in case of retries.

joins := ""
wheres := ""
args := []any{nid}
identifier := params.CredentialsIdentifier
identifierOperator := "="
if identifier == "" && params.CredentialsIdentifierSimilar != "" {
identifier = params.CredentialsIdentifierSimilar
identifierOperator = "%"
switch con.Dialect.Name() {
case "postgres", "cockroach":
default:
identifier = "%" + identifier + "%"
identifierOperator = "LIKE"
if err := crdbx.SetTransactionConsistency(con, params.ConsistencyLevel, p.r.Config().DefaultConsistencyLevel(ctx)); err != nil {
return err
}
}

if len(identifier) > 0 {
// When filtering by credentials identifier, we most likely are looking for a username or email. It is therefore
// important to normalize the identifier before querying the database.
identifier = NormalizeIdentifier(identity.CredentialsTypePassword, identifier)
joins := ""
wheres := ""
args := []any{nid}
identifier := params.CredentialsIdentifier
identifierOperator := "="
if identifier == "" && params.CredentialsIdentifierSimilar != "" {
identifier = params.CredentialsIdentifierSimilar
identifierOperator = "%"
switch con.Dialect.Name() {
case "postgres", "cockroach":
default:
identifier = "%" + identifier + "%"
identifierOperator = "LIKE"
}
}

joins = `
if len(identifier) > 0 {
// When filtering by credentials identifier, we most likely are looking for a username or email. It is therefore
// important to normalize the identifier before querying the database.
identifier = NormalizeIdentifier(identity.CredentialsTypePassword, identifier)

joins = `
INNER JOIN identity_credentials ic ON ic.identity_id = identities.id
INNER JOIN identity_credential_types ict ON ict.id = ic.identity_credential_type_id
INNER JOIN identity_credential_identifiers ici ON ici.identity_credential_id = ic.id`
wheres = fmt.Sprintf(`
wheres = fmt.Sprintf(`
AND (ic.nid = ? AND ici.nid = ? AND ici.identifier %s ?)
AND ict.name IN (?, ?)`, identifierOperator)
args = append(args, nid, nid, identifier, identity.CredentialsTypeWebAuthn, identity.CredentialsTypePassword)
}
args = append(args, nid, nid, identifier, identity.CredentialsTypeWebAuthn, identity.CredentialsTypePassword)
}

// Follow up: add page token support here, will be easy.
paginator := pop.NewPaginator(params.Page+1, params.PerPage)
// Follow up: add page token support here, will be easy.
paginator := pop.NewPaginator(params.Page+1, params.PerPage)

if err := con.RawQuery(fmt.Sprintf(`SELECT DISTINCT identities.*
if err := con.RawQuery(fmt.Sprintf(`SELECT DISTINCT identities.*
FROM identities AS identities
%s
WHERE identities.nid = ?
%s
ORDER BY identities.id DESC
LIMIT %d
OFFSET %d`, joins, wheres, paginator.PerPage, paginator.Offset), args...).All(&is); err != nil {
return nil, sqlcon.HandleError(err)
}

if len(is) == 0 {
return is, nil
}
OFFSET %d`,
joins,
wheres,
paginator.PerPage,
paginator.Offset,
), args...).All(&is); err != nil {
return sqlcon.HandleError(err)
}

identitiesByID := make(map[uuid.UUID]*identity.Identity, len(is))
identityIDs := make([]any, len(is))
for k := range is {
identitiesByID[is[k].ID] = &is[k]
identityIDs[k] = is[k].ID
}
if len(is) == 0 {
return nil
}

for _, e := range params.Expand {
switch e {
case identity.ExpandFieldCredentials:
creds, err := QueryForCredentials(con,
Where{"identity_credentials.nid = ?", []any{nid}},
Where{"identity_credentials.identity_id IN (?)", identityIDs})
if err != nil {
return nil, err
}
for k := range is {
is[k].Credentials = creds[is[k].ID]
}
case identity.ExpandFieldVerifiableAddresses:
addrs := make([]identity.VerifiableAddress, 0)
if err := con.Where("nid = ?", nid).Where("identity_id IN (?)", identityIDs).Order("id").All(&addrs); err != nil {
return nil, sqlcon.HandleError(err)
}
for _, addr := range addrs {
identitiesByID[addr.IdentityID].VerifiableAddresses = append(identitiesByID[addr.IdentityID].VerifiableAddresses, addr)
}
case identity.ExpandFieldRecoveryAddresses:
addrs := make([]identity.RecoveryAddress, 0)
if err := con.Where("nid = ?", nid).Where("identity_id IN (?)", identityIDs).Order("id").All(&addrs); err != nil {
return nil, sqlcon.HandleError(err)
}
for _, addr := range addrs {
identitiesByID[addr.IdentityID].RecoveryAddresses = append(identitiesByID[addr.IdentityID].RecoveryAddresses, addr)
}
identitiesByID := make(map[uuid.UUID]*identity.Identity, len(is))
identityIDs := make([]any, len(is))
for k := range is {
identitiesByID[is[k].ID] = &is[k]
identityIDs[k] = is[k].ID
}

var eg errgroup.Group
for k := range params.Expand {
e := params.Expand[k]
eg.Go(func() error {
switch e {
case identity.ExpandFieldCredentials:
creds, err := QueryForCredentials(con,
Where{"identity_credentials.nid = ?", []any{nid}},
Where{"identity_credentials.identity_id IN (?)", identityIDs})
if err != nil {
return err
}
for k := range is {
is[k].Credentials = creds[is[k].ID]
}

case identity.ExpandFieldVerifiableAddresses:
addrs := make([]identity.VerifiableAddress, 0)
if err := con.Where("nid = ?", nid).Where("identity_id IN (?)", identityIDs).Order("id").All(&addrs); err != nil {
return sqlcon.HandleError(err)
}
for _, addr := range addrs {
identitiesByID[addr.IdentityID].VerifiableAddresses = append(identitiesByID[addr.IdentityID].VerifiableAddresses, addr)
}

case identity.ExpandFieldRecoveryAddresses:
addrs := make([]identity.RecoveryAddress, 0)
if err := con.Where("nid = ?", nid).Where("identity_id IN (?)", identityIDs).Order("id").All(&addrs); err != nil {
return sqlcon.HandleError(err)
}
for _, addr := range addrs {
identitiesByID[addr.IdentityID].RecoveryAddresses = append(identitiesByID[addr.IdentityID].RecoveryAddresses, addr)
}
}

return nil
})
}

return eg.Wait()
}); err != nil {
return nil, err
}

schemaCache := map[string]string{}
Expand Down

0 comments on commit ab471ef

Please sign in to comment.