Skip to content

Commit

Permalink
add routing balancer support
Browse files Browse the repository at this point in the history
  • Loading branch information
povsister committed Aug 2, 2024
1 parent 657cd26 commit fe18794
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 45 deletions.
9 changes: 7 additions & 2 deletions app/dnscircuit/dns_circuit.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ import (
)

type dnsCircuit struct {
dynRouter routing.RouterWithDynamicRule
expectTags map[string]bool
dynRouter routing.RouterWithDynamicRule
expectTags map[string]bool
expectBalancerTags map[string]bool

inboundTags []string
ihm inbound.Manager
Expand Down Expand Up @@ -46,6 +47,10 @@ func (s *dnsCircuit) Init(ctx context.Context, c *Config, router routing.Router,
for _, tag := range c.OutboundTags {
s.expectTags[tag] = true
}
s.expectBalancerTags = make(map[string]bool, len(c.BalancerTags))
for _, tag := range c.BalancerTags {
s.expectBalancerTags[tag] = true
}
// confirm router capable
dynRouter, ok := router.(routing.RouterWithDynamicRule)
if !ok {
Expand Down
114 changes: 72 additions & 42 deletions app/dnscircuit/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,24 +53,26 @@ type obConnTrackMeta struct {
}

type connTrackDestMeta struct {
src netip.Addr
g *obDestMeta
rw sync.RWMutex
lastAccessed time.Time
domain string
outboundTag string
src netip.Addr
g *obDestMeta
rw sync.RWMutex
lastAccessed time.Time
domain string
outboundTag string
isBalancerTag bool
}

func (ock *connTrackDestMeta) getOutTag() string {
func (ock *connTrackDestMeta) getOutTag() (string, bool) {
ock.rw.RLock()
defer ock.rw.RUnlock()
return ock.outboundTag
return ock.outboundTag, ock.isBalancerTag
}

func (ock *connTrackDestMeta) setOutTag(tag string) {
func (ock *connTrackDestMeta) setOutTag(tag string, isBalancer bool) {
ock.rw.Lock()
defer ock.rw.Unlock()
ock.outboundTag = tag
ock.isBalancerTag = isBalancer
}

func (ock *connTrackDestMeta) updateLastAccessed(d string) {
Expand Down Expand Up @@ -150,11 +152,18 @@ func (s *dnsCircuit) initObserver() error {
if dynDestIPset == nil {
return newError(fmt.Sprintf("route default dest rule %s not found", dns_feature.DynamicIPSetDnsCircuitDestDefault))
}
srcConnTrackIPset := make(map[string]routing.DynamicRuleIP, len(s.expectTags))
destConnTrackIPset := make(map[string]routing.DynamicRuleIP, len(s.expectTags))
allOutboundTags := make(map[string]struct{}, len(s.expectTags)+len(s.expectBalancerTags))
for tag := range s.expectTags {
allOutboundTags[tag] = struct{}{}
}
for tag := range s.expectBalancerTags {
allOutboundTags[tag] = struct{}{}
}
srcConnTrackIPset := make(map[string]routing.DynamicRuleIP, len(allOutboundTags))
destConnTrackIPset := make(map[string]routing.DynamicRuleIP, len(allOutboundTags))
// init skip rules
rtCtx = origRtCtx
for outTag := range s.expectTags {
for outTag := range allOutboundTags {
srcIPsetRuleName := dns_feature.DynamicIPSetDNSCircuitConnTrackSrcPrefix + outTag
if srcTrackRule := s.dynRouter.GetDynamicRuleIP(srcIPsetRuleName); srcTrackRule == nil {
return newError(fmt.Sprintf("route src-track rule %s not found", srcIPsetRuleName))
Expand Down Expand Up @@ -312,8 +321,12 @@ func (s *observer) doObStatGC() {
}
s.obDestStatRw.Unlock()

type outMeta struct {
tag string
isBalancer bool
}
var (
revokeDestTracks = make(map[string][]net.IPNet)
revokeDestTracks = make(map[outMeta][]net.IPNet)
emptyClients []netip.Addr
emptyClientsIPnets []net.IPNet
)
Expand All @@ -325,10 +338,11 @@ func (s *observer) doObStatGC() {
)
for destIPNet, destMeta := range clientMeta.destMeta {
if destMeta.g.isOutdated || destMeta.durSinceLastAccess() >= s.c.inactiveClean {
tag := destMeta.getOutTag()
ips := revokeDestTracks[tag]
tag, isB := destMeta.getOutTag()
om := outMeta{tag: tag, isBalancer: isB}
ips := revokeDestTracks[om]
ips = append(ips, destIPNet.ipNet())
revokeDestTracks[tag] = ips
revokeDestTracks[om] = ips
toDeleteDest = append(toDeleteDest, destIPNet)
}
}
Expand All @@ -346,9 +360,9 @@ func (s *observer) doObStatGC() {
if len(ips) <= 0 {
continue
}
ospf.LogImportant("revoking conn-track ip rule for outbound:%s due to %s inactive: %s -> %s",
tag, s.c.inactiveClean.String(), clientAddr.String(), PrettyPrintIPNet(ips...))
s.dynDestRuleIP[dns_feature.DynamicIPSetDNSCircuitConnTrackDestPrefix+tag].RemoveIPNetConnTrack(clientAddr.AsSlice(), ips...)
ospf.LogImportant("revoking conn-track ip rule for %s:%s due to %s inactive: %s -> %s",
outName(tag.isBalancer), tag, s.c.inactiveClean.String(), clientAddr.String(), PrettyPrintIPNet(ips...))
s.dynDestRuleIP[dns_feature.DynamicIPSetDNSCircuitConnTrackDestPrefix+tag.tag].RemoveIPNetConnTrack(clientAddr.AsSlice(), ips...)
}
clear(revokeDestTracks)
}
Expand Down Expand Up @@ -453,9 +467,10 @@ var defaultMask = net.CIDRMask(32, 32)

func (s *observer) procEvent(e observedEvent) {
var (
qualifiedIP []net.IP
finalIP []net.IP
finalTag string
qualifiedIP []net.IP
finalIP []net.IP
isBalancerTag bool
finalTag string
)
pbCtx.TargetDomain = e.d
pbCtx.TargetIPs = pbCtx.TargetIPs[0:0]
Expand All @@ -469,7 +484,7 @@ func (s *observer) procEvent(e observedEvent) {
if len(qualifiedIP) <= 0 {
return
}
pickRt, err := s.c.dynRouter.PickRoute(rtCtx)
pickRt, err := s.c.dynRouter.PickRouteB(rtCtx)
if err != nil {
if errors.Is(err, common.ErrNoClue) {
finalTag = s.c.ohm.GetDefaultHandler().Tag()
Expand All @@ -482,8 +497,13 @@ func (s *observer) procEvent(e observedEvent) {
}
} else {
finalTag = pickRt.GetOutboundTag()
if bTag := pickRt.GetBalancerTag(); len(bTag) > 0 {
isBalancerTag = true
finalTag = bTag
}
}
if s.c.expectTags[finalTag] {
if (!isBalancerTag && s.c.expectTags[finalTag]) ||
(isBalancerTag && s.c.expectBalancerTags[finalTag]) {
for _, r := range qualifiedIP {
finalIP = append(finalIP, r.Mask(defaultMask))
}
Expand All @@ -496,7 +516,7 @@ func (s *observer) procEvent(e observedEvent) {
})
}
needUpdateIPNets, metas := s.doObStatUpdate(ipNets, e)
s.addConnTrack(e, ipNets, metas, finalTag)
s.addConnTrack(e, ipNets, metas, finalTag, isBalancerTag)
if len(needUpdateIPNets) > 0 {
s.c.ospf.AnnounceASBRRoute(needUpdateIPNets)
}
Expand All @@ -507,7 +527,15 @@ var (
maskHost = net.CIDRMask(32, 32)
)

func (s *observer) addConnTrack(e observedEvent, dests []net.IPNet, metas []*obDestMeta, tag string) {
func outName(isBalancerTag bool) string {
if isBalancerTag {
return "balancer"
}
return "outbound"
}

func (s *observer) addConnTrack(e observedEvent, dests []net.IPNet, metas []*obDestMeta,
tag string, isBalancerTag bool) {
newError(fmt.Sprintf("adding default dynamic dest ip rule in %s: %s",
dns_feature.DynamicIPSetDnsCircuitDestDefault, PrettyPrintIPNet(dests...))).
AtDebug().WriteToLog()
Expand All @@ -534,20 +562,21 @@ func (s *observer) addConnTrack(e observedEvent, dests []net.IPNet, metas []*obD
destM, exist := cMeta.destMeta[destK]
if !exist {
cMeta.destMeta[destK] = &connTrackDestMeta{
src: connTrackKey(e.from),
g: metas[i],
lastAccessed: time.Now(),
domain: e.d,
outboundTag: tag,
src: connTrackKey(e.from),
g: metas[i],
lastAccessed: time.Now(),
domain: e.d,
outboundTag: tag,
isBalancerTag: isBalancerTag,
}
toAddDest = append(toAddDest, dest)
} else {
destM.updateLastAccessed(e.d)
if prevOutTag := destM.getOutTag(); prevOutTag != tag {
ospf.LogImportant("updating conn-track src %s ip rule from outbound:%s to outbound:%s domain: %s",
e.from.Address.String(), prevOutTag, tag, e.d)
if prevOutTag, prevIsBalancerTag := destM.getOutTag(); prevOutTag != tag {
ospf.LogImportant("updating conn-track src %s ip rule from %s:%s to %s:%s domain: %s",
e.from.Address.String(), outName(prevIsBalancerTag), prevOutTag, outName(isBalancerTag), tag, e.d)
s.dynDestRuleIP[dns_feature.DynamicIPSetDNSCircuitConnTrackDestPrefix+prevOutTag].RemoveIPNetConnTrack(e.from.Address.IP(), destK.ipNet())
destM.setOutTag(tag)
destM.setOutTag(tag, isBalancerTag)
toAddDest = append(toAddDest, dest)
}
}
Expand All @@ -562,11 +591,12 @@ func (s *observer) addConnTrack(e observedEvent, dests []net.IPNet, metas []*obD
}
destK := kFromIPAndMask(dest.IP, dest.Mask)
cMeta.destMeta[destK] = &connTrackDestMeta{
src: connTrackKey(e.from),
g: metas[i],
lastAccessed: time.Now(),
domain: e.d,
outboundTag: tag,
src: connTrackKey(e.from),
g: metas[i],
lastAccessed: time.Now(),
domain: e.d,
outboundTag: tag,
isBalancerTag: isBalancerTag,
}
toAddDest = append(toAddDest, dest)
}
Expand All @@ -577,8 +607,8 @@ func (s *observer) addConnTrack(e observedEvent, dests []net.IPNet, metas []*obD
}

if len(toAddDest) > 0 {
ospf.LogImportant("adding conn-track ip rule for outbound:%s domain: %s : %s -> %s",
tag, e.d, e.from.Address.String(), PrettyPrintIPNet(toAddDest...))
ospf.LogImportant("adding conn-track ip rule for %s:%s domain: %s : %s -> %s",
outName(isBalancerTag), tag, e.d, e.from.Address.String(), PrettyPrintIPNet(toAddDest...))
s.dynSrcRuleIP[dns_feature.DynamicIPSetDNSCircuitConnTrackSrcPrefix+tag].AddIPNet(net.IPNet{
IP: e.from.Address.IP(), Mask: maskHost,
})
Expand Down
1 change: 1 addition & 0 deletions app/router/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

type Rule struct {
Tag string
BTag string
Balancer *Balancer
Condition Condition
}
Expand Down
25 changes: 25 additions & 0 deletions app/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func (r *Router) Init(ctx context.Context, config *Config, d dns.Client, ohm out
if !found {
return newError("balancer ", btag, " not found")
}
rr.BTag = btag
rr.Balancer = brule
}
r.rules = append(r.rules, rr)
Expand All @@ -83,6 +84,21 @@ func (r *Router) PickRoute(ctx routing.Context) (routing.Route, error) {
return &Route{Context: ctx, outboundTag: tag}, nil
}

func (r *Router) PickRouteB(ctx routing.Context) (routing.RouteB, error) {
rule, ctx, err := r.pickRouteInternal(ctx)
if err != nil {
return nil, err
}
tag, err := rule.GetTag()
if err != nil {
return nil, err
}
return &RouteB{Route: &Route{
Context: ctx,
outboundTag: tag,
}, balancerTag: rule.BTag}, nil
}

func (r *Router) pickRouteInternal(ctx routing.Context) (*Rule, routing.Context, error) {
// SkipDNSResolve is set from DNS module.
// the DOH remote server maybe a domain name,
Expand Down Expand Up @@ -140,6 +156,15 @@ func (r *Route) GetOutboundTag() string {
return r.outboundTag
}

type RouteB struct {
*Route
balancerTag string
}

func (r *RouteB) GetBalancerTag() string {
return r.balancerTag
}

func init() {
common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
r := new(Router)
Expand Down
2 changes: 1 addition & 1 deletion features/routing/dynamic_routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
)

type RouterWithDynamicRule interface {
Router
RouterB
// GetDynamicRuleIP returns a dynamic routing rule based on IPNet.
// currently, only "dynamic-ipset:XXXX" is supported.
// Be noted that rules with same name are Singleton.
Expand Down
10 changes: 10 additions & 0 deletions features/routing/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,16 @@ type Router interface {
PickRoute(ctx Context) (Route, error)
}

type RouterB interface {
Router
PickRouteB(ctx Context) (RouteB, error)
}

type RouteB interface {
Route
GetBalancerTag() string
}

// Route is the routing result of Router feature.
//
// v2ray:api:stable
Expand Down

0 comments on commit fe18794

Please sign in to comment.