Skip to content

Commit

Permalink
feat: drain and volume detachment status conditions
Browse files Browse the repository at this point in the history
  • Loading branch information
jmdeal committed Jan 15, 2025
1 parent 380bcc9 commit 21176e1
Show file tree
Hide file tree
Showing 5 changed files with 285 additions and 163 deletions.
2 changes: 2 additions & 0 deletions pkg/apis/v1/nodeclaim_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const (
ConditionTypeInitialized = "Initialized"
ConditionTypeConsolidatable = "Consolidatable"
ConditionTypeDrifted = "Drifted"
ConditionTypeDrained = "Drained"
ConditionTypeVolumesDetached = "VolumesDetached"
ConditionTypeInstanceTerminating = "InstanceTerminating"
ConditionTypeConsistentStateFound = "ConsistentStateFound"
ConditionTypeDisruptionReason = "DisruptionReason"
Expand Down
160 changes: 106 additions & 54 deletions pkg/controllers/node/termination/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
Expand Down Expand Up @@ -76,6 +77,7 @@ func NewController(clk clock.Clock, kubeClient client.Client, cloudProvider clou

func (c *Controller) Reconcile(ctx context.Context, n *corev1.Node) (reconcile.Result, error) {
ctx = injection.WithControllerName(ctx, "node.termination")
ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("Node", klog.KRef(n.Namespace, n.Name)))

if !n.GetDeletionTimestamp().IsZero() {
return c.finalize(ctx, n)
Expand All @@ -92,99 +94,149 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile
return reconcile.Result{}, nil
}

nodeClaims, err := nodeutils.GetNodeClaims(ctx, c.kubeClient, node)
nodeClaim, err := nodeutils.NodeClaimForNode(ctx, c.kubeClient, node)
if err != nil {
return reconcile.Result{}, fmt.Errorf("listing nodeclaims, %w", err)
if nodeutils.IsDuplicateNodeClaimError(err) || nodeutils.IsNodeClaimNotFoundError(err) {
log.FromContext(ctx).Error(err, "failed to terminate node")
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
}
ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("NodeClaim", klog.KRef(nodeClaim.Namespace, nodeClaim.Name)))
if nodeClaim.DeletionTimestamp.IsZero() {
if err := c.kubeClient.Delete(ctx, nodeClaim); err != nil {
if errors.IsNotFound(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, fmt.Errorf("deleting nodeclaim, %w", err)
}
}

if err = c.deleteAllNodeClaims(ctx, nodeClaims...); err != nil {
return reconcile.Result{}, fmt.Errorf("deleting nodeclaims, %w", err)
// If the underlying NodeClaim no longer exists, we want to delete to avoid trying to gracefully drain nodes that are
// no longer alive. We do a check on the Ready condition of the node since, even though the CloudProvider says the
// instance is not around, we know that the kubelet process is still running if the Node Ready condition is true.
// Similar logic to: https://github.com/kubernetes/kubernetes/blob/3a75a8c8d9e6a1ebd98d8572132e675d4980f184/staging/src/k8s.io/cloud-provider/controllers/nodelifecycle/node_lifecycle_controller.go#L144
if nodeutils.GetCondition(node, corev1.NodeReady).Status != corev1.ConditionTrue {
if _, err = c.cloudProvider.Get(ctx, node.Spec.ProviderID); err != nil {
if cloudprovider.IsNodeClaimNotFoundError(err) {
return reconcile.Result{}, c.removeFinalizer(ctx, node)
}
return reconcile.Result{}, fmt.Errorf("getting nodeclaim, %w", err)
}
}

nodeTerminationTime, err := c.nodeTerminationTime(node, nodeClaims...)
nodeTerminationTime, err := c.nodeTerminationTime(node, nodeClaim)
if err != nil {
return reconcile.Result{}, err
}

if err = c.terminator.Taint(ctx, node, v1.DisruptedNoScheduleTaint); err != nil {
if errors.IsConflict(err) {
if errors.IsConflict(err) || errors.IsNotFound(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, client.IgnoreNotFound(fmt.Errorf("tainting node with %s, %w", pretty.Taint(v1.DisruptedNoScheduleTaint), err))
return reconcile.Result{}, fmt.Errorf("tainting node with %s, %w", pretty.Taint(v1.DisruptedNoScheduleTaint), err)
}
if err = c.terminator.Drain(ctx, node, nodeTerminationTime); err != nil {
if !terminator.IsNodeDrainError(err) {
return reconcile.Result{}, fmt.Errorf("draining node, %w", err)
}
c.recorder.Publish(terminatorevents.NodeFailedToDrain(node, err))
// If the underlying NodeClaim no longer exists, we want to delete to avoid trying to gracefully draining
// on nodes that are no longer alive. We do a check on the Ready condition of the node since, even
// though the CloudProvider says the instance is not around, we know that the kubelet process is still running
// if the Node Ready condition is true
// Similar logic to: https://github.com/kubernetes/kubernetes/blob/3a75a8c8d9e6a1ebd98d8572132e675d4980f184/staging/src/k8s.io/cloud-provider/controllers/nodelifecycle/node_lifecycle_controller.go#L144
if nodeutils.GetCondition(node, corev1.NodeReady).Status != corev1.ConditionTrue {
if _, err = c.cloudProvider.Get(ctx, node.Spec.ProviderID); err != nil {
if cloudprovider.IsNodeClaimNotFoundError(err) {
return reconcile.Result{}, c.removeFinalizer(ctx, node)
stored := nodeClaim.DeepCopy()
if modified := nodeClaim.StatusConditions().SetFalse(v1.ConditionTypeDrained, "Draining", "Draining"); modified {
if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
if errors.IsConflict(err) || errors.IsNotFound(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, fmt.Errorf("getting nodeclaim, %w", err)
return reconcile.Result{}, err
}
}

return reconcile.Result{RequeueAfter: 1 * time.Second}, nil
}
NodesDrainedTotal.Inc(map[string]string{
metrics.NodePoolLabel: node.Labels[v1.NodePoolLabelKey],
})
if !nodeClaim.StatusConditions().Get(v1.ConditionTypeDrained).IsTrue() {
stored := nodeClaim.DeepCopy()
_ = nodeClaim.StatusConditions().SetTrue(v1.ConditionTypeDrained)
if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
if errors.IsConflict(err) || errors.IsNotFound(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, err
}
NodesDrainedTotal.Inc(map[string]string{
metrics.NodePoolLabel: node.Labels[v1.NodePoolLabelKey],
})
// We requeue after a patch operation since we want to ensure we read our own writes before any subsequent
// operations on the NodeClaim.
return reconcile.Result{RequeueAfter: 1 * time.Second}, nil
}

// In order for Pods associated with PersistentVolumes to smoothly migrate from the terminating Node, we wait
// for VolumeAttachments of drain-able Pods to be cleaned up before terminating Node and removing its finalizer.
// However, if TerminationGracePeriod is configured for Node, and we are past that period, we will skip waiting.
if nodeTerminationTime == nil || c.clock.Now().Before(*nodeTerminationTime) {
areVolumesDetached, err := c.ensureVolumesDetached(ctx, node)
if err != nil {
return reconcile.Result{}, fmt.Errorf("ensuring no volume attachments, %w", err)
}
if !areVolumesDetached {
return reconcile.Result{RequeueAfter: 1 * time.Second}, nil
}
}
nodeClaims, err = nodeutils.GetNodeClaims(ctx, c.kubeClient, node)
volumesDetached, err := c.ensureVolumesDetached(ctx, node)
if err != nil {
return reconcile.Result{}, fmt.Errorf("deleting nodeclaims, %w", err)
return reconcile.Result{}, fmt.Errorf("ensuring no volume attachments, %w", err)
}
for _, nodeClaim := range nodeClaims {
isInstanceTerminated, err := termination.EnsureTerminated(ctx, c.kubeClient, nodeClaim, c.cloudProvider)
if err != nil {
// 404 = the nodeClaim no longer exists
if errors.IsNotFound(err) {
continue
if volumesDetached {
stored := nodeClaim.DeepCopy()
if modified := nodeClaim.StatusConditions().SetTrue(v1.ConditionTypeVolumesDetached); modified {
if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
if errors.IsConflict(err) || errors.IsNotFound(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, err
}
// 409 - The nodeClaim exists, but its status has already been modified
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
// We requeue after a patch operation since we want to ensure we read our own writes before any subsequent
// operations on the NodeClaim.
return reconcile.Result{RequeueAfter: 1 * time.Second}, nil
}
} else if !c.hasTerminationGracePeriodElapsed(nodeTerminationTime) {
c.recorder.Publish(terminatorevents.NodeAwaitingVolumeDetachmentEvent(node))
stored := nodeClaim.DeepCopy()
if modified := nodeClaim.StatusConditions().SetFalse(v1.ConditionTypeVolumesDetached, "AwaitingVolumeDetachment", "AwaitingVolumeDetachment"); modified {
if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
if errors.IsConflict(err) || errors.IsNotFound(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, err
}
}
return reconcile.Result{RequeueAfter: 1 * time.Second}, nil
} else {
stored := nodeClaim.DeepCopy()
if modified := nodeClaim.StatusConditions().SetFalse(v1.ConditionTypeVolumesDetached, "TerminationGracePeriodElapsed", "TerminationGracePeriodElapsed"); modified {
if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
if errors.IsConflict(err) || errors.IsNotFound(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, err
}
return reconcile.Result{}, fmt.Errorf("ensuring instance termination, %w", err)
// We requeue after a patch operation since we want to ensure we read our own writes before any subsequent
// operations on the NodeClaim.
return reconcile.Result{RequeueAfter: 1 * time.Second}, nil
}
if !isInstanceTerminated {
return reconcile.Result{RequeueAfter: 5 * time.Second}, nil
}

isInstanceTerminated, err := termination.EnsureTerminated(ctx, c.kubeClient, nodeClaim, c.cloudProvider)
if client.IgnoreNotFound(err) != nil {
// 409 - The nodeClaim exists, but its status has already been modified
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, fmt.Errorf("ensuring instance termination, %w", err)
}
if !isInstanceTerminated {
return reconcile.Result{RequeueAfter: 5 * time.Second}, nil
}
if err := c.removeFinalizer(ctx, node); err != nil {
return reconcile.Result{}, err
}
return reconcile.Result{}, nil
}

func (c *Controller) deleteAllNodeClaims(ctx context.Context, nodeClaims ...*v1.NodeClaim) error {
for _, nodeClaim := range nodeClaims {
// If we still get the NodeClaim, but it's already marked as terminating, we don't need to call Delete again
if nodeClaim.DeletionTimestamp.IsZero() {
if err := c.kubeClient.Delete(ctx, nodeClaim); err != nil {
return client.IgnoreNotFound(err)
}
}
func (c *Controller) hasTerminationGracePeriodElapsed(nodeTerminationTime *time.Time) bool {
if nodeTerminationTime == nil {
return false
}
return nil
return !c.clock.Now().Before(*nodeTerminationTime)
}

func (c *Controller) ensureVolumesDetached(ctx context.Context, node *corev1.Node) (volumesDetached bool, err error) {
Expand Down
Loading

0 comments on commit 21176e1

Please sign in to comment.