From bf834b1fd9f050f378ae6d92dcba4edace14c60d Mon Sep 17 00:00:00 2001 From: payall4u Date: Sun, 21 Jan 2024 10:47:51 +0800 Subject: [PATCH] Bugfix: avoid ext resource less then allocated --- pkg/agent/agent.go | 2 +- pkg/resource/node_resource_manager.go | 55 +++++++++++++++---- ...urce_manger.go => pod_resource_manager.go} | 0 3 files changed, 46 insertions(+), 11 deletions(-) rename pkg/resource/{pod_resource_manger.go => pod_resource_manager.go} (100%) diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index fa236c6a2..62ae7cdab 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -120,7 +120,7 @@ func NewAgent(ctx context.Context, if nodeResource := utilfeature.DefaultFeatureGate.Enabled(features.CraneNodeResource); nodeResource { tspName := agent.CreateNodeResourceTsp() - nodeResourceManager, err := resource.NewNodeResourceManager(kubeClient, nodeName, nodeResourceReserved, tspName, nodeInformer, tspInformer, stateCollector.NodeResourceChann) + nodeResourceManager, err := resource.NewNodeResourceManager(kubeClient, nodeName, nodeResourceReserved, tspName, nodeInformer, podInformer, tspInformer, stateCollector.NodeResourceChann) if err != nil { return agent, err } diff --git a/pkg/resource/node_resource_manager.go b/pkg/resource/node_resource_manager.go index 130b3a5cb..da0a7ccae 100644 --- a/pkg/resource/node_resource_manager.go +++ b/pkg/resource/node_resource_manager.go @@ -3,6 +3,7 @@ package resource import ( "context" "fmt" + "k8s.io/apimachinery/pkg/labels" "math" "strconv" "time" @@ -55,6 +56,9 @@ type NodeResourceManager struct { nodeLister corelisters.NodeLister nodeSynced cache.InformerSynced + podLister corelisters.PodLister + podSynced cache.InformerSynced + tspLister predictionlisters.TimeSeriesPredictionLister tspSynced cache.InformerSynced @@ -71,7 +75,7 @@ type NodeResourceManager struct { tspName string } -func NewNodeResourceManager(client clientset.Interface, nodeName string, nodeResourceReserved map[string]string, tspName string, nodeInformer coreinformers.NodeInformer, +func NewNodeResourceManager(client clientset.Interface, nodeName string, nodeResourceReserved map[string]string, tspName string, nodeInformer coreinformers.NodeInformer, podInformer coreinformers.PodInformer, tspInformer predictionv1.TimeSeriesPredictionInformer, stateChann chan map[string][]common.TimeSeries) (*NodeResourceManager, error) { reserveCpuPercent, err := utils.ParsePercentage(nodeResourceReserved[v1.ResourceCPU.String()]) if err != nil { @@ -92,6 +96,8 @@ func NewNodeResourceManager(client clientset.Interface, nodeName string, nodeRes client: client, nodeLister: nodeInformer.Lister(), nodeSynced: nodeInformer.Informer().HasSynced, + podLister: podInformer.Lister(), + podSynced: podInformer.Informer().HasSynced, tspLister: tspInformer.Lister(), tspSynced: tspInformer.Informer().HasSynced, recorder: recorder, @@ -117,6 +123,7 @@ func (o *NodeResourceManager) Run(stop <-chan struct{}) { stop, o.tspSynced, o.nodeSynced, + o.podSynced, ) { return } @@ -144,7 +151,11 @@ func (o *NodeResourceManager) Run(stop <-chan struct{}) { } func (o *NodeResourceManager) UpdateNodeResource() { - node := o.getNode() + node, err := o.getNode() + if err != nil { + klog.ErrorS(err, "Get node failed") + return + } if len(node.Status.Addresses) == 0 { klog.Error("Node addresses is empty") return @@ -168,13 +179,33 @@ func (o *NodeResourceManager) UpdateNodeResource() { } } -func (o *NodeResourceManager) getNode() *v1.Node { - node, err := o.nodeLister.Get(o.nodeName) +func (o *NodeResourceManager) getNode() (*v1.Node, error) { + return o.nodeLister.Get(o.nodeName) +} + +func (o *NodeResourceManager) getExtResourceAllocated(extResource string) (float64, error) { + pods, err := o.podLister.List(labels.Everything()) if err != nil { - klog.Errorf("Failed to get node: %v", err) - return nil + return 0, err + } + allocated := 0.0 + allocatedFromContainer := func(container *v1.Container) float64 { + return float64(container.Resources.Requests.Name(v1.ResourceName(extResource), resource.BinarySI).Value()) + } + for _, pod := range pods { + if pod.Status.Phase != v1.PodRunning { + continue + } + var one = 0.0 + for _, container := range pod.Spec.Containers { + one += allocatedFromContainer(&container) + } + for _, container := range pod.Spec.Containers { + one = math.Max(one, allocatedFromContainer(&container)) + } + allocated += one } - return node + return allocated, nil } func (o *NodeResourceManager) FindTargetNode(tsp *predictionapi.TimeSeriesPrediction, addresses []v1.NodeAddress) (bool, error) { @@ -238,11 +269,15 @@ func (o *NodeResourceManager) BuildNodeStatus(node *v1.Node) map[string]int64 { default: continue } - if nextRecommendation < 0 { - nextRecommendation = 0 + extResourceName := fmt.Sprintf(utils.ExtResourcePrefixFormat, string(resourceName)) + extResourceAllocated, err := o.getExtResourceAllocated(extResourceName) + if err != nil { + klog.Warningf("Get allocated ext resources %s failed: %s", extResourceName, err.Error()) + } + if nextRecommendation < extResourceAllocated { + nextRecommendation = extResourceAllocated } metrics.UpdateNodeResourceRecommendedValue(metrics.SubComponentNodeResource, metrics.StepGetExtResourceRecommended, string(resourceName), resourceFrom, nextRecommendation) - extResourceName := fmt.Sprintf(utils.ExtResourcePrefixFormat, string(resourceName)) resValue, exists := node.Status.Capacity[v1.ResourceName(extResourceName)] if exists && resValue.Value() != 0 && math.Abs(float64(resValue.Value())- diff --git a/pkg/resource/pod_resource_manger.go b/pkg/resource/pod_resource_manager.go similarity index 100% rename from pkg/resource/pod_resource_manger.go rename to pkg/resource/pod_resource_manager.go