Skip to content

Commit

Permalink
add protobuf object handling to validating round tripper
Browse files Browse the repository at this point in the history
Signed-off-by: Per Goncalves da Silva <[email protected]>
  • Loading branch information
Per Goncalves da Silva committed Jan 14, 2025
1 parent 616897a commit a3e3dfa
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 20 deletions.
13 changes: 12 additions & 1 deletion cmd/catalog/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
"os"
"time"

apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"

configv1client "github.com/openshift/client-go/config/clientset/versioned/typed/config/v1"
"github.com/sirupsen/logrus"
k8sscheme "k8s.io/client-go/kubernetes/scheme"
Expand Down Expand Up @@ -88,6 +91,14 @@ func (o *options) run(ctx context.Context, logger *logrus.Logger) error {
if o.setWorkloadUserID {
workloadUserID = defaultWorkLoadUserID
}

// the scheme is used by the catalog operator to create
// a validatingroundtripper that ensures that all created
// resources are appropriately labeled
scheme := k8sscheme.Scheme
_ = apiextensionsv1.AddToScheme(scheme) // required by opClient
_ = apiregistrationv1.AddToScheme(scheme) // required by opClient

// TODO(tflannag): Use options pattern for catalog operator
// Create a new instance of the operator.
op, err := catalog.NewOperator(
Expand All @@ -100,7 +111,7 @@ func (o *options) run(ctx context.Context, logger *logrus.Logger) error {
o.opmImage,
o.utilImage,
o.catalogNamespace,
k8sscheme.Scheme,
scheme,
o.installPlanTimeout,
o.bundleUnpackTimeout,
workloadUserID,
Expand Down
2 changes: 1 addition & 1 deletion cmd/olm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func main() {
config := mgr.GetConfig()

// create a config that validates we're creating objects with labels
validatingConfig := validatingroundtripper.Wrap(config)
validatingConfig := validatingroundtripper.Wrap(config, mgr.GetScheme())

versionedConfigClient, err := configclientset.NewForConfig(config)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
}

// create a config that validates we're creating objects with labels
validatingConfig := validatingroundtripper.Wrap(config)
validatingConfig := validatingroundtripper.Wrap(config, scheme)

// Create a new client for dynamic types (CRs)
dynamicClient, err := dynamic.NewForConfig(validatingConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ package validatingroundtripper

import (
"fmt"
"io"
"net/http"
"os"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"

"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/install"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/util/yaml"
Expand All @@ -13,23 +17,69 @@ import (

type validatingRoundTripper struct {
delegate http.RoundTripper
codecs serializer.CodecFactory
}

func (rt *validatingRoundTripper) decodeYAMLOrJSON(body io.Reader) (*unstructured.Unstructured, error) {
dec := yaml.NewYAMLOrJSONDecoder(body, 10)
unstructuredObject := &unstructured.Unstructured{}
if err := dec.Decode(unstructuredObject); err != nil {
return nil, fmt.Errorf("error decoding yaml/json object to an unstructured object: %w", err)
}
return unstructuredObject, nil
}

func (rt *validatingRoundTripper) decodeProtobuf(body io.Reader) (*unstructured.Unstructured, error) {
data, err := io.ReadAll(body)
if err != nil {
return nil, fmt.Errorf("failed to read request body: %w", err)
}

decoder := rt.codecs.UniversalDeserializer()
obj, _, err := decoder.Decode(data, nil, nil)
if err != nil {
return nil, fmt.Errorf("failed to decode protobuf data: %w", err)
}

unstructuredObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
if err != nil {
return nil, fmt.Errorf("failed to convert object to unstructured: %w", err)
}

return &unstructured.Unstructured{Object: unstructuredObj}, nil
}

func (rt *validatingRoundTripper) decodeRequestBody(req *http.Request) (*unstructured.Unstructured, error) {
b, err := req.GetBody()
if err != nil {
panic(fmt.Errorf("failed to get request body: %w", err))
}
defer b.Close()

switch req.Header.Get("Content-Type") {
case "application/vnd.kubernetes.protobuf":
return rt.decodeProtobuf(b)
default:
return rt.decodeYAMLOrJSON(b)
}
}

func (rt *validatingRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
if req.Method == "POST" {
b, err := req.GetBody()
unstructuredObject, err := rt.decodeRequestBody(req)

if err != nil {
panic(err)
}
dec := yaml.NewYAMLOrJSONDecoder(b, 10)
unstructuredObject := &unstructured.Unstructured{}
if err := dec.Decode(unstructuredObject); err != nil {
panic(fmt.Errorf("error decoding object to an unstructured object: %w", err))
return nil, err
}

gvk := unstructuredObject.GroupVersionKind()
if gvk.Kind != "Event" {
if labels := unstructuredObject.GetLabels(); labels[install.OLMManagedLabelKey] != install.OLMManagedLabelValue {
panic(fmt.Errorf("%s.%s/%v %s/%s does not have labels[%s]=%s", gvk.Kind, gvk.Group, gvk.Version, unstructuredObject.GetNamespace(), unstructuredObject.GetName(), install.OLMManagedLabelKey, install.OLMManagedLabelValue))
labels := unstructuredObject.GetLabels()
if labels[install.OLMManagedLabelKey] != install.OLMManagedLabelValue {
panic(fmt.Errorf("%s.%s/%v %s/%s does not have labels[%s]=%s",
gvk.Kind, gvk.Group, gvk.Version,
unstructuredObject.GetNamespace(), unstructuredObject.GetName(),
install.OLMManagedLabelKey, install.OLMManagedLabelValue))
}
}
}
Expand All @@ -40,14 +90,17 @@ var _ http.RoundTripper = (*validatingRoundTripper)(nil)

// Wrap is meant to be used in developer environments and CI to make it easy to find places
// where we accidentally create Kubernetes objects without our management label.
func Wrap(cfg *rest.Config) *rest.Config {
func Wrap(cfg *rest.Config, scheme *runtime.Scheme) *rest.Config {
if _, set := os.LookupEnv("CI"); !set {
return cfg
}

cfgCopy := *cfg
cfgCopy.Wrap(func(rt http.RoundTripper) http.RoundTripper {
return &validatingRoundTripper{delegate: rt}
return &validatingRoundTripper{
delegate: rt,
codecs: serializer.NewCodecFactory(scheme),
}
})
return &cfgCopy
}
12 changes: 6 additions & 6 deletions pkg/lib/operatorclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,25 +172,25 @@ func NewClientFromConfig(kubeconfig string, logger *logrus.Logger) ClientInterfa
}

func NewClientFromRestConfig(config *rest.Config) (client ClientInterface, err error) {
kubernetes, err := kubernetes.NewForConfig(config)
k8s, err := kubernetes.NewForConfig(config)
if err != nil {
return
}

apiextensions, err := apiextensions.NewForConfig(config)
apiext, err := apiextensions.NewForConfig(config)
if err != nil {
return
}

apiregistration, err := apiregistration.NewForConfig(config)
apireg, err := apiregistration.NewForConfig(config)
if err != nil {
return
}

client = &Client{
kubernetes,
apiextensions,
apiregistration,
k8s,
apiext,
apireg,
}

return
Expand Down

0 comments on commit a3e3dfa

Please sign in to comment.