Skip to content

Commit

Permalink
feat(kernelLogWatcher): enable revive kmsg parser if channel closed
Browse files Browse the repository at this point in the history
  • Loading branch information
daveoy committed Jan 9, 2025
1 parent 053539e commit f66aa56
Showing 1 changed file with 46 additions and 10 deletions.
56 changes: 46 additions & 10 deletions pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,19 @@ import (
"k8s.io/node-problem-detector/pkg/util/tomb"
)

const (
reviveRetries = 10
reviveDuration = 5 * time.Second
)

type kernelLogWatcher struct {
cfg types.WatcherConfig
startTime time.Time
logCh chan *logtypes.Log
tomb *tomb.Tomb

kmsgParser kmsgparser.Parser
kmsgParser kmsgparser.Parser
reviveCount int
}

// NewKmsgWatcher creates a watcher which will read messages from /dev/kmsg
Expand All @@ -55,22 +61,17 @@ func NewKmsgWatcher(cfg types.WatcherConfig) types.LogWatcher {
startTime: startTime,
tomb: tomb.NewTomb(),
// Arbitrary capacity
logCh: make(chan *logtypes.Log, 100),
logCh: make(chan *logtypes.Log, 100),
reviveCount: 0,
}
}

var _ types.WatcherCreateFunc = NewKmsgWatcher

func (k *kernelLogWatcher) Watch() (<-chan *logtypes.Log, error) {
if k.kmsgParser == nil {
// nil-check to make mocking easier
parser, err := kmsgparser.NewParser()
if err != nil {
return nil, fmt.Errorf("failed to create kmsg parser: %v", err)
}
k.kmsgParser = parser
if err := k.SetKmsgParser(); err != nil {
return nil, err
}

go k.watchLoop()
return k.logCh, nil
}
Expand Down Expand Up @@ -99,6 +100,9 @@ func (k *kernelLogWatcher) watchLoop() {
return
case msg, ok := <-kmsgs:
if !ok {
if val, ok := k.cfg.PluginConfig["revive"]; ok && val == "true" {
k.reviveMyself()
}
klog.Error("Kmsg channel closed")
return
}
Expand All @@ -120,3 +124,35 @@ func (k *kernelLogWatcher) watchLoop() {
}
}
}

// create a new kmsg parser and sets it to the watcher.
func (k *kernelLogWatcher) SetKmsgParser() error {
parser, err := kmsgparser.NewParser()
if err != nil {
return fmt.Errorf("failed to create kmsg parser: %v", err)
}
k.kmsgParser = parser
return nil
}

// revive ourselves if the kmsg channel is closed
// close the old kmsg parser and create a new one
// enter the watch loop again
func (k *kernelLogWatcher) reviveMyself() {
// if k.reviveCount >= reviveRetries {
// klog.Errorf("Failed to revive kmsg parser after %d retries", reviveRetries)
// return
// }
// klog.Infof("Reviving kmsg parser, attempt %d of %d", k.reviveCount, reviveRetries)
klog.Infof("Reviving kmsg parser, attempt %d", k.reviveCount)
if err := k.kmsgParser.Close(); err != nil {
klog.Errorf("Failed to close kmsg parser: %v", err)
}
time.Sleep(reviveDuration)
if err := k.SetKmsgParser(); err != nil {
klog.Errorf("Failed to revive kmsg parser: %v", err)
return
}
k.reviveCount++
k.watchLoop()
}

0 comments on commit f66aa56

Please sign in to comment.