Skip to content

Commit

Permalink
Refactor GCC
Browse files Browse the repository at this point in the history
  • Loading branch information
mengelbart committed Jan 20, 2025
1 parent 902265e commit 189d1ca
Show file tree
Hide file tree
Showing 17 changed files with 1,065 additions and 1 deletion.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/pion/interceptor

go 1.20
go 1.21

Check failure on line 3 in go.mod

View workflow job for this annotation

GitHub Actions / lint / Metadata

Invalid Go version

Found 1.21. Expected 1.20

require (
github.com/pion/logging v0.2.2
Expand Down
21 changes: 21 additions & 0 deletions pkg/bwe/acknowledgment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package bwe

import (
"fmt"
"time"

"github.com/pion/rtcp"
)

type acknowledgment struct {
seqNr int64
size uint16
departure time.Time
arrived bool
arrival time.Time
ecn rtcp.ECN
}

func (a acknowledgment) String() string {
return fmt.Sprintf("seq=%v, departure=%v, arrival=%v", a.seqNr, a.departure, a.arrival)

Check warning on line 20 in pkg/bwe/acknowledgment.go

View check run for this annotation

Codecov / codecov/patch

pkg/bwe/acknowledgment.go#L19-L20

Added lines #L19 - L20 were not covered by tests
}
43 changes: 43 additions & 0 deletions pkg/bwe/arrival_group_accumulator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package bwe

import "time"

type arrivalGroup []acknowledgment

type arrivalGroupAccumulator struct {
next arrivalGroup
burstInterval time.Duration
}

func newArrivalGroupAccumulator() *arrivalGroupAccumulator {
return &arrivalGroupAccumulator{
next: make([]acknowledgment, 0),
burstInterval: 5 * time.Millisecond,
}
}

func (a *arrivalGroupAccumulator) onPacketAcked(ack acknowledgment) arrivalGroup {
if len(a.next) == 0 {
a.next = append(a.next, ack)
return nil
}

if ack.departure.Sub(a.next[0].departure) < a.burstInterval {
a.next = append(a.next, ack)
return nil
}

interDepartureTime := ack.departure.Sub(a.next[0].departure)
interArrivalTime := ack.arrival.Sub(a.next[len(a.next)-1].arrival)
interGroupDelay := interArrivalTime - interDepartureTime

if interArrivalTime < a.burstInterval && interGroupDelay < 0 {
a.next = append(a.next, ack)
return nil
}

Check warning on line 37 in pkg/bwe/arrival_group_accumulator.go

View check run for this annotation

Codecov / codecov/patch

pkg/bwe/arrival_group_accumulator.go#L35-L37

Added lines #L35 - L37 were not covered by tests

group := make(arrivalGroup, len(a.next))
copy(group, a.next)
a.next = arrivalGroup{ack}
return group
}
204 changes: 204 additions & 0 deletions pkg/bwe/arrival_group_accumulator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
package bwe

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestArrivalGroupAccumulator(t *testing.T) {
triggerNewGroupElement := acknowledgment{
departure: time.Time{}.Add(time.Second),
arrival: time.Time{}.Add(time.Second),
}
cases := []struct {
name string
log []acknowledgment
exp []arrivalGroup
}{
{
name: "emptyCreatesNoGroups",
log: []acknowledgment{},
exp: []arrivalGroup{},
},
{
name: "createsSingleElementGroup",
log: []acknowledgment{
{
departure: time.Time{},
arrival: time.Time{}.Add(time.Millisecond),
},
triggerNewGroupElement,
},
exp: []arrivalGroup{{
{
departure: time.Time{},
arrival: time.Time{}.Add(time.Millisecond),
},
},
},
},
{
name: "createsTwoElementGroup",
log: []acknowledgment{
{
arrival: time.Time{}.Add(15 * time.Millisecond),
},
{
departure: time.Time{}.Add(3 * time.Millisecond),
arrival: time.Time{}.Add(20 * time.Millisecond),
},
triggerNewGroupElement,
},
exp: []arrivalGroup{{
{
departure: time.Time{},
arrival: time.Time{}.Add(15 * time.Millisecond),
},
{
departure: time.Time{}.Add(3 * time.Millisecond),
arrival: time.Time{}.Add(20 * time.Millisecond),
},
}},
},
{
name: "createsTwoArrivalGroups",
log: []acknowledgment{
{
departure: time.Time{},
arrival: time.Time{}.Add(15 * time.Millisecond),
},
{
departure: time.Time{}.Add(3 * time.Millisecond),
arrival: time.Time{}.Add(20 * time.Millisecond),
},
{
departure: time.Time{}.Add(9 * time.Millisecond),
arrival: time.Time{}.Add(30 * time.Millisecond),
},
triggerNewGroupElement,
},
exp: []arrivalGroup{
{
{
arrival: time.Time{}.Add(15 * time.Millisecond),
},
{
departure: time.Time{}.Add(3 * time.Millisecond),
arrival: time.Time{}.Add(20 * time.Millisecond),
},
},
{
{
departure: time.Time{}.Add(9 * time.Millisecond),
arrival: time.Time{}.Add(30 * time.Millisecond),
},
},
},
},
{
name: "ignoresOutOfOrderPackets",
log: []acknowledgment{
{
departure: time.Time{},
arrival: time.Time{}.Add(15 * time.Millisecond),
},
{
departure: time.Time{}.Add(6 * time.Millisecond),
arrival: time.Time{}.Add(34 * time.Millisecond),
},
{
departure: time.Time{}.Add(8 * time.Millisecond),
arrival: time.Time{}.Add(30 * time.Millisecond),
},
triggerNewGroupElement,
},
exp: []arrivalGroup{
{
{
departure: time.Time{},
arrival: time.Time{}.Add(15 * time.Millisecond),
},
},
{
{
departure: time.Time{}.Add(6 * time.Millisecond),
arrival: time.Time{}.Add(34 * time.Millisecond),
},
{
departure: time.Time{}.Add(8 * time.Millisecond),
arrival: time.Time{}.Add(30 * time.Millisecond),
},
},
},
},
{
name: "newGroupBecauseOfInterDepartureTime",
log: []acknowledgment{
{
seqNr: 0,
departure: time.Time{},
arrival: time.Time{}.Add(4 * time.Millisecond),
},
{
seqNr: 1,
departure: time.Time{}.Add(3 * time.Millisecond),
arrival: time.Time{}.Add(4 * time.Millisecond),
},
{
seqNr: 2,
departure: time.Time{}.Add(6 * time.Millisecond),
arrival: time.Time{}.Add(10 * time.Millisecond),
},
{
seqNr: 3,
departure: time.Time{}.Add(9 * time.Millisecond),
arrival: time.Time{}.Add(10 * time.Millisecond),
},
triggerNewGroupElement,
},
exp: []arrivalGroup{
{
{
seqNr: 0,
departure: time.Time{},
arrival: time.Time{}.Add(4 * time.Millisecond),
},
{
seqNr: 1,
departure: time.Time{}.Add(3 * time.Millisecond),
arrival: time.Time{}.Add(4 * time.Millisecond),
},
},
{
{
seqNr: 2,
departure: time.Time{}.Add(6 * time.Millisecond),
arrival: time.Time{}.Add(10 * time.Millisecond),
},
{
seqNr: 3,
departure: time.Time{}.Add(9 * time.Millisecond),
arrival: time.Time{}.Add(10 * time.Millisecond),
},
},
},
},
}

for _, tc := range cases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
aga := newArrivalGroupAccumulator()
received := []arrivalGroup{}
for _, ack := range tc.log {
next := aga.onPacketAcked(ack)
if next != nil {
received = append(received, next)
}
}
assert.Equal(t, tc.exp, received)
})
}
}
51 changes: 51 additions & 0 deletions pkg/bwe/delay_rate_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package bwe

import (
"log"
"time"
)

type DelayRateController struct {
aga *arrivalGroupAccumulator
last arrivalGroup
kf *kalman
od *overuseDetector
rc *rateController
latest usage
}

func NewDelayRateController(initialRate int) *DelayRateController {
return &DelayRateController{
aga: newArrivalGroupAccumulator(),
last: []acknowledgment{},
kf: newKalman(),
od: newOveruseDetector(),
rc: newRateController(initialRate),
}

Check warning on line 24 in pkg/bwe/delay_rate_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/bwe/delay_rate_controller.go#L17-L24

Added lines #L17 - L24 were not covered by tests
}

func (c *DelayRateController) OnPacketAcked(ack acknowledgment) {
next := c.aga.onPacketAcked(ack)
if next == nil {
return
}
if len(next) == 0 {
// ignore empty groups, should never occur
return
}
if len(c.last) == 0 {
c.last = next
return
}
interArrivalTime := next[len(next)-1].arrival.Sub(c.last[len(c.last)-1].arrival)
interDepartureTime := next[0].departure.Sub(c.last[0].departure)
interGroupDelay := interArrivalTime - interDepartureTime
estimate := c.kf.updateEstimate(interGroupDelay)
c.latest = c.od.update(ack.arrival, estimate)
c.last = next
log.Printf("interArrivalTime=%v, interDepartureTime=%v, interGroupDelay=%v, estimate=%v, threshold=%v", interArrivalTime.Nanoseconds(), interDepartureTime.Nanoseconds(), interGroupDelay.Nanoseconds(), estimate.Nanoseconds(), c.od.delayThreshold.Nanoseconds())

Check warning on line 46 in pkg/bwe/delay_rate_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/bwe/delay_rate_controller.go#L27-L46

Added lines #L27 - L46 were not covered by tests
}

func (c *DelayRateController) Update(ts time.Time, lastDeliveryRate int, rtt time.Duration) int {
return c.rc.update(ts, c.latest, lastDeliveryRate, rtt)

Check warning on line 50 in pkg/bwe/delay_rate_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/bwe/delay_rate_controller.go#L49-L50

Added lines #L49 - L50 were not covered by tests
}
Loading

0 comments on commit 189d1ca

Please sign in to comment.