-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathid_generator.go
258 lines (231 loc) · 5.64 KB
/
id_generator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
package id_generator
import (
"errors"
"fmt"
"log"
)
const (
maxTryCount = 100
DefaultIDSetSize = 10
defaultTotalSize uint64 = 18446744073709551615
)
type IDGenerator struct {
idProvider IDProvider
idSets map[string]*IDSet
idReqChan chan idReq
}
type IDProvider interface {
GetData(category string) (string, int32, error)
SetData(data, category string, version int32) error
Initialize(iniSet string, category string) error
Delete(category string, version int32) error
Lock(category string) (interface{}, error)
Unlock(lck interface{}) error
}
func NewIDGenerator(provider IDProvider) *IDGenerator {
gen := &IDGenerator{
idProvider: provider,
idSets: make(map[string]*IDSet),
idReqChan: make(chan idReq),
}
go gen.takeIDHandler()
return gen
}
func (g *IDGenerator) Initialize(category string, startID uint64) error {
set, err := g.PeekIDs(category)
if err != nil {
log.Println(err.Error())
}
if set != nil && set.GetSize() != 0 {
log.Println("set for category already exists")
return nil
}
currIDs := NewIDSet([]IDRange{
NewIDRange(startID, defaultTotalSize, false),
}, category, false)
return g.idProvider.Initialize(currIDs.String(), category)
}
func (g *IDGenerator) TakeIDsWithRetry(category string) (s *IDSet, rErr error) {
currTryCount := 0
success := false
var takenIDs *IDSet
var errFin error
lock, err := g.idProvider.Lock(category)
if err != nil {
return nil, err
}
defer g.idProvider.Unlock(lock)
for !success && currTryCount <= maxTryCount {
if currTryCount > 1 {
log.Println(fmt.Sprintf("attempt %d of %d", currTryCount, maxTryCount))
}
currTryCount++
// get data
currData, version, err := g.idProvider.GetData(category)
if err != nil {
return nil, err
}
// TODO - probably delete
if len(currData) == 0 {
if err = g.Initialize(category, 1); err != nil {
return nil, err
}
continue
}
// deserialize data
currIDs, err := IDSetFromString(currData)
if err != nil {
return nil, err
}
// take IDs
takenIDs, err = currIDs.TakeIDs(DefaultIDSetSize)
if err != nil {
return nil, err
}
// try to set data
setStr := takenIDs.String()
size := takenIDs.GetSize()
if errFin = g.idProvider.SetData(currIDs.String(), category, version); errFin != nil {
log.Println("error saving data", errFin)
} else {
success = true
log.Println("fetched a new ID batch from provider = ", "SET: ", setStr, "SIZE: ", size)
}
}
if !success {
errMsg := "failed to take IDs"
log.Println(errMsg, errFin)
return nil, errors.New(fmt.Sprintf(errMsg+": %s", errFin))
}
g.idSets[category] = takenIDs
return takenIDs, nil
}
func (g *IDGenerator) PushIDsWithRetry(category string) (v int32, rErr error) {
idSet, ok := g.idSets[category]
if !ok {
return -1, errors.New(fmt.Sprintf("no set for category '%s'\n", category))
}
if idSet.IsReadOnly() {
return -1, errors.New("cannot push IDs, ID set is read only")
}
currTryCount := 0
success := false
var errFin error
var version int32
lock, err := g.idProvider.Lock(category)
if err != nil {
return -1, err
}
defer g.idProvider.Unlock(lock)
for !success && currTryCount <= maxTryCount {
if currTryCount > 1 {
log.Println(fmt.Sprintf("attempt %d of %d", currTryCount, maxTryCount))
}
currTryCount++
// get data
currData, ver, err := g.idProvider.GetData(category)
if err != nil {
return -1, err
}
if idSet.GetSize() == 0 {
return ver, errors.New("cannot push IDs, ID set is empty")
}
version = ver
var currIDs *IDSet
if len(currData) == 0 {
return -1, errors.New(fmt.Sprintf("no data for category '%s'", category))
} else {
// deserialize data
currIDs, err = IDSetFromString(string(currData))
if err != nil {
return -1, err
}
}
// push IDs
if err = currIDs.PushIDsFromString(idSet.String()); err != nil {
return -1, err
}
// try to save data
setStr := idSet.String()
size := idSet.GetSize()
stateStr := currIDs.String()
if errFin = g.idProvider.SetData(stateStr, category, version); errFin != nil {
log.Println("error saving data", errFin)
} else {
success = true
log.Println("pushed ID set back to provider = ", "SET: ", setStr, "SIZE: ", size, "STATE: ", stateStr)
}
}
if !success {
errMsg := "failed to push IDs"
log.Println(errMsg, errFin)
return -1, errors.New(fmt.Sprintf(errMsg+": %s", errFin))
}
return version, nil
}
func (g *IDGenerator) Stop() int32 {
log.Println("pushing back unused IDs ...")
var version int32
var err error
for c := range g.idSets {
version, err = g.PushIDsWithRetry(c)
if err != nil {
log.Println("error pushing sets", err)
}
}
return version
}
func (g *IDGenerator) PeekIDs(category string) (*IDSet, error) {
data, _, err := g.idProvider.GetData(category)
if err != nil {
return nil, err
}
if len(data) == 0 {
return nil, nil
}
currIDs, err := IDSetFromString(string(data))
if err != nil {
return nil, err
}
currIDs.SetReadOnly(true)
return currIDs, err
}
type idReq struct {
category string
resp chan idResp
}
type idResp struct {
id uint64
err error
}
func (g *IDGenerator) takeIDHandler() {
for {
req := <-g.idReqChan
var set *IDSet
var err error
var rsp idResp
set, ok := g.idSets[req.category]
if !ok || set.GetSize() == 0 {
_, err = g.TakeIDsWithRetry(req.category)
if err != nil {
rsp.err = err
}
set = g.idSets[req.category]
}
if set != nil {
id, err := set.TakeID()
rsp.id = id
rsp.err = err
}
req.resp <- rsp
}
}
func (g *IDGenerator) TakeID(category string) (uint64, error) {
req := idReq{
category: category,
resp: make(chan idResp),
}
g.idReqChan <- req
resp := <-req.resp
return resp.id, resp.err
}