Skip to content

Commit

Permalink
Merge pull request #124 from mhindery/parallel
Browse files Browse the repository at this point in the history
Startup performance improvements
  • Loading branch information
cjimti authored May 29, 2020
2 parents 504a088 + 15c7281 commit d3e72cd
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 47 deletions.
82 changes: 46 additions & 36 deletions cmd/kubefwd/services/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,26 +290,27 @@ Try:
}

for ii, namespace := range namespaces {
// ShortName field only use short name for the first namespace and context
fwdServiceOpts := FwdServiceOpts{
Wg: wg,
ClientSet: clientSet,
Context: ctx,
Namespace: namespace,
ListOptions: listOptions,
Hostfile: &fwdport.HostFileWithLock{Hosts: hostFile},
ClientConfig: restConfig,
RESTClient: restClient,
ShortName: i < 1 && ii < 1,
Remote: i > 0,
IpC: byte(ipC),
IpD: ipD,
ExitOnFail: exitOnFail,
Domain: domain,
}
go fwdServiceOpts.StartListen(stopListenCh)

ipC = ipC + 1
go func(ii int, namespace string) {
// ShortName field only use short name for the first namespace and context
fwdServiceOpts := FwdServiceOpts{
Wg: wg,
ClientSet: clientSet,
Context: ctx,
Namespace: namespace,
NamespaceIPLock: &sync.Mutex{}, // For parallelization of ip handout, each namespace has its own a.b.c.* range
ListOptions: listOptions,
Hostfile: &fwdport.HostFileWithLock{Hosts: hostFile},
ClientConfig: restConfig,
RESTClient: restClient,
ShortName: i < 1 && ii < 1,
Remote: i > 0,
IpC: byte(ipC + ii),
IpD: ipD,
ExitOnFail: exitOnFail,
Domain: domain,
}
fwdServiceOpts.StartListen(stopListenCh)
}(ii, namespace)
}
}

Expand All @@ -325,20 +326,21 @@ Try:
}

type FwdServiceOpts struct {
Wg *sync.WaitGroup
ClientSet *kubernetes.Clientset
Context string
Namespace string
ListOptions metav1.ListOptions
Hostfile *fwdport.HostFileWithLock
ClientConfig *restclient.Config
RESTClient *restclient.RESTClient
ShortName bool
Remote bool
IpC byte
IpD int
ExitOnFail bool
Domain string
Wg *sync.WaitGroup
ClientSet *kubernetes.Clientset
Context string
Namespace string
NamespaceIPLock *sync.Mutex
ListOptions metav1.ListOptions
Hostfile *fwdport.HostFileWithLock
ClientConfig *restclient.Config
RESTClient *restclient.RESTClient
ShortName bool
Remote bool
IpC byte
IpD int
ExitOnFail bool
Domain string
}

// StartListen sets up event handlers to act on service-related events.
Expand Down Expand Up @@ -372,7 +374,7 @@ func (opts *FwdServiceOpts) AddServiceHandler(obj interface{}) {

log.Debugf("Add service %s namespace %s.", svc.Name, svc.Namespace)

opts.ForwardService(svc)
go opts.ForwardService(svc)
}

// DeleteServiceHandler is the event handler for when a service gets deleted in k8s.
Expand All @@ -384,7 +386,7 @@ func (opts *FwdServiceOpts) DeleteServiceHandler(obj interface{}) {

log.Debugf("Delete service %s namespace %s.", svc.Name, svc.Namespace)

opts.UnForwardService(svc)
go opts.UnForwardService(svc)
}

// UpdateServiceHandler is the event handler to deal with service changes from k8s.
Expand All @@ -408,6 +410,11 @@ func (opts *FwdServiceOpts) ForwardService(svc *v1.Service) {

listOpts := metav1.ListOptions{LabelSelector: selector}

// Only a single pod (which will be setup for forwarding) is required in this case
if svc.Spec.ClusterIP == "None" {
listOpts.Limit = 1
}

pods, err := opts.ClientSet.CoreV1().Pods(svc.Namespace).List(listOpts)

if err != nil {
Expand Down Expand Up @@ -468,11 +475,14 @@ func (opts *FwdServiceOpts) LoopPodsToForward(pods []v1.Pod, svc *v1.Service) {
podPort := ""
svcName := ""

// Ip address handout is a critical section for synchronization, use a lock which synchronizes inside each namespace.
opts.NamespaceIPLock.Lock()
localIp, dInc, err := fwdnet.ReadyInterface(127, 1, opts.IpC, opts.IpD, podPort)
if err != nil {
log.Warnf("WARNING: error readying interface: %s\n", err)
}
opts.IpD = dInc
opts.NamespaceIPLock.Unlock()

for _, port := range svc.Spec.Ports {

Expand Down
38 changes: 27 additions & 11 deletions pkg/fwdnet/fwdnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,34 @@ import (
"os"
"os/exec"
"runtime"
"sync"

log "github.com/sirupsen/logrus"
)

var addrs []net.Addr
var initAddresses sync.Once

// getLocalListenAddrs returns the listen addresses for the lo0 interface.
// It will exit if these could not be determined.
func getLocalListenAddrs() []net.Addr {
initAddresses.Do(func() {
if addrs == nil {
iface, err := net.InterfaceByName("lo0")
if err != nil {
log.Fatalf("Could not get lo0 netInterface: %s", err)
}

addrs, err = iface.Addrs()
if err != nil {
log.Fatalf("Could not get lo0 listen addresses: %s", err)
}
}
})

return addrs
}

// ReadyInterface prepares a local IP address on
// the loopback interface.
func ReadyInterface(a byte, b byte, c byte, d int, port string) (net.IP, int, error) {
Expand All @@ -31,18 +57,8 @@ func ReadyInterface(a byte, b byte, c byte, d int, port string) (net.IP, int, er

ip = net.IPv4(a, b, c, byte(i))

iface, err := net.InterfaceByName("lo0")
if err != nil {
return net.IP{}, i, err
}

addrs, err := iface.Addrs()
if err != nil {
return net.IP{}, i, err
}

// check the addresses already assigned to the interface
for _, addr := range addrs {
for _, addr := range getLocalListenAddrs() {

// found a match
if addr.String() == ip.String()+"/8" {
Expand Down

0 comments on commit d3e72cd

Please sign in to comment.