Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement power states as machine stage events #843

Merged
merged 1 commit into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,978 changes: 1,026 additions & 952 deletions client/api/omni/specs/omni.pb.go

Large diffs are not rendered by default.

10 changes: 9 additions & 1 deletion client/api/omni/specs/omni.proto
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ message RedactedClusterMachineConfigSpec {

// CompressedData contains the config bytes. It is only set if the config is not compressed. Otherwise, CompressedData is set instead.
//
// Deprecated: use accessor methods GetUncompressedData/SetUncompressedDataa to manage this field.
// Deprecated: use accessor methods GetUncompressedData/SetUncompressedData to manage this field.
bytes compressed_data = 2;
}

Expand Down Expand Up @@ -502,6 +502,7 @@ message ClusterMachineStatusSpec {
BEFORE_DESTROY = 9;
DESTROYING = 5;
POWERING_ON = 10;
POWERED_OFF = 11;
}

Stage stage = 2;
Expand Down Expand Up @@ -829,7 +830,14 @@ message MachineLabelsSpec {}

// MachineStatusSnapshotSpec describes latest known status of MachineStatus Talos resource.
message MachineStatusSnapshotSpec {
enum PowerStage {
POWER_STAGE_NONE = 0;
POWER_STAGE_POWERED_OFF = 1;
POWER_STAGE_POWERING_ON = 2;
}

machine.MachineStatusEvent machine_status = 1;
PowerStage power_stage = 2;
}

enum ConditionType {
Expand Down
31 changes: 31 additions & 0 deletions client/api/omni/specs/omni_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions client/pkg/template/operations/internal/statustree/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ func clusterMachineStageString(phase specs.ClusterMachineStatusSpec_Stage) strin
c = color.HiRedString
case specs.ClusterMachineStatusSpec_RUNNING:
c = color.GreenString
case specs.ClusterMachineStatusSpec_POWERED_OFF:
c = color.WhiteString
default:
c = fmt.Sprintf
}
Expand Down
8 changes: 8 additions & 0 deletions frontend/src/api/omni/specs/omni.pb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ export enum ClusterMachineStatusSpecStage {
BEFORE_DESTROY = 9,
DESTROYING = 5,
POWERING_ON = 10,
POWERED_OFF = 11,
}

export enum ClusterStatusSpecPhase {
Expand Down Expand Up @@ -117,6 +118,12 @@ export enum TalosUpgradeStatusSpecPhase {
InstallingExtensions = 5,
}

export enum MachineStatusSnapshotSpecPowerStage {
POWER_STAGE_NONE = 0,
POWER_STAGE_POWERED_OFF = 1,
POWER_STAGE_POWERING_ON = 2,
}

export enum ControlPlaneStatusSpecConditionStatus {
Unknown = 0,
Ready = 1,
Expand Down Expand Up @@ -570,6 +577,7 @@ export type MachineLabelsSpec = {

export type MachineStatusSnapshotSpec = {
machine_status?: MachineMachine.MachineStatusEvent
power_stage?: MachineStatusSnapshotSpecPowerStage
}

export type ControlPlaneStatusSpecCondition = {
Expand Down
98 changes: 98 additions & 0 deletions internal/backend/powerstage/watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright (c) 2024 Sidero Labs, Inc.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.

// Package powerstage provides a power stage watcher that produces MachineStatusSnapshot events containing power stage information.
package powerstage

import (
"context"
"fmt"

"github.com/cosi-project/runtime/pkg/safe"
"github.com/cosi-project/runtime/pkg/state"
"go.uber.org/zap"

"github.com/siderolabs/omni/client/api/omni/specs"
"github.com/siderolabs/omni/client/pkg/omni/resources"
"github.com/siderolabs/omni/client/pkg/omni/resources/infra"
"github.com/siderolabs/omni/client/pkg/omni/resources/omni"
)

// Watcher watches the infra.MachineStatus resources and sends MachineStatusSnapshot resources with the power stage information to the snapshot channel.
type Watcher struct {
state state.State
snapshotCh chan<- *omni.MachineStatusSnapshot
logger *zap.Logger
}

// NewWatcher creates a new Watcher.
func NewWatcher(state state.State, snapshotCh chan<- *omni.MachineStatusSnapshot, logger *zap.Logger) *Watcher {
return &Watcher{
state: state,
snapshotCh: snapshotCh,
logger: logger,
}
}

// Run runs the Watcher.
func (watcher *Watcher) Run(ctx context.Context) error {
eventCh := make(chan safe.WrappedStateEvent[*infra.MachineStatus])

if err := safe.StateWatchKind[*infra.MachineStatus](ctx, watcher.state, infra.NewMachineStatus("").Metadata(), eventCh); err != nil {
return err
}

for {
var event safe.WrappedStateEvent[*infra.MachineStatus]

select {
case <-ctx.Done():
watcher.logger.Info("power status watcher stopped")

return nil
case event = <-eventCh:
}

switch event.Type() { //nolint:exhaustive
case state.Created, state.Updated:
default: // ignore other events
continue
}

if err := event.Error(); err != nil {
return fmt.Errorf("failed to watch machine status: %w", err)
}

resource, err := event.Resource()
if err != nil {
return fmt.Errorf("failed to read resource from the event: %w", err)
}

utkuozdemir marked this conversation as resolved.
Show resolved Hide resolved
if resource.TypedSpec().Value.PowerState == specs.InfraMachineStatusSpec_POWER_STATE_OFF {
snapshot := omni.NewMachineStatusSnapshot(resources.DefaultNamespace, resource.Metadata().ID())

// find out if it is assigned to a cluster, and if so, mark it as "powering on"
if _, err = watcher.state.Get(ctx, omni.NewClusterMachine(resources.DefaultNamespace, resource.Metadata().ID()).Metadata()); err != nil {
if !state.IsNotFoundError(err) {
return err
}

snapshot.TypedSpec().Value.PowerStage = specs.MachineStatusSnapshotSpec_POWER_STAGE_POWERED_OFF
} else {
snapshot.TypedSpec().Value.PowerStage = specs.MachineStatusSnapshotSpec_POWER_STAGE_POWERING_ON
}

select {
case <-ctx.Done():
watcher.logger.Info("power status watcher stopped before sending a snapshot")
case watcher.snapshotCh <- snapshot:
}

watcher.logger.Debug("sent power stage snapshot",
zap.String("machine_id", resource.Metadata().ID()),
zap.Stringer("power_stage", snapshot.TypedSpec().Value.PowerStage))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,14 @@ func NewClusterMachineStatusController() *ClusterMachineStatusController {
cmsVal.Stage = specs.ClusterMachineStatusSpec_CONFIGURING
}

if err = reconcilePoweringOn(ctx, r, clusterMachineStatus); err != nil {
return err
// if the machine snapshot has a power stage set (not NONE), overwrite the stage
switch statusSnapshot.TypedSpec().Value.PowerStage {
case specs.MachineStatusSnapshotSpec_POWER_STAGE_NONE:
// do not override
case specs.MachineStatusSnapshotSpec_POWER_STAGE_POWERED_OFF:
cmsVal.Stage = specs.ClusterMachineStatusSpec_POWERED_OFF
case specs.MachineStatusSnapshotSpec_POWER_STAGE_POWERING_ON:
cmsVal.Stage = specs.ClusterMachineStatusSpec_POWERING_ON
}

clusterMachineIdentity, err := safe.ReaderGet[*omni.ClusterMachineIdentity](ctx, r, omni.NewClusterMachineIdentity(
Expand Down Expand Up @@ -269,30 +275,6 @@ func NewClusterMachineStatusController() *ClusterMachineStatusController {
)
}

// reconcilePoweringOn overwrites the machine stage if the machine is managed by the bare metal infra provider and is being powered on right now.
func reconcilePoweringOn(ctx context.Context, r controller.Reader, clusterMachineStatus *omni.ClusterMachineStatus) error {
_, err := safe.ReaderGetByID[*infra.Machine](ctx, r, clusterMachineStatus.Metadata().ID())
if err != nil {
if state.IsNotFoundError(err) {
return nil
}

return err
}

infraMachineStatus, err := safe.ReaderGetByID[*infra.MachineStatus](ctx, r, clusterMachineStatus.Metadata().ID())
if err != nil && !state.IsNotFoundError(err) {
return err
}

// if the expected state is powered on, but the actual power state is off overwrite the cluster machine state to be "POWERING_ON"
if infraMachineStatus == nil || infraMachineStatus.TypedSpec().Value.PowerState != specs.InfraMachineStatusSpec_POWER_STATE_ON {
clusterMachineStatus.TypedSpec().Value.Stage = specs.ClusterMachineStatusSpec_POWERING_ON
}

return nil
}

func updateMachineProvisionStatus(ctx context.Context, r controller.Reader, machineStatus *omni.MachineStatus, cmsVal *specs.ClusterMachineStatusSpec) error {
machineRequestID, ok := machineStatus.Metadata().Labels().Get(omni.LabelMachineRequest)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,19 @@ type MachineStatusSnapshotController struct {
runner *task.Runner[snapshot.InfoChan, snapshot.CollectTaskSpec]
notifyCh chan *omni.MachineStatusSnapshot
siderolinkCh <-chan *omni.MachineStatusSnapshot
powerStageCh <-chan *omni.MachineStatusSnapshot
utkuozdemir marked this conversation as resolved.
Show resolved Hide resolved
generic.NamedController
}

// NewMachineStatusSnapshotController initializes MachineStatusSnapshotController.
func NewMachineStatusSnapshotController(siderolinkEventsCh <-chan *omni.MachineStatusSnapshot) *MachineStatusSnapshotController {
func NewMachineStatusSnapshotController(siderolinkEventsCh, powerStageEventsCh <-chan *omni.MachineStatusSnapshot) *MachineStatusSnapshotController {
return &MachineStatusSnapshotController{
NamedController: generic.NamedController{
ControllerName: MachineStatusSnapshotControllerName,
},
notifyCh: make(chan *omni.MachineStatusSnapshot),
siderolinkCh: siderolinkEventsCh,
powerStageCh: powerStageEventsCh,
runner: task.NewEqualRunner[snapshot.CollectTaskSpec](),
}
}
Expand Down Expand Up @@ -87,6 +89,10 @@ func (ctrl *MachineStatusSnapshotController) Settings() controller.QSettings {
if err := ctrl.reconcileSnapshot(ctx, r, resource); err != nil {
return err
}
case resource := <-ctrl.powerStageCh:
if err := ctrl.reconcileSnapshot(ctx, r, resource); err != nil {
return err
}
}
}
},
Expand Down Expand Up @@ -235,8 +241,12 @@ func (ctrl *MachineStatusSnapshotController) reconcileSnapshot(ctx context.Conte
return nil
}

if err := safe.WriterModify(ctx, r, omni.NewMachineStatusSnapshot(resources.DefaultNamespace, snapshot.Metadata().ID()), func(m *omni.MachineStatusSnapshot) error {
m.TypedSpec().Value = snapshot.TypedSpec().Value
if err = safe.WriterModify(ctx, r, omni.NewMachineStatusSnapshot(resources.DefaultNamespace, snapshot.Metadata().ID()), func(m *omni.MachineStatusSnapshot) error {
if snapshot.TypedSpec().Value.MachineStatus != nil { // if this is a power stage snapshot, it will not contain machine status, so we preserve the existing value
m.TypedSpec().Value.MachineStatus = snapshot.TypedSpec().Value.MachineStatus
}

m.TypedSpec().Value.PowerStage = snapshot.TypedSpec().Value.PowerStage // always set the power stage

return nil
}); err != nil && !state.IsPhaseConflictError(err) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (suite *MachineStatusSnapshotControllerSuite) TestReconcile() {

siderolinkEventsCh := make(chan *omni.MachineStatusSnapshot)

suite.Require().NoError(suite.runtime.RegisterQController(omnictrl.NewMachineStatusSnapshotController(siderolinkEventsCh)))
suite.Require().NoError(suite.runtime.RegisterQController(omnictrl.NewMachineStatusSnapshotController(siderolinkEventsCh, nil)))

m := omni.NewMachine(resources.DefaultNamespace, "1")
m.TypedSpec().Value.Connected = true
Expand Down
4 changes: 2 additions & 2 deletions internal/backend/runtime/omni/migration/migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1393,7 +1393,7 @@ func (suite *MigrationSuite) TestSetMachineStatusSnapshotOwner() {
suite.Require().NoError(suite.state.Create(
ctx,
item,
state.WithCreateOwner(omnictrl.NewMachineStatusSnapshotController(nil).Name())),
state.WithCreateOwner(omnictrl.MachineStatusSnapshotControllerName)),
)
}

Expand All @@ -1405,7 +1405,7 @@ func (suite *MigrationSuite) TestSetMachineStatusSnapshotOwner() {
check := func(item *omni.MachineStatusSnapshot, expectedVersion int) {
result, err := safe.StateGet[*omni.MachineStatusSnapshot](ctx, suite.state, item.Metadata())
suite.Require().NoError(err)
suite.Require().Equal(omnictrl.NewMachineStatusSnapshotController(nil).Name(), result.Metadata().Owner())
suite.Require().Equal(omnictrl.MachineStatusSnapshotControllerName, result.Metadata().Owner())
suite.Require().EqualValues(result.Metadata().Version().Value(), expectedVersion)
}

Expand Down
2 changes: 1 addition & 1 deletion internal/backend/runtime/omni/migration/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -1175,7 +1175,7 @@ func setMachineStatusSnapshotOwner(ctx context.Context, st state.State, logger *
logger.Info("updating machine status snapshot with empty owner", zap.String("id", item.Metadata().String()))

_, err = safe.StateUpdateWithConflicts(ctx, st, item.Metadata(), func(res *omni.MachineStatusSnapshot) error {
return res.Metadata().SetOwner(omnictrl.NewMachineStatusSnapshotController(nil).Name())
return res.Metadata().SetOwner(omnictrl.MachineStatusSnapshotControllerName)
}, state.WithExpectedPhaseAny(), state.WithUpdateOwner(item.Metadata().Owner()))
if err != nil {
return err
Expand Down
Loading
Loading