Skip to content

Commit

Permalink
Merge pull request #751 from sgotti/k8s_store_use_patch_for_pod_annot…
Browse files Browse the repository at this point in the history
…ations

k8s store: patch pod annotations
  • Loading branch information
sgotti authored Jan 20, 2020
2 parents 467a27d + 9f180e6 commit 2ae5e0f
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 51 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/docker/libkv v0.2.1
github.com/emicklei/go-restful v2.5.0+incompatible // indirect
github.com/emicklei/go-restful-swagger12 v0.0.0-20170208215640-dcef7f557305 // indirect
github.com/evanphx/json-patch v4.5.0+incompatible
github.com/go-openapi/jsonpointer v0.0.0-20170102174223-779f45308c19 // indirect
github.com/go-openapi/jsonreference v0.0.0-20161105162150-36d33bfe519e // indirect
github.com/go-openapi/spec v0.0.0-20180131233152-f3499b5df538 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ github.com/docker/leadership v0.1.0/go.mod h1:6yL2hg00l43fYEJagcF7eIS4PootU7TAO1
github.com/docker/libkv v0.2.1/go.mod h1:r5hEwHwW8dr0TFBYGCarMNbrQOiwL1xoqDYZ/JqoTK0=
github.com/emicklei/go-restful v2.5.0+incompatible/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
github.com/emicklei/go-restful-swagger12 v0.0.0-20170208215640-dcef7f557305/go.mod h1:qr0VowGBT4CS4Q8vFF8BSeKz34PuqKGxs/L0IAQA9DQ=
github.com/evanphx/json-patch v4.5.0+incompatible h1:ouOWdg56aJriqS0huScTkVXPC5IcNrDCXZ6OoTAWu7M=
github.com/evanphx/json-patch v4.5.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
Expand Down
93 changes: 42 additions & 51 deletions internal/store/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ import (
"github.com/sorintlab/stolon/internal/cluster"
"github.com/sorintlab/stolon/internal/util"

jsonpatch "github.com/evanphx/json-patch"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
Expand Down Expand Up @@ -72,6 +74,43 @@ func (s *KubeStore) labelSelector(componentLabel ComponentLabelValue) labels.Sel
return labels.SelectorFromSet(selector)
}

func (s *KubeStore) patchKubeStatusAnnotation(annotationData []byte) error {
podsClient := s.client.CoreV1().Pods(s.namespace)
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
pod, err := podsClient.Get(s.podName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get latest version of pod: %v", err)
}

oldPodJSON, err := json.Marshal(pod)
if err != nil {
return fmt.Errorf("failed to marshal pod: %v", err)
}

if pod.Annotations == nil {
pod.Annotations = map[string]string{}
}
pod.Annotations[util.KubeStatusAnnnotation] = string(annotationData)

newPodJSON, err := json.Marshal(pod)
if err != nil {
return fmt.Errorf("failed to marshal pod: %v", err)
}

patchBytes, err := jsonpatch.CreateMergePatch(oldPodJSON, newPodJSON)
if err != nil {
return fmt.Errorf("failed to create pod merge patch: %v", err)
}

_, err = podsClient.Patch(s.podName, types.MergePatchType, patchBytes)
return err
})
if retryErr != nil {
return fmt.Errorf("update failed: %v", retryErr)
}
return nil
}

func (s *KubeStore) AtomicPutClusterData(ctx context.Context, cd *cluster.ClusterData, previous *KVPair) (*KVPair, error) {
cdj, err := json.Marshal(cd)
if err != nil {
Expand Down Expand Up @@ -210,23 +249,7 @@ func (s *KubeStore) SetKeeperInfo(ctx context.Context, id string, ms *cluster.Ke
if err != nil {
return err
}
podsClient := s.client.CoreV1().Pods(s.namespace)
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
result, err := podsClient.Get(s.podName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get latest version of pod: %v", err)
}
if result.Annotations == nil {
result.Annotations = map[string]string{}
}
result.Annotations[util.KubeStatusAnnnotation] = string(msj)
_, err = podsClient.Update(result)
return err
})
if retryErr != nil {
return fmt.Errorf("update failed: %v", retryErr)
}
return nil
return s.patchKubeStatusAnnotation(msj)
}

func (s *KubeStore) GetKeepersInfo(ctx context.Context) (cluster.KeepersInfo, error) {
Expand Down Expand Up @@ -261,23 +284,7 @@ func (s *KubeStore) SetSentinelInfo(ctx context.Context, si *cluster.SentinelInf
if err != nil {
return err
}
podsClient := s.client.CoreV1().Pods(s.namespace)
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
result, err := podsClient.Get(s.podName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get latest version of pod: %v", err)
}
if result.Annotations == nil {
result.Annotations = map[string]string{}
}
result.Annotations[util.KubeStatusAnnnotation] = string(sij)
_, err = podsClient.Update(result)
return err
})
if retryErr != nil {
return fmt.Errorf("update failed: %v", retryErr)
}
return nil
return s.patchKubeStatusAnnotation(sij)
}

func (s *KubeStore) GetSentinelsInfo(ctx context.Context) (cluster.SentinelsInfo, error) {
Expand Down Expand Up @@ -312,23 +319,7 @@ func (s *KubeStore) SetProxyInfo(ctx context.Context, pi *cluster.ProxyInfo, ttl
if err != nil {
return err
}
podsClient := s.client.CoreV1().Pods(s.namespace)
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
result, err := podsClient.Get(s.podName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get latest version of pod: %v", err)
}
if result.Annotations == nil {
result.Annotations = map[string]string{}
}
result.Annotations[util.KubeStatusAnnnotation] = string(pij)
_, err = podsClient.Update(result)
return err
})
if retryErr != nil {
return fmt.Errorf("update failed: %v", retryErr)
}
return nil
return s.patchKubeStatusAnnotation(pij)
}

func (s *KubeStore) GetProxiesInfo(ctx context.Context) (cluster.ProxiesInfo, error) {
Expand Down

0 comments on commit 2ae5e0f

Please sign in to comment.