From 5909eb85f157797b8973d75be98b977bab0daee8 Mon Sep 17 00:00:00 2001 From: Filippov Alex Date: Tue, 19 Dec 2023 12:45:22 +0530 Subject: [PATCH] technical debt --- README.md | 1 - system/gate/client/gate_client.go | 1 - system/gate/client/wsp/client.go | 4 +- system/gate/client/wsp/connection.go | 10 +--- system/gate/client/wsp/pool.go | 88 ++++++++++++++-------------- system/gate/server/wsp/server.go | 22 +------ 6 files changed, 51 insertions(+), 75 deletions(-) diff --git a/README.md b/README.md index 1f680fafb..2c77292da 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,6 @@ # The program complex **Smart House** [Project site](https://e154.github.io/smart-home/) | -[Mobile Gate](https://github.com/e154/smart-home-gate/) | [Node](https://github.com/e154/smart-home-node/) [![Go Report Card](https://goreportcard.com/badge/github.com/e154/smart-home)](https://goreportcard.com/report/github.com/e154/smart-home) diff --git a/system/gate/client/gate_client.go b/system/gate/client/gate_client.go index 06fc39e55..74f9652f7 100644 --- a/system/gate/client/gate_client.go +++ b/system/gate/client/gate_client.go @@ -104,7 +104,6 @@ func (g *GateClient) initWspServer() { if !g.inProcess.CompareAndSwap(false, true) { return } - defer g.inProcess.Store(false) if g.proxy != nil { g.proxy.Shutdown() diff --git a/system/gate/client/wsp/client.go b/system/gate/client/wsp/client.go index 2a1c3587b..ab88ccc04 100644 --- a/system/gate/client/wsp/client.go +++ b/system/gate/client/wsp/client.go @@ -67,10 +67,10 @@ func NewClient(cfg *Config, api *api.Api, stream *stream.Stream, adaptors *adapt // Start the Proxy func (c *Client) Start(ctx context.Context) { + log.Info("Start") if !c.isStarted.CompareAndSwap(false, true) { return } - defer c.isStarted.Store(true) for _, target := range c.cfg.Targets { pool := NewPool(c, target, c.cfg.SecretKey, c.api, c.stream, c.adaptors, c.jwtManager) c.pools[target] = pool @@ -80,10 +80,10 @@ func (c *Client) Start(ctx context.Context) { // Shutdown the Proxy func (c *Client) Shutdown() { + log.Info("Shutdown") if !c.isStarted.CompareAndSwap(true, false) { return } - defer c.isStarted.Store(false) for _, pool := range c.pools { pool.Shutdown() } diff --git a/system/gate/client/wsp/connection.go b/system/gate/client/wsp/connection.go index 87e8ccf62..81c9f9c8c 100644 --- a/system/gate/client/wsp/connection.go +++ b/system/gate/client/wsp/connection.go @@ -69,7 +69,7 @@ func NewConnection(pool *Pool, // Connect to the IsolatorServer using a HTTP websocket func (c *Connection) Connect(ctx context.Context) (err error) { - //log.Infof("Connecting to %s", c.pool.target) + log.Infof("Connecting to %s", c.pool.target) // Create a new TCP(/TLS) connection ( no use of net.http ) c.ws, _, err = c.pool.client.dialer.DialContext( @@ -96,7 +96,7 @@ func (c *Connection) Connect(ctx context.Context) (err error) { return err } - go c.serve(ctx) + c.serve(ctx) return } @@ -109,7 +109,6 @@ func (c *Connection) Connect(ctx context.Context) (err error) { // As in the server code there is no buffering of HTTP request/response body // As is the server if any error occurs the connection is closed/throwed func (c *Connection) serve(ctx context.Context) { - defer c.Close() // Keep connection alive go func() { @@ -117,7 +116,7 @@ func (c *Connection) serve(ctx context.Context) { defer timer.Stop() for { select { - case t := <- timer.C: + case t := <-timer.C: err := c.ws.WriteControl(websocket.PingMessage, []byte{}, t.Add(time.Second)) if err != nil { c.Close() @@ -273,10 +272,7 @@ func (c *Connection) error(msg string) (err error) { // Close close the ws/tcp connection and remove it from the pool func (c *Connection) Close() { - c.pool.lock.Lock() - defer c.pool.lock.Unlock() - c.pool.remove(c) if c.ws != nil { c.ws.Close() } diff --git a/system/gate/client/wsp/pool.go b/system/gate/client/wsp/pool.go index 46163aafd..23cc07cbc 100644 --- a/system/gate/client/wsp/pool.go +++ b/system/gate/client/wsp/pool.go @@ -23,6 +23,8 @@ import ( "sync" "time" + "github.com/google/uuid" + "github.com/e154/smart-home/adaptors" "github.com/e154/smart-home/api" m "github.com/e154/smart-home/models" @@ -36,7 +38,7 @@ type Pool struct { target string secretKey string - connections []*Connection + connections sync.Map lock sync.RWMutex done chan struct{} @@ -59,7 +61,7 @@ func NewPool(client *Client, target string, return &Pool{ client: client, target: target, - connections: make([]*Connection, 0), + connections: sync.Map{}, secretKey: secretKey, done: make(chan struct{}), api: api, @@ -71,16 +73,16 @@ func NewPool(client *Client, target string, // Start connect to the remote Server func (p *Pool) Start(ctx context.Context) { + log.Info("Start") p.connector(ctx) go func() { ticker := time.NewTicker(time.Second) defer ticker.Stop() - L: for { select { case <-p.done: - break L + return case <-ticker.C: p.connector(ctx) } @@ -88,6 +90,18 @@ func (p *Pool) Start(ctx context.Context) { }() } +// Shutdown close all connection in the pool +func (p *Pool) Shutdown() { + log.Info("Shutdown") + close(p.done) + p.connections.Range(func(key, value interface{}) bool { + connection := value.(*Connection) + connection.Close() + p.connections.Delete(key) + return true + }) +} + // The garbage collector func (p *Pool) connector(ctx context.Context) { p.lock.Lock() @@ -104,66 +118,50 @@ func (p *Pool) connector(ctx context.Context) { // Create enough connection to fill the pool toCreate := p.client.cfg.PoolIdleSize - poolSize.idle + if toCreate < 0 { + toCreate = 0 + } + // Create only one connection if the pool is empty if poolSize.total == 0 { toCreate = 1 } // Ensure to open at most PoolMaxSize connections - if poolSize.total+toCreate > p.client.cfg.PoolMaxSize { - toCreate = p.client.cfg.PoolMaxSize - poolSize.total + if poolSize.total+toCreate >= p.client.cfg.PoolMaxSize { + toCreate = 0 + } + + if toCreate < 0 { + toCreate = 0 + } + + if toCreate == 0 { + return } // Try to reach ideal p size for i := 0; i < toCreate; i++ { - conn := NewConnection(p, p.api, p.stream) - p.connections = append(p.connections, conn) + connection := NewConnection(p, p.api, p.stream) + id := uuid.NewString() + p.connections.Store(id, connection) go func() { - err := conn.Connect(ctx) + err := connection.Connect(ctx) if err != nil { - //log.Errorf("Unable to connect to %s : %s", p.target, err) - - p.lock.Lock() - defer p.lock.Unlock() - p.remove(conn) + log.Errorf("Unable to connect to %s : %s", p.target, err) } + p.connections.Delete(id) }() } } -// Add a connection to the pool -func (p *Pool) add(conn *Connection) { - p.connections = append(p.connections, conn) -} - -// Remove a connection from the pool -func (p *Pool) remove(conn *Connection) { - // This trick uses the fact that a slice shares the same backing array and capacity as the original, - // so the storage is reused for the filtered slice. Of course, the original contents are modified. - - var filtered []*Connection // == nil - for _, c := range p.connections { - if conn != c { - filtered = append(filtered, c) - } - } - p.connections = filtered -} - -// Shutdown close all connection in the pool -func (p *Pool) Shutdown() { - close(p.done) - for _, conn := range p.connections { - conn.Close() - } -} - // Size return the current state of the pool func (p *Pool) Size() (poolSize *PoolSize) { poolSize = &PoolSize{} - poolSize.total = len(p.connections) - for _, connection := range p.connections { + p.connections.Range(func(key, value interface{}) bool { + poolSize.total++ + connection := value.(*Connection) switch connection.status { case CONNECTING: poolSize.connecting++ @@ -172,8 +170,8 @@ func (p *Pool) Size() (poolSize *PoolSize) { case RUNNING: poolSize.running++ } - } - + return true + }) return } diff --git a/system/gate/server/wsp/server.go b/system/gate/server/wsp/server.go index a4feb98b0..f49a9bfe1 100644 --- a/system/gate/server/wsp/server.go +++ b/system/gate/server/wsp/server.go @@ -101,33 +101,17 @@ func NewServer(config *Config) (server *Server) { // Start Server HTTP server func (s *Server) Start() { go func() { - L: for { select { case <-s.done: - break L + return case <-time.After(5 * time.Second): s.clean() } } }() - //r := http.NewServeMux() - // TODO: I want to detach the handler function from the Server struct, - // but it is tightly coupled to the internal state of the Server. - //r.HandleFunc("/gate/register", s.Register) - //r.HandleFunc("/gate/request", s.Request) - //r.HandleFunc("/gate/status", s.status) - - // Dispatch connection from available pools to clients requests - // in a separate thread from the server thread. go s.dispatchConnections() - - //s.server = &http.Server{ - // Addr: s.Config.GetAddr(), - // Handler: r, - //} - //go s.server.ListenAndServe() } // clean removes empty Pools which has no connection. @@ -155,7 +139,7 @@ func (s *Server) clean() { busy += ps.Busy } - log.Infof("%d pools, %d idle, %d busy", len(s.pools), idle, busy) + //log.Infof("%d pools, %d idle, %d busy", len(s.pools), idle, busy) } // Dispatch connection from available pools to clients requests @@ -198,7 +182,7 @@ func (s *Server) dispatchConnections() { break } s.lock.RUnlock() - connection, ok := <- pool.idle + connection, ok := <-pool.idle if !ok { continue // a pool has been removed, try again }