forked from couchbase/cbgt
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpindex_impl.go
358 lines (295 loc) · 11.9 KB
/
pindex_impl.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
// Copyright 2014-Present Couchbase, Inc.
//
// Use of this software is governed by the Business Source License included
// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified
// in that file, in accordance with the Business Source License, use of this
// software will be governed by the Apache License, Version 2.0, included in
// the file licenses/APL2.txt.
package cbgt
import (
"container/list"
"fmt"
"io"
"sync"
"github.com/gorilla/mux"
"github.com/rcrowley/go-metrics"
)
// PIndexImpl represents a runtime pindex implementation instance,
// whose runtime type depends on the pindex's type.
type PIndexImpl interface{}
// PIndexImplType defines the functions that every pindex
// implementation type must register on startup.
type PIndexImplType struct {
// Invoked by the manager to customize the index definition
// during creating or updating indexes (Optional).
Prepare func(mgr *Manager, indexDef *IndexDef) (*IndexDef, error)
// Invoked by the manager to validate the index definition
// before going ahead with the actual creation (Optional).
Validate func(indexType, indexName, indexParams string) error
// Invoked by the manager on index deletion to clean up
// any stats/resources pertaining to the index before removing
// the index (Optional).
OnDelete func(indexDef *IndexDef)
// Invoked by the manager when it wants to create an index
// partition. The pindex implementation should persist enough
// info into the path subdirectory so that it can reconstitute the
// pindex during restart and Open().
New func(indexType, indexParams, path string, restart func()) (
PIndexImpl, Dest, error)
// NewEx is an optional method that is invoked by the manager when
// it wants to create an index partition. The pindex implementation
// should persist enough info into the path subdirectory so that
// it can reconstitute the pindex during restart and Open().
NewEx func(indexType, indexParams, sourceParams, path string, mgr *Manager,
restart func()) (PIndexImpl, Dest, error)
// Invoked by the manager when it wants a pindex implementation to
// reconstitute and reload a pindex instance back into the
// process, such as when the process has re-started.
Open func(indexType, path string, restart func()) (
PIndexImpl, Dest, error)
// Optional, invoked by the manager when it wants a pindex
// implementation to reconstitute and reload a pindex instance
// back into the process, with the updated index parameter values.
OpenUsing func(indexType, path, indexParams string,
restart func()) (PIndexImpl, Dest, error)
// Invoked by the manager when it wants a count of documents from
// an index. The registered Count() function can be nil.
Count func(mgr *Manager, indexName, indexUUID string) (
uint64, error)
Rollback func(indexType, indexParams, sourceParams, path string, mgr *Manager,
restart func()) (PIndexImpl, Dest, error)
// Invoked by the manager when it wants to query an index. The
// registered Query() function can be nil.
Query func(mgr *Manager, indexName, indexUUID string,
req []byte, res io.Writer) error
// Description is used to populate docs, UI, etc, such as index
// type drop-down control in the web admin UI. Format of the
// description string:
//
// $categoryName/$indexType - short descriptive string
//
// The $categoryName is something like "advanced", or "general".
Description string
// A prototype instance of indexParams JSON that is usable for
// Validate() and New().
StartSample interface{}
// Example instances of JSON that are usable for Query requests().
// These are used to help generate API documentation.
QuerySamples func() []Documentation
// Displayed in docs, web admin UI, etc, and often might be a link
// to even further help.
QueryHelp string
// Invoked during startup to allow pindex implementation to affect
// the REST API with its own endpoint.
InitRouter func(r *mux.Router, phase string, mgr *Manager)
// Optional, additional handlers a pindex implementation may have
// for /api/diag output.
DiagHandlers []DiagHandler
// Optional, allows pindex implementation to add more information
// to the REST /api/managerMeta output.
MetaExtra func(map[string]interface{})
// Optional, allows pindex implementation to specify advanced UI
// implementations and information.
UI map[string]string
// Optional, invoked for checking whether the pindex implementations
// can effect the config changes through a restart of pindexes.
AnalyzeIndexDefUpdates func(configUpdates *ConfigAnalyzeRequest) ResultCode
// Invoked by the manager when it wants to trigger generic operations
// on the index.
SubmitTaskRequest func(mgr *Manager, indexName,
indexUUID string, req []byte) (*TaskRequestStatus, error)
}
type Feedable interface {
// IsFeedable implementation checks whether the current pindex
// instance is ready for ingesting data from a Feed implementation.
IsFeedable() (bool, error)
}
// ConfigAnalyzeRequest wraps up the various configuration
// parameters that the PIndexImplType implementations deals with.
type ConfigAnalyzeRequest struct {
IndexDefnCur *IndexDef
IndexDefnPrev *IndexDef
SourcePartitionsCur map[string]bool
SourcePartitionsPrev map[string]bool
}
// ResultCode represents the return code indicative of the various operations
// recommended by the pindex implementations upon detecting a config change.
type ResultCode string
const (
// PINDEXES_RESTART suggests a reboot of the pindexes
PINDEXES_RESTART ResultCode = "request_restart_pindexes"
)
// PIndexImplTypes is a global registry of pindex type backends or
// implementations. It is keyed by indexType and should be treated as
// immutable/read-only after process init/startup.
var PIndexImplTypes = make(map[string]*PIndexImplType)
// RegisterPIndexImplType registers a index type into the system.
func RegisterPIndexImplType(indexType string, t *PIndexImplType) {
PIndexImplTypes[indexType] = t
}
// NewPIndexImpl creates an index partition of the given, registered
// index type.
func NewPIndexImpl(indexType, indexParams, path string, restart func()) (
PIndexImpl, Dest, error) {
t, exists := PIndexImplTypes[indexType]
if !exists || t == nil || t.New == nil {
return nil, nil,
fmt.Errorf("pindex_impl: NewPIndexImpl indexType: %s",
indexType)
}
return t.New(indexType, indexParams, path, restart)
}
// NewPIndexImplEx creates an index partition of the given, registered
// index type.
func NewPIndexImplEx(indexType, indexParams, sourceParams, path string,
mgr *Manager, restart func()) (
PIndexImpl, Dest, error) {
t, exists := PIndexImplTypes[indexType]
if !exists || t == nil || t.NewEx == nil {
// fallback to default NewPIndexImpl implementation.
return NewPIndexImpl(indexType, indexParams, path, restart)
}
return t.NewEx(indexType, indexParams, sourceParams, path, mgr, restart)
}
func RollbackPIndexImpl(indexType, indexParams, sourceParams, path string,
mgr *Manager, restart func()) (PIndexImpl, Dest, error) {
t, exists := PIndexImplTypes[indexType]
if !exists || t == nil || t.Rollback == nil {
// Re-create the partition from scratch.
return NewPIndexImpl(indexType, indexParams, path, restart)
}
return t.Rollback(indexType, indexParams, sourceParams, path, mgr, restart)
}
// OpenPIndexImpl loads an index partition of the given, registered
// index type from a given path.
func OpenPIndexImpl(indexType, path string, restart func()) (
PIndexImpl, Dest, error) {
t, exists := PIndexImplTypes[indexType]
if !exists || t == nil || t.Open == nil {
return nil, nil, fmt.Errorf("pindex_impl: OpenPIndexImpl"+
" indexType: %s", indexType)
}
return t.Open(indexType, path, restart)
}
// OpenPIndexImplUsing loads an index partition of the given, registered
// index type from a given path with the given indexParams.
func OpenPIndexImplUsing(indexType, path, indexParams string,
restart func()) (PIndexImpl, Dest, error) {
t, exists := PIndexImplTypes[indexType]
if !exists || t == nil || t.OpenUsing == nil {
return nil, nil, fmt.Errorf("pindex_impl: OpenPIndexImplUsing"+
" indexType: %s", indexType)
}
return t.OpenUsing(indexType, path, indexParams, restart)
}
// PIndexImplTypeForIndex retrieves from the Cfg provider the index
// type for a given index.
func PIndexImplTypeForIndex(cfg Cfg, indexName string) (
*PIndexImplType, error) {
_, pindexImplType, err := GetIndexDef(cfg, indexName)
return pindexImplType, err
}
// GetIndexDef retrieves the IndexDef and PIndexImplType for an index.
func GetIndexDef(cfg Cfg, indexName string) (
*IndexDef, *PIndexImplType, error) {
indexDefs, _, err := CfgGetIndexDefs(cfg)
if err != nil || indexDefs == nil {
return nil, nil, fmt.Errorf("pindex_impl: could not get indexDefs,"+
" indexName: %s, err: %v",
indexName, err)
}
indexDef := indexDefs.IndexDefs[indexName]
if indexDef == nil {
return nil, nil, fmt.Errorf("pindex_impl: no indexDef,"+
" indexName: %s", indexName)
}
pindexImplType := PIndexImplTypes[indexDef.Type]
if pindexImplType == nil {
return nil, nil, fmt.Errorf("pindex_impl: no pindexImplType,"+
" indexName: %s, indexDef.Type: %s",
indexName, indexDef.Type)
}
return indexDef, pindexImplType, nil
}
// ------------------------------------------------
// QueryCtlParams defines the JSON that includes the "ctl" part of a
// query request. These "ctl" query request parameters are
// independent of any specific pindex type.
type QueryCtlParams struct {
Ctl QueryCtl `json:"ctl"`
}
// QueryCtl defines the JSON parameters that control query execution
// and which are independent of any specific pindex type.
//
// A PartitionSelection value can optionally be specified for performing
// advanced scatter gather operations, recognized options:
// - "" : default behavior - active partitions are selected
// - local : local partitions are favored, pseudo random selection from remote
// - random : pseudo random selection from available local and remote
// - random_balanced : random selection from available local and remote nodes by
// distributing the query load across all nodes.
type QueryCtl struct {
Timeout int64 `json:"timeout"`
Consistency *ConsistencyParams `json:"consistency"`
PartitionSelection string `json:"partition_selection,omitempty"`
}
// QUERY_CTL_DEFAULT_TIMEOUT_MS is the default query timeout.
const QUERY_CTL_DEFAULT_TIMEOUT_MS = int64(10000)
// ------------------------------------------------
// PINDEX_STORE_MAX_ERRORS is the max number of errors that a
// PIndexStoreStats will track.
// Updates to this setting is not thread-safe.
var PINDEX_STORE_MAX_ERRORS = 40
// PIndexStoreStats provides some common stats/metrics and error
// tracking that some pindex type backends can reuse.
type PIndexStoreStats struct {
TimerBatchStore metrics.Timer // Access protected by an internal lock
m sync.RWMutex // Mutex to protect following fields
Errors *list.List // Capped list of string (json)
TotalErrorCount uint64
}
func NewPIndexStoreStats() *PIndexStoreStats {
return &PIndexStoreStats{
TimerBatchStore: metrics.NewTimer(),
Errors: list.New(),
}
}
func (d *PIndexStoreStats) AddError(err string) {
d.m.Lock()
if d.Errors.Len() >= PINDEX_STORE_MAX_ERRORS {
d.Errors.Remove(d.Errors.Front())
}
d.Errors.PushBack(err)
d.TotalErrorCount++
d.m.Unlock()
}
func (d *PIndexStoreStats) WriteJSON(w io.Writer) {
w.Write([]byte(`{"TimerBatchStore":`))
WriteTimerJSON(w, d.TimerBatchStore)
d.m.RLock()
if d.Errors != nil {
w.Write([]byte(`,"Errors":[`))
e := d.Errors.Front()
i := 0
for e != nil {
j, ok := e.Value.(string)
if ok && j != "" {
if i > 0 {
w.Write(JsonComma)
}
w.Write([]byte(j))
}
e = e.Next()
i = i + 1
}
w.Write([]byte(`]`))
}
d.m.RUnlock()
w.Write(JsonCloseBrace)
}
func (d *PIndexStoreStats) FetchTotalErrorCount() uint64 {
d.m.RLock()
defer d.m.RUnlock()
return d.TotalErrorCount
}
var prefixPIndexStoreStats = []byte(`{"pindexStoreStats":`)