-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathtopic_metadata.go
222 lines (188 loc) · 6.29 KB
/
topic_metadata.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
package client
import "fmt"
// TopicMetadataRequest is used to get topics, their partitions, leader brokers for them and where these brokers are located.
type TopicMetadataRequest struct {
Topics []string
}
// NewMetadataRequest creates a new MetadataRequest to fetch metadata for given topics.
// Passing it an empty slice will request metadata for all topics.
func NewMetadataRequest(topics []string) *TopicMetadataRequest {
return &TopicMetadataRequest{
Topics: topics,
}
}
func (mr *TopicMetadataRequest) Write(encoder Encoder) {
encoder.WriteInt32(int32(len(mr.Topics)))
for _, topic := range mr.Topics {
encoder.WriteString(topic)
}
}
// Key returns the Kafka API key for TopicMetadataRequest.
func (mr *TopicMetadataRequest) Key() int16 {
return 3
}
// Version returns the Kafka request version for backwards compatibility.
func (mr *TopicMetadataRequest) Version() int16 {
return 0
}
// MetadataResponse contains information about brokers in cluster and topics that exist.
type MetadataResponse struct {
Brokers []*Broker
TopicsMetadata []*TopicMetadata
}
func (tmr *MetadataResponse) Read(decoder Decoder) *DecodingError {
brokersLength, err := decoder.GetInt32()
if err != nil {
return NewDecodingError(err, reasonInvalidBrokersLength)
}
tmr.Brokers = make([]*Broker, brokersLength)
for i := int32(0); i < brokersLength; i++ {
broker := new(Broker)
err := broker.Read(decoder)
if err != nil {
return err
}
tmr.Brokers[i] = broker
}
metadataLength, err := decoder.GetInt32()
if err != nil {
return NewDecodingError(err, reasonInvalidMetadataLength)
}
tmr.TopicsMetadata = make([]*TopicMetadata, metadataLength)
for i := int32(0); i < metadataLength; i++ {
topicMetadata := new(TopicMetadata)
err := topicMetadata.Read(decoder)
if err != nil {
return err
}
tmr.TopicsMetadata[i] = topicMetadata
}
return nil
}
// Broker contains information about a Kafka broker in cluster - its ID, host name and port.
type Broker struct {
ID int32
Host string
Port int32
}
func (n *Broker) String() string {
return fmt.Sprintf("%s:%d", n.Host, n.Port)
}
func (n *Broker) Read(decoder Decoder) *DecodingError {
nodeID, err := decoder.GetInt32()
if err != nil {
return NewDecodingError(err, reasonInvalidBrokerNodeID)
}
n.ID = nodeID
host, err := decoder.GetString()
if err != nil {
return NewDecodingError(err, reasonInvalidBrokerHost)
}
n.Host = host
port, err := decoder.GetInt32()
if err != nil {
return NewDecodingError(err, reasonInvalidBrokerPort)
}
n.Port = port
return nil
}
// TopicMetadata contains information about topic - its name, number of partitions, leaders, ISRs and errors if they occur.
type TopicMetadata struct {
Error error
Topic string
PartitionsMetadata []*PartitionMetadata
}
func (tm *TopicMetadata) Read(decoder Decoder) *DecodingError {
errCode, err := decoder.GetInt16()
if err != nil {
return NewDecodingError(err, reasonInvalidTopicMetadataErrorCode)
}
tm.Error = BrokerErrors[errCode]
topicName, err := decoder.GetString()
if err != nil {
return NewDecodingError(err, reasonInvalidTopicMetadataTopicName)
}
tm.Topic = topicName
metadataLength, err := decoder.GetInt32()
if err != nil {
return NewDecodingError(err, reasonInvalidPartitionMetadataLength)
}
tm.PartitionsMetadata = make([]*PartitionMetadata, metadataLength)
for i := int32(0); i < metadataLength; i++ {
metadata := new(PartitionMetadata)
err := metadata.Read(decoder)
if err != nil {
return err
}
tm.PartitionsMetadata[i] = metadata
}
return nil
}
// PartitionMetadata contains information about a topic partition - its id, leader, replicas, ISRs and error if it occurred.
type PartitionMetadata struct {
Error error
PartitionID int32
Leader int32
Replicas []int32
ISR []int32
}
func (pm *PartitionMetadata) Read(decoder Decoder) *DecodingError {
errCode, err := decoder.GetInt16()
if err != nil {
return NewDecodingError(err, reasonInvalidPartitionMetadataErrorCode)
}
pm.Error = BrokerErrors[errCode]
partition, err := decoder.GetInt32()
if err != nil {
return NewDecodingError(err, reasonInvalidPartitionMetadataPartition)
}
pm.PartitionID = partition
leader, err := decoder.GetInt32()
if err != nil {
return NewDecodingError(err, reasonInvalidPartitionMetadataLeader)
}
pm.Leader = leader
replicasLength, err := decoder.GetInt32()
if err != nil {
return NewDecodingError(err, reasonInvalidPartitionMetadataReplicasLength)
}
pm.Replicas = make([]int32, replicasLength)
for i := int32(0); i < replicasLength; i++ {
var replica int32
replica, err = decoder.GetInt32()
if err != nil {
return NewDecodingError(err, reasonInvalidPartitionMetadataReplica)
}
pm.Replicas[i] = replica
}
isrLength, err := decoder.GetInt32()
if err != nil {
return NewDecodingError(err, reasonInvalidPartitionMetadataIsrLength)
}
pm.ISR = make([]int32, isrLength)
for i := int32(0); i < isrLength; i++ {
isr, err := decoder.GetInt32()
if err != nil {
return NewDecodingError(err, reasonInvalidPartitionMetadataIsr)
}
pm.ISR[i] = isr
}
return nil
}
var (
reasonInvalidBrokersLength = "Invalid length for Brokers field"
reasonInvalidMetadataLength = "Invalid length for TopicMetadata field"
reasonInvalidBrokerNodeID = "Invalid broker node id"
reasonInvalidBrokerHost = "Invalid broker host"
reasonInvalidBrokerPort = "Invalid broker port"
reasonInvalidTopicMetadataErrorCode = "Invalid topic metadata error code"
reasonInvalidTopicMetadataTopicName = "Invalid topic metadata topic name"
reasonInvalidPartitionMetadataLength = "Invalid length for Partition Metadata field"
reasonInvalidPartitionMetadataErrorCode = "Invalid partition metadata error code"
reasonInvalidPartitionMetadataPartition = "Invalid partition in partition metadata"
reasonInvalidPartitionMetadataLeader = "Invalid leader in partition metadata"
reasonInvalidPartitionMetadataReplicasLength = "Invalid length for Replicas field"
reasonInvalidPartitionMetadataReplica = "Invalid replica in partition metadata"
reasonInvalidPartitionMetadataIsrLength = "Invalid length for Isr field"
reasonInvalidPartitionMetadataIsr = "Invalid isr in partition metadata"
)