-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpool.go
148 lines (132 loc) · 3.72 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
// Package pool implements a simple connection pool for the MyMySQL driver
package pool
import (
"fmt"
"github.com/ziutek/mymysql/mysql"
_ "github.com/ziutek/mymysql/native" // Use the native driver
"sync"
"time"
)
// A Pool is a set of one or more persistent database connections.
type Pool struct {
openConnections map[*Conn]struct{}
idleConnections chan *Conn
numPending uint
mutex *sync.Mutex
config Config
connectionExpiry time.Duration
connectTimeout time.Duration
requestTimeout time.Duration
}
// Config packs all the configuration options for a pool in a simple, easy-to-use container.
type Config struct {
Address string
Protocol string
Username string
Password string
Database string
MaxConnections uint
MaxConnectionAge uint
ConnectTimeout uint
RequestTimeout uint
KeepConnectionsAlive bool
Charset string
Collation string
}
// New initializes a connection pool.
func New(config Config) (*Pool, error) {
pool := &Pool{
openConnections: make(map[*Conn]struct{}),
idleConnections: make(chan *Conn, config.MaxConnections),
mutex: new(sync.Mutex),
config: config,
connectionExpiry: time.Duration(config.MaxConnectionAge) * time.Second,
connectTimeout: time.Duration(config.ConnectTimeout) * time.Second,
requestTimeout: time.Duration(config.RequestTimeout) * time.Second,
}
return pool, nil
}
// Size returns the total number of connections managed by the pool and the
// number of those that are currently available.
func (pool *Pool) Size() (total, available int) {
pool.mutex.Lock()
defer pool.mutex.Unlock()
return len(pool.openConnections), len(pool.idleConnections)
}
// Ping sends a simple query to the database to determine its status.
func (pool *Pool) Ping() (time.Duration, error) {
conn, err := pool.Get()
if err != nil {
return 0, err
}
defer conn.Release()
start := time.Now()
_, _, err = conn.Query("SELECT 1")
return time.Since(start), err
}
// Conn initializes and returns a new connection.
func (pool *Pool) Conn() (*Conn, error) {
pool.mutex.Lock()
defer pool.mutex.Unlock()
return pool.createConn()
}
// Assumes that the pool is already locked
func (pool *Pool) createConn() (*Conn, error) {
conn := &Conn{
mysql.New(
pool.config.Protocol,
"",
pool.config.Address,
pool.config.Username,
pool.config.Password,
pool.config.Database,
),
pool,
map[string]*Stmt{},
time.Now().Add(pool.connectionExpiry),
}
conn.Conn.SetTimeout(pool.connectTimeout)
err := conn.Connect()
if err == nil {
pool.openConnections[conn] = struct{}{}
return conn, nil
}
return nil, err
}
// Get retrieves a database connection from the pool.
func (pool *Pool) Get() (*Conn, error) {
for {
select {
// If a connection is available immediately, use that
case conn := <-pool.idleConnections:
if conn.verify() {
return conn, nil
}
default:
// Create a new connection if we're still below the maximum
pool.mutex.Lock()
if len(pool.openConnections) < int(pool.config.MaxConnections) {
conn, err := pool.createConn()
pool.mutex.Unlock()
return conn, err
}
pool.numPending++
pool.mutex.Unlock()
defer func() {
pool.mutex.Lock()
pool.numPending--
pool.mutex.Unlock()
}()
// Otherwise wait for a connection to become available
select {
case conn := <-pool.idleConnections:
if conn.verify() {
return conn, nil
}
case <-time.After(pool.connectTimeout):
total, avail := pool.Size()
return nil, fmt.Errorf("Timeout reached while waiting for SQL connection (total: %d, avail: %d, max: %d)", total, avail, pool.config.MaxConnections)
}
}
}
}