Skip to content

Commit

Permalink
Merge pull request #3 from SAP/refactoring
Browse files Browse the repository at this point in the history
refactoring (towards adding non-flux sources)
  • Loading branch information
cbarbian-sap authored Jun 16, 2024
2 parents 8349cbf + 46f1df8 commit c91bcaf
Show file tree
Hide file tree
Showing 9 changed files with 317 additions and 232 deletions.
90 changes: 12 additions & 78 deletions api/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,15 @@ SPDX-License-Identifier: Apache-2.0
package v1alpha1

import (
"context"
"fmt"

apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
apitypes "k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"

fluxsourcev1beta2 "github.com/fluxcd/source-controller/api/v1beta2"

"github.com/sap/component-operator-runtime/pkg/component"
componentoperatorruntimetypes "github.com/sap/component-operator-runtime/pkg/types"

"github.com/sap/component-operator/internal/flux"
)

// ComponentSpec defines the desired state of Component.
Expand Down Expand Up @@ -54,6 +47,18 @@ type SourceReference struct {
loaded bool `json:"-"`
}

// Initialize source reference. This is meant to be called by the reconciler.
// Other consumers should probably not (need to) call this.
func (r *SourceReference) Init(url string, revision string) {
if r.loaded {
// note: this panic indicates a programmatic error on the consumer side
panic("reference already initialized")
}
r.url = url
r.revision = revision
r.loaded = true
}

// Get the URL of a loaded source reference. Calling Url() on a not-loaded source reference will panic.
// The returned URL can be used to download a gzipped tar archive containing the templates.
// Furthermore, the URL will be unique for the archive's content.
Expand Down Expand Up @@ -220,77 +225,6 @@ func (c *Component) GetStatus() *component.Status {
return &c.Status.Status
}

// Load component's source reference.
func LoadSourceReference(ctx context.Context, clnt client.Client, component *Component) error {
if !component.GetDeletionTimestamp().IsZero() {
return nil
}

sourceRef := &component.Spec.SourceRef
switch {
case sourceRef.FluxGitRepository != nil:
gitRepository := &fluxsourcev1beta2.GitRepository{}
if err := clnt.Get(ctx, apitypes.NamespacedName(sourceRef.FluxGitRepository.WithDefaultNamespace(component.Namespace)), gitRepository); err != nil {
if apierrors.IsNotFound(err) {
return componentoperatorruntimetypes.NewRetriableError(err, nil)
}
return err
}
if !flux.IsSourceReady(gitRepository) {
return componentoperatorruntimetypes.NewRetriableError(fmt.Errorf("source not ready"), nil)
}
sourceRef.url = gitRepository.Status.Artifact.URL
sourceRef.revision = gitRepository.Status.Artifact.Revision
case sourceRef.FluxOciRepository != nil:
ociRepository := &fluxsourcev1beta2.OCIRepository{}
if err := clnt.Get(ctx, apitypes.NamespacedName(sourceRef.FluxOciRepository.WithDefaultNamespace(component.Namespace)), ociRepository); err != nil {
if apierrors.IsNotFound(err) {
return componentoperatorruntimetypes.NewRetriableError(err, nil)
}
return err

}
if !flux.IsSourceReady(ociRepository) {
return componentoperatorruntimetypes.NewRetriableError(fmt.Errorf("source not ready"), nil)
}
sourceRef.url = ociRepository.Status.Artifact.URL
sourceRef.revision = ociRepository.Status.Artifact.Revision
case sourceRef.FluxBucket != nil:
bucket := &fluxsourcev1beta2.Bucket{}
if err := clnt.Get(ctx, apitypes.NamespacedName(sourceRef.FluxBucket.WithDefaultNamespace(component.Namespace)), bucket); err != nil {
if apierrors.IsNotFound(err) {
return componentoperatorruntimetypes.NewRetriableError(err, nil)
}
return err
}
if !flux.IsSourceReady(bucket) {
return componentoperatorruntimetypes.NewRetriableError(fmt.Errorf("source not ready"), nil)
}
sourceRef.url = bucket.Status.Artifact.URL
sourceRef.revision = bucket.Status.Artifact.Revision
case sourceRef.FluxHelmChart != nil:
helmChart := &fluxsourcev1beta2.HelmChart{}
if err := clnt.Get(ctx, apitypes.NamespacedName(sourceRef.FluxHelmChart.WithDefaultNamespace(component.Namespace)), helmChart); err != nil {
if apierrors.IsNotFound(err) {
return componentoperatorruntimetypes.NewRetriableError(err, nil)
}
return err
}
if !flux.IsSourceReady(helmChart) {
return componentoperatorruntimetypes.NewRetriableError(fmt.Errorf("source not ready"), nil)
}
sourceRef.url = helmChart.Status.Artifact.URL
sourceRef.revision = helmChart.Status.Artifact.Revision
default:
return fmt.Errorf("unable to get source; one of fluxGitRepository, fluxOciRepository, fluxBucket, fluxHelmChart must be defined")
}
sourceRef.loaded = true
if component.Spec.Revision != "" && sourceRef.revision != component.Spec.Revision {
return componentoperatorruntimetypes.NewRetriableError(fmt.Errorf("source revision (%s) does not match specified revision (%s)", sourceRef.revision, component.Spec.Revision), nil)
}
return nil
}

func equal[T comparable](x *T, y *T) bool {
return x == nil && y == nil || x != nil && y != nil && *x == *y
}
Expand Down
163 changes: 163 additions & 0 deletions internal/flux/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
SPDX-FileCopyrightText: 2024 SAP SE or an SAP affiliate company and component-operator contributors
SPDX-License-Identifier: Apache-2.0
*/

package flux

import (
"context"

"github.com/pkg/errors"

apitypes "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

fluxsourcev1 "github.com/fluxcd/source-controller/api/v1"
fluxsourcev1beta2 "github.com/fluxcd/source-controller/api/v1beta2"

operatorv1alpha1 "github.com/sap/component-operator/api/v1alpha1"
"github.com/sap/component-operator/internal/object"
)

type Source interface {
object.Object
fluxsourcev1.Source
}

const (
gitRepositoryIndexKey string = ".metadata.flux.gitRepository"
ociRepositoryIndexKey string = ".metadata.flux.ociRepository"
bucketIndexKey string = ".metadata.flux.bucket"
helmChartIndexKey string = ".metadata.flux.helmChart"
)

func SetupCache(mgr manager.Manager, blder *builder.Builder) error {
// TODO: should we pass a meaningful context?
if err := mgr.GetCache().IndexField(context.TODO(), &operatorv1alpha1.Component{}, gitRepositoryIndexKey, indexByGitRepository); err != nil {
return errors.Wrapf(err, "failed setting index field %s", gitRepositoryIndexKey)
}
if err := mgr.GetCache().IndexField(context.TODO(), &operatorv1alpha1.Component{}, ociRepositoryIndexKey, indexByOciRepository); err != nil {
return errors.Wrapf(err, "failed setting index field %s", ociRepositoryIndexKey)
}
if err := mgr.GetCache().IndexField(context.TODO(), &operatorv1alpha1.Component{}, bucketIndexKey, indexByBucket); err != nil {
return errors.Wrapf(err, "failed setting index field %s", bucketIndexKey)
}
if err := mgr.GetCache().IndexField(context.TODO(), &operatorv1alpha1.Component{}, helmChartIndexKey, indexByHelmChart); err != nil {
return errors.Wrapf(err, "failed setting index field %s", helmChartIndexKey)
}

blder.
Watches(
&fluxsourcev1beta2.GitRepository{},
newHandler(mgr.GetCache(), gitRepositoryIndexKey)).
Watches(
&fluxsourcev1beta2.OCIRepository{},
newHandler(mgr.GetCache(), ociRepositoryIndexKey)).
Watches(
&fluxsourcev1beta2.Bucket{},
newHandler(mgr.GetCache(), bucketIndexKey)).
Watches(
&fluxsourcev1beta2.HelmChart{},
newHandler(mgr.GetCache(), helmChartIndexKey))

return nil
}

func indexByGitRepository(object client.Object) []string {
component := object.(*operatorv1alpha1.Component)
if component.Spec.SourceRef.FluxGitRepository == nil {
return nil
}
return []string{component.Spec.SourceRef.FluxGitRepository.WithDefaultNamespace(component.Namespace).String()}
}

func indexByOciRepository(object client.Object) []string {
component := object.(*operatorv1alpha1.Component)
if component.Spec.SourceRef.FluxOciRepository == nil {
return nil
}
return []string{component.Spec.SourceRef.FluxOciRepository.WithDefaultNamespace(component.Namespace).String()}
}

func indexByBucket(object client.Object) []string {
component := object.(*operatorv1alpha1.Component)
if component.Spec.SourceRef.FluxBucket == nil {
return nil
}
return []string{component.Spec.SourceRef.FluxBucket.WithDefaultNamespace(component.Namespace).String()}
}

func indexByHelmChart(object client.Object) []string {
component := object.(*operatorv1alpha1.Component)
if component.Spec.SourceRef.FluxHelmChart == nil {
return nil
}
return []string{component.Spec.SourceRef.FluxHelmChart.WithDefaultNamespace(component.Namespace).String()}
}

type sourceHandler struct {
cache cache.Cache
indexKey string
}

func newHandler(cache cache.Cache, indexKey string) handler.EventHandler {
return &sourceHandler{
cache: cache,
indexKey: indexKey,
}
}

func (h *sourceHandler) Create(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) {
// new sources will never be immediately ready, so nothing has to be done here
}

func (h *sourceHandler) Update(ctx context.Context, e event.UpdateEvent, q workqueue.RateLimitingInterface) {
newSource := e.ObjectNew.(Source)

if !object.IsReady(newSource) {
return
}

artifact := newSource.GetArtifact()
if artifact == nil {
return
}
newRevision := artifact.Revision
if newRevision == "" {
return
}

componentList := &operatorv1alpha1.ComponentList{}
if err := h.cache.List(ctx, componentList, client.MatchingFields{
h.indexKey: client.ObjectKeyFromObject(e.ObjectNew).String(),
}); err != nil {
// TODO
// log.Error(err, "failed to list objects for source revision change")
return
}
for _, c := range componentList.Items {
if c.IsReady() && c.Status.LastAttemptedRevision == newRevision {
continue
}
q.Add(reconcile.Request{NamespacedName: apitypes.NamespacedName{
Namespace: c.Namespace,
Name: c.Name,
}})
}
}

func (h *sourceHandler) Delete(ctx context.Context, e event.DeleteEvent, q workqueue.RateLimitingInterface) {
// no need to queue components if source is deleted (reconciliation of the component would anyway fail)
}

func (h *sourceHandler) Generic(context.Context, event.GenericEvent, workqueue.RateLimitingInterface) {
// generic events are not expected to arrive on the watch that uses this handler, so nothing to do here
}
5 changes: 5 additions & 0 deletions internal/generator/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ func GetGenerator(id string, url string, path string, clnt client.Client, decryp

func downloadArchive(url string, prefix string, dir string, decryptor decrypt.Decryptor) error {
prefix = filepath.Clean(prefix)
// TODO: check that prefix is a relative path and does not contain ..

resp, err := http.Get(url)
if err != nil {
return err
Expand All @@ -129,10 +131,13 @@ func downloadArchive(url string, prefix string, dir string, decryptor decrypt.De
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("error downloading %s: %s", url, resp.Status)
}

gzipReader, err := gzip.NewReader(resp.Body)
if err != nil {
return err
}
defer gzipReader.Close()

tarReader := tar.NewReader(gzipReader)
for {
header, err := tarReader.Next()
Expand Down
13 changes: 5 additions & 8 deletions internal/flux/source.go → internal/object/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,24 @@ SPDX-FileCopyrightText: 2024 SAP SE or an SAP affiliate company and component-op
SPDX-License-Identifier: Apache-2.0
*/

package flux
package object

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

fluxsourcev1 "github.com/fluxcd/source-controller/api/v1"
)

type Source interface {
type Object interface {
client.Object
fluxsourcev1.Source
GetConditions() []metav1.Condition
}

func IsSourceReady(source Source) bool {
for _, condition := range source.GetConditions() {
func IsReady(object Object) bool {
for _, condition := range object.GetConditions() {
if condition.Type != "Ready" {
continue
}
if condition.ObservedGeneration != source.GetGeneration() {
if condition.ObservedGeneration != object.GetGeneration() {
return false
}
return condition.Status == metav1.ConditionTrue
Expand Down
Loading

0 comments on commit c91bcaf

Please sign in to comment.