From 57aec98276d06698e38ba979dcf2a65a3f70369e Mon Sep 17 00:00:00 2001 From: Jonathan Vuillemin Date: Wed, 5 Jun 2024 16:37:58 +0200 Subject: [PATCH] feat(fxgcppubsub): Added schemas support --- fxgcppubsub/module.go | 21 ++++++++++++++++-- fxgcppubsub/schema/registry.go | 39 ++++++++++++++++++++++++++++++++++ 2 files changed, 58 insertions(+), 2 deletions(-) create mode 100644 fxgcppubsub/schema/registry.go diff --git a/fxgcppubsub/module.go b/fxgcppubsub/module.go index 2f38a2a..d00f11f 100644 --- a/fxgcppubsub/module.go +++ b/fxgcppubsub/module.go @@ -6,6 +6,7 @@ import ( "cloud.google.com/go/pubsub" "cloud.google.com/go/pubsub/pstest" + "github.com/ankorstore/yokai-contrib/fxgcppubsub/schema" "github.com/ankorstore/yokai/config" "go.uber.org/fx" "google.golang.org/api/option" @@ -23,6 +24,8 @@ var FxGcpPubSubModule = fx.Module( ModuleName, fx.Provide( NewFxGcpPubSubClient, + NewFxGcpPubSubSchemaClient, + schema.NewSchemaRegistry, ), ) @@ -30,6 +33,7 @@ var FxGcpPubSubModule = fx.Module( type FxGcpPubSubClientParam struct { fx.In LifeCycle fx.Lifecycle + Context context.Context Config *config.Config } @@ -43,7 +47,7 @@ func NewFxGcpPubSubClient(p FxGcpPubSubClientParam) (*pubsub.Client, error) { } func createClient(p FxGcpPubSubClientParam) (*pubsub.Client, error) { - client, err := pubsub.NewClient(context.Background(), p.Config.GetString("modules.gcppubsub.project.id")) + client, err := pubsub.NewClient(p.Context, p.Config.GetString("modules.gcppubsub.project.id")) if err != nil { return nil, fmt.Errorf("failed to create pubsub client: %w", err) } @@ -66,7 +70,7 @@ func createTestClient(p FxGcpPubSubClientParam) (*pubsub.Client, error) { } client, err := pubsub.NewClient( - context.Background(), + p.Context, p.Config.GetString("modules.gcppubsub.project.id"), option.WithGRPCConn(conn), ) @@ -87,3 +91,16 @@ func createTestClient(p FxGcpPubSubClientParam) (*pubsub.Client, error) { return client, nil } + +// FxGcpPubSubSchemaClientParam allows injection of the required dependencies in [NewFxGcpPubSubSchemaClient]. +type FxGcpPubSubSchemaClientParam struct { + fx.In + Context context.Context + Config *config.Config +} + +// NewFxGcpPubSubSchemaClient returns a [pubsub.SchemaClient]. +func NewFxGcpPubSubSchemaClient(p FxGcpPubSubSchemaClientParam) (*pubsub.SchemaClient, error) { + return pubsub.NewSchemaClient(p.Context, p.Config.GetString("modules.gcppubsub.project.id")) + +} diff --git a/fxgcppubsub/schema/registry.go b/fxgcppubsub/schema/registry.go new file mode 100644 index 0000000..168878c --- /dev/null +++ b/fxgcppubsub/schema/registry.go @@ -0,0 +1,39 @@ +package schema + +import ( + "context" + "sync" + + "cloud.google.com/go/pubsub" +) + +type SchemaRegistry struct { + client *pubsub.SchemaClient + schemas map[string]*pubsub.SchemaConfig + mutex sync.RWMutex +} + +func NewSchemaRegistry(client *pubsub.SchemaClient) *SchemaRegistry { + return &SchemaRegistry{ + client: client, + schemas: make(map[string]*pubsub.SchemaConfig), + } +} + +func (r *SchemaRegistry) Get(ctx context.Context, schemaID string) (*pubsub.SchemaConfig, error) { + r.mutex.Lock() + defer r.mutex.Unlock() + + if _, found := r.schemas[schemaID]; found { + return r.schemas[schemaID], nil + } + + schema, err := r.client.Schema(ctx, schemaID, pubsub.SchemaViewFull) + if err != nil { + return nil, err + } + + r.schemas[schemaID] = schema + + return schema, nil +}