Skip to content

Commit

Permalink
feat(fxgcppubsub): Added schemas support
Browse files Browse the repository at this point in the history
  • Loading branch information
ekkinox committed Jun 5, 2024
1 parent 10a1f03 commit 57aec98
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 2 deletions.
21 changes: 19 additions & 2 deletions fxgcppubsub/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsub/pstest"
"github.com/ankorstore/yokai-contrib/fxgcppubsub/schema"
"github.com/ankorstore/yokai/config"
"go.uber.org/fx"
"google.golang.org/api/option"
Expand All @@ -23,13 +24,16 @@ var FxGcpPubSubModule = fx.Module(
ModuleName,
fx.Provide(
NewFxGcpPubSubClient,
NewFxGcpPubSubSchemaClient,
schema.NewSchemaRegistry,
),
)

// FxGcpPubSubClientParam allows injection of the required dependencies in [NewFxGcpPubSubClient].
type FxGcpPubSubClientParam struct {
fx.In
LifeCycle fx.Lifecycle
Context context.Context
Config *config.Config
}

Expand All @@ -43,7 +47,7 @@ func NewFxGcpPubSubClient(p FxGcpPubSubClientParam) (*pubsub.Client, error) {
}

func createClient(p FxGcpPubSubClientParam) (*pubsub.Client, error) {
client, err := pubsub.NewClient(context.Background(), p.Config.GetString("modules.gcppubsub.project.id"))
client, err := pubsub.NewClient(p.Context, p.Config.GetString("modules.gcppubsub.project.id"))
if err != nil {
return nil, fmt.Errorf("failed to create pubsub client: %w", err)
}
Expand All @@ -66,7 +70,7 @@ func createTestClient(p FxGcpPubSubClientParam) (*pubsub.Client, error) {
}

client, err := pubsub.NewClient(
context.Background(),
p.Context,
p.Config.GetString("modules.gcppubsub.project.id"),
option.WithGRPCConn(conn),
)
Expand All @@ -87,3 +91,16 @@ func createTestClient(p FxGcpPubSubClientParam) (*pubsub.Client, error) {

return client, nil
}

// FxGcpPubSubSchemaClientParam allows injection of the required dependencies in [NewFxGcpPubSubSchemaClient].
type FxGcpPubSubSchemaClientParam struct {
fx.In
Context context.Context
Config *config.Config
}

// NewFxGcpPubSubSchemaClient returns a [pubsub.SchemaClient].
func NewFxGcpPubSubSchemaClient(p FxGcpPubSubSchemaClientParam) (*pubsub.SchemaClient, error) {
return pubsub.NewSchemaClient(p.Context, p.Config.GetString("modules.gcppubsub.project.id"))

}
39 changes: 39 additions & 0 deletions fxgcppubsub/schema/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package schema

import (
"context"
"sync"

"cloud.google.com/go/pubsub"
)

type SchemaRegistry struct {
client *pubsub.SchemaClient
schemas map[string]*pubsub.SchemaConfig
mutex sync.RWMutex
}

func NewSchemaRegistry(client *pubsub.SchemaClient) *SchemaRegistry {
return &SchemaRegistry{
client: client,
schemas: make(map[string]*pubsub.SchemaConfig),
}
}

func (r *SchemaRegistry) Get(ctx context.Context, schemaID string) (*pubsub.SchemaConfig, error) {
r.mutex.Lock()
defer r.mutex.Unlock()

if _, found := r.schemas[schemaID]; found {
return r.schemas[schemaID], nil
}

schema, err := r.client.Schema(ctx, schemaID, pubsub.SchemaViewFull)
if err != nil {
return nil, err
}

r.schemas[schemaID] = schema

return schema, nil
}

0 comments on commit 57aec98

Please sign in to comment.