From 61360214b0713f03f95e3478da7653ab0929ff2d Mon Sep 17 00:00:00 2001 From: Igor Eulalio Date: Mon, 9 Dec 2024 18:19:48 -0300 Subject: [PATCH] add logs using proper plugin, finish configuration Signed-off-by: Igor Eulalio --- plugins/k8saudit-aks/falco_k8s_audit.yaml | 0 .../pkg/k8sauditaks/k8sauditaks.go | 44 +++++++++---------- shared/go/azure/eventhub/processor.go | 14 +++--- 3 files changed, 28 insertions(+), 30 deletions(-) create mode 100644 plugins/k8saudit-aks/falco_k8s_audit.yaml diff --git a/plugins/k8saudit-aks/falco_k8s_audit.yaml b/plugins/k8saudit-aks/falco_k8s_audit.yaml new file mode 100644 index 00000000..e69de29b diff --git a/plugins/k8saudit-aks/pkg/k8sauditaks/k8sauditaks.go b/plugins/k8saudit-aks/pkg/k8sauditaks/k8sauditaks.go index d4d54339..d7da998d 100644 --- a/plugins/k8saudit-aks/pkg/k8sauditaks/k8sauditaks.go +++ b/plugins/k8saudit-aks/pkg/k8sauditaks/k8sauditaks.go @@ -3,7 +3,6 @@ package k8sauditaks import ( "context" "encoding/json" - "fmt" "log" "os" "regexp" @@ -41,7 +40,7 @@ type PluginConfig struct { RateLimitBurst int `json:"rate_limit_burst" jsonschema:"title=rate_limit_burst,description=The rate limit burst of events to read from EventHub"` } -func (k *Plugin) Info() *plugins.Info { +func (p *Plugin) Info() *plugins.Info { return &plugins.Info{ ID: 21, Name: pluginName, @@ -72,6 +71,8 @@ func (p *PluginConfig) Reset() { return } p.RateLimitEventsPerSecond = rateLimitEventsPerSecond + } else { + p.RateLimitEventsPerSecond = 100 } if i := os.Getenv("RATE_LIMIT_BURST"); i != "" { rateLimitBurst, err := strconv.Atoi(i) @@ -79,15 +80,17 @@ func (p *PluginConfig) Reset() { return } p.RateLimitBurst = rateLimitBurst + } else { + p.RateLimitBurst = 200 } } -func (k *Plugin) Init(cfg string) error { +func (p *Plugin) Init(cfg string) error { // read configuration - k.Plugin.Config.Reset() - k.Config.Reset() + p.Plugin.Config.Reset() + p.Config.Reset() - err := json.Unmarshal([]byte(cfg), &k.Config) + err := json.Unmarshal([]byte(cfg), &p.Config) if err != nil { return err } @@ -97,7 +100,7 @@ func (k *Plugin) Init(cfg string) error { return err } - k.Logger = log.New(os.Stderr, "["+pluginName+"] ", log.LstdFlags|log.LUTC|log.Lmsgprefix) + p.Logger = log.New(os.Stderr, "["+pluginName+"] ", log.LstdFlags|log.LUTC|log.Lmsgprefix) return nil } @@ -124,24 +127,27 @@ func (p *Plugin) OpenParams() ([]sdk.OpenParam, error) { } func (p *Plugin) Open(_ string) (source.Instance, error) { - ctx, cancel := context.WithCancel(context.Background()) + ctx, _ := context.WithCancel(context.Background()) checkClient, err := container.NewClientFromConnectionString(p.Config.BlobStorageConnectionString, p.Config.BlobStorageContainerName, nil) if err != nil { p.Logger.Printf("error opening connection to blob storage: %v", err) return nil, err } + p.Logger.Printf("opened connection to blob storage") checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil) if err != nil { p.Logger.Printf("error opening blob checkpoint connection: %v", err) return nil, err } + p.Logger.Printf("opened blob checkpoint connection") consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString( p.Config.EventHubNamespaceConnectionString, p.Config.EventHubName, azeventhubs.DefaultConsumerGroup, nil, ) + p.Logger.Printf("opened consumer client") if err != nil { p.Logger.Printf("error creating consumer client: %v", err) return nil, err @@ -160,25 +166,22 @@ func (p *Plugin) Open(_ string) (source.Instance, error) { RateLimiter: rateLimiter, } - fmt.Println("Plugin initialization complete.") + p.Logger.Printf("created eventhub processor") eventsC := make(chan falcoeventhub.Record) pushEventC := make(chan source.PushEvent) - // Start processing partition clients go func() { for { - partitionClient := processor.NextPartitionClient(ctx) + partitionClient := processor.NextPartitionClient(context.Background()) if partitionClient == nil { break } - // Capture the partitionClient variable - go func(pc *azeventhubs.ProcessorPartitionClient) { - //defer pc.Close(ctx) - if err := falcoEventHubProcessor.Process(pc, eventsC); err != nil { + go func(pc *azeventhubs.ProcessorPartitionClient, ec chan<- falcoeventhub.Record) { + if err := falcoEventHubProcessor.Process(partitionClient, eventsC); err != nil { p.Logger.Printf("error processing partition client: %v", err) } - }(partitionClient) + }(partitionClient, eventsC) } }() @@ -200,14 +203,10 @@ func (p *Plugin) Open(_ string) (source.Instance, error) { p.Logger.Println(j.Err) continue } - select { - case pushEventC <- *j: - case <-ctx.Done(): - return - } + pushEventC <- *j } case <-ctx.Done(): - p.Logger.Println("context done") + p.Logger.Println("context done in eventsC") return } } @@ -223,7 +222,6 @@ func (p *Plugin) Open(_ string) (source.Instance, error) { return source.NewPushInstance( pushEventC, source.WithInstanceClose(func() { - cancel() // Close consumerClient when the context is canceled if err := consumerClient.Close(context.Background()); err != nil { p.Logger.Printf("error closing consumer client: %v", err) diff --git a/shared/go/azure/eventhub/processor.go b/shared/go/azure/eventhub/processor.go index 96a2ba98..54225c4f 100644 --- a/shared/go/azure/eventhub/processor.go +++ b/shared/go/azure/eventhub/processor.go @@ -32,10 +32,13 @@ func (p *Processor) Process( defer closePartitionResources(partitionClient) for { - receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute) + ctx := context.Background() + + receiveCtx, receiveCtxCancel := context.WithTimeout(ctx, time.Minute) + fmt.Printf("Receiving events on partitionId %v\n", partitionClient.PartitionID()) events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil) + fmt.Printf("Received %d events on partitionId %v\n", len(events), partitionClient.PartitionID()) receiveCtxCancel() - if err != nil && !errors.Is(err, context.DeadlineExceeded) { return err } @@ -45,22 +48,19 @@ func (p *Processor) Process( if err != nil { return err } - for _, record := range eventData.Records { ctx := context.Background() err := p.RateLimiter.Wait(ctx) if err != nil { - fmt.Printf("Error ocurred while waiting for rate limiter: %v\n", err) continue } - fmt.Printf("Received record: %v\n", record) recordChan <- record - fmt.Printf("Sent record: %v\n", record) } - if err := partitionClient.UpdateCheckpoint(context.TODO(), event, nil); err != nil { + if err := partitionClient.UpdateCheckpoint(ctx, event, nil); err != nil { return err } + fmt.Printf("Updated checkpoint for partitionId %v\n", partitionClient.PartitionID()) } } }