diff --git a/bkunifylogbeat.reference.yml b/bkunifylogbeat.reference.yml index fab9a43..38df6ec 100644 --- a/bkunifylogbeat.reference.yml +++ b/bkunifylogbeat.reference.yml @@ -21,6 +21,8 @@ bkunifylogbeat.buffertimeout: 2 bkunifylogbeat.eventdataid: -1 bkunifylogbeat.max_cpu_limit: -1 bkunifylogbeat.cpu_check_times: 10 +bkunifylogbeat.registry.operation_log_path: /var/lib/gse/operation.log + bkunifylogbeat.multi_config: - path: "/usr/local/gse/plugins/etc/bkunifylogbeat" file_pattern: "*.conf" diff --git a/config/config.go b/config/config.go index e794d54..da7ee3d 100644 --- a/config/config.go +++ b/config/config.go @@ -57,8 +57,9 @@ type SecConfigItem struct { // 采集状态 type Registry struct { - FlushTimeout time.Duration `config:"flush"` - GcFrequency time.Duration `config:"gc_frequency"` + FlushTimeout time.Duration `config:"flush"` + GcFrequency time.Duration `config:"gc_frequency"` + OperationLogPath string `config:"operation_log_path"` } //默认配置 diff --git a/registrar/registrar.go b/registrar/registrar.go index dc64447..c88816b 100644 --- a/registrar/registrar.go +++ b/registrar/registrar.go @@ -23,6 +23,7 @@ package registrar import ( + "bufio" "encoding/json" "fmt" "os" @@ -50,18 +51,27 @@ var ( registrarFlushed = bkmonitoring.NewInt("registrar_flushed") registrarMarshalError = bkmonitoring.NewInt("registrar_marshal_error") registrarFiles = bkmonitoring.NewInt("registrar_files", monitoring.Gauge) + operationLogFileObj *os.File + operationLogWriter *bufio.Writer ) +type operation struct { + State file.State + Time time.Time +} + // Registrar: 采集进度管理 type Registrar struct { - Channel chan []file.State - done chan struct{} - wg sync.WaitGroup - - states *file.States // Map with all file paths inside and the corresponding state - gcRequired bool // gcRequired is set if registry state needs to be gc'ed before the next write - flushTimeout time.Duration - gcFrequency time.Duration + Channel chan []file.State + done chan struct{} + wg sync.WaitGroup + operationLogWG sync.WaitGroup + + states *file.States // Map with all file paths inside and the corresponding state + gcRequired bool // gcRequired is set if registry state needs to be gc'ed before the next write + flushTimeout time.Duration + gcFrequency time.Duration + operationLogPath string } // New creates a new Registrar instance, updating the registry file on @@ -71,10 +81,11 @@ func New(config cfg.Registry) (*Registrar, error) { done: make(chan struct{}), wg: sync.WaitGroup{}, - states: file.NewStates(), - Channel: make(chan []file.State, 1), - flushTimeout: config.FlushTimeout, - gcFrequency: config.GcFrequency, + states: file.NewStates(), + Channel: make(chan []file.State, 1), + flushTimeout: config.FlushTimeout, + gcFrequency: config.GcFrequency, + operationLogPath: config.OperationLogPath, } return r, r.Init() } @@ -82,6 +93,13 @@ func New(config cfg.Registry) (*Registrar, error) { // Init: 采集器启动时调用,同时对原采集器采集进度迁移 func (r *Registrar) Init() error { var states []file.State + var err error + + operationLogFileObj, err = os.OpenFile(r.operationLogPath, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666) + if err != nil { + return fmt.Errorf("open opeartion Log file returned error: %v. Continuing", err) + } + operationLogWriter = bufio.NewWriter(operationLogFileObj) // get time str, err := bkStorage.Get(timeKey) @@ -112,11 +130,14 @@ func (r *Registrar) Init() error { logp.L.Errorf("json unmarshal error, %s", str) return fmt.Errorf("error decoding states: %s", err) } + states = r.migrate(states) logp.L.Infof("load states: time=>%s, count=>%d, flush=>%s, gcFrequency=>%s", t, len(states), r.flushTimeout, r.gcFrequency) r.states.SetStates(ResetStates(states)) + operations := r.loadOperation() + r.migrateOperation(operations) return nil } @@ -157,10 +178,13 @@ func (r *Registrar) run() { // Writes registry on shutdown flushTicker := time.NewTicker(r.flushTimeout) gcTicker := time.NewTicker(r.gcFrequency) + operationLogSizeTimer := time.NewTicker(1 * time.Second) defer func() { flushTicker.Stop() gcTicker.Stop() + operationLogSizeTimer.Stop() + operationLogFileObj.Close() r.flushRegistry() r.wg.Done() }() @@ -170,6 +194,8 @@ func (r *Registrar) run() { case <-r.done: logp.L.Info("Ending Registrar") return + case <-operationLogSizeTimer.C: + r.operationLogFileSizeHandler() case <-flushTicker.C: r.flushRegistry() case <-gcTicker.C: @@ -187,8 +213,11 @@ func (r *Registrar) onEvents(states []file.State) { ts := time.Now() for _, s := range states { if s.Type == wineventlog.WinLogFileStateType { - r.states.UpdateWithTs(s, s.Timestamp) + stateTS := s.Timestamp + r.logOperation(s, stateTS) + r.states.UpdateWithTs(s, stateTS) } else { + r.logOperation(s, ts) r.states.UpdateWithTs(s, ts) } } @@ -197,7 +226,7 @@ func (r *Registrar) onEvents(states []file.State) { // flushRegistry writes the registry to disk. func (r *Registrar) flushRegistry() { registrarFlushed.Add(1) - + r.operationLogWG.Add(1) // First clean up states r.gcStates() states := r.GetStates() @@ -213,6 +242,10 @@ func (r *Registrar) flushRegistry() { bkStorage.Set(registrarKey, string(bytes), 0) bkStorage.Set(timeKey, time.Now().Format(time.UnixDate), 0) + + operationLogFileObj.Truncate(0) + operationLogFileObj.Seek(0, 0) + r.operationLogWG.Done() } // migrate file state @@ -256,6 +289,7 @@ func (r *Registrar) gcStates() { for _, state := range states { if state.TTL == stateNotManage { state.TTL = stateNanosecond + r.logOperation(state, time.Now()) r.states.Update(state) } } @@ -269,3 +303,65 @@ func (r *Registrar) gcStates() { r.gcRequired = false } + +func (r *Registrar) logOperation(s file.State, ts time.Time) { + r.operationLogWG.Wait() + operationLog, err := json.Marshal(operation{ + Time: ts, + State: s, + }) + + if err != nil { + logp.L.Errorf("Marshal operationLog returned error: %v. Continuing...", err) + } + + operationLogWriter.WriteString(string(operationLog)) + operationLogWriter.WriteString("\n") + operationLogWriter.Flush() +} + +func (r Registrar) loadOperation() []operation { + operationLogFileObj.Seek(0, 0) + scanner := bufio.NewScanner(operationLogFileObj) + operations := make([]operation, 0) + for scanner.Scan() { + line := scanner.Text() + if line == "" { + continue + } + var operationItem = operation{} + err := json.Unmarshal([]byte(line), &operationItem) + logp.L.Errorf("Unmarshal operationLog returned error: %v. Continuing...", err) + operations = append(operations, operationItem) + + } + return operations +} + +func (r *Registrar) migrateOperation(operations []operation) { + for _, operation := range operations { + s := operation.State + if s.IsEmpty() { + continue + } + if s.Type == wineventlog.WinLogFileStateType { + r.states.UpdateWithTs(s, s.Timestamp) + } else { + r.states.UpdateWithTs(s, operation.Time) + } + } + r.gcRequired = true + r.gcStates() + operationLogFileObj.Truncate(0) + operationLogFileObj.Seek(0, 0) +} + +func (r *Registrar) operationLogFileSizeHandler() { + fileStat, err := os.Stat(r.operationLogPath) + if err != nil { + logp.L.Errorf("Get operation Log Stat returned error: %v. Continuing...", err) + } + if fileStat.Size() == 10*1024*1024 { + r.flushRegistry() + } +} diff --git a/registrar/registrar_test.go b/registrar/registrar_test.go index d521c30..b2d4a52 100644 --- a/registrar/registrar_test.go +++ b/registrar/registrar_test.go @@ -23,6 +23,7 @@ package registrar import ( + "encoding/json" "os" "path/filepath" "testing" @@ -57,6 +58,20 @@ func TestRegistrar(t *testing.T) { } } + testOperationLogPath, err := filepath.Abs("../tests/operation.log") + if err != nil { + panic(err) + } + _, err = os.Stat(testOperationLogPath) + if err != nil { + if os.IsExist(err) { + err = os.Remove(testOperationLogPath) + if err != nil { + panic(err) + } + } + } + //Step 2: 初始化registrar err = bkStorage.Init(testRegPath, nil) if err != nil { @@ -64,8 +79,9 @@ func TestRegistrar(t *testing.T) { } registrar, err := New(cfg.Registry{ - FlushTimeout: 1 * time.Second, - GcFrequency: 1 * time.Second, + FlushTimeout: 1 * time.Second, + GcFrequency: 1 * time.Second, + OperationLogPath: testOperationLogPath, }) if err != nil { panic(err) @@ -96,3 +112,77 @@ func TestRegistrar(t *testing.T) { bkStorage.Close() os.Remove(testRegPath) } + +func TestRegistrarIO(t *testing.T) { + testRegPath, err := filepath.Abs("../tests/registrar.bkpipe.db") + + if err != nil { + panic(err) + } + // Step 1: 如果文件存在则直接删除 + _, err = os.Stat(testRegPath) + if err != nil { + if os.IsExist(err) { + err = os.Remove(testRegPath) + if err != nil { + panic(err) + } + } + } + + testOperationLogPath, err := filepath.Abs("../tests/operation.log") + if err != nil { + panic(err) + } + _, err = os.Stat(testOperationLogPath) + if err != nil { + if os.IsExist(err) { + err = os.Remove(testOperationLogPath) + if err != nil { + panic(err) + } + } + } + + //Step 2: 初始化registrar + err = bkStorage.Init(testRegPath, nil) + bkStorage.Set(timeKey, time.Now().Format(time.UnixDate), 0) + + str, _ := json.Marshal(make([]file.State, 0)) + bkStorage.Set(registrarKey, string(str), 0) + if err != nil { + panic(err) + } + + registrar, err := New(cfg.Registry{ + FlushTimeout: 1 * time.Minute, + GcFrequency: 1 * time.Minute, + OperationLogPath: testOperationLogPath, + }) + if err != nil { + panic(err) + } + err = registrar.Init() + if err != nil { + panic(err) + } + registrar.Start() + source := "/data/logs/test.log" + for i := 0; i < 5; i++ { + states := make([]file.State, 0) + for j := 0; j < 10; j++ { + //Step 3: 写入事件 + data := tests.MockLogEvent(source, "test") + states = append(states, data.GetState()) + } + registrar.Channel <- states + } + time.Sleep(10 * time.Second) + Operations := registrar.loadOperation() + assert.Equal(t, len(Operations), 50) + //Step 5: 关闭并删除文件 + registrar.Stop() + bkStorage.Close() + os.Remove(testRegPath) + os.Remove(testOperationLogPath) +}