Skip to content

Commit

Permalink
technical debt
Browse files Browse the repository at this point in the history
  • Loading branch information
e154 committed Dec 19, 2023
1 parent bb94f04 commit 5909eb8
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 75 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# The program complex **Smart House**

[Project site](https://e154.github.io/smart-home/) |
[Mobile Gate](https://github.com/e154/smart-home-gate/) |
[Node](https://github.com/e154/smart-home-node/)

[![Go Report Card](https://goreportcard.com/badge/github.com/e154/smart-home)](https://goreportcard.com/report/github.com/e154/smart-home)
Expand Down
1 change: 0 additions & 1 deletion system/gate/client/gate_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ func (g *GateClient) initWspServer() {
if !g.inProcess.CompareAndSwap(false, true) {
return
}
defer g.inProcess.Store(false)

if g.proxy != nil {
g.proxy.Shutdown()
Expand Down
4 changes: 2 additions & 2 deletions system/gate/client/wsp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ func NewClient(cfg *Config, api *api.Api, stream *stream.Stream, adaptors *adapt

// Start the Proxy
func (c *Client) Start(ctx context.Context) {
log.Info("Start")
if !c.isStarted.CompareAndSwap(false, true) {
return
}
defer c.isStarted.Store(true)
for _, target := range c.cfg.Targets {
pool := NewPool(c, target, c.cfg.SecretKey, c.api, c.stream, c.adaptors, c.jwtManager)
c.pools[target] = pool
Expand All @@ -80,10 +80,10 @@ func (c *Client) Start(ctx context.Context) {

// Shutdown the Proxy
func (c *Client) Shutdown() {
log.Info("Shutdown")
if !c.isStarted.CompareAndSwap(true, false) {
return
}
defer c.isStarted.Store(false)
for _, pool := range c.pools {
pool.Shutdown()
}
Expand Down
10 changes: 3 additions & 7 deletions system/gate/client/wsp/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func NewConnection(pool *Pool,

// Connect to the IsolatorServer using a HTTP websocket
func (c *Connection) Connect(ctx context.Context) (err error) {
//log.Infof("Connecting to %s", c.pool.target)
log.Infof("Connecting to %s", c.pool.target)

// Create a new TCP(/TLS) connection ( no use of net.http )
c.ws, _, err = c.pool.client.dialer.DialContext(
Expand All @@ -96,7 +96,7 @@ func (c *Connection) Connect(ctx context.Context) (err error) {
return err
}

go c.serve(ctx)
c.serve(ctx)

return
}
Expand All @@ -109,15 +109,14 @@ func (c *Connection) Connect(ctx context.Context) (err error) {
// As in the server code there is no buffering of HTTP request/response body
// As is the server if any error occurs the connection is closed/throwed
func (c *Connection) serve(ctx context.Context) {
defer c.Close()

// Keep connection alive
go func() {
timer := time.NewTicker(time.Second * 30)
defer timer.Stop()
for {
select {
case t := <- timer.C:
case t := <-timer.C:
err := c.ws.WriteControl(websocket.PingMessage, []byte{}, t.Add(time.Second))
if err != nil {
c.Close()
Expand Down Expand Up @@ -273,10 +272,7 @@ func (c *Connection) error(msg string) (err error) {

// Close close the ws/tcp connection and remove it from the pool
func (c *Connection) Close() {
c.pool.lock.Lock()
defer c.pool.lock.Unlock()

c.pool.remove(c)
if c.ws != nil {
c.ws.Close()
}
Expand Down
88 changes: 43 additions & 45 deletions system/gate/client/wsp/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"sync"
"time"

"github.com/google/uuid"

"github.com/e154/smart-home/adaptors"
"github.com/e154/smart-home/api"
m "github.com/e154/smart-home/models"
Expand All @@ -36,7 +38,7 @@ type Pool struct {
target string
secretKey string

connections []*Connection
connections sync.Map
lock sync.RWMutex

done chan struct{}
Expand All @@ -59,7 +61,7 @@ func NewPool(client *Client, target string,
return &Pool{
client: client,
target: target,
connections: make([]*Connection, 0),
connections: sync.Map{},
secretKey: secretKey,
done: make(chan struct{}),
api: api,
Expand All @@ -71,23 +73,35 @@ func NewPool(client *Client, target string,

// Start connect to the remote Server
func (p *Pool) Start(ctx context.Context) {
log.Info("Start")
p.connector(ctx)
go func() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

L:
for {
select {
case <-p.done:
break L
return
case <-ticker.C:
p.connector(ctx)
}
}
}()
}

// Shutdown close all connection in the pool
func (p *Pool) Shutdown() {
log.Info("Shutdown")
close(p.done)
p.connections.Range(func(key, value interface{}) bool {
connection := value.(*Connection)
connection.Close()
p.connections.Delete(key)
return true
})
}

// The garbage collector
func (p *Pool) connector(ctx context.Context) {
p.lock.Lock()
Expand All @@ -104,66 +118,50 @@ func (p *Pool) connector(ctx context.Context) {
// Create enough connection to fill the pool
toCreate := p.client.cfg.PoolIdleSize - poolSize.idle

if toCreate < 0 {
toCreate = 0
}

// Create only one connection if the pool is empty
if poolSize.total == 0 {
toCreate = 1
}

// Ensure to open at most PoolMaxSize connections
if poolSize.total+toCreate > p.client.cfg.PoolMaxSize {
toCreate = p.client.cfg.PoolMaxSize - poolSize.total
if poolSize.total+toCreate >= p.client.cfg.PoolMaxSize {
toCreate = 0
}

if toCreate < 0 {
toCreate = 0
}

if toCreate == 0 {
return
}

// Try to reach ideal p size
for i := 0; i < toCreate; i++ {
conn := NewConnection(p, p.api, p.stream)
p.connections = append(p.connections, conn)
connection := NewConnection(p, p.api, p.stream)
id := uuid.NewString()
p.connections.Store(id, connection)

go func() {
err := conn.Connect(ctx)
err := connection.Connect(ctx)
if err != nil {
//log.Errorf("Unable to connect to %s : %s", p.target, err)

p.lock.Lock()
defer p.lock.Unlock()
p.remove(conn)
log.Errorf("Unable to connect to %s : %s", p.target, err)
}
p.connections.Delete(id)
}()
}
}

// Add a connection to the pool
func (p *Pool) add(conn *Connection) {
p.connections = append(p.connections, conn)
}

// Remove a connection from the pool
func (p *Pool) remove(conn *Connection) {
// This trick uses the fact that a slice shares the same backing array and capacity as the original,
// so the storage is reused for the filtered slice. Of course, the original contents are modified.

var filtered []*Connection // == nil
for _, c := range p.connections {
if conn != c {
filtered = append(filtered, c)
}
}
p.connections = filtered
}

// Shutdown close all connection in the pool
func (p *Pool) Shutdown() {
close(p.done)
for _, conn := range p.connections {
conn.Close()
}
}

// Size return the current state of the pool
func (p *Pool) Size() (poolSize *PoolSize) {
poolSize = &PoolSize{}
poolSize.total = len(p.connections)
for _, connection := range p.connections {
p.connections.Range(func(key, value interface{}) bool {
poolSize.total++
connection := value.(*Connection)
switch connection.status {
case CONNECTING:
poolSize.connecting++
Expand All @@ -172,8 +170,8 @@ func (p *Pool) Size() (poolSize *PoolSize) {
case RUNNING:
poolSize.running++
}
}

return true
})
return
}

Expand Down
22 changes: 3 additions & 19 deletions system/gate/server/wsp/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,33 +101,17 @@ func NewServer(config *Config) (server *Server) {
// Start Server HTTP server
func (s *Server) Start() {
go func() {
L:
for {
select {
case <-s.done:
break L
return
case <-time.After(5 * time.Second):
s.clean()
}
}
}()

//r := http.NewServeMux()
// TODO: I want to detach the handler function from the Server struct,
// but it is tightly coupled to the internal state of the Server.
//r.HandleFunc("/gate/register", s.Register)
//r.HandleFunc("/gate/request", s.Request)
//r.HandleFunc("/gate/status", s.status)

// Dispatch connection from available pools to clients requests
// in a separate thread from the server thread.
go s.dispatchConnections()

//s.server = &http.Server{
// Addr: s.Config.GetAddr(),
// Handler: r,
//}
//go s.server.ListenAndServe()
}

// clean removes empty Pools which has no connection.
Expand Down Expand Up @@ -155,7 +139,7 @@ func (s *Server) clean() {
busy += ps.Busy
}

log.Infof("%d pools, %d idle, %d busy", len(s.pools), idle, busy)
//log.Infof("%d pools, %d idle, %d busy", len(s.pools), idle, busy)
}

// Dispatch connection from available pools to clients requests
Expand Down Expand Up @@ -198,7 +182,7 @@ func (s *Server) dispatchConnections() {
break
}
s.lock.RUnlock()
connection, ok := <- pool.idle
connection, ok := <-pool.idle
if !ok {
continue // a pool has been removed, try again
}
Expand Down

0 comments on commit 5909eb8

Please sign in to comment.