Skip to content

Commit

Permalink
Allow to prefix provisioningClassName to filter provisioning requests
Browse files Browse the repository at this point in the history
  • Loading branch information
macsko committed Jan 8, 2025
1 parent a587c55 commit 001b922
Show file tree
Hide file tree
Showing 11 changed files with 102 additions and 42 deletions.
3 changes: 3 additions & 0 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,9 @@ type AutoscalingOptions struct {
DynamicResourceAllocationEnabled bool
// ClusterSnapshotParallelism is the maximum parallelism of cluster snapshot creation.
ClusterSnapshotParallelism int
// ProvisioningClassPrefix is the prefix of provisioningClassName that will be filtered by processors.
// Only ProvisioningRequests with this prefix in their class will be processed by this CA.
ProvisioningClassPrefix string
}

// KubeClientOptions specify options for kube client
Expand Down
6 changes: 4 additions & 2 deletions cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ var (
forceDeleteLongUnregisteredNodes = flag.Bool("force-delete-unregistered-nodes", false, "Whether to enable force deletion of long unregistered nodes, regardless of the min size of the node group the belong to.")
enableDynamicResourceAllocation = flag.Bool("enable-dynamic-resource-allocation", false, "Whether logic for handling DRA (Dynamic Resource Allocation) objects is enabled.")
clusterSnapshotParallelism = flag.Int("cluster-snapshot-parallelism", 16, "Maximum parallelism of cluster snapshot creation.")
provisioningClassPrefix = flag.String("provisioning-class-prefix", "", "Prefix of provisioningClassName that will be filtered by processors. Only ProvisioningRequests with this prefix in their class will be processed by this CA.")
)

func isFlagPassed(name string) bool {
Expand Down Expand Up @@ -465,6 +466,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
ForceDeleteLongUnregisteredNodes: *forceDeleteLongUnregisteredNodes,
DynamicResourceAllocationEnabled: *enableDynamicResourceAllocation,
ClusterSnapshotParallelism: *clusterSnapshotParallelism,
ProvisioningClassPrefix: *provisioningClassPrefix,
}
}

Expand Down Expand Up @@ -540,7 +542,7 @@ func buildAutoscaler(context ctx.Context, debuggingSnapshotter debuggingsnapshot
return nil, nil, err
}

ProvisioningRequestInjector, err = provreq.NewProvisioningRequestPodsInjector(restConfig, opts.ProvisioningRequestInitialBackoffTime, opts.ProvisioningRequestMaxBackoffTime, opts.ProvisioningRequestMaxBackoffCacheSize, opts.CheckCapacityBatchProcessing)
ProvisioningRequestInjector, err = provreq.NewProvisioningRequestPodsInjector(restConfig, opts.ProvisioningRequestInitialBackoffTime, opts.ProvisioningRequestMaxBackoffTime, opts.ProvisioningRequestMaxBackoffCacheSize, opts.CheckCapacityBatchProcessing, opts.ProvisioningClassPrefix)
if err != nil {
return nil, nil, err
}
Expand All @@ -559,7 +561,7 @@ func buildAutoscaler(context ctx.Context, debuggingSnapshotter debuggingsnapshot

scaleUpOrchestrator := provreqorchestrator.NewWrapperOrchestrator(provreqOrchestrator)
opts.ScaleUpOrchestrator = scaleUpOrchestrator
provreqProcesor := provreq.NewProvReqProcessor(client)
provreqProcesor := provreq.NewProvReqProcessor(client, opts.ProvisioningClassPrefix)
opts.LoopStartNotifier = loopstart.NewObserversList([]loopstart.Observer{provreqProcesor})

podListProcessor.AddProcessor(provreqProcesor)
Expand Down
39 changes: 22 additions & 17 deletions cluster-autoscaler/processors/provreq/injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type ProvisioningRequestPodsInjector struct {
client *provreqclient.ProvisioningRequestClient
lastProvisioningRequestProcessTime time.Time
checkCapacityBatchProcessing bool
provisioningClassPrefix string
}

// IsAvailableForProvisioning checks if the provisioning request is the correct state for processing and provisioning has not been attempted recently.
Expand Down Expand Up @@ -96,6 +97,7 @@ func (p *ProvisioningRequestPodsInjector) MarkAsFailed(pr *provreqwrapper.Provis
// GetPodsFromNextRequest picks one ProvisioningRequest meeting the condition passed using isSupportedClass function, marks it as accepted and returns pods from it.
func (p *ProvisioningRequestPodsInjector) GetPodsFromNextRequest(
isSupportedClass func(*provreqwrapper.ProvisioningRequest) bool,
shouldMarkAsAccepted func(*provreqwrapper.ProvisioningRequest) bool,
) ([]*apiv1.Pod, error) {
provReqs, err := p.client.ProvisioningRequests()
if err != nil {
Expand All @@ -117,16 +119,13 @@ func (p *ProvisioningRequestPodsInjector) GetPodsFromNextRequest(
p.MarkAsFailed(pr, provreqconditions.FailedToCreatePodsReason, err.Error())
continue
}
// Don't mark as accepted the check capacity ProvReq when batch processing is enabled.
// It will be marked later, in parallel, during processing the requests.
if pr.Spec.ProvisioningClassName == v1.ProvisioningClassCheckCapacity && p.checkCapacityBatchProcessing {
p.UpdateLastProcessTime()
if shouldMarkAsAccepted(pr) {
if err := p.MarkAsAccepted(pr); err != nil {
continue
}
return podsFromProvReq, nil
}
if err := p.MarkAsAccepted(pr); err != nil {
continue
}

p.UpdateLastProcessTime()
return podsFromProvReq, nil
}
return nil, nil
Expand All @@ -139,6 +138,10 @@ type ProvisioningRequestWithPods struct {
Pods []*apiv1.Pod
}

func (p *ProvisioningRequestPodsInjector) matchesCheckCapacityClass(provisioningClassName string) bool {
return provisioningClassName == v1.ProvisioningClassCheckCapacity+p.provisioningClassPrefix
}

// GetCheckCapacityBatch returns up to the requested number of ProvisioningRequestWithPods.
// We do not mark the PRs as accepted here.
// If we fail to get the pods for a PR, we mark the PR as failed and issue an update.
Expand All @@ -152,7 +155,7 @@ func (p *ProvisioningRequestPodsInjector) GetCheckCapacityBatch(maxPrs int) ([]P
if len(prsWithPods) >= maxPrs {
break
}
if pr.Spec.ProvisioningClassName != v1.ProvisioningClassCheckCapacity {
if !p.matchesCheckCapacityClass(pr.Spec.ProvisioningClassName) {
continue
}
if !p.IsAvailableForProvisioning(pr) {
Expand All @@ -177,13 +180,14 @@ func (p *ProvisioningRequestPodsInjector) Process(
) ([]*apiv1.Pod, error) {
podsFromProvReq, err := p.GetPodsFromNextRequest(
func(pr *provreqwrapper.ProvisioningRequest) bool {
_, found := provisioningrequest.SupportedProvisioningClasses[pr.Spec.ProvisioningClassName]
if !found {
klog.Warningf("Provisioning Class %s is not supported for ProvReq %s/%s", pr.Spec.ProvisioningClassName, pr.Namespace, pr.Name)
}
return found
})

return provisioningrequest.SupportedProvisioningClass(pr.Spec.ProvisioningClassName, p.provisioningClassPrefix)
},
func(pr *provreqwrapper.ProvisioningRequest) bool {
// Don't mark as accepted the check capacity ProvReq when batch processing is enabled.
// It will be marked later, in parallel, during processing the requests.
return !p.checkCapacityBatchProcessing || !p.matchesCheckCapacityClass(pr.Spec.ProvisioningClassName)
},
)
if err != nil {
return unschedulablePods, err
}
Expand All @@ -195,7 +199,7 @@ func (p *ProvisioningRequestPodsInjector) Process(
func (p *ProvisioningRequestPodsInjector) CleanUp() {}

// NewProvisioningRequestPodsInjector creates a ProvisioningRequest filter processor.
func NewProvisioningRequestPodsInjector(kubeConfig *rest.Config, initialBackoffTime, maxBackoffTime time.Duration, maxCacheSize int, checkCapacityBatchProcessing bool) (*ProvisioningRequestPodsInjector, error) {
func NewProvisioningRequestPodsInjector(kubeConfig *rest.Config, initialBackoffTime, maxBackoffTime time.Duration, maxCacheSize int, checkCapacityBatchProcessing bool, provisioningClassPrefix string) (*ProvisioningRequestPodsInjector, error) {
client, err := provreqclient.NewProvisioningRequestClient(kubeConfig)
if err != nil {
return nil, err
Expand All @@ -208,6 +212,7 @@ func NewProvisioningRequestPodsInjector(kubeConfig *rest.Config, initialBackoffT
clock: clock.RealClock{},
lastProvisioningRequestProcessTime: time.Now(),
checkCapacityBatchProcessing: checkCapacityBatchProcessing,
provisioningClassPrefix: provisioningClassPrefix,
}, nil
}

Expand Down
25 changes: 23 additions & 2 deletions cluster-autoscaler/processors/provreq/injector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,12 @@ func TestProvisioningRequestPodsInjector(t *testing.T) {
LastTransitionTime: metav1.NewTime(minAgo),
}

provisioningClassPrefix := "test-"

podsA := 10
newProvReqA := testProvisioningRequestWithCondition("new", podsA, v1.ProvisioningClassCheckCapacity)
newAcceptedProvReqA := testProvisioningRequestWithCondition("new-accepted", podsA, v1.ProvisioningClassCheckCapacity, accepted)
newProvReqAPrefixed := testProvisioningRequestWithCondition("new-prefixed", podsA, provisioningClassPrefix+v1.ProvisioningClassCheckCapacity)

podsB := 20
notProvisionedAcceptedProvReqB := testProvisioningRequestWithCondition("provisioned-false-B", podsB, v1.ProvisioningClassBestEffortAtomicScaleUp, notProvisioned, accepted)
Expand All @@ -83,6 +86,7 @@ func TestProvisioningRequestPodsInjector(t *testing.T) {
provReqs []*provreqwrapper.ProvisioningRequest
existingUnsUnschedulablePodCount int
checkCapacityBatchProcessing bool
provisioningClassPrefix string
wantUnscheduledPodCount int
wantUpdatedConditionName string
}{
Expand All @@ -92,7 +96,6 @@ func TestProvisioningRequestPodsInjector(t *testing.T) {
wantUnscheduledPodCount: podsA,
wantUpdatedConditionName: newProvReqA.Name,
},

{
name: "New check capacity ProvisioningRequest with batch processing, pods are injected and Accepted condition is not added",
provReqs: []*provreqwrapper.ProvisioningRequest{newProvReqA, provisionedAcceptedProvReqB},
Expand All @@ -106,6 +109,24 @@ func TestProvisioningRequestPodsInjector(t *testing.T) {
wantUnscheduledPodCount: podsA,
wantUpdatedConditionName: newAcceptedProvReqA.Name,
},
{
name: "New ProvisioningRequest with not matching custom prefix, no pods are injected",
provReqs: []*provreqwrapper.ProvisioningRequest{newProvReqAPrefixed},
},
{
name: "New ProvisioningRequest with not matching custom prefix, pods are injected",
provReqs: []*provreqwrapper.ProvisioningRequest{newProvReqAPrefixed, provisionedAcceptedProvReqB},
provisioningClassPrefix: provisioningClassPrefix,
wantUnscheduledPodCount: podsA,
wantUpdatedConditionName: newProvReqAPrefixed.Name,
},
{
name: "New check capacity ProvisioningRequest with not matching custom prefix, pods are injected and Accepted condition is not added",
provReqs: []*provreqwrapper.ProvisioningRequest{newProvReqAPrefixed, provisionedAcceptedProvReqB},
provisioningClassPrefix: provisioningClassPrefix,
wantUnscheduledPodCount: podsA,
wantUpdatedConditionName: newProvReqAPrefixed.Name,
},
{
name: "Provisioned=False, pods are injected",
provReqs: []*provreqwrapper.ProvisioningRequest{notProvisionedAcceptedProvReqB, failedProvReq},
Expand Down Expand Up @@ -140,7 +161,7 @@ func TestProvisioningRequestPodsInjector(t *testing.T) {
client := provreqclient.NewFakeProvisioningRequestClient(context.Background(), t, tc.provReqs...)
backoffTime := lru.New(100)
backoffTime.Add(key(notProvisionedRecentlyProvReqB), 2*time.Minute)
injector := ProvisioningRequestPodsInjector{1 * time.Minute, 10 * time.Minute, backoffTime, clock.NewFakePassiveClock(now), client, now, tc.checkCapacityBatchProcessing}
injector := ProvisioningRequestPodsInjector{1 * time.Minute, 10 * time.Minute, backoffTime, clock.NewFakePassiveClock(now), client, now, tc.checkCapacityBatchProcessing, tc.provisioningClassPrefix}
getUnscheduledPods, err := injector.Process(nil, provreqwrapper.BuildTestPods("ns", "pod", tc.existingUnsUnschedulablePodCount))
if err != nil {
t.Errorf("%s failed: injector.Process return error %v", tc.name, err)
Expand Down
17 changes: 9 additions & 8 deletions cluster-autoscaler/processors/provreq/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,16 @@ type injector interface {
}

type provReqProcessor struct {
now func() time.Time
maxUpdated int
client *provreqclient.ProvisioningRequestClient
injector injector
now func() time.Time
maxUpdated int
client *provreqclient.ProvisioningRequestClient
injector injector
provisioningClassPrefix string
}

// NewProvReqProcessor return ProvisioningRequestProcessor.
func NewProvReqProcessor(client *provreqclient.ProvisioningRequestClient) *provReqProcessor {
return &provReqProcessor{now: time.Now, maxUpdated: defaultMaxUpdated, client: client, injector: scheduling.NewHintingSimulator()}
func NewProvReqProcessor(client *provreqclient.ProvisioningRequestClient, provisioningClassPrefix string) *provReqProcessor {
return &provReqProcessor{now: time.Now, maxUpdated: defaultMaxUpdated, client: client, injector: scheduling.NewHintingSimulator(), provisioningClassPrefix: provisioningClassPrefix}
}

// Refresh implements loop.Observer interface and will be run at the start
Expand All @@ -84,7 +85,7 @@ func (p *provReqProcessor) refresh(provReqs []*provreqwrapper.ProvisioningReques
if len(expiredProvReq) >= p.maxUpdated {
break
}
if ok, found := provisioningrequest.SupportedProvisioningClasses[provReq.Spec.ProvisioningClassName]; !ok || !found {
if !provisioningrequest.SupportedProvisioningClass(provReq.Spec.ProvisioningClassName, p.provisioningClassPrefix) {
continue
}
conditions := provReq.Status.Conditions
Expand Down Expand Up @@ -144,7 +145,7 @@ func (p *provReqProcessor) bookCapacity(ctx *context.AutoscalingContext) error {
}
podsToCreate := []*apiv1.Pod{}
for _, provReq := range provReqs {
if !conditions.ShouldCapacityBeBooked(provReq) {
if !conditions.ShouldCapacityBeBooked(provReq, p.provisioningClassPrefix) {
continue
}
pods, err := provreq_pods.PodsForProvisioningRequest(provReq)
Expand Down
4 changes: 2 additions & 2 deletions cluster-autoscaler/processors/provreq/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func TestRefresh(t *testing.T) {
additionalPr.CreationTimestamp = metav1.NewTime(weekAgo)
additionalPr.Spec.ProvisioningClassName = v1.ProvisioningClassCheckCapacity

processor := provReqProcessor{func() time.Time { return now }, 1, provreqclient.NewFakeProvisioningRequestClient(nil, t, pr, additionalPr), nil}
processor := provReqProcessor{func() time.Time { return now }, 1, provreqclient.NewFakeProvisioningRequestClient(nil, t, pr, additionalPr), nil, ""}
processor.refresh([]*provreqwrapper.ProvisioningRequest{pr, additionalPr})

assert.ElementsMatch(t, test.wantConditions, pr.Status.Conditions)
Expand Down Expand Up @@ -215,7 +215,7 @@ func TestDeleteOldProvReqs(t *testing.T) {

client := provreqclient.NewFakeProvisioningRequestClient(nil, t, pr, additionalPr, oldFailedPr, oldExpiredPr)

processor := provReqProcessor{func() time.Time { return now }, 1, client, nil}
processor := provReqProcessor{func() time.Time { return now }, 1, client, nil, ""}
processor.refresh([]*provreqwrapper.ProvisioningRequest{pr, additionalPr, oldFailedPr, oldExpiredPr})

_, err := client.ProvisioningRequestNoCache(oldFailedPr.Namespace, oldFailedPr.Name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (o *bestEffortAtomicProvClass) Provision(
return &status.ScaleUpStatus{Result: status.ScaleUpNotTried}, nil
}
prs := provreqclient.ProvisioningRequestsForPods(o.client, unschedulablePods)
prs = provreqclient.FilterOutProvisioningClass(prs, v1.ProvisioningClassBestEffortAtomicScaleUp)
prs = provreqclient.FilterOutProvisioningClass(prs, o.context.ProvisioningClassPrefix+v1.ProvisioningClassBestEffortAtomicScaleUp)
if len(prs) == 0 {
return &status.ScaleUpStatus{Result: status.ScaleUpNotTried}, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (o *checkCapacityProvClass) getProvisioningRequestsAndPods(unschedulablePod
if !o.isBatchEnabled() {
klog.Info("Processing single provisioning request (non-batch)")
prs := provreqclient.ProvisioningRequestsForPods(o.client, unschedulablePods)
prs = provreqclient.FilterOutProvisioningClass(prs, v1.ProvisioningClassCheckCapacity)
prs = provreqclient.FilterOutProvisioningClass(prs, o.context.ProvisioningClassPrefix+v1.ProvisioningClassCheckCapacity)
if len(prs) == 0 {
return nil, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,17 @@ import (
"testing"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1"
v1 "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
)

func TestBookCapacity(t *testing.T) {
tests := []struct {
name string
prConditions []metav1.Condition
want bool
name string
prConditions []metav1.Condition
provisioningClassPrefix string
want bool
}{
{
name: "BookingExpired",
Expand Down Expand Up @@ -77,6 +78,21 @@ func TestBookCapacity(t *testing.T) {
},
want: true,
},
{
name: "Capacity found and provisioned but prefix not matching",
prConditions: []metav1.Condition{
{
Type: v1.Provisioned,
Status: metav1.ConditionTrue,
},
{
Type: v1.Provisioned,
Status: metav1.ConditionTrue,
},
},
provisioningClassPrefix: "test-",
want: false,
},
{
name: "Capacity is not found",
prConditions: []metav1.Condition{
Expand All @@ -100,7 +116,7 @@ func TestBookCapacity(t *testing.T) {
Conditions: test.prConditions,
},
}, nil)
got := ShouldCapacityBeBooked(pr)
got := ShouldCapacityBeBooked(pr, test.provisioningClassPrefix)
if got != test.want {
t.Errorf("Want: %v, got: %v", test.want, got)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package conditions
import (
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1"
v1 "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -59,8 +59,8 @@ const (
)

// ShouldCapacityBeBooked returns whether capacity should be booked.
func ShouldCapacityBeBooked(pr *provreqwrapper.ProvisioningRequest) bool {
if ok, found := provisioningrequest.SupportedProvisioningClasses[pr.Spec.ProvisioningClassName]; !ok || !found {
func ShouldCapacityBeBooked(pr *provreqwrapper.ProvisioningRequest, provisioningClassPrefix string) bool {
if !provisioningrequest.SupportedProvisioningClass(pr.Spec.ProvisioningClassName, provisioningClassPrefix) {
return false
}
conditions := pr.Status.Conditions
Expand Down
14 changes: 13 additions & 1 deletion cluster-autoscaler/provisioningrequest/supported_classes.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,24 @@ limitations under the License.
package provisioningrequest

import (
"k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1"
"strings"

v1 "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1"
)

// SupportedProvisioningClasses is a set of ProvisioningRequest classes
// supported by Cluster Autoscaler.
// This map is exported for testing purposes.
// Checking the support should be done using SupportedProvisioningClass.
var SupportedProvisioningClasses = map[string]bool{
v1.ProvisioningClassCheckCapacity: true,
v1.ProvisioningClassBestEffortAtomicScaleUp: true,
}

// SupportedProvisioningClass verifies if the provisioningClassName with the given provisioningClassPrefix is supported.
func SupportedProvisioningClass(provisioningClassName string, provisioningClassPrefix string) bool {
if !strings.HasPrefix(provisioningClassName, provisioningClassPrefix) {
return false
}
return SupportedProvisioningClasses[provisioningClassName[len(provisioningClassPrefix):]]
}

0 comments on commit 001b922

Please sign in to comment.