Skip to content

Commit

Permalink
add logs using proper plugin, finish configuration
Browse files Browse the repository at this point in the history
Signed-off-by: Igor Eulalio <[email protected]>
  • Loading branch information
IgorEulalio committed Dec 9, 2024
1 parent 3c4335b commit 6136021
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 30 deletions.
Empty file.
44 changes: 21 additions & 23 deletions plugins/k8saudit-aks/pkg/k8sauditaks/k8sauditaks.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package k8sauditaks
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"regexp"
Expand Down Expand Up @@ -41,7 +40,7 @@ type PluginConfig struct {
RateLimitBurst int `json:"rate_limit_burst" jsonschema:"title=rate_limit_burst,description=The rate limit burst of events to read from EventHub"`
}

func (k *Plugin) Info() *plugins.Info {
func (p *Plugin) Info() *plugins.Info {
return &plugins.Info{
ID: 21,
Name: pluginName,
Expand Down Expand Up @@ -72,22 +71,26 @@ func (p *PluginConfig) Reset() {
return
}
p.RateLimitEventsPerSecond = rateLimitEventsPerSecond
} else {
p.RateLimitEventsPerSecond = 100
}
if i := os.Getenv("RATE_LIMIT_BURST"); i != "" {
rateLimitBurst, err := strconv.Atoi(i)
if err != nil {
return
}
p.RateLimitBurst = rateLimitBurst
} else {
p.RateLimitBurst = 200
}
}

func (k *Plugin) Init(cfg string) error {
func (p *Plugin) Init(cfg string) error {
// read configuration
k.Plugin.Config.Reset()
k.Config.Reset()
p.Plugin.Config.Reset()
p.Config.Reset()

err := json.Unmarshal([]byte(cfg), &k.Config)
err := json.Unmarshal([]byte(cfg), &p.Config)
if err != nil {
return err
}
Expand All @@ -97,7 +100,7 @@ func (k *Plugin) Init(cfg string) error {
return err
}

k.Logger = log.New(os.Stderr, "["+pluginName+"] ", log.LstdFlags|log.LUTC|log.Lmsgprefix)
p.Logger = log.New(os.Stderr, "["+pluginName+"] ", log.LstdFlags|log.LUTC|log.Lmsgprefix)

return nil
}
Expand All @@ -124,24 +127,27 @@ func (p *Plugin) OpenParams() ([]sdk.OpenParam, error) {
}

func (p *Plugin) Open(_ string) (source.Instance, error) {
ctx, cancel := context.WithCancel(context.Background())
ctx, _ := context.WithCancel(context.Background())

checkClient, err := container.NewClientFromConnectionString(p.Config.BlobStorageConnectionString, p.Config.BlobStorageContainerName, nil)
if err != nil {
p.Logger.Printf("error opening connection to blob storage: %v", err)
return nil, err
}
p.Logger.Printf("opened connection to blob storage")
checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil)
if err != nil {
p.Logger.Printf("error opening blob checkpoint connection: %v", err)
return nil, err
}
p.Logger.Printf("opened blob checkpoint connection")
consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString(
p.Config.EventHubNamespaceConnectionString,
p.Config.EventHubName,
azeventhubs.DefaultConsumerGroup,
nil,
)
p.Logger.Printf("opened consumer client")
if err != nil {
p.Logger.Printf("error creating consumer client: %v", err)
return nil, err
Expand All @@ -160,25 +166,22 @@ func (p *Plugin) Open(_ string) (source.Instance, error) {
RateLimiter: rateLimiter,
}

fmt.Println("Plugin initialization complete.")
p.Logger.Printf("created eventhub processor")

eventsC := make(chan falcoeventhub.Record)
pushEventC := make(chan source.PushEvent)

// Start processing partition clients
go func() {
for {
partitionClient := processor.NextPartitionClient(ctx)
partitionClient := processor.NextPartitionClient(context.Background())
if partitionClient == nil {
break
}
// Capture the partitionClient variable
go func(pc *azeventhubs.ProcessorPartitionClient) {
//defer pc.Close(ctx)
if err := falcoEventHubProcessor.Process(pc, eventsC); err != nil {
go func(pc *azeventhubs.ProcessorPartitionClient, ec chan<- falcoeventhub.Record) {
if err := falcoEventHubProcessor.Process(partitionClient, eventsC); err != nil {
p.Logger.Printf("error processing partition client: %v", err)
}
}(partitionClient)
}(partitionClient, eventsC)
}
}()

Expand All @@ -200,14 +203,10 @@ func (p *Plugin) Open(_ string) (source.Instance, error) {
p.Logger.Println(j.Err)
continue
}
select {
case pushEventC <- *j:
case <-ctx.Done():
return
}
pushEventC <- *j
}
case <-ctx.Done():
p.Logger.Println("context done")
p.Logger.Println("context done in eventsC")
return
}
}
Expand All @@ -223,7 +222,6 @@ func (p *Plugin) Open(_ string) (source.Instance, error) {
return source.NewPushInstance(
pushEventC,
source.WithInstanceClose(func() {
cancel()
// Close consumerClient when the context is canceled
if err := consumerClient.Close(context.Background()); err != nil {
p.Logger.Printf("error closing consumer client: %v", err)
Expand Down
14 changes: 7 additions & 7 deletions shared/go/azure/eventhub/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,13 @@ func (p *Processor) Process(
defer closePartitionResources(partitionClient)

for {
receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
ctx := context.Background()

receiveCtx, receiveCtxCancel := context.WithTimeout(ctx, time.Minute)
fmt.Printf("Receiving events on partitionId %v\n", partitionClient.PartitionID())
events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
fmt.Printf("Received %d events on partitionId %v\n", len(events), partitionClient.PartitionID())
receiveCtxCancel()

if err != nil && !errors.Is(err, context.DeadlineExceeded) {
return err
}
Expand All @@ -45,22 +48,19 @@ func (p *Processor) Process(
if err != nil {
return err
}

for _, record := range eventData.Records {
ctx := context.Background()
err := p.RateLimiter.Wait(ctx)
if err != nil {
fmt.Printf("Error ocurred while waiting for rate limiter: %v\n", err)
continue
}
fmt.Printf("Received record: %v\n", record)
recordChan <- record
fmt.Printf("Sent record: %v\n", record)
}

if err := partitionClient.UpdateCheckpoint(context.TODO(), event, nil); err != nil {
if err := partitionClient.UpdateCheckpoint(ctx, event, nil); err != nil {
return err
}
fmt.Printf("Updated checkpoint for partitionId %v\n", partitionClient.PartitionID())
}
}
}
Expand Down

0 comments on commit 6136021

Please sign in to comment.