forked from couchbase/cbgt
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfeed_dcp.go
129 lines (99 loc) · 4.7 KB
/
feed_dcp.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
// Copyright 2019-Present Couchbase, Inc.
//
// Use of this software is governed by the Business Source License included
// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified
// in that file, in accordance with the Business Source License, use of this
// software will be governed by the Apache License, Version 2.0, included in
// the file licenses/APL2.txt.
package cbgt
import (
"fmt"
)
// DCPFeedPrefix should be immutable after process init()'ialization.
var DCPFeedPrefix string
// DCPFeedBufferSizeBytes is representative of connection_buffer_size
// for DCP to enable flow control, defaults at 20MB.
var DCPFeedBufferSizeBytes = uint32(20 * 1024 * 1024)
// DCPFeedBufferAckThreshold is representative of the percentage of
// the connection_buffer_size when the consumer will ack back to
// the producer.
var DCPFeedBufferAckThreshold = float32(0.8)
// DCPNoopTimeIntervalSecs is representative of set_noop_interval
// for DCP to enable no-op messages, defaults at 2min.
var DCPNoopTimeIntervalSecs = uint32(120)
// DCPFeedParams are DCP data-source/feed specific connection
// parameters that may be part of a sourceParams JSON and is a
// superset of CBAuthParams. DCPFeedParams holds the information used
// to populate a cbdatasource.BucketDataSourceOptions on calls to
// cbdatasource.NewBucketDataSource(). DCPFeedParams also implements
// the couchbase.AuthHandler interface.
type DCPFeedParams struct {
AuthUser string `json:"authUser,omitempty"` // May be "" for no auth.
AuthPassword string `json:"authPassword,omitempty"`
AuthSaslUser string `json:"authSaslUser,omitempty"` // May be "" for no auth.
AuthSaslPassword string `json:"authSaslPassword,omitempty"`
ClientCertPath string `json:"clientCertPath,omitempty"`
ClientKeyPath string `json:"clientKeyPath,omitempty"`
// Factor (like 1.5) to increase sleep time between retries
// in connecting to a cluster manager node.
ClusterManagerBackoffFactor float32 `json:"clusterManagerBackoffFactor,omitempty"`
// Initial sleep time (millisecs) before first retry to cluster manager.
ClusterManagerSleepInitMS int `json:"clusterManagerSleepInitMS,omitempty"`
// Maximum sleep time (millisecs) between retries to cluster manager.
ClusterManagerSleepMaxMS int `json:"clusterManagerSleepMaxMS,omitempty"`
// Factor (like 1.5) to increase sleep time between retries
// in connecting to a data manager node.
DataManagerBackoffFactor float32 `json:"dataManagerBackoffFactor,omitempty"`
// Initial sleep time (millisecs) before first retry to data manager.
DataManagerSleepInitMS int `json:"dataManagerSleepInitMS,omitempty"`
// Maximum sleep time (millisecs) between retries to data manager.
DataManagerSleepMaxMS int `json:"dataManagerSleepMaxMS,omitempty"`
// Buffer size in bytes provided for UPR flow control.
FeedBufferSizeBytes uint32 `json:"feedBufferSizeBytes,omitempty"`
// Used for UPR flow control and buffer-ack messages when this
// percentage of FeedBufferSizeBytes is reached.
FeedBufferAckThreshold float32 `json:"feedBufferAckThreshold,omitempty"`
// Time interval in seconds of NO-OP messages for UPR flow control,
// needs to be set to a non-zero value to enable no-ops.
NoopTimeIntervalSecs uint32 `json:"noopTimeIntervalSecs,omitempty"`
// Used to specify whether the applications are interested
// in receiving the xattrs information in a dcp stream.
IncludeXAttrs bool `json:"includeXAttrs,omitempty"`
// Used to specify whether the applications are not interested
// in receiving the value for mutations in a dcp stream.
NoValue bool `json:"noValue,omitempty"`
// Scope within the bucket to stream data from.
Scope string `json:"scope,omitempty"`
// Collections within the scope that the feed would cover.
Collections []string `json:"collections,omitempty"`
}
// NewDCPFeedParams returns a DCPFeedParams initialized with default
// values.
func NewDCPFeedParams() *DCPFeedParams {
return &DCPFeedParams{}
}
// -------------------------------------------------------
// The FeedEx interface will be used to represent extended functionality
// for a DCP Feed. These functions will be invoked by the application's error
// handlers to decide on the course of the feed.
type FeedEx interface {
VerifySourceNotExists() (bool, string, error)
GetBucketDetails() (string, string)
NotifyMgrOnClose()
}
// -------------------------------------------------------
type VBucketMetaData struct {
FailOverLog [][]uint64 `json:"failOverLog"`
}
func ParseOpaqueToUUID(b []byte) string {
vmd := &VBucketMetaData{}
err := UnmarshalJSON(b, &vmd)
if err != nil {
return ""
}
flogLen := len(vmd.FailOverLog)
if flogLen < 1 || len(vmd.FailOverLog[flogLen-1]) < 1 {
return ""
}
return fmt.Sprintf("%d", vmd.FailOverLog[0][0])
}