diff --git a/backends/mqtt/mqtt.go b/backends/mqtt/mqtt.go index d3c5ddb6..13f22fbf 100644 --- a/backends/mqtt/mqtt.go +++ b/backends/mqtt/mqtt.go @@ -148,7 +148,7 @@ func generateTLSConfig(args *args.MQTTConn) (*tls.Config, error) { if err != nil { return nil, errors.Wrap(err, "unable to load ssl keypair") } - } else { + } else if len(args.TlsOptions.TlsClientCert) > 0 { // Server input certpool.AppendCertsFromPEM(args.TlsOptions.TlsCaCert) diff --git a/backends/nats-jetstream/nats-jetstream.go b/backends/nats-jetstream/nats-jetstream.go index 3acc56f0..744dd71c 100644 --- a/backends/nats-jetstream/nats-jetstream.go +++ b/backends/nats-jetstream/nats-jetstream.go @@ -2,9 +2,6 @@ package nats_jetstream import ( "context" - "crypto/tls" - "crypto/x509" - "io/ioutil" "net/url" "strings" @@ -16,7 +13,6 @@ import ( "github.com/batchcorp/plumber/util" "github.com/batchcorp/plumber/validate" - "github.com/batchcorp/plumber-schemas/build/go/protos/args" "github.com/batchcorp/plumber-schemas/build/go/protos/opts" ) @@ -64,11 +60,16 @@ func New(connOpts *opts.ConnectionOptions) (*NatsJetstream, error) { } var client *nats.Conn - if uri.Scheme == "tls" { + if uri.Scheme == "tls" || args.TlsOptions.UseTls { // TLS Secured connection - tlsConfig, err := generateTLSConfig(args) + tlsConfig, err := util.GenerateTLSConfig( + args.TlsOptions.TlsCaCert, + args.TlsOptions.TlsClientCert, + args.TlsOptions.TlsClientKey, + args.TlsOptions.TlsSkipVerify, + ) if err != nil { - return nil, err + return nil, errors.Wrap(err, "Unable to generate TLS Config") } client, err = nats.Connect(args.Dsn, nats.Secure(tlsConfig), creds) @@ -103,50 +104,6 @@ func (n *NatsJetstream) Test(_ context.Context) error { return types.NotImplementedErr } -func generateTLSConfig(args *args.NatsJetstreamConn) (*tls.Config, error) { - certpool := x509.NewCertPool() - - var cert tls.Certificate - var err error - - if util.FileExists(args.TlsOptions.TlsClientCert) { - // CLI input, read from file - pemCerts, err := ioutil.ReadFile(string(args.TlsOptions.TlsCaCert)) - if err == nil { - certpool.AppendCertsFromPEM(pemCerts) - } - - cert, err = tls.LoadX509KeyPair(string(args.TlsOptions.TlsClientCert), string(args.TlsOptions.TlsClientKey)) - if err != nil { - return nil, errors.Wrap(err, "unable to load ssl keypair") - } - - } else { - certpool.AppendCertsFromPEM(args.TlsOptions.TlsCaCert) - - cert, err = tls.X509KeyPair(args.TlsOptions.TlsClientCert, args.TlsOptions.TlsClientKey) - if err != nil { - return nil, errors.Wrap(err, "unable to load ssl keypair") - } - } - - // Just to print out the client certificate.. - cert.Leaf, err = x509.ParseCertificate(cert.Certificate[0]) - if err != nil { - return nil, errors.Wrap(err, "unable to parse certificate") - } - - // Create tls.Config with desired tls properties - return &tls.Config{ - RootCAs: certpool, - ClientAuth: tls.NoClientCert, - ClientCAs: nil, - InsecureSkipVerify: args.TlsOptions.TlsSkipVerify, - Certificates: []tls.Certificate{cert}, - MinVersion: tls.VersionTLS12, - }, nil -} - func validateBaseConnOpts(connOpts *opts.ConnectionOptions) error { if connOpts == nil { return validate.ErrMissingConnOpts diff --git a/backends/nats-streaming/nats-streaming.go b/backends/nats-streaming/nats-streaming.go index 2ce739b9..2bb347e3 100644 --- a/backends/nats-streaming/nats-streaming.go +++ b/backends/nats-streaming/nats-streaming.go @@ -2,9 +2,6 @@ package nats_streaming import ( "context" - "crypto/tls" - "crypto/x509" - "io/ioutil" "net/url" "strings" @@ -67,11 +64,16 @@ func New(connOpts *opts.ConnectionOptions) (*NatsStreaming, error) { } var natsClient *nats.Conn - if uri.Scheme == "tls" { + if uri.Scheme == "tls" || args.TlsOptions.UseTls { // TLS Secured connection - tlsConfig, err := generateTLSConfig(args) + tlsConfig, err := util.GenerateTLSConfig( + args.TlsOptions.TlsCaCert, + args.TlsOptions.TlsClientCert, + args.TlsOptions.TlsClientKey, + args.TlsOptions.TlsSkipVerify, + ) if err != nil { - return nil, err + return nil, errors.Wrap(err, "Unable to generate TLS Config") } natsClient, err = nats.Connect(args.Dsn, nats.Secure(tlsConfig), creds) @@ -88,7 +90,7 @@ func New(connOpts *opts.ConnectionOptions) (*NatsStreaming, error) { stanClient, err := stan.Connect(args.ClusterId, args.ClientId, stan.NatsOptions()) if err != nil { - return nil, errors.Wrap(err, "could not create NATS subscription") + return nil, errors.Wrap(err, "could not create STAN subscription") } return &NatsStreaming{ @@ -118,50 +120,6 @@ func (n *NatsStreaming) Test(_ context.Context) error { return types.NotImplementedErr } -func generateTLSConfig(args *args.NatsStreamingConn) (*tls.Config, error) { - certpool := x509.NewCertPool() - - var cert tls.Certificate - var err error - - if util.FileExists(args.TlsOptions.TlsClientCert) { - // CLI input, read from file - pemCerts, err := ioutil.ReadFile(string(args.TlsOptions.TlsCaCert)) - if err == nil { - certpool.AppendCertsFromPEM(pemCerts) - } - - cert, err = tls.LoadX509KeyPair(string(args.TlsOptions.TlsClientCert), string(args.TlsOptions.TlsClientKey)) - if err != nil { - return nil, errors.Wrap(err, "unable to load ssl keypair") - } - - } else { - certpool.AppendCertsFromPEM(args.TlsOptions.TlsCaCert) - - cert, err = tls.X509KeyPair(args.TlsOptions.TlsClientCert, args.TlsOptions.TlsClientKey) - if err != nil { - return nil, errors.Wrap(err, "unable to load ssl keypair") - } - } - - // Just to print out the client certificate.. - cert.Leaf, err = x509.ParseCertificate(cert.Certificate[0]) - if err != nil { - return nil, errors.Wrap(err, "unable to parse certificate") - } - - // Create tls.Config with desired tls properties - return &tls.Config{ - RootCAs: certpool, - ClientAuth: tls.NoClientCert, - ClientCAs: nil, - InsecureSkipVerify: args.TlsOptions.TlsSkipVerify, - Certificates: []tls.Certificate{cert}, - MinVersion: tls.VersionTLS12, - }, nil -} - func validateBaseConnOpts(connOpts *opts.ConnectionOptions) error { if connOpts == nil { return validate.ErrMissingConnOpts diff --git a/backends/nats-streaming/nats-streaming_test.go b/backends/nats-streaming/nats-streaming_test.go index 9ba7a64e..69a9594b 100644 --- a/backends/nats-streaming/nats-streaming_test.go +++ b/backends/nats-streaming/nats-streaming_test.go @@ -1,8 +1,6 @@ package nats_streaming import ( - "io/ioutil" - . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -62,64 +60,6 @@ var _ = Describe("Nats Streaming Backend", func() { }) }) - Context("generateTLSConfig", func() { - It("works with files", func() { - tlsConfig, err := generateTLSConfig(connOpts.GetNatsStreaming()) - Expect(err).ToNot(HaveOccurred()) - Expect(len(tlsConfig.Certificates)).To(Equal(1)) - }) - It("returns error on incorrect cert file", func() { - args := connOpts.GetNatsStreaming() - args.TlsOptions.TlsClientCert = args.TlsOptions.TlsClientKey - _, err := generateTLSConfig(args) - Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("unable to load ssl keypair")) - }) - It("returns error on incorrect cert string", func() { - caBytes, err := ioutil.ReadFile("../../test-assets/ssl/ca.crt") - Expect(err).ToNot(HaveOccurred()) - certBytes, err := ioutil.ReadFile("../../test-assets/ssl/client.crt") - Expect(err).ToNot(HaveOccurred()) - keyBytes, err := ioutil.ReadFile("../../test-assets/ssl/client.key") - Expect(err).ToNot(HaveOccurred()) - - args := &args.NatsStreamingConn{ - TlsOptions: &args.NatsStreamingTLSOptions{ - TlsCaCert: caBytes, - TlsClientCert: keyBytes, - TlsClientKey: certBytes, - TlsSkipVerify: true, - }, - } - - _, err = generateTLSConfig(args) - Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("unable to load ssl keypair")) - }) - - It("works with strings", func() { - caBytes, err := ioutil.ReadFile("../../test-assets/ssl/ca.crt") - Expect(err).ToNot(HaveOccurred()) - certBytes, err := ioutil.ReadFile("../../test-assets/ssl/client.crt") - Expect(err).ToNot(HaveOccurred()) - keyBytes, err := ioutil.ReadFile("../../test-assets/ssl/client.key") - Expect(err).ToNot(HaveOccurred()) - - args := &args.NatsStreamingConn{ - TlsOptions: &args.NatsStreamingTLSOptions{ - TlsCaCert: caBytes, - TlsClientCert: certBytes, - TlsClientKey: keyBytes, - TlsSkipVerify: true, - }, - } - - tlsConfig, err := generateTLSConfig(args) - Expect(err).ToNot(HaveOccurred()) - Expect(len(tlsConfig.Certificates)).To(Equal(1)) - }) - }) - Context("validateBaseConnOpts", func() { It("validates conn presence", func() { err := validateBaseConnOpts(nil) diff --git a/backends/nats/nats.go b/backends/nats/nats.go index 30a712fc..99fe8afa 100644 --- a/backends/nats/nats.go +++ b/backends/nats/nats.go @@ -2,9 +2,6 @@ package nats import ( "context" - "crypto/tls" - "crypto/x509" - "io/ioutil" "net/url" "github.com/nats-io/nats.go" @@ -15,6 +12,7 @@ import ( "github.com/batchcorp/plumber-schemas/build/go/protos/opts" "github.com/batchcorp/plumber/types" + "github.com/batchcorp/plumber/util" ) const BackendName = "nats" @@ -73,7 +71,7 @@ func newClient(opts *args.NatsConn) (*nats.Conn, error) { creds = nats.UserCredentials(string(opts.UserCredentials)) } - if uri.Scheme != "tls" { + if uri.Scheme != "tls" && !opts.TlsOptions.UseTls { // Insecure connection c, err := nats.Connect(opts.Dsn, creds) if err != nil { @@ -83,9 +81,14 @@ func newClient(opts *args.NatsConn) (*nats.Conn, error) { } // TLS Secured connection - tlsConfig, err := generateTLSConfig(opts) + tlsConfig, err := util.GenerateTLSConfig( + opts.TlsOptions.TlsCaCert, + opts.TlsOptions.TlsClientCert, + opts.TlsOptions.TlsClientKey, + opts.TlsOptions.TlsSkipVerify, + ) if err != nil { - return nil, err + return nil, errors.Wrap(err, "Unable to generate TLS Config") } c, err := nats.Connect(opts.Dsn, nats.Secure(tlsConfig), creds) @@ -95,34 +98,3 @@ func newClient(opts *args.NatsConn) (*nats.Conn, error) { return c, nil } - -func generateTLSConfig(opts *args.NatsConn) (*tls.Config, error) { - certpool := x509.NewCertPool() - - pemCerts, err := ioutil.ReadFile(string(opts.TlsOptions.TlsCaCert)) - if err == nil { - certpool.AppendCertsFromPEM(pemCerts) - } - - // Import client certificate/key pair - cert, err := tls.LoadX509KeyPair(string(opts.TlsOptions.TlsClientCert), string(opts.TlsOptions.TlsClientKey)) - if err != nil { - return nil, errors.Wrap(err, "unable to load ssl keypair") - } - - // Just to print out the client certificate.. - cert.Leaf, err = x509.ParseCertificate(cert.Certificate[0]) - if err != nil { - return nil, errors.Wrap(err, "unable to parse certificate") - } - - // Create tls.Config with desired tls properties - return &tls.Config{ - RootCAs: certpool, - ClientAuth: tls.NoClientCert, - ClientCAs: nil, - InsecureSkipVerify: opts.TlsOptions.TlsSkipVerify, - Certificates: []tls.Certificate{cert}, - MinVersion: tls.VersionTLS12, - }, nil -} diff --git a/docs/env.md b/docs/env.md index 0f309768..47283ff2 100644 --- a/docs/env.md +++ b/docs/env.md @@ -30,7 +30,7 @@ ## Backends -### Aws_sns +### AQS SNS | **Environment Variable** | **Description** | **Default** | **Required** | | ------------------------ | --------------- | ----------- | ------------ | @@ -38,7 +38,7 @@ | AWS_ACCESS_KEY_ID | | | **true** | | AWS_SECRET_ACCESS_KEY | | | **true** | -### Aws_sqs +### AWS SQS | **Environment Variable** | **Description** | **Default** | **Required** | | ------------------------ | --------------- | ----------- | ------------ | @@ -52,13 +52,13 @@ | PLUMBER_RELAY_SQS_AUTO_DELETE | Auto-delete read/received message(s) | | false | | PLUMBER_RELAY_SQS_WAIT_TIME_SECONDS | Number of seconds to wait for messages (not used when using --continuous) | 5 | false | -### Azure_event_hub +### Azure Event Hub | **Environment Variable** | **Description** | **Default** | **Required** | | ------------------------ | --------------- | ----------- | ------------ | | EVENTHUB_CONNECTION_STRING | Connection string | | **true** | -### Azure_service_bus +### Azure Service Bus | **Environment Variable** | **Description** | **Default** | **Required** | | ------------------------ | --------------- | ----------- | ------------ | @@ -67,7 +67,7 @@ | PLUMBER_RELAY_AZURE_TOPIC_NAME | Topic name | | **true** | | PLUMBER_RELAY_AZURE_SUBSCRIPTION | Subscription name | | **true** | -### Gcp_pubsub +### Google Cloud PubSub | **Environment Variable** | **Description** | **Default** | **Required** | | ------------------------ | --------------- | ----------- | ------------ | @@ -99,7 +99,7 @@ | PLUMBER_RELAY_KAFKA_REBALANCE_TIMEOUT | How long a coordinator will wait for member joins as part of a rebalance | 5 | false | | PLUMBER_RELAY_KAFKA_QUEUE_CAPACITY | Internal library queue capacity (throughput optimization) | 1000 | false | -### Kubemq_queue +### KubeMQ Queue | **Environment Variable** | **Description** | **Default** | **Required** | | ------------------------ | --------------- | ----------- | ------------ | @@ -133,14 +133,48 @@ | PLUMBER_RELAY_MQTT_TOPIC | Topic to read message(s) from | | **true** | | PLUMBER_RELAY_MQTT_READ_TIMEOUT_SECONDS | How long to attempt to read message(s) | 0 | false | -### Nats_streaming +### NATS | **Environment Variable** | **Description** | **Default** | **Required** | | ------------------------ | --------------- | ----------- | ------------ | -| PLUMBER_RELAY_NATS_STREAMING_TLS_CA_CERT | CA file (only needed if addr is tls:// | | false | -| PLUMBER_RELAY_NATS_STREAMING_TLS_CLIENT_CERT | Client cert file (only needed if addr is tls:// | | false | -| PLUMBER_RELAY_NATS_STREAMING_TLS_CLIENT_KEY | Client key file (only needed if addr is tls:// | | false | +| PLUMBER_RELAY_NATS_DSN | Dial string for NATS server (Ex: nats://localhost:4222) | nats://localhost:4222 | **true** | +| PLUMBER_RELAY_NATS_CREDENTIALS | Contents of NATS .creds file to authenticate with | | false | +| PLUMBER_RELAY_NATS_SUBJECT | Subject to read from. Ex: foo.bar.* | | false | +| PLUMBER_RELAY_NATS_USE_TLS | Force TLS connection. (Ignored if DSN begins with "tls://") | false | false | +| PLUMBER_RELAY_NATS_TLS_CA_CERT | CA file (only used for TLS connections) | | false | +| PLUMBER_RELAY_NATS_TLS_CLIENT_CERT | Client cert file (only used for TLS connections) | | false | +| PLUMBER_RELAY_NATS_TLS_CLIENT_KEY | Client key file (only used for TLS connections) | | false | + +### NATS Streaming + +| **Environment Variable** | **Description** | **Default** | **Required** | +| ------------------------ | --------------- | ----------- | ------------ | +| PLUMBER_RELAY_NATS_STREAMING_DSN | Dial string for NATS server (Ex: nats://localhost:4222) | nats://localhost:4222 | **true** | +| PLUMBER_RELAY_NATS_STREAMING_CLUSTER_ID | Cluster ID | Cluster ID of the Nats server | **true** | +| PLUMBER_RELAY_NATS_STREAMING_CHANNEL | Channel name | Channel name to subscribe to | **true** | +| PLUMBER_RELAY_NATS_STREAMING_DURABLE_SUBSCRIPTION_NAME | Create a durable subscription with this name for the given channel | | false | +| PLUMBER_RELAY_NATS_STREAMING_USE_TLS | Force TLS connection. (Ignored if DSN begins with "tls://") | false | false | +| PLUMBER_RELAY_NATS_STREAMING_TLS_CA_CERT | CA file (only used for TLS connections) | | false | +| PLUMBER_RELAY_NATS_STREAMING_TLS_CLIENT_CERT | Client cert file (only used for TLS connections) | | false | +| PLUMBER_RELAY_NATS_STREAMING_TLS_CLIENT_KEY | Client key file (only used for TLS connections) | | false | | PLUMBER_RELAY_NATS_STREAMING_SKIP_VERIFY_TLS | Whether to verify server certificate | | false | +| PLUMBER_RELAY_NATS_STREAMING_READ_LAST | Deliver starting with last published message | false | false | +| PLUMBER_RELAY_NATS_STREAMING_READ_SEQUENCE | Deliver messages starting at this sequence number | 0 | false | +| PLUMBER_RELAY_NATS_STREAMING_READ_SINCE | Deliver messages in last interval (e.g. 1s, 1h) | | false | +| PLUMBER_RELAY_NATS_STREAMING_READ_ALL | Deliver all available messages | false | false | + +### NATS JetStream + +| **Environment Variable** | **Description** | **Default** | **Required** | +| ------------------------ | --------------- | ----------- | ------------ | +| PLUMBER_RELAY_NATS_JETSTREAM_DSN | Dial string for NATS server (Ex: nats://localhost:4222) | nats://localhost:4222 | **true** | +| PLUMBER_RELAY_NATS_JETSTREAM_CLIENT_ID | User specified client ID to connect with | plumber | false | +| PLUMBER_RELAY_NATS_JETSTREAM_STREAM | Stream name to read from | | **true** | +| PLUMBER_RELAY_NATS_JETSTREAM_USE_TLS | Force TLS connection. (Ignored if DSN begins with "tls://") | false | false | +| PLUMBER_RELAY_NATS_JETSTREAM_TLS_CA_CERT | CA file (only used for TLS connections) | | false | +| PLUMBER_RELAY_NATS_JETSTREAM_TLS_CLIENT_CERT | Client cert file (only used for TLS connections) | | false | +| PLUMBER_RELAY_NATS_JETSTREAM_TLS_CLIENT_KEY | Client key file (only used for TLS connections) | | false | +| PLUMBER_RELAY_NATS_JETSTREAM_SKIP_VERIFY_TLS | Whether to verify server certificate (only needed using TLS) | | false | ### Nsq @@ -189,7 +223,7 @@ | PLUMBER_RELAY_CONSUMER_TAG | How to identify the consumer to RabbitMQ | plumber | false | | PLUMBER_RELAY_RABBIT_QUEUE_AUTO_DELETE | Whether to auto-delete the queue after plumber has disconnected | true | false | -### Redis_pubsub +### Redis PubSub | **Environment Variable** | **Description** | **Default** | **Required** | | ------------------------ | --------------- | ----------- | ------------ | @@ -199,7 +233,7 @@ | PLUMBER_RELAY_REDIS_PUBSUB_DATABASE | Database (0-16) | | false | | PLUMBER_RELAY_REDIS_PUBSUB_CHANNELS | Comma separated list of channels to read from | | **true** | -### Redis_streams +### Redis Streams | **Environment Variable** | **Description** | **Default** | **Required** | | ------------------------ | --------------- | ----------- | ------------ | diff --git a/docs/examples.md b/docs/examples.md index c2643077..324f85ed 100644 --- a/docs/examples.md +++ b/docs/examples.md @@ -9,6 +9,7 @@ * [Azure Event Hub](#azure-event-hub) * [NATS](#nats) * [NATS Streaming](#nats-streaming) + * [NATS JetStream](#nats-jetstream) * [Redis PubSub](#redis-pubsub) * [Redis Streams](#redis-streams) * [GCP Pub/Sub](#gcp-pubsub) @@ -28,6 +29,7 @@ * [Azure Event Hub](#azure-event-hub-1) * [NATS](#nats-1) * [NATS Streaming](#nats-streaming-1) + * [NATS JetStream](#nats-jetstream-1) * [Redis PubSub](#redis-pubsub-1) * [Redis Streams](#redis-streams-1) * [GCP Pub/Sub](#gcp-pubsub-1) @@ -44,6 +46,7 @@ * [Continuously relay messages for multiple Redis streams to a Batch.sh collection](#continuously-relay-messages-from-multiple-redis-streams-to-a-batchsh-collection) * [Continuously relay messages from a Kafka topic (on Confluent) to a Batch.sh collection (via CLI)](#continuously-relay-messages-from-a-kafka-topic-on-confluent-to-a-batchsh-collection-via-cli) * [Continuously relay messages from a MQTT topic to a Batch.sh collection](#continuously-relay-messages-from-a-mqtt-topic-to-a-batchsh-collection) + * [Continuously relay messages from a NATS JetStream stream to a Batch.sh collection](#continuously-relay-messages-from-a-nats-jetstream-stream-to-a-batchsh-collection) * [Change Data Capture](#change-data-capture) * [Continuously relay Postgres change events to a Batch.sh collection](#continuously-relay-postgres-change-events-to-a-batchsh-collection) * [Continuously relay MongoDB change stream events to a Batch.sh collection](#continuously-relay-mongodb-change-stream-events-to-a-batchsh-collection) @@ -166,6 +169,12 @@ plumber read nats --address="nats://user:pass@nats.test.io:4222" --subject "test plumber read nats-streaming --address="nats://user:pass@nats.test.io:4222" --channel "orders" --cluster-id "test-cluster" --client-id "plumber" ``` +##### NATS JetStream + +```bash +plumber read nats-jetstream --dsn="nats://user:pass@nats.test.io:4222" --stream "orders.>" --client-id "plumber" +``` + ##### Redis PubSub ```bash @@ -325,6 +334,12 @@ plumber write nats --address="nats://user:pass@nats.test.io:4222" --subject "tes plumber write nats-streaming --address="nats://user:pass@nats.test.io:4222" --channel "orders" --cluster-id "test-cluster" --client-id "plumber-producer" --input "{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}" ``` +##### NATS JetStream + +```bash +plumber read nats-jetstream --dsn="nats://user:pass@nats.test.io:4222" --stream "orders.>" --input="{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}" +``` + ##### Redis PubSub ```bash @@ -458,6 +473,17 @@ docker run -d --name plumber-mqtt -p 8080:8080 \ batchcorp/plumber:local mqtt ``` +##### Continuously relay messages from a NATS JetStream stream to a Batch.sh collection + +```bash +docker run -d --name plumber-natsjs -p 8080:8080 \ + -e PLUMBER_RELAY_NATS_JETSTREAM_DSN=nats://localhost:4222 \ + -e PLUMBER_RELAY_NATS_JETSTREAM_CLIENT_ID=plumber \ + -e PLUMBER_RELAY_NATS_JETSTREAM_STREAM=orders \ + -e PLUMBER_RELAY_TOKEN=$YOUR-BATCHSH-TOKEN-HERE \ + batchcorp/plumber:local mqtt +``` + ## Change Data Capture diff --git a/util/util.go b/util/util.go index 506d971c..f8d001cf 100644 --- a/util/util.go +++ b/util/util.go @@ -3,8 +3,11 @@ package util import ( "bytes" "compress/gzip" + "crypto/tls" + "crypto/x509" "fmt" "io" + "io/ioutil" "os" "strings" "time" @@ -141,3 +144,54 @@ func FileExists(path []byte) bool { _, err := os.Stat(string(path)) return err == nil } + +func GenerateTLSConfig(caCert, clientCert, clientKey []byte, skipVerify bool) (*tls.Config, error) { + certpool := x509.NewCertPool() + + if len(caCert) > 0 { + if FileExists(caCert) { + // CLI input, read from file + pemCerts, err := ioutil.ReadFile(string(caCert)) + if err == nil { + certpool.AppendCertsFromPEM(pemCerts) + } + } else { + // Server input, contents of the certificate + certpool.AppendCertsFromPEM(caCert) + } + } + + // Import client certificate/key pair + var cert tls.Certificate + var err error + if len(clientCert) > 0 && len(clientKey) > 0 { + if FileExists(clientCert) { + // CLI input, read from file + cert, err = tls.LoadX509KeyPair(string(clientCert), string(clientKey)) + if err != nil { + return nil, errors.Wrap(err, "unable to load client certificate") + } + } else { + // Server input, contents of the certificate + cert, err = tls.X509KeyPair(clientCert, clientKey) + if err != nil { + return nil, errors.Wrap(err, "unable to load client certificate") + } + } + + cert.Leaf, err = x509.ParseCertificate(cert.Certificate[0]) + if err != nil { + return nil, errors.Wrap(err, "unable to parse certificate") + } + } + + // Create tls.Config with desired tls properties + return &tls.Config{ + RootCAs: certpool, + ClientAuth: tls.NoClientCert, + ClientCAs: nil, + InsecureSkipVerify: skipVerify, + Certificates: []tls.Certificate{cert}, + MinVersion: tls.VersionTLS12, + }, nil +} diff --git a/util/util_suite_test.go b/util/util_suite_test.go new file mode 100644 index 00000000..9a34451a --- /dev/null +++ b/util/util_suite_test.go @@ -0,0 +1,13 @@ +package util_test + +import ( + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +func TestUtil(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Util Suite") +} diff --git a/util/util_test.go b/util/util_test.go new file mode 100644 index 00000000..1bb67ced --- /dev/null +++ b/util/util_test.go @@ -0,0 +1,66 @@ +package util + +import ( + "io/ioutil" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("Utility Package", func() { + Context("GenerateTLSConfig", func() { + + var tlsSkipVerify bool + var tlsCaCert, tlsClientCert, tlsClientKey []byte + + BeforeEach(func() { + tlsSkipVerify = false + tlsCaCert = []byte(`../test-assets/ssl/ca.crt`) + tlsClientCert = []byte(`../test-assets/ssl/client.crt`) + tlsClientKey = []byte(`../test-assets/ssl/client.key`) + }) + + It("works with files", func() { + tlsConfig, err := GenerateTLSConfig(tlsCaCert, tlsClientCert, tlsClientKey, tlsSkipVerify) + Expect(err).ToNot(HaveOccurred()) + Expect(len(tlsConfig.Certificates)).To(Equal(1)) + }) + It("returns error on incorrect cert file", func() { + // Cert and key arguments are swapped + _, err := GenerateTLSConfig(tlsCaCert, tlsClientKey, tlsClientCert, tlsSkipVerify) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("failed to find certificate PEM data in certificate input")) + }) + It("returns error on incorrect cert string", func() { + caBytes, err := ioutil.ReadFile("../test-assets/ssl/ca.crt") + Expect(err).ToNot(HaveOccurred()) + certBytes, err := ioutil.ReadFile("../test-assets/ssl/client.crt") + Expect(err).ToNot(HaveOccurred()) + keyBytes, err := ioutil.ReadFile("../test-assets/ssl/client.key") + Expect(err).ToNot(HaveOccurred()) + + // Cert and key arguments are swapped + _, err = GenerateTLSConfig(caBytes, keyBytes, certBytes, tlsSkipVerify) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("failed to find certificate PEM data in certificate input")) + }) + + It("works with strings", func() { + caBytes, err := ioutil.ReadFile("../test-assets/ssl/ca.crt") + Expect(err).ToNot(HaveOccurred()) + certBytes, err := ioutil.ReadFile("../test-assets/ssl/client.crt") + Expect(err).ToNot(HaveOccurred()) + keyBytes, err := ioutil.ReadFile("../test-assets/ssl/client.key") + Expect(err).ToNot(HaveOccurred()) + + tlsCaCert = caBytes + tlsClientCert = certBytes + tlsClientKey = keyBytes + tlsSkipVerify = true + + tlsConfig, err := GenerateTLSConfig(tlsCaCert, tlsClientCert, tlsClientKey, tlsSkipVerify) + Expect(err).ToNot(HaveOccurred()) + Expect(len(tlsConfig.Certificates)).To(Equal(1)) + }) + }) +})