-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathapplication.go
156 lines (137 loc) · 5.25 KB
/
application.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package main
import (
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io/ioutil"
"os"
"gopkg.in/alecthomas/kingpin.v2"
"github.com/xitonix/trubka/commands"
"github.com/xitonix/trubka/commands/consume"
"github.com/xitonix/trubka/commands/create"
"github.com/xitonix/trubka/commands/deletion"
"github.com/xitonix/trubka/commands/describe"
"github.com/xitonix/trubka/commands/list"
"github.com/xitonix/trubka/commands/produce"
"github.com/xitonix/trubka/internal"
"github.com/xitonix/trubka/kafka"
)
func newApplication() error {
app := kingpin.New("trubka", "A CLI tool for Kafka.").DefaultEnvars()
global := &commands.GlobalParameters{}
bindAppFlags(app, global)
commands.AddVersionCommand(app, version, commit, built, runtimeVer)
kafkaParams := bindKafkaFlags(app)
list.AddCommands(app, global, kafkaParams)
describe.AddCommands(app, global, kafkaParams)
deletion.AddCommands(app, global, kafkaParams)
consume.AddCommands(app, global, kafkaParams)
create.AddCommands(app, global, kafkaParams)
produce.AddCommands(app, global, kafkaParams)
_, err := app.Parse(os.Args[1:])
return err
}
func bindAppFlags(app *kingpin.Application, global *commands.GlobalParameters) {
app.Flag("colour", "Enables colours in the standard output. To disable, use --no-colour.").
Default("true").
BoolVar(&global.EnableColor)
app.Flag("color", "Enables colours in the standard output. To disable, use --no-color.").
Default("true").
Hidden().
BoolVar(&global.EnableColor)
app.PreAction(func(context *kingpin.ParseContext) error {
enabledColor = global.EnableColor
return nil
})
var verbosity int
app.Flag("verbose", "The verbosity level of Trubka.").
Short('v').
NoEnvar().
PreAction(func(context *kingpin.ParseContext) error {
global.Verbosity = internal.ToVerbosityLevel(verbosity)
return nil
}).
CounterVar(&verbosity)
}
func bindKafkaFlags(app *kingpin.Application) *commands.KafkaParameters {
params := &commands.KafkaParameters{}
app.Flag("brokers", "The comma separated list of Kafka brokers in server:port format.").
Short('b').
StringVar(¶ms.Brokers)
app.Flag("kafka-version", "Kafka cluster version.").
Default(kafka.DefaultClusterVersion).
StringVar(¶ms.Version)
bindSASLFlags(app, params)
tlsParams := bindTLSFlags(app)
app.PreAction(func(ctx *kingpin.ParseContext) error {
if !tlsParams.Enabled {
return nil
}
tlsConfig, err := configureTLS(tlsParams)
if err != nil {
return err
}
params.TLS = tlsConfig
return nil
})
return params
}
func bindTLSFlags(app *kingpin.Application) *commands.TLSParameters {
t := &commands.TLSParameters{}
app.Flag("tls", "Enables TLS (Unverified by default). Mutual authentication can also be enabled by providing client key and certificate.").
BoolVar(&t.Enabled)
app.Flag("ca-cert", `Trusted root certificates for verifying the server. If not set, Trubka will skip server certificate and domain verification.`).
ExistingFileVar(&t.CACert)
app.Flag("client-cert", `Client certification file to enable mutual TLS authentication. Client key must also be provided.`).
ExistingFileVar(&t.ClientCert)
app.Flag("client-key", `Client private key file to enable mutual TLS authentication. Client certificate must also be provided.`).
ExistingFileVar(&t.ClientKey)
return t
}
func bindSASLFlags(app *kingpin.Application, params *commands.KafkaParameters) {
app.Flag("sasl-mechanism", "SASL authentication mechanism.").
Default(kafka.SASLMechanismNone).
EnumVar(¶ms.SASLMechanism,
kafka.SASLMechanismNone,
kafka.SASLMechanismPlain,
kafka.SASLMechanismSCRAM256,
kafka.SASLMechanismSCRAM512)
app.Flag("sasl-username", "SASL authentication username. Will be ignored if --sasl-mechanism is set to none.").
StringVar(¶ms.SASLUsername)
app.Flag("sasl-password", "SASL authentication password. Will be ignored if --sasl-mechanism is set to none.").
StringVar(¶ms.SASLPassword)
app.Flag("sasl-version", "SASL handshake version. Will be ignored if --sasl-mechanism is set to none.").
Default(string(kafka.SASLHandshakeV1)).
EnumVar(¶ms.SASLHandshakeVersion, string(kafka.SASLHandshakeV0), string(kafka.SASLHandshakeV1))
}
func configureTLS(params *commands.TLSParameters) (*tls.Config, error) {
tlsConf := tls.Config{}
// Mutual authentication is enabled. Both client key and certificate are needed.
if !internal.IsEmpty(params.ClientCert) {
if internal.IsEmpty(params.ClientKey) {
return nil, errors.New("TLS client key is missing. Mutual authentication cannot be used")
}
certificate, err := tls.LoadX509KeyPair(params.ClientCert, params.ClientKey)
if err != nil {
return nil, fmt.Errorf("failed to load the client TLS key pair: %w", err)
}
tlsConf.Certificates = []tls.Certificate{certificate}
}
if internal.IsEmpty(params.CACert) {
// Server cert verification will be disabled.
// Only standard trusted certificates are used to verify the server certs.
tlsConf.InsecureSkipVerify = true
return &tlsConf, nil
}
certPool := x509.NewCertPool()
ca, err := ioutil.ReadFile(params.CACert)
if err != nil {
return nil, fmt.Errorf("failed to read the CA certificate: %w", err)
}
if ok := certPool.AppendCertsFromPEM(ca); !ok {
return nil, errors.New("failed to append the CA certificate to the pool")
}
tlsConf.RootCAs = certPool
return &tlsConf, nil
}