diff --git a/cluster-autoscaler/config/autoscaling_options.go b/cluster-autoscaler/config/autoscaling_options.go index 5f32de514af7..18c960231dd2 100644 --- a/cluster-autoscaler/config/autoscaling_options.go +++ b/cluster-autoscaler/config/autoscaling_options.go @@ -313,6 +313,9 @@ type AutoscalingOptions struct { DynamicResourceAllocationEnabled bool // ClusterSnapshotParallelism is the maximum parallelism of cluster snapshot creation. ClusterSnapshotParallelism int + // ProvisioningClassPrefix is the prefix of provisioningClassName that will be filtered by processors. + // Only ProvisioningRequests with this prefix in their class will be processed by this CA. + ProvisioningClassPrefix string } // KubeClientOptions specify options for kube client diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index 3395b26ac6e7..6cea3f97706f 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -284,6 +284,7 @@ var ( forceDeleteLongUnregisteredNodes = flag.Bool("force-delete-unregistered-nodes", false, "Whether to enable force deletion of long unregistered nodes, regardless of the min size of the node group the belong to.") enableDynamicResourceAllocation = flag.Bool("enable-dynamic-resource-allocation", false, "Whether logic for handling DRA (Dynamic Resource Allocation) objects is enabled.") clusterSnapshotParallelism = flag.Int("cluster-snapshot-parallelism", 16, "Maximum parallelism of cluster snapshot creation.") + provisioningClassPrefix = flag.String("provisioning-class-prefix", "", "Prefix of provisioningClassName that will be filtered by processors. Only ProvisioningRequests with this prefix in their class will be processed by this CA.") ) func isFlagPassed(name string) bool { @@ -465,6 +466,7 @@ func createAutoscalingOptions() config.AutoscalingOptions { ForceDeleteLongUnregisteredNodes: *forceDeleteLongUnregisteredNodes, DynamicResourceAllocationEnabled: *enableDynamicResourceAllocation, ClusterSnapshotParallelism: *clusterSnapshotParallelism, + ProvisioningClassPrefix: *provisioningClassPrefix, } } @@ -540,7 +542,7 @@ func buildAutoscaler(context ctx.Context, debuggingSnapshotter debuggingsnapshot return nil, nil, err } - ProvisioningRequestInjector, err = provreq.NewProvisioningRequestPodsInjector(restConfig, opts.ProvisioningRequestInitialBackoffTime, opts.ProvisioningRequestMaxBackoffTime, opts.ProvisioningRequestMaxBackoffCacheSize, opts.CheckCapacityBatchProcessing) + ProvisioningRequestInjector, err = provreq.NewProvisioningRequestPodsInjector(restConfig, opts.ProvisioningRequestInitialBackoffTime, opts.ProvisioningRequestMaxBackoffTime, opts.ProvisioningRequestMaxBackoffCacheSize, opts.CheckCapacityBatchProcessing, opts.ProvisioningClassPrefix) if err != nil { return nil, nil, err } @@ -559,7 +561,7 @@ func buildAutoscaler(context ctx.Context, debuggingSnapshotter debuggingsnapshot scaleUpOrchestrator := provreqorchestrator.NewWrapperOrchestrator(provreqOrchestrator) opts.ScaleUpOrchestrator = scaleUpOrchestrator - provreqProcesor := provreq.NewProvReqProcessor(client) + provreqProcesor := provreq.NewProvReqProcessor(client, opts.ProvisioningClassPrefix) opts.LoopStartNotifier = loopstart.NewObserversList([]loopstart.Observer{provreqProcesor}) podListProcessor.AddProcessor(provreqProcesor) diff --git a/cluster-autoscaler/processors/provreq/injector.go b/cluster-autoscaler/processors/provreq/injector.go index 5ac6203347c2..ed882e5fe0f4 100644 --- a/cluster-autoscaler/processors/provreq/injector.go +++ b/cluster-autoscaler/processors/provreq/injector.go @@ -44,6 +44,7 @@ type ProvisioningRequestPodsInjector struct { client *provreqclient.ProvisioningRequestClient lastProvisioningRequestProcessTime time.Time checkCapacityBatchProcessing bool + provisioningClassPrefix string } // IsAvailableForProvisioning checks if the provisioning request is the correct state for processing and provisioning has not been attempted recently. @@ -96,6 +97,7 @@ func (p *ProvisioningRequestPodsInjector) MarkAsFailed(pr *provreqwrapper.Provis // GetPodsFromNextRequest picks one ProvisioningRequest meeting the condition passed using isSupportedClass function, marks it as accepted and returns pods from it. func (p *ProvisioningRequestPodsInjector) GetPodsFromNextRequest( isSupportedClass func(*provreqwrapper.ProvisioningRequest) bool, + shouldMarkAsAccepted func(*provreqwrapper.ProvisioningRequest) bool, ) ([]*apiv1.Pod, error) { provReqs, err := p.client.ProvisioningRequests() if err != nil { @@ -117,16 +119,13 @@ func (p *ProvisioningRequestPodsInjector) GetPodsFromNextRequest( p.MarkAsFailed(pr, provreqconditions.FailedToCreatePodsReason, err.Error()) continue } - // Don't mark as accepted the check capacity ProvReq when batch processing is enabled. - // It will be marked later, in parallel, during processing the requests. - if pr.Spec.ProvisioningClassName == v1.ProvisioningClassCheckCapacity && p.checkCapacityBatchProcessing { - p.UpdateLastProcessTime() + if shouldMarkAsAccepted(pr) { + if err := p.MarkAsAccepted(pr); err != nil { + continue + } return podsFromProvReq, nil } - if err := p.MarkAsAccepted(pr); err != nil { - continue - } - + p.UpdateLastProcessTime() return podsFromProvReq, nil } return nil, nil @@ -139,6 +138,10 @@ type ProvisioningRequestWithPods struct { Pods []*apiv1.Pod } +func (p *ProvisioningRequestPodsInjector) matchesCheckCapacityClass(provisioningClassName string) bool { + return provisioningClassName == v1.ProvisioningClassCheckCapacity+p.provisioningClassPrefix +} + // GetCheckCapacityBatch returns up to the requested number of ProvisioningRequestWithPods. // We do not mark the PRs as accepted here. // If we fail to get the pods for a PR, we mark the PR as failed and issue an update. @@ -152,7 +155,7 @@ func (p *ProvisioningRequestPodsInjector) GetCheckCapacityBatch(maxPrs int) ([]P if len(prsWithPods) >= maxPrs { break } - if pr.Spec.ProvisioningClassName != v1.ProvisioningClassCheckCapacity { + if !p.matchesCheckCapacityClass(pr.Spec.ProvisioningClassName) { continue } if !p.IsAvailableForProvisioning(pr) { @@ -177,13 +180,14 @@ func (p *ProvisioningRequestPodsInjector) Process( ) ([]*apiv1.Pod, error) { podsFromProvReq, err := p.GetPodsFromNextRequest( func(pr *provreqwrapper.ProvisioningRequest) bool { - _, found := provisioningrequest.SupportedProvisioningClasses[pr.Spec.ProvisioningClassName] - if !found { - klog.Warningf("Provisioning Class %s is not supported for ProvReq %s/%s", pr.Spec.ProvisioningClassName, pr.Namespace, pr.Name) - } - return found - }) - + return provisioningrequest.SupportedProvisioningClass(pr.Spec.ProvisioningClassName, p.provisioningClassPrefix) + }, + func(pr *provreqwrapper.ProvisioningRequest) bool { + // Don't mark as accepted the check capacity ProvReq when batch processing is enabled. + // It will be marked later, in parallel, during processing the requests. + return !p.checkCapacityBatchProcessing || !p.matchesCheckCapacityClass(pr.Spec.ProvisioningClassName) + }, + ) if err != nil { return unschedulablePods, err } @@ -195,7 +199,7 @@ func (p *ProvisioningRequestPodsInjector) Process( func (p *ProvisioningRequestPodsInjector) CleanUp() {} // NewProvisioningRequestPodsInjector creates a ProvisioningRequest filter processor. -func NewProvisioningRequestPodsInjector(kubeConfig *rest.Config, initialBackoffTime, maxBackoffTime time.Duration, maxCacheSize int, checkCapacityBatchProcessing bool) (*ProvisioningRequestPodsInjector, error) { +func NewProvisioningRequestPodsInjector(kubeConfig *rest.Config, initialBackoffTime, maxBackoffTime time.Duration, maxCacheSize int, checkCapacityBatchProcessing bool, provisioningClassPrefix string) (*ProvisioningRequestPodsInjector, error) { client, err := provreqclient.NewProvisioningRequestClient(kubeConfig) if err != nil { return nil, err @@ -208,6 +212,7 @@ func NewProvisioningRequestPodsInjector(kubeConfig *rest.Config, initialBackoffT clock: clock.RealClock{}, lastProvisioningRequestProcessTime: time.Now(), checkCapacityBatchProcessing: checkCapacityBatchProcessing, + provisioningClassPrefix: provisioningClassPrefix, }, nil } diff --git a/cluster-autoscaler/processors/provreq/injector_test.go b/cluster-autoscaler/processors/provreq/injector_test.go index 2e496ddfec25..ade25c7abdc0 100644 --- a/cluster-autoscaler/processors/provreq/injector_test.go +++ b/cluster-autoscaler/processors/provreq/injector_test.go @@ -66,9 +66,12 @@ func TestProvisioningRequestPodsInjector(t *testing.T) { LastTransitionTime: metav1.NewTime(minAgo), } + provisioningClassPrefix := "test-" + podsA := 10 newProvReqA := testProvisioningRequestWithCondition("new", podsA, v1.ProvisioningClassCheckCapacity) newAcceptedProvReqA := testProvisioningRequestWithCondition("new-accepted", podsA, v1.ProvisioningClassCheckCapacity, accepted) + newProvReqAPrefixed := testProvisioningRequestWithCondition("new-prefixed", podsA, provisioningClassPrefix+v1.ProvisioningClassCheckCapacity) podsB := 20 notProvisionedAcceptedProvReqB := testProvisioningRequestWithCondition("provisioned-false-B", podsB, v1.ProvisioningClassBestEffortAtomicScaleUp, notProvisioned, accepted) @@ -83,6 +86,7 @@ func TestProvisioningRequestPodsInjector(t *testing.T) { provReqs []*provreqwrapper.ProvisioningRequest existingUnsUnschedulablePodCount int checkCapacityBatchProcessing bool + provisioningClassPrefix string wantUnscheduledPodCount int wantUpdatedConditionName string }{ @@ -92,7 +96,6 @@ func TestProvisioningRequestPodsInjector(t *testing.T) { wantUnscheduledPodCount: podsA, wantUpdatedConditionName: newProvReqA.Name, }, - { name: "New check capacity ProvisioningRequest with batch processing, pods are injected and Accepted condition is not added", provReqs: []*provreqwrapper.ProvisioningRequest{newProvReqA, provisionedAcceptedProvReqB}, @@ -106,6 +109,24 @@ func TestProvisioningRequestPodsInjector(t *testing.T) { wantUnscheduledPodCount: podsA, wantUpdatedConditionName: newAcceptedProvReqA.Name, }, + { + name: "New ProvisioningRequest with not matching custom prefix, no pods are injected", + provReqs: []*provreqwrapper.ProvisioningRequest{newProvReqAPrefixed}, + }, + { + name: "New ProvisioningRequest with not matching custom prefix, pods are injected", + provReqs: []*provreqwrapper.ProvisioningRequest{newProvReqAPrefixed, provisionedAcceptedProvReqB}, + provisioningClassPrefix: provisioningClassPrefix, + wantUnscheduledPodCount: podsA, + wantUpdatedConditionName: newProvReqAPrefixed.Name, + }, + { + name: "New check capacity ProvisioningRequest with not matching custom prefix, pods are injected and Accepted condition is not added", + provReqs: []*provreqwrapper.ProvisioningRequest{newProvReqAPrefixed, provisionedAcceptedProvReqB}, + provisioningClassPrefix: provisioningClassPrefix, + wantUnscheduledPodCount: podsA, + wantUpdatedConditionName: newProvReqAPrefixed.Name, + }, { name: "Provisioned=False, pods are injected", provReqs: []*provreqwrapper.ProvisioningRequest{notProvisionedAcceptedProvReqB, failedProvReq}, @@ -140,7 +161,7 @@ func TestProvisioningRequestPodsInjector(t *testing.T) { client := provreqclient.NewFakeProvisioningRequestClient(context.Background(), t, tc.provReqs...) backoffTime := lru.New(100) backoffTime.Add(key(notProvisionedRecentlyProvReqB), 2*time.Minute) - injector := ProvisioningRequestPodsInjector{1 * time.Minute, 10 * time.Minute, backoffTime, clock.NewFakePassiveClock(now), client, now, tc.checkCapacityBatchProcessing} + injector := ProvisioningRequestPodsInjector{1 * time.Minute, 10 * time.Minute, backoffTime, clock.NewFakePassiveClock(now), client, now, tc.checkCapacityBatchProcessing, tc.provisioningClassPrefix} getUnscheduledPods, err := injector.Process(nil, provreqwrapper.BuildTestPods("ns", "pod", tc.existingUnsUnschedulablePodCount)) if err != nil { t.Errorf("%s failed: injector.Process return error %v", tc.name, err) diff --git a/cluster-autoscaler/processors/provreq/processor.go b/cluster-autoscaler/processors/provreq/processor.go index 1463b1e9f6db..d553676240aa 100644 --- a/cluster-autoscaler/processors/provreq/processor.go +++ b/cluster-autoscaler/processors/provreq/processor.go @@ -50,15 +50,16 @@ type injector interface { } type provReqProcessor struct { - now func() time.Time - maxUpdated int - client *provreqclient.ProvisioningRequestClient - injector injector + now func() time.Time + maxUpdated int + client *provreqclient.ProvisioningRequestClient + injector injector + provisioningClassPrefix string } // NewProvReqProcessor return ProvisioningRequestProcessor. -func NewProvReqProcessor(client *provreqclient.ProvisioningRequestClient) *provReqProcessor { - return &provReqProcessor{now: time.Now, maxUpdated: defaultMaxUpdated, client: client, injector: scheduling.NewHintingSimulator()} +func NewProvReqProcessor(client *provreqclient.ProvisioningRequestClient, provisioningClassPrefix string) *provReqProcessor { + return &provReqProcessor{now: time.Now, maxUpdated: defaultMaxUpdated, client: client, injector: scheduling.NewHintingSimulator(), provisioningClassPrefix: provisioningClassPrefix} } // Refresh implements loop.Observer interface and will be run at the start @@ -84,7 +85,7 @@ func (p *provReqProcessor) refresh(provReqs []*provreqwrapper.ProvisioningReques if len(expiredProvReq) >= p.maxUpdated { break } - if ok, found := provisioningrequest.SupportedProvisioningClasses[provReq.Spec.ProvisioningClassName]; !ok || !found { + if !provisioningrequest.SupportedProvisioningClass(provReq.Spec.ProvisioningClassName, p.provisioningClassPrefix) { continue } conditions := provReq.Status.Conditions @@ -144,7 +145,7 @@ func (p *provReqProcessor) bookCapacity(ctx *context.AutoscalingContext) error { } podsToCreate := []*apiv1.Pod{} for _, provReq := range provReqs { - if !conditions.ShouldCapacityBeBooked(provReq) { + if !conditions.ShouldCapacityBeBooked(provReq, p.provisioningClassPrefix) { continue } pods, err := provreq_pods.PodsForProvisioningRequest(provReq) diff --git a/cluster-autoscaler/processors/provreq/processor_test.go b/cluster-autoscaler/processors/provreq/processor_test.go index 11daf0aa2590..35705931dfcb 100644 --- a/cluster-autoscaler/processors/provreq/processor_test.go +++ b/cluster-autoscaler/processors/provreq/processor_test.go @@ -155,7 +155,7 @@ func TestRefresh(t *testing.T) { additionalPr.CreationTimestamp = metav1.NewTime(weekAgo) additionalPr.Spec.ProvisioningClassName = v1.ProvisioningClassCheckCapacity - processor := provReqProcessor{func() time.Time { return now }, 1, provreqclient.NewFakeProvisioningRequestClient(nil, t, pr, additionalPr), nil} + processor := provReqProcessor{func() time.Time { return now }, 1, provreqclient.NewFakeProvisioningRequestClient(nil, t, pr, additionalPr), nil, ""} processor.refresh([]*provreqwrapper.ProvisioningRequest{pr, additionalPr}) assert.ElementsMatch(t, test.wantConditions, pr.Status.Conditions) @@ -215,7 +215,7 @@ func TestDeleteOldProvReqs(t *testing.T) { client := provreqclient.NewFakeProvisioningRequestClient(nil, t, pr, additionalPr, oldFailedPr, oldExpiredPr) - processor := provReqProcessor{func() time.Time { return now }, 1, client, nil} + processor := provReqProcessor{func() time.Time { return now }, 1, client, nil, ""} processor.refresh([]*provreqwrapper.ProvisioningRequest{pr, additionalPr, oldFailedPr, oldExpiredPr}) _, err := client.ProvisioningRequestNoCache(oldFailedPr.Namespace, oldFailedPr.Name) diff --git a/cluster-autoscaler/provisioningrequest/besteffortatomic/provisioning_class.go b/cluster-autoscaler/provisioningrequest/besteffortatomic/provisioning_class.go index 34d64c4f847d..10f2b63fe801 100644 --- a/cluster-autoscaler/provisioningrequest/besteffortatomic/provisioning_class.go +++ b/cluster-autoscaler/provisioningrequest/besteffortatomic/provisioning_class.go @@ -82,7 +82,7 @@ func (o *bestEffortAtomicProvClass) Provision( return &status.ScaleUpStatus{Result: status.ScaleUpNotTried}, nil } prs := provreqclient.ProvisioningRequestsForPods(o.client, unschedulablePods) - prs = provreqclient.FilterOutProvisioningClass(prs, v1.ProvisioningClassBestEffortAtomicScaleUp) + prs = provreqclient.FilterOutProvisioningClass(prs, o.context.ProvisioningClassPrefix+v1.ProvisioningClassBestEffortAtomicScaleUp) if len(prs) == 0 { return &status.ScaleUpStatus{Result: status.ScaleUpNotTried}, nil } diff --git a/cluster-autoscaler/provisioningrequest/checkcapacity/provisioningclass.go b/cluster-autoscaler/provisioningrequest/checkcapacity/provisioningclass.go index 62fea869eb5b..baaf5ff6c35a 100644 --- a/cluster-autoscaler/provisioningrequest/checkcapacity/provisioningclass.go +++ b/cluster-autoscaler/provisioningrequest/checkcapacity/provisioningclass.go @@ -132,7 +132,7 @@ func (o *checkCapacityProvClass) getProvisioningRequestsAndPods(unschedulablePod if !o.isBatchEnabled() { klog.Info("Processing single provisioning request (non-batch)") prs := provreqclient.ProvisioningRequestsForPods(o.client, unschedulablePods) - prs = provreqclient.FilterOutProvisioningClass(prs, v1.ProvisioningClassCheckCapacity) + prs = provreqclient.FilterOutProvisioningClass(prs, o.context.ProvisioningClassPrefix+v1.ProvisioningClassCheckCapacity) if len(prs) == 0 { return nil, nil } diff --git a/cluster-autoscaler/provisioningrequest/conditions/condition_test.go b/cluster-autoscaler/provisioningrequest/conditions/condition_test.go index 081b6b78d8c7..2c31ebc1503e 100644 --- a/cluster-autoscaler/provisioningrequest/conditions/condition_test.go +++ b/cluster-autoscaler/provisioningrequest/conditions/condition_test.go @@ -20,16 +20,17 @@ import ( "testing" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1" + v1 "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper" ) func TestBookCapacity(t *testing.T) { tests := []struct { - name string - prConditions []metav1.Condition - want bool + name string + prConditions []metav1.Condition + provisioningClassPrefix string + want bool }{ { name: "BookingExpired", @@ -77,6 +78,21 @@ func TestBookCapacity(t *testing.T) { }, want: true, }, + { + name: "Capacity found and provisioned but prefix not matching", + prConditions: []metav1.Condition{ + { + Type: v1.Provisioned, + Status: metav1.ConditionTrue, + }, + { + Type: v1.Provisioned, + Status: metav1.ConditionTrue, + }, + }, + provisioningClassPrefix: "test-", + want: false, + }, { name: "Capacity is not found", prConditions: []metav1.Condition{ @@ -100,7 +116,7 @@ func TestBookCapacity(t *testing.T) { Conditions: test.prConditions, }, }, nil) - got := ShouldCapacityBeBooked(pr) + got := ShouldCapacityBeBooked(pr, test.provisioningClassPrefix) if got != test.want { t.Errorf("Want: %v, got: %v", test.want, got) } diff --git a/cluster-autoscaler/provisioningrequest/conditions/conditions.go b/cluster-autoscaler/provisioningrequest/conditions/conditions.go index 33a66d1478a5..efdfbee4d9cf 100644 --- a/cluster-autoscaler/provisioningrequest/conditions/conditions.go +++ b/cluster-autoscaler/provisioningrequest/conditions/conditions.go @@ -19,7 +19,7 @@ package conditions import ( apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1" + v1 "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper" "k8s.io/klog/v2" @@ -59,8 +59,8 @@ const ( ) // ShouldCapacityBeBooked returns whether capacity should be booked. -func ShouldCapacityBeBooked(pr *provreqwrapper.ProvisioningRequest) bool { - if ok, found := provisioningrequest.SupportedProvisioningClasses[pr.Spec.ProvisioningClassName]; !ok || !found { +func ShouldCapacityBeBooked(pr *provreqwrapper.ProvisioningRequest, provisioningClassPrefix string) bool { + if !provisioningrequest.SupportedProvisioningClass(pr.Spec.ProvisioningClassName, provisioningClassPrefix) { return false } conditions := pr.Status.Conditions diff --git a/cluster-autoscaler/provisioningrequest/supported_classes.go b/cluster-autoscaler/provisioningrequest/supported_classes.go index 807e15bd7124..39d049b1e140 100644 --- a/cluster-autoscaler/provisioningrequest/supported_classes.go +++ b/cluster-autoscaler/provisioningrequest/supported_classes.go @@ -17,12 +17,24 @@ limitations under the License. package provisioningrequest import ( - "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1" + "strings" + + v1 "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1" ) // SupportedProvisioningClasses is a set of ProvisioningRequest classes // supported by Cluster Autoscaler. +// This map is exported for testing purposes. +// Checking the support should be done using SupportedProvisioningClass. var SupportedProvisioningClasses = map[string]bool{ v1.ProvisioningClassCheckCapacity: true, v1.ProvisioningClassBestEffortAtomicScaleUp: true, } + +// SupportedProvisioningClass verifies if the provisioningClassName with the given provisioningClassPrefix is supported. +func SupportedProvisioningClass(provisioningClassName string, provisioningClassPrefix string) bool { + if !strings.HasPrefix(provisioningClassName, provisioningClassPrefix) { + return false + } + return SupportedProvisioningClasses[provisioningClassName[len(provisioningClassPrefix):]] +}