Skip to content

Commit

Permalink
Merge pull request #21 from systemli/Revert-Introduce-Worker-Pools
Browse files Browse the repository at this point in the history
Revert "✨ Introduce Worker Pools for TCP Server"
  • Loading branch information
0x46616c6b committed Dec 2, 2024
2 parents 0744b5b + df06a4e commit b725ac0
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 88 deletions.
8 changes: 4 additions & 4 deletions adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (s *AdapterTestSuite) TestAliasHandler() {

adapter := NewPostfixAdapter(userli)

go StartTCPServer(s.ctx, s.wg, listen, adapter.AliasHandler, 1)
go StartTCPServer(s.ctx, s.wg, listen, adapter.AliasHandler)

// wait until the server is ready
for {
Expand Down Expand Up @@ -116,7 +116,7 @@ func (s *AdapterTestSuite) TestDomainHandler() {

adapter := NewPostfixAdapter(userli)

go StartTCPServer(s.ctx, s.wg, listen, adapter.DomainHandler, 1)
go StartTCPServer(s.ctx, s.wg, listen, adapter.DomainHandler)

// wait until the server is ready
for {
Expand Down Expand Up @@ -188,7 +188,7 @@ func (s *AdapterTestSuite) TestMailboxHandler() {

adapter := NewPostfixAdapter(userli)

go StartTCPServer(s.ctx, s.wg, listen, adapter.MailboxHandler, 1)
go StartTCPServer(s.ctx, s.wg, listen, adapter.MailboxHandler)

// wait until the server is ready
for {
Expand Down Expand Up @@ -261,7 +261,7 @@ func (s *AdapterTestSuite) TestSendersHandler() {

adapter := NewPostfixAdapter(userli)

go StartTCPServer(s.ctx, s.wg, listen, adapter.SendersHandler, 1)
go StartTCPServer(s.ctx, s.wg, listen, adapter.SendersHandler)

// wait until the server is ready
for {
Expand Down
37 changes: 0 additions & 37 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"os"
"strconv"

log "github.com/sirupsen/logrus"
)
Expand All @@ -18,27 +17,15 @@ type Config struct {
// AliasListenAddr is the address to listen for alias requests.
AliasListenAddr string

// AliasMaxWorkers is the maximum number of workers for the alias server.
AliasMaxWorkers int

// DomainListenAddr is the address to listen for domain requests.
DomainListenAddr string

// DomainMaxWorkers is the maximum number of workers for the domain server.
DomainMaxWorkers int

// MailboxListenAddr is the address to listen for mailbox requests.
MailboxListenAddr string

// MailboxMaxWorkers is the maximum number of workers for the mailbox server.
MailboxMaxWorkers int

// SendersListenAddr is the address to listen for senders requests.
SendersListenAddr string

// SendersMaxWorkers is the maximum number of workers for the senders server.
SendersMaxWorkers int

// MetricsListenAddr is the address to listen for metrics requests.
MetricsListenAddr string
}
Expand Down Expand Up @@ -82,41 +69,21 @@ func NewConfig() *Config {
aliasListenAddr = ":10001"
}

aliasMaxWorkers, err := strconv.Atoi(os.Getenv("ALIAS_MAX_WORKERS"))
if err != nil || aliasMaxWorkers <= 0 {
aliasMaxWorkers = 10
}

domainListenAddr := os.Getenv("DOMAIN_LISTEN_ADDR")
if domainListenAddr == "" {
domainListenAddr = ":10002"
}

domainMaxWorkers, err := strconv.Atoi(os.Getenv("DOMAIN_MAX_WORKERS"))
if err != nil || domainMaxWorkers <= 0 {
domainMaxWorkers = 10
}

mailboxListenAddr := os.Getenv("MAILBOX_LISTEN_ADDR")
if mailboxListenAddr == "" {
mailboxListenAddr = ":10003"
}

mailboxMaxWorkers, err := strconv.Atoi(os.Getenv("MAILBOX_MAX_WORKERS"))
if err != nil || mailboxMaxWorkers <= 0 {
mailboxMaxWorkers = 10
}

sendersListenAddr := os.Getenv("SENDERS_LISTEN_ADDR")
if sendersListenAddr == "" {
sendersListenAddr = ":10004"
}

sendersMaxWorkers, err := strconv.Atoi(os.Getenv("SENDERS_MAX_WORKERS"))
if err != nil || sendersMaxWorkers <= 0 {
sendersMaxWorkers = 10
}

metricsListenAddr := os.Getenv("METRICS_LISTEN_ADDR")
if metricsListenAddr == "" {
metricsListenAddr = ":10005"
Expand All @@ -126,13 +93,9 @@ func NewConfig() *Config {
UserliBaseURL: userliBaseURL,
UserliToken: userliToken,
AliasListenAddr: aliasListenAddr,
AliasMaxWorkers: aliasMaxWorkers,
DomainListenAddr: domainListenAddr,
DomainMaxWorkers: domainMaxWorkers,
MailboxListenAddr: mailboxListenAddr,
MailboxMaxWorkers: mailboxMaxWorkers,
SendersListenAddr: sendersListenAddr,
SendersMaxWorkers: sendersMaxWorkers,
MetricsListenAddr: metricsListenAddr,
}
}
8 changes: 4 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ func main() {
var wg sync.WaitGroup

wg.Add(4)
go StartTCPServer(ctx, &wg, config.AliasListenAddr, adapter.AliasHandler, config.AliasMaxWorkers)
go StartTCPServer(ctx, &wg, config.DomainListenAddr, adapter.DomainHandler, config.DomainMaxWorkers)
go StartTCPServer(ctx, &wg, config.MailboxListenAddr, adapter.MailboxHandler, config.MailboxMaxWorkers)
go StartTCPServer(ctx, &wg, config.SendersListenAddr, adapter.SendersHandler, config.SendersMaxWorkers)
go StartTCPServer(ctx, &wg, config.AliasListenAddr, adapter.AliasHandler)
go StartTCPServer(ctx, &wg, config.DomainListenAddr, adapter.DomainHandler)
go StartTCPServer(ctx, &wg, config.MailboxListenAddr, adapter.MailboxHandler)
go StartTCPServer(ctx, &wg, config.SendersListenAddr, adapter.SendersHandler)

wg.Wait()
log.Info("All servers stopped")
Expand Down
50 changes: 7 additions & 43 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,14 @@ import (
log "github.com/sirupsen/logrus"
)

func StartTCPServer(ctx context.Context, wg *sync.WaitGroup, addr string, handler func(net.Conn), maxWorkers int) {
func StartTCPServer(ctx context.Context, wg *sync.WaitGroup, addr string, handler func(net.Conn)) {
defer wg.Done()

// Create a buffered channel to limit concurrent connections
semaphore := make(chan struct{}, maxWorkers)

// Create connection queue channel
connQueue := make(chan net.Conn, maxWorkers)
lc := net.ListenConfig{
KeepAlive: -1,
}

listener, err := net.Listen("tcp", addr)
listener, err := lc.Listen(ctx, "tcp", addr)
if err != nil {
log.WithError(err).Error("Error creating listener")
return
Expand All @@ -29,14 +27,8 @@ func StartTCPServer(ctx context.Context, wg *sync.WaitGroup, addr string, handle
listener.Close()
}()

// Start worker pool
for i := 0; i < maxWorkers; i++ {
go worker(ctx, handler, connQueue, semaphore)
}

log.Info("Server started on ", addr, " with ", maxWorkers, " workers")
log.Info("Server started on ", addr)

// Accept connections
for {
conn, err := listener.Accept()
if err != nil {
Expand All @@ -48,34 +40,6 @@ func StartTCPServer(ctx context.Context, wg *sync.WaitGroup, addr string, handle
continue
}

// Try to acquire semaphore
select {
case semaphore <- struct{}{}:
// Successfully acquired semaphore, queue the connection
select {
case connQueue <- conn:
case <-ctx.Done():
conn.Close()
return
}
default:
// If we can't acquire semaphore, we're at capacity
log.Warn("Server at capacity, dropping connection")
conn.Close()
}
}
}

func worker(ctx context.Context, handler func(net.Conn), connQueue chan net.Conn, semaphore chan struct{}) {
for {
select {
case <-ctx.Done():
return
case conn := <-connQueue:
// Process the connection
handler(conn)
// Release the semaphore
<-semaphore
}
go handler(conn)
}
}

0 comments on commit b725ac0

Please sign in to comment.