diff --git a/cmd/catalog/main.go b/cmd/catalog/main.go index d282dfebb3..b82f1689cb 100644 --- a/cmd/catalog/main.go +++ b/cmd/catalog/main.go @@ -7,6 +7,9 @@ import ( "os" "time" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" + configv1client "github.com/openshift/client-go/config/clientset/versioned/typed/config/v1" "github.com/sirupsen/logrus" k8sscheme "k8s.io/client-go/kubernetes/scheme" @@ -88,6 +91,14 @@ func (o *options) run(ctx context.Context, logger *logrus.Logger) error { if o.setWorkloadUserID { workloadUserID = defaultWorkLoadUserID } + + // the scheme is used by the catalog operator to create + // a validatingroundtripper that ensures that all created + // resources are appropriately labeled + scheme := k8sscheme.Scheme + _ = apiextensionsv1.AddToScheme(scheme) // required by opClient + _ = apiregistrationv1.AddToScheme(scheme) // required by opClient + // TODO(tflannag): Use options pattern for catalog operator // Create a new instance of the operator. op, err := catalog.NewOperator( @@ -100,7 +111,7 @@ func (o *options) run(ctx context.Context, logger *logrus.Logger) error { o.opmImage, o.utilImage, o.catalogNamespace, - k8sscheme.Scheme, + scheme, o.installPlanTimeout, o.bundleUnpackTimeout, workloadUserID, diff --git a/cmd/olm/main.go b/cmd/olm/main.go index 6d76606dc3..715ae9aea0 100644 --- a/cmd/olm/main.go +++ b/cmd/olm/main.go @@ -141,7 +141,7 @@ func main() { config := mgr.GetConfig() // create a config that validates we're creating objects with labels - validatingConfig := validatingroundtripper.Wrap(config) + validatingConfig := validatingroundtripper.Wrap(config, mgr.GetScheme()) versionedConfigClient, err := configclientset.NewForConfig(config) if err != nil { diff --git a/pkg/controller/operators/catalog/operator.go b/pkg/controller/operators/catalog/operator.go index ebdd6313a4..09c28a7319 100644 --- a/pkg/controller/operators/catalog/operator.go +++ b/pkg/controller/operators/catalog/operator.go @@ -149,7 +149,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo } // create a config that validates we're creating objects with labels - validatingConfig := validatingroundtripper.Wrap(config) + validatingConfig := validatingroundtripper.Wrap(config, scheme) // Create a new client for dynamic types (CRs) dynamicClient, err := dynamic.NewForConfig(validatingConfig) diff --git a/pkg/controller/operators/validatingroundtripper/validating_round_tripper.go b/pkg/controller/operators/validatingroundtripper/validating_round_tripper.go index c9c1cbd395..333bc7740e 100644 --- a/pkg/controller/operators/validatingroundtripper/validating_round_tripper.go +++ b/pkg/controller/operators/validatingroundtripper/validating_round_tripper.go @@ -2,9 +2,13 @@ package validatingroundtripper import ( "fmt" + "io" "net/http" "os" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer" + "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/install" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/util/yaml" @@ -13,23 +17,69 @@ import ( type validatingRoundTripper struct { delegate http.RoundTripper + codecs serializer.CodecFactory +} + +func (rt *validatingRoundTripper) decodeYAMLOrJSON(body io.Reader) (*unstructured.Unstructured, error) { + dec := yaml.NewYAMLOrJSONDecoder(body, 10) + unstructuredObject := &unstructured.Unstructured{} + if err := dec.Decode(unstructuredObject); err != nil { + return nil, fmt.Errorf("error decoding yaml/json object to an unstructured object: %w", err) + } + return unstructuredObject, nil +} + +func (rt *validatingRoundTripper) decodeProtobuf(body io.Reader) (*unstructured.Unstructured, error) { + data, err := io.ReadAll(body) + if err != nil { + return nil, fmt.Errorf("failed to read request body: %w", err) + } + + decoder := rt.codecs.UniversalDeserializer() + obj, _, err := decoder.Decode(data, nil, nil) + if err != nil { + return nil, fmt.Errorf("failed to decode protobuf data: %w", err) + } + + unstructuredObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj) + if err != nil { + return nil, fmt.Errorf("failed to convert object to unstructured: %w", err) + } + + return &unstructured.Unstructured{Object: unstructuredObj}, nil +} + +func (rt *validatingRoundTripper) decodeRequestBody(req *http.Request) (*unstructured.Unstructured, error) { + b, err := req.GetBody() + if err != nil { + panic(fmt.Errorf("failed to get request body: %w", err)) + } + defer b.Close() + + switch req.Header.Get("Content-Type") { + case "application/vnd.kubernetes.protobuf": + return rt.decodeProtobuf(b) + default: + return rt.decodeYAMLOrJSON(b) + } } func (rt *validatingRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { if req.Method == "POST" { - b, err := req.GetBody() + unstructuredObject, err := rt.decodeRequestBody(req) + if err != nil { - panic(err) - } - dec := yaml.NewYAMLOrJSONDecoder(b, 10) - unstructuredObject := &unstructured.Unstructured{} - if err := dec.Decode(unstructuredObject); err != nil { - panic(fmt.Errorf("error decoding object to an unstructured object: %w", err)) + return nil, err } + gvk := unstructuredObject.GroupVersionKind() if gvk.Kind != "Event" { - if labels := unstructuredObject.GetLabels(); labels[install.OLMManagedLabelKey] != install.OLMManagedLabelValue { - panic(fmt.Errorf("%s.%s/%v %s/%s does not have labels[%s]=%s", gvk.Kind, gvk.Group, gvk.Version, unstructuredObject.GetNamespace(), unstructuredObject.GetName(), install.OLMManagedLabelKey, install.OLMManagedLabelValue)) + labels := unstructuredObject.GetLabels() + if labels[install.OLMManagedLabelKey] != install.OLMManagedLabelValue { + panic(fmt.Errorf("%s.%s/%v %s/%s does not have labels[%s]=%s", + gvk.Kind, gvk.Group, gvk.Version, + unstructuredObject.GetNamespace(), unstructuredObject.GetName(), + install.OLMManagedLabelKey, install.OLMManagedLabelValue)) } } } @@ -40,14 +90,17 @@ var _ http.RoundTripper = (*validatingRoundTripper)(nil) // Wrap is meant to be used in developer environments and CI to make it easy to find places // where we accidentally create Kubernetes objects without our management label. -func Wrap(cfg *rest.Config) *rest.Config { +func Wrap(cfg *rest.Config, scheme *runtime.Scheme) *rest.Config { if _, set := os.LookupEnv("CI"); !set { return cfg } cfgCopy := *cfg cfgCopy.Wrap(func(rt http.RoundTripper) http.RoundTripper { - return &validatingRoundTripper{delegate: rt} + return &validatingRoundTripper{ + delegate: rt, + codecs: serializer.NewCodecFactory(scheme), + } }) return &cfgCopy } diff --git a/pkg/lib/operatorclient/client.go b/pkg/lib/operatorclient/client.go index cc89369fb7..b53d2da7d4 100644 --- a/pkg/lib/operatorclient/client.go +++ b/pkg/lib/operatorclient/client.go @@ -172,25 +172,25 @@ func NewClientFromConfig(kubeconfig string, logger *logrus.Logger) ClientInterfa } func NewClientFromRestConfig(config *rest.Config) (client ClientInterface, err error) { - kubernetes, err := kubernetes.NewForConfig(config) + k8s, err := kubernetes.NewForConfig(config) if err != nil { return } - apiextensions, err := apiextensions.NewForConfig(config) + apiext, err := apiextensions.NewForConfig(config) if err != nil { return } - apiregistration, err := apiregistration.NewForConfig(config) + apireg, err := apiregistration.NewForConfig(config) if err != nil { return } client = &Client{ - kubernetes, - apiextensions, - apiregistration, + k8s, + apiext, + apireg, } return