Skip to content

Commit

Permalink
Sort raw events from perf (#22)
Browse files Browse the repository at this point in the history
  • Loading branch information
jschwinger233 authored Nov 27, 2023
1 parent 060114b commit 4caab58
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 15 deletions.
60 changes: 49 additions & 11 deletions bpf/bpf.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
"encoding/binary"
"fmt"
"log"
"slices"
"strings"
"time"
"unsafe"

"github.com/cilium/ebpf"
Expand Down Expand Up @@ -201,29 +203,52 @@ func (o *Bpf) KprobeKfree() *ebpf.Program {
}

func (o *Bpf) PollSkb(ctx context.Context) (_ <-chan Skbdump, err error) {
dataReader, err := perf.NewReader(o.objs.PerfOutput, 1500*1000)
if err != nil {
log.Printf("Failed to open perf: %+v", err)
}

ch := make(chan Skbdump)
go func() {
defer close(ch)

dataReader, err := perf.NewReader(o.objs.PerfOutput, 1500*1000)
if err != nil {
log.Printf("Failed to open perf: %+v", err)
}
defer dataReader.Close()

go func() {
<-ctx.Done()
dataReader.Close()
}()

records := make(chan perf.Record)
go func() {
defer close(records)
for {
rec, err := dataReader.Read()
if err != nil {
if errors.Is(err, ringbuf.ErrClosed) {
return
}
log.Printf("Failed to read ringbuf: %+v", err)
continue
}
records <- rec
}
}()

var pool []Skbdump
for {
rec, err := dataReader.Read()
if err != nil {
if errors.Is(err, ringbuf.ErrClosed) {
return
var ok bool
var rec perf.Record
select {
case <-time.After(10 * time.Millisecond):
for _, skb := range pool {
ch <- skb
}
log.Printf("Failed to read ringbuf: %+v", err)
pool = nil
continue
case rec, ok = <-records:
if !ok {
return
}
}

skb := Skbdump{}
Expand All @@ -233,7 +258,20 @@ func (o *Bpf) PollSkb(ctx context.Context) (_ <-chan Skbdump, err error) {
}
copy(skb.Payload[:], rec.RawSample[unsafe.Offsetof(skb.Payload):])

ch <- skb
if len(pool) > 100 {
ch <- pool[0]
pool = pool[1:]
}

idx, _ := slices.BinarySearchFunc(pool, skb, func(a, b Skbdump) int {
if a.Meta.TimeNs < b.Meta.TimeNs {
return -1
} else if a.Meta.TimeNs > b.Meta.TimeNs {
return 1
}
return 0
})
pool = slices.Insert(pool, idx, skb)
}
}()
return ch, nil
Expand Down
5 changes: 1 addition & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,7 @@ func main() {

if config.Oneshot {
if ksym, _ := utils.Addr2ksym(skb.Meta.At); ksym == "kfree_skbmem" {
go func() {
time.Sleep(1 * time.Second)
cancel()
}()
cancel()
}
}
}
Expand Down

0 comments on commit 4caab58

Please sign in to comment.