diff --git a/control-plane/pkg/reconciler/broker/controller.go b/control-plane/pkg/reconciler/broker/controller.go index 113d51ce97..781dd16a8e 100644 --- a/control-plane/pkg/reconciler/broker/controller.go +++ b/control-plane/pkg/reconciler/broker/controller.go @@ -88,8 +88,13 @@ func NewController(ctx context.Context, watcher configmap.Watcher, env *config.E ) } + featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store")) + featureStore.WatchConfigs(watcher) + impl := brokerreconciler.NewImpl(ctx, reconciler, kafka.BrokerClass, func(impl *controller.Impl) controller.Options { - return controller.Options{PromoteFilterFunc: kafka.BrokerClassFilter()} + return controller.Options{ + ConfigStore: featureStore, + PromoteFilterFunc: kafka.BrokerClassFilter()} }) reconciler.Resolver = resolver.NewURIResolverFromTracker(ctx, impl.Tracker) @@ -97,6 +102,7 @@ func NewController(ctx context.Context, watcher configmap.Watcher, env *config.E features := feature.FromContext(ctx) caCerts, err := reconciler.getCaCerts() + if err != nil && (features.IsStrictTransportEncryption() || features.IsPermissiveTransportEncryption()) { // We only need to warn here as the broker won't reconcile properly without the proper certs because the prober won't succeed logger.Warn("Failed to get CA certs when at least one address uses TLS", zap.Error(err)) diff --git a/control-plane/pkg/reconciler/broker/controller_test.go b/control-plane/pkg/reconciler/broker/controller_test.go index 555c32e56c..e714216e5d 100644 --- a/control-plane/pkg/reconciler/broker/controller_test.go +++ b/control-plane/pkg/reconciler/broker/controller_test.go @@ -89,6 +89,10 @@ func TestNewController(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: apisconfig.FlagsConfigName, }, + }, &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "config-features", + }, }), env, ) diff --git a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/metrics/Metrics.java b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/metrics/Metrics.java index 9f0a11bc3d..6ff61f3d19 100644 --- a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/metrics/Metrics.java +++ b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/metrics/Metrics.java @@ -62,17 +62,22 @@ public class Metrics { public static final boolean DISABLE_KAFKA_CLIENTS_METRICS = Boolean.parseBoolean(System.getenv("DISABLE_KAFKA_CLIENTS_METRICS")); - // There are different thread polls usable, mainly, each with its own drawbacks for our use case: - // - cached thread pools - // - fixed thread pools + // There are different thread polls usable, mainly, each with its own drawbacks + // for our use case: + // - cached thread pools + // - fixed thread pools // - // A cached thread might grow unbounded and since creating, updating and deleting resources - // trigger the usage of this executor, a bad actor might start continuously creating, updating + // A cached thread might grow unbounded and since creating, updating and + // deleting resources + // trigger the usage of this executor, a bad actor might start continuously + // creating, updating // and deleting resources which will cause resource exhaustion. // - // A fixed thread poll doesn't give the best possible latency for every resource, but it's + // A fixed thread poll doesn't give the best possible latency for every + // resource, but it's // bounded, so we keep the resource usage under control. - // We might want to provide configs to make it bigger than a single thread but a single thread + // We might want to provide configs to make it bigger than a single thread but a + // single thread // to start with is good enough for now. public static final ExecutorService meterBinderExecutor = Executors.newSingleThreadExecutor(); @@ -80,17 +85,19 @@ public class Metrics { Runtime.getRuntime().addShutdownHook(new Thread(meterBinderExecutor::shutdown)); } - // Micrometer employs a naming convention that separates lowercase words with a '.' (dot) character. - // Different monitoring systems have different recommendations regarding naming convention, and some naming + // Micrometer employs a naming convention that separates lowercase words with a + // '.' (dot) character. + // Different monitoring systems have different recommendations regarding naming + // convention, and some naming // conventions may be incompatible for one system and not another. - // Each Micrometer implementation for a monitoring system comes with a naming convention that transforms lowercase + // Each Micrometer implementation for a monitoring system comes with a naming + // convention that transforms lowercase // dot notation names to the monitoring system’s recommended naming convention. - // Additionally, this naming convention implementation sanitizes metric names and tags of special characters that + // Additionally, this naming convention implementation sanitizes metric names + // and tags of special characters that // are disallowed by the monitoring system. - /** - * In prometheus format --> http_events_sent_total - */ + /** In prometheus format --> http_events_sent_total */ public static final String HTTP_EVENTS_SENT_COUNT = "http.events.sent"; /** @@ -221,8 +228,8 @@ public static MeterRegistry getRegistry() { * Register the given consumer to the global meter registry. * * @param consumer consumer to bind to the global registry. - * @param Record key type. - * @param Record value type. + * @param Record key type. + * @param Record value type. * @return A meter binder to close once the consumer is closed. */ public static AsyncCloseable register(final Consumer consumer) { @@ -233,8 +240,8 @@ public static AsyncCloseable register(final Consumer consumer) { * Register the given producer to the global meter registry. * * @param producer Consumer to bind to the global registry. - * @param Record key type. - * @param Record value type. + * @param Record key type. + * @param Record value type. * @return A meter binder to close once the producer is closed. */ public static AsyncCloseable register(final Producer producer) { @@ -270,7 +277,8 @@ private static AsyncCloseable register(final Supplier metric }; } catch (final RejectedExecutionException ex) { - // if this task cannot be accepted for execution when the executor has been shutdown. + // if this task cannot be accepted for execution when the executor has been + // shutdown. logger.warn("Failed to bind metrics for Kafka client", ex); } } diff --git a/test/config-transport-encryption/features.yaml b/test/config-transport-encryption/features.yaml new file mode 100644 index 0000000000..ce2d561784 --- /dev/null +++ b/test/config-transport-encryption/features.yaml @@ -0,0 +1,32 @@ +# Copyright 2021 The Knative Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: v1 +kind: ConfigMap +metadata: + name: config-features + namespace: knative-eventing + labels: + knative.dev/config-propagation: original + knative.dev/config-category: eventing +data: + authentication.oidc: "disabled" + delivery-retryafter: "disabled" + delivery-timeout: "enabled" + eventtype-auto-create: "disabled" + kreference-group: "disabled" + kreference-mapping: "disabled" + new-trigger-filters: "enabled" + strict-subscriber: "disabled" + transport-encryption: "Strict" diff --git a/test/e2e_new/broker_eventing_tls_test.go b/test/e2e_new/broker_eventing_tls_test.go new file mode 100644 index 0000000000..123d9fb61d --- /dev/null +++ b/test/e2e_new/broker_eventing_tls_test.go @@ -0,0 +1,48 @@ +//go:build e2e +// +build e2e + +/* + * Copyright 2023 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package e2e_new + +import ( + "testing" + "time" + + "knative.dev/eventing-kafka-broker/test/rekt/features" + "knative.dev/pkg/system" + "knative.dev/reconciler-test/pkg/environment" + "knative.dev/reconciler-test/pkg/eventshub" + "knative.dev/reconciler-test/pkg/k8s" + "knative.dev/reconciler-test/pkg/knative" +) + +func TestBrokerTLSCARotation(t *testing.T) { + t.Parallel() + + ctx, env := global.Environment( + knative.WithKnativeNamespace(system.Namespace()), + knative.WithLoggingConfig, + knative.WithTracingConfig, + k8s.WithEventListener, + environment.Managed(t), + eventshub.WithTLS(t), + environment.WithPollTimings(5*time.Second, 4*time.Minute), + ) + + env.Test(ctx, t, features.RotateBrokerTLSCertificates()) +} diff --git a/test/reconciler-tests.sh b/test/reconciler-tests.sh index 419afdd382..1aa9a5ba6d 100755 --- a/test/reconciler-tests.sh +++ b/test/reconciler-tests.sh @@ -48,6 +48,12 @@ go_test_e2e -tags=e2e,cloudevents -timeout=1h ./test/e2e_new_channel/... || fail go_test_e2e -tags=deletecm ./test/e2e_new/... || fail_test "E2E (new deletecm) suite failed" +echo "Running E2E Reconciler Tests with strict transport encryption" + +kubectl apply -Rf "$(dirname "$0")/config-transport-encryption" + +go_test_e2e -timeout=1h ./test/e2e_new -run TLS || fail_test + if ! ${LOCAL_DEVELOPMENT}; then go_test_e2e -tags=sacura -timeout=40m ./test/e2e/... || fail_test "E2E (sacura) suite failed" fi diff --git a/test/rekt/features/broker_auth.go b/test/rekt/features/broker_auth.go index f66f8edca4..24acf2706c 100644 --- a/test/rekt/features/broker_auth.go +++ b/test/rekt/features/broker_auth.go @@ -21,15 +21,19 @@ import ( "time" "github.com/cloudevents/sdk-go/v2/test" + "github.com/google/uuid" testpkg "knative.dev/eventing-kafka-broker/test/pkg" "knative.dev/eventing-kafka-broker/test/rekt/resources/kafkaauthsecret" + "knative.dev/eventing/test/rekt/resources/broker" "knative.dev/eventing/test/rekt/resources/trigger" + "knative.dev/reconciler-test/pkg/eventshub" "knative.dev/reconciler-test/pkg/eventshub/assert" "knative.dev/reconciler-test/pkg/feature" "knative.dev/reconciler-test/pkg/manifest" + "knative.dev/reconciler-test/resources/svc" brokerconfigmap "knative.dev/eventing-kafka-broker/test/rekt/resources/configmap/broker" diff --git a/test/rekt/features/broker_tls.go b/test/rekt/features/broker_tls.go new file mode 100644 index 0000000000..705693197b --- /dev/null +++ b/test/rekt/features/broker_tls.go @@ -0,0 +1,111 @@ +/* + * Copyright 2023 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package features + +import ( + "context" + "time" + + "k8s.io/apimachinery/pkg/types" + "knative.dev/eventing/test/rekt/resources/addressable" + "knative.dev/reconciler-test/resources/certificate" + + testpkg "knative.dev/eventing-kafka-broker/test/pkg" + + cetest "github.com/cloudevents/sdk-go/v2/test" + "github.com/google/uuid" + + brokerconfigmap "knative.dev/eventing-kafka-broker/test/rekt/resources/configmap/broker" + "knative.dev/eventing/test/rekt/features/featureflags" + "knative.dev/eventing/test/rekt/resources/broker" + "knative.dev/eventing/test/rekt/resources/trigger" + "knative.dev/pkg/system" + "knative.dev/reconciler-test/pkg/eventshub" + "knative.dev/reconciler-test/pkg/eventshub/assert" + "knative.dev/reconciler-test/pkg/feature" + "knative.dev/reconciler-test/pkg/resources/service" +) + +func RotateBrokerTLSCertificates() *feature.Feature { + + ingressCertificateName := "kafka-broker-ingress-server-tls" + ingressSecretName := "kafka-broker-ingress-server-tls" + + brokerName := feature.MakeRandomK8sName("broker") + triggerName := feature.MakeRandomK8sName("trigger") + sink := feature.MakeRandomK8sName("sink") + source := feature.MakeRandomK8sName("source") + + f := feature.NewFeatureNamed("Rotate Kafka Broker TLS certificate") + + brokerConfig := feature.MakeRandomK8sName("brokercfg") + + f.Prerequisite("transport encryption is strict", featureflags.TransportEncryptionStrict()) + f.Prerequisite("should not run when Istio is enabled", featureflags.IstioDisabled()) + + f.Setup("Create broker config", brokerconfigmap.Install(brokerConfig, + brokerconfigmap.WithNumPartitions(1), + brokerconfigmap.WithReplicationFactor(1), + brokerconfigmap.WithBootstrapServer(testpkg.BootstrapServersPlaintext))) + + f.Setup("Rotate ingress certificate", certificate.Rotate(certificate.RotateCertificate{ + Certificate: types.NamespacedName{ + Namespace: system.Namespace(), + Name: ingressCertificateName, + }, + })) + + f.Setup("install sink", eventshub.Install(sink, eventshub.StartReceiverTLS)) + f.Setup("Install broker", broker.Install(brokerName, append( + broker.WithEnvConfig(), + broker.WithConfig(brokerConfig))..., + )) + f.Setup("Broker is ready", broker.IsReady(brokerName)) + f.Setup("install trigger", func(ctx context.Context, t feature.T) { + d := service.AsDestinationRef(sink) + d.CACerts = eventshub.GetCaCerts(ctx) + trigger.Install(triggerName, brokerName, trigger.WithSubscriberFromDestination(d))(ctx, t) + }) + f.Setup("trigger is ready", trigger.IsReady(triggerName)) + f.Setup("Broker has HTTPS address", broker.ValidateAddress(brokerName, addressable.AssertHTTPSAddress)) + + event := cetest.FullEvent() + event.SetID(uuid.New().String()) + + f.Requirement("install source", eventshub.Install(source, + eventshub.StartSenderToResourceTLS(broker.GVR(), brokerName, nil), + eventshub.InputEvent(event), + // Send multiple events so that we take into account that the certificate rotation might + // be detected by the server after some time. + eventshub.SendMultipleEvents(100, 3*time.Second), + )) + + f.Assert("Event sent", assert.OnStore(source). + MatchSentEvent(cetest.HasId(event.ID())). + AtLeast(1), + ) + f.Assert("Event received", assert.OnStore(sink). + MatchReceivedEvent(cetest.HasId(event.ID())). + AtLeast(1), + ) + f.Assert("Source match updated peer certificate", assert.OnStore(source). + MatchPeerCertificatesReceived(assert.MatchPeerCertificatesFromSecret(system.Namespace(), ingressSecretName, "tls.crt")). + AtLeast(1), + ) + + return f +}