From 9f180e648dd1c727e0f0c2d9f7db9207cdeb96b9 Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Sat, 7 Dec 2019 22:30:06 +0100 Subject: [PATCH] k8s store: patch pod annotations Use k8s client patch method to update pod annotations (in some versions replacing the whole pod object will return an error). --- go.mod | 1 + go.sum | 2 + internal/store/k8s.go | 93 +++++++++++++++++++------------------------ 3 files changed, 45 insertions(+), 51 deletions(-) diff --git a/go.mod b/go.mod index ea6e79dcd..4ecb5d6e3 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/docker/libkv v0.2.1 github.com/emicklei/go-restful v2.5.0+incompatible // indirect github.com/emicklei/go-restful-swagger12 v0.0.0-20170208215640-dcef7f557305 // indirect + github.com/evanphx/json-patch v4.5.0+incompatible github.com/go-openapi/jsonpointer v0.0.0-20170102174223-779f45308c19 // indirect github.com/go-openapi/jsonreference v0.0.0-20161105162150-36d33bfe519e // indirect github.com/go-openapi/spec v0.0.0-20180131233152-f3499b5df538 // indirect diff --git a/go.sum b/go.sum index 90ab614ff..8be37d3bb 100644 --- a/go.sum +++ b/go.sum @@ -25,6 +25,8 @@ github.com/docker/leadership v0.1.0/go.mod h1:6yL2hg00l43fYEJagcF7eIS4PootU7TAO1 github.com/docker/libkv v0.2.1/go.mod h1:r5hEwHwW8dr0TFBYGCarMNbrQOiwL1xoqDYZ/JqoTK0= github.com/emicklei/go-restful v2.5.0+incompatible/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= github.com/emicklei/go-restful-swagger12 v0.0.0-20170208215640-dcef7f557305/go.mod h1:qr0VowGBT4CS4Q8vFF8BSeKz34PuqKGxs/L0IAQA9DQ= +github.com/evanphx/json-patch v4.5.0+incompatible h1:ouOWdg56aJriqS0huScTkVXPC5IcNrDCXZ6OoTAWu7M= +github.com/evanphx/json-patch v4.5.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= diff --git a/internal/store/k8s.go b/internal/store/k8s.go index 42df81ebb..661c712ad 100644 --- a/internal/store/k8s.go +++ b/internal/store/k8s.go @@ -23,10 +23,12 @@ import ( "github.com/sorintlab/stolon/internal/cluster" "github.com/sorintlab/stolon/internal/util" + jsonpatch "github.com/evanphx/json-patch" v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" v1core "k8s.io/client-go/kubernetes/typed/core/v1" @@ -72,6 +74,43 @@ func (s *KubeStore) labelSelector(componentLabel ComponentLabelValue) labels.Sel return labels.SelectorFromSet(selector) } +func (s *KubeStore) patchKubeStatusAnnotation(annotationData []byte) error { + podsClient := s.client.CoreV1().Pods(s.namespace) + retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { + pod, err := podsClient.Get(s.podName, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("failed to get latest version of pod: %v", err) + } + + oldPodJSON, err := json.Marshal(pod) + if err != nil { + return fmt.Errorf("failed to marshal pod: %v", err) + } + + if pod.Annotations == nil { + pod.Annotations = map[string]string{} + } + pod.Annotations[util.KubeStatusAnnnotation] = string(annotationData) + + newPodJSON, err := json.Marshal(pod) + if err != nil { + return fmt.Errorf("failed to marshal pod: %v", err) + } + + patchBytes, err := jsonpatch.CreateMergePatch(oldPodJSON, newPodJSON) + if err != nil { + return fmt.Errorf("failed to create pod merge patch: %v", err) + } + + _, err = podsClient.Patch(s.podName, types.MergePatchType, patchBytes) + return err + }) + if retryErr != nil { + return fmt.Errorf("update failed: %v", retryErr) + } + return nil +} + func (s *KubeStore) AtomicPutClusterData(ctx context.Context, cd *cluster.ClusterData, previous *KVPair) (*KVPair, error) { cdj, err := json.Marshal(cd) if err != nil { @@ -210,23 +249,7 @@ func (s *KubeStore) SetKeeperInfo(ctx context.Context, id string, ms *cluster.Ke if err != nil { return err } - podsClient := s.client.CoreV1().Pods(s.namespace) - retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { - result, err := podsClient.Get(s.podName, metav1.GetOptions{}) - if err != nil { - return fmt.Errorf("failed to get latest version of pod: %v", err) - } - if result.Annotations == nil { - result.Annotations = map[string]string{} - } - result.Annotations[util.KubeStatusAnnnotation] = string(msj) - _, err = podsClient.Update(result) - return err - }) - if retryErr != nil { - return fmt.Errorf("update failed: %v", retryErr) - } - return nil + return s.patchKubeStatusAnnotation(msj) } func (s *KubeStore) GetKeepersInfo(ctx context.Context) (cluster.KeepersInfo, error) { @@ -261,23 +284,7 @@ func (s *KubeStore) SetSentinelInfo(ctx context.Context, si *cluster.SentinelInf if err != nil { return err } - podsClient := s.client.CoreV1().Pods(s.namespace) - retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { - result, err := podsClient.Get(s.podName, metav1.GetOptions{}) - if err != nil { - return fmt.Errorf("failed to get latest version of pod: %v", err) - } - if result.Annotations == nil { - result.Annotations = map[string]string{} - } - result.Annotations[util.KubeStatusAnnnotation] = string(sij) - _, err = podsClient.Update(result) - return err - }) - if retryErr != nil { - return fmt.Errorf("update failed: %v", retryErr) - } - return nil + return s.patchKubeStatusAnnotation(sij) } func (s *KubeStore) GetSentinelsInfo(ctx context.Context) (cluster.SentinelsInfo, error) { @@ -312,23 +319,7 @@ func (s *KubeStore) SetProxyInfo(ctx context.Context, pi *cluster.ProxyInfo, ttl if err != nil { return err } - podsClient := s.client.CoreV1().Pods(s.namespace) - retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { - result, err := podsClient.Get(s.podName, metav1.GetOptions{}) - if err != nil { - return fmt.Errorf("failed to get latest version of pod: %v", err) - } - if result.Annotations == nil { - result.Annotations = map[string]string{} - } - result.Annotations[util.KubeStatusAnnnotation] = string(pij) - _, err = podsClient.Update(result) - return err - }) - if retryErr != nil { - return fmt.Errorf("update failed: %v", retryErr) - } - return nil + return s.patchKubeStatusAnnotation(pij) } func (s *KubeStore) GetProxiesInfo(ctx context.Context) (cluster.ProxiesInfo, error) {