From d7ed86d282585c9c948628baa7c966891c95bc76 Mon Sep 17 00:00:00 2001 From: Jonathan Vuillemin Date: Mon, 10 Jun 2024 14:32:48 +0200 Subject: [PATCH] feat(fxgcppubsub): Added Avro and Protobuf schemas support --- fxgcppubsub/.golangci.yml | 2 - fxgcppubsub/healthcheck/subscription_test.go | 151 ------------------- fxgcppubsub/healthcheck/topic_test.go | 133 ---------------- fxgcppubsub/module.go | 4 + fxgcppubsub/module_test.go | 46 +++--- fxgcppubsub/schema/registry.go | 2 - fxgcppubsub/subscription/option.go | 1 - fxgcppubsub/topic/option.go | 1 - 8 files changed, 22 insertions(+), 318 deletions(-) delete mode 100644 fxgcppubsub/healthcheck/subscription_test.go delete mode 100644 fxgcppubsub/healthcheck/topic_test.go diff --git a/fxgcppubsub/.golangci.yml b/fxgcppubsub/.golangci.yml index beb045a..3499e07 100644 --- a/fxgcppubsub/.golangci.yml +++ b/fxgcppubsub/.golangci.yml @@ -10,7 +10,6 @@ linters: - bodyclose - containedctx - contextcheck - - cyclop - decorder - dogsled - durationcheck @@ -18,7 +17,6 @@ linters: - errchkjson - errname - errorlint - - exhaustive - forbidigo - forcetypeassert - gocognit diff --git a/fxgcppubsub/healthcheck/subscription_test.go b/fxgcppubsub/healthcheck/subscription_test.go deleted file mode 100644 index cdd5451..0000000 --- a/fxgcppubsub/healthcheck/subscription_test.go +++ /dev/null @@ -1,151 +0,0 @@ -package healthcheck_test - -import ( - "context" - "testing" - - "cloud.google.com/go/pubsub" - "github.com/ankorstore/yokai-contrib/fxgcppubsub" - fxgcppubsubhealthcheck "github.com/ankorstore/yokai-contrib/fxgcppubsub/healthcheck" - "github.com/ankorstore/yokai/config" - "github.com/ankorstore/yokai/fxconfig" - "github.com/stretchr/testify/assert" - "go.uber.org/fx" - "go.uber.org/fx/fxtest" -) - -func TestWithExistingSubscriptions(t *testing.T) { - t.Setenv("APP_ENV", "test") - t.Setenv("APP_CONFIG_PATH", "../testdata/config") - t.Setenv("GCP_PROJECT_ID", "test-project") - - var client *pubsub.Client - var conf *config.Config - - app := fxtest.New( - t, - fx.NopLogger, - fxgcppubsub.FxGcpPubSubModule, - fxconfig.FxConfigModule, - fx.Populate(&client, &conf), - ).RequireStart() - - topic, err := client.CreateTopic(context.Background(), "topic1") - assert.NoError(t, err) - - for _, subscription := range conf.GetStringSlice("modules.gcppubsub.healthcheck.subscriptions") { - _, err = client.CreateSubscription(context.Background(), subscription, pubsub.SubscriptionConfig{ - Topic: topic, - }) - assert.NoError(t, err) - } - - p := fxgcppubsubhealthcheck.NewGcpPubSubSubscriptionsProbe(conf, client) - assert.Equal(t, fxgcppubsubhealthcheck.SubscriptionsProbeName, p.Name()) - checkResult := p.Check(context.Background()) - - app.RequireStop() - - assert.True(t, checkResult.Success) - assert.Equal( - t, - "subscription subscription1 exists, subscription subscription2 exists, subscription subscription3 exists", - checkResult.Message, - ) -} - -func TestWithMissingSubscriptions(t *testing.T) { - t.Setenv("APP_ENV", "test") - t.Setenv("APP_CONFIG_PATH", "../testdata/config") - t.Setenv("GCP_PROJECT_ID", "test-project") - - var client *pubsub.Client - var conf *config.Config - - app := fxtest.New( - t, - fx.NopLogger, - fxgcppubsub.FxGcpPubSubModule, - fxconfig.FxConfigModule, - fx.Populate(&client, &conf), - ).RequireStart() - - topic, err := client.CreateTopic(context.Background(), "topic1") - assert.NoError(t, err) - - for _, subscription := range conf.GetStringSlice("modules.gcppubsub.healthcheck.subscriptions") { - if subscription != "subscription2" { - _, err = client.CreateSubscription(context.Background(), subscription, pubsub.SubscriptionConfig{ - Topic: topic, - }) - assert.NoError(t, err) - } - } - - p := fxgcppubsubhealthcheck.NewGcpPubSubSubscriptionsProbe(conf, client) - assert.Equal(t, fxgcppubsubhealthcheck.SubscriptionsProbeName, p.Name()) - checkResult := p.Check(context.Background()) - - app.RequireStop() - - assert.False(t, checkResult.Success) - assert.Equal( - t, - "subscription subscription1 exists, subscription subscription2 does not exist, subscription subscription3 exists", - checkResult.Message, - ) -} - -func TestWithEmptySubscriptions(t *testing.T) { - t.Setenv("APP_ENV", "test") - t.Setenv("APP_CONFIG_PATH", "../testdata/config") - t.Setenv("GCP_PROJECT_ID", "test-project") - - // empty the subscriptions list - t.Setenv("MODULES_GCPPUBSUB_HEALTHCHECK_SUBSCRIPTIONS", " ") - - var client *pubsub.Client - var conf *config.Config - - app := fxtest.New( - t, - fx.NopLogger, - fxgcppubsub.FxGcpPubSubModule, - fxconfig.FxConfigModule, - fx.Populate(&client, &conf), - ).RequireStart() - - p := fxgcppubsubhealthcheck.NewGcpPubSubSubscriptionsProbe(conf, client) - assert.Equal(t, fxgcppubsubhealthcheck.SubscriptionsProbeName, p.Name()) - checkResult := p.Check(context.Background()) - - app.RequireStop() - - assert.True(t, checkResult.Success) -} - -func TestWithFailingSubscriptions(t *testing.T) { - t.Setenv("APP_ENV", "test") - t.Setenv("APP_CONFIG_PATH", "../testdata/config") - t.Setenv("GCP_PROJECT_ID", "test-project") - - var client *pubsub.Client - var conf *config.Config - - fxtest.New( - t, - fx.NopLogger, - fxgcppubsub.FxGcpPubSubModule, - fxconfig.FxConfigModule, - fx.Populate(&client, &conf), - ).RequireStart().RequireStop() - - p := fxgcppubsubhealthcheck.NewGcpPubSubSubscriptionsProbe(conf, client) - assert.Equal(t, fxgcppubsubhealthcheck.SubscriptionsProbeName, p.Name()) - checkResult := p.Check(context.Background()) - - assert.False(t, checkResult.Success) - assert.Contains(t, checkResult.Message, "subscription subscription1 error: rpc error") - assert.Contains(t, checkResult.Message, "subscription subscription2 error: rpc error") - assert.Contains(t, checkResult.Message, "subscription subscription3 error: rpc error") -} diff --git a/fxgcppubsub/healthcheck/topic_test.go b/fxgcppubsub/healthcheck/topic_test.go deleted file mode 100644 index e9f9e93..0000000 --- a/fxgcppubsub/healthcheck/topic_test.go +++ /dev/null @@ -1,133 +0,0 @@ -package healthcheck_test - -import ( - "context" - "testing" - - "cloud.google.com/go/pubsub" - "github.com/ankorstore/yokai-contrib/fxgcppubsub" - fxgcppubsubhealthcheck "github.com/ankorstore/yokai-contrib/fxgcppubsub/healthcheck" - "github.com/ankorstore/yokai/config" - "github.com/ankorstore/yokai/fxconfig" - "github.com/stretchr/testify/assert" - "go.uber.org/fx" - "go.uber.org/fx/fxtest" -) - -func TestWithExistingTopics(t *testing.T) { - t.Setenv("APP_ENV", "test") - t.Setenv("APP_CONFIG_PATH", "../testdata/config") - t.Setenv("GCP_PROJECT_ID", "test-project") - - var client *pubsub.Client - var conf *config.Config - - app := fxtest.New( - t, - fx.NopLogger, - fxgcppubsub.FxGcpPubSubModule, - fxconfig.FxConfigModule, - fx.Populate(&client, &conf), - ).RequireStart() - - for _, topic := range conf.GetStringSlice("modules.gcppubsub.healthcheck.topics") { - _, err := client.CreateTopic(context.Background(), topic) - assert.NoError(t, err) - } - - p := fxgcppubsubhealthcheck.NewGcpPubSubTopicsProbe(conf, client) - assert.Equal(t, fxgcppubsubhealthcheck.TopicsProbeName, p.Name()) - checkResult := p.Check(context.Background()) - - app.RequireStop() - - assert.True(t, checkResult.Success) - assert.Equal(t, "topic topic1 exists, topic topic2 exists, topic topic3 exists", checkResult.Message) -} - -func TestWithMissingTopics(t *testing.T) { - t.Setenv("APP_ENV", "test") - t.Setenv("APP_CONFIG_PATH", "../testdata/config") - t.Setenv("GCP_PROJECT_ID", "test-project") - - var client *pubsub.Client - var conf *config.Config - - app := fxtest.New( - t, - fx.NopLogger, - fxgcppubsub.FxGcpPubSubModule, - fxconfig.FxConfigModule, - fx.Populate(&client, &conf), - ).RequireStart() - - for _, topic := range conf.GetStringSlice("modules.gcppubsub.healthcheck.topics") { - if topic != "topic2" { - _, err := client.CreateTopic(context.Background(), topic) - assert.NoError(t, err) - } - } - - p := fxgcppubsubhealthcheck.NewGcpPubSubTopicsProbe(conf, client) - assert.Equal(t, fxgcppubsubhealthcheck.TopicsProbeName, p.Name()) - checkResult := p.Check(context.Background()) - - app.RequireStop() - - assert.False(t, checkResult.Success) - assert.Equal(t, "topic topic1 exists, topic topic2 does not exist, topic topic3 exists", checkResult.Message) -} - -func TestWithEmptyTopics(t *testing.T) { - t.Setenv("APP_ENV", "test") - t.Setenv("APP_CONFIG_PATH", "../testdata/config") - t.Setenv("GCP_PROJECT_ID", "test-project") - - // empty the topics list - t.Setenv("MODULES_GCPPUBSUB_HEALTHCHECK_TOPICS", " ") - - var client *pubsub.Client - var conf *config.Config - - app := fxtest.New( - t, - fx.NopLogger, - fxgcppubsub.FxGcpPubSubModule, - fxconfig.FxConfigModule, - fx.Populate(&client, &conf), - ).RequireStart() - - p := fxgcppubsubhealthcheck.NewGcpPubSubTopicsProbe(conf, client) - assert.Equal(t, fxgcppubsubhealthcheck.TopicsProbeName, p.Name()) - checkResult := p.Check(context.Background()) - - app.RequireStop() - - assert.True(t, checkResult.Success) -} - -func TestWithFailingTopics(t *testing.T) { - t.Setenv("APP_ENV", "test") - t.Setenv("APP_CONFIG_PATH", "../testdata/config") - t.Setenv("GCP_PROJECT_ID", "test-project") - - var client *pubsub.Client - var conf *config.Config - - fxtest.New( - t, - fx.NopLogger, - fxgcppubsub.FxGcpPubSubModule, - fxconfig.FxConfigModule, - fx.Populate(&client, &conf), - ).RequireStart().RequireStop() - - p := fxgcppubsubhealthcheck.NewGcpPubSubTopicsProbe(conf, client) - assert.Equal(t, fxgcppubsubhealthcheck.TopicsProbeName, p.Name()) - checkResult := p.Check(context.Background()) - - assert.False(t, checkResult.Success) - assert.Contains(t, checkResult.Message, "topic topic1 error: rpc error") - assert.Contains(t, checkResult.Message, "topic topic2 error: rpc error") - assert.Contains(t, checkResult.Message, "topic topic3 error: rpc error") -} diff --git a/fxgcppubsub/module.go b/fxgcppubsub/module.go index a50d867..6bae93d 100644 --- a/fxgcppubsub/module.go +++ b/fxgcppubsub/module.go @@ -44,6 +44,8 @@ func NewFxGcpPubSubTestServer() *pstest.Server { } // FxGcpPubSubClientParam allows injection of the required dependencies in [NewFxGcpPubSubClient]. +// +//nolint:containedctx type FxGcpPubSubClientParam struct { fx.In LifeCycle fx.Lifecycle @@ -84,6 +86,8 @@ func NewFxGcpPubSubClient(p FxGcpPubSubClientParam) (*pubsub.Client, error) { } // FxGcpPubSubSchemaClientParam allows injection of the required dependencies in [NewFxGcpPubSubSchemaClient]. +// +//nolint:containedctx type FxGcpPubSubSchemaClientParam struct { fx.In LifeCycle fx.Lifecycle diff --git a/fxgcppubsub/module_test.go b/fxgcppubsub/module_test.go index b12746d..4b35c7a 100644 --- a/fxgcppubsub/module_test.go +++ b/fxgcppubsub/module_test.go @@ -14,11 +14,16 @@ import ( ) func TestFxGcpPubSubModule(t *testing.T) { + ctx := context.Background() + app := fxtest.New( t, fx.NopLogger, fxgcppubsub.FxGcpPubSubModule, fxconfig.FxConfigModule, + fx.Supply( + fx.Annotate(ctx, fx.As(new(context.Context))), + ), ) app.RequireStart() @@ -34,6 +39,8 @@ func TestFxGcpPubSubClient(t *testing.T) { t.Setenv("GCP_PROJECT_ID", "project-test") t.Setenv("PUBSUB_EMULATOR_HOST", "localhost") + ctx := context.Background() + var conf *config.Config var client *pubsub.Client @@ -41,15 +48,18 @@ func TestFxGcpPubSubClient(t *testing.T) { fx.NopLogger, fxconfig.FxConfigModule, fxgcppubsub.FxGcpPubSubModule, + fx.Supply( + fx.Annotate(ctx, fx.As(new(context.Context))), + ), fx.Populate(&conf, &client), ) - err := app.Start(context.Background()) + err := app.Start(ctx) assert.NoError(t, err, "failed to create pubsub.Client") assert.NotNil(t, client) assert.Equal(t, "project-test", client.Project()) - err = app.Stop(context.Background()) + err = app.Stop(ctx) assert.NoError(t, err, "failed to close pubsub.Client") } @@ -57,6 +67,8 @@ func TestFxGcpPubSubClientWithoutProjectId(t *testing.T) { t.Setenv("APP_ENV", "dev") t.Setenv("APP_CONFIG_PATH", "testdata/config") + ctx := context.Background() + var conf *config.Config var client *pubsub.Client @@ -64,34 +76,12 @@ func TestFxGcpPubSubClientWithoutProjectId(t *testing.T) { fx.NopLogger, fxconfig.FxConfigModule, fxgcppubsub.FxGcpPubSubModule, + fx.Supply( + fx.Annotate(ctx, fx.As(new(context.Context))), + ), fx.Populate(&conf, &client), ) - err := app.Start(context.Background()) + err := app.Start(ctx) assert.Contains(t, err.Error(), "failed to create pubsub client: pubsub: projectID string is empty") } - -func TestNewFxGcpPubSubForTestClient(t *testing.T) { - t.Setenv("APP_ENV", "test") - t.Setenv("APP_CONFIG_PATH", "testdata/config") - t.Setenv("GCP_PROJECT_ID", "project-test") - - var conf *config.Config - var client *pubsub.Client - - app := fxtest.New( - t, - fx.NopLogger, - fxconfig.FxConfigModule, - fxgcppubsub.FxGcpPubSubModule, - fx.Populate(&conf, &client), - ) - - app.RequireStart() - assert.NoError(t, app.Err(), "failed to create pubsub.Client") - assert.NotNil(t, client) - assert.Equal(t, "project-test", client.Project()) - - app.RequireStop() - assert.NoError(t, app.Err(), "failed to close pubsub.Client") -} diff --git a/fxgcppubsub/schema/registry.go b/fxgcppubsub/schema/registry.go index 2ff6bbb..343c938 100644 --- a/fxgcppubsub/schema/registry.go +++ b/fxgcppubsub/schema/registry.go @@ -2,7 +2,6 @@ package schema import ( "context" - "fmt" "strings" "sync" @@ -36,7 +35,6 @@ func (r *SchemaRegistry) Get(ctx context.Context, schemaID string) (*pubsub.Sche schema, err := r.client.Schema(ctx, schemaID, pubsub.SchemaViewFull) if err != nil { - fmt.Printf("error: %s\n", err) return nil, err } diff --git a/fxgcppubsub/subscription/option.go b/fxgcppubsub/subscription/option.go index 25981de..82eb2a1 100644 --- a/fxgcppubsub/subscription/option.go +++ b/fxgcppubsub/subscription/option.go @@ -14,7 +14,6 @@ func DefaultSubscribeOptions() Options { return Options{ Settings: pubsub.DefaultReceiveSettings, } - } type SubscribeOption func(o *Options) diff --git a/fxgcppubsub/topic/option.go b/fxgcppubsub/topic/option.go index c36b809..cf532b9 100644 --- a/fxgcppubsub/topic/option.go +++ b/fxgcppubsub/topic/option.go @@ -14,7 +14,6 @@ func DefaultPublishOptions() Options { return Options{ Settings: pubsub.DefaultPublishSettings, } - } type PublishOption func(o *Options)