Skip to content

Commit

Permalink
Merge pull request #291 from batchcorp/dselans/improve-jetstream
Browse files Browse the repository at this point in the history
Dselans/improve jetstream
  • Loading branch information
dselans authored Jun 22, 2022
2 parents 2f09b69 + 18483ec commit bd330d0
Show file tree
Hide file tree
Showing 13 changed files with 478 additions and 139 deletions.
6 changes: 6 additions & 0 deletions backends/nats-jetstream/display.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nats_jetstream

import (
"fmt"
"time"

"github.com/pkg/errors"
Expand All @@ -26,6 +27,11 @@ func (n *NatsJetstream) DisplayMessage(cliOpts *opts.CLIOptions, msg *records.Re
{"Subject", record.Stream},
}

if cliOpts.Read.NatsJetstream.Args.CreateDurableConsumer || cliOpts.Read.NatsJetstream.Args.ExistingDurableConsumer {
properties = append(properties, []string{"Consumer Name", record.ConsumerName})
properties = append(properties, []string{"Stream Sequence", fmt.Sprint(record.Sequence)})
}

receivedAt := time.Unix(msg.ReceivedAtUnixTsUtc, 0)

printer.PrintTable(cliOpts, msg.Num, receivedAt, msg.Payload, properties)
Expand Down
102 changes: 102 additions & 0 deletions backends/nats-jetstream/nats-jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"context"
"crypto/tls"
"net/url"
"time"

"github.com/batchcorp/plumber-schemas/build/go/protos/args"
"github.com/nats-io/nats.go"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -102,6 +104,106 @@ func (n *NatsJetstream) Test(_ context.Context) error {
return types.NotImplementedErr
}

func (n *NatsJetstream) validateExistingConsumerConfig(readArgs *args.NatsJetstreamReadArgs, config *nats.ConsumerConfig) error {
if readArgs == nil || config == nil {
return errors.New("readArgs and config cannot be nil")
}

if !readArgs.ExistingDurableConsumer {
return errors.New("expected existing durable consumer to be enabled - bug?")
}

// Consumer should be configured to use deliverByStartSeq
if readArgs.ConsumerStartSequence != 0 {
if config.DeliverPolicy != nats.DeliverByStartSequencePolicy {
return errors.New("existing consumer's deliver policy is not set to DeliverByStartSequence (tip: do not use existing consumer)")
}

return nil
}

if readArgs.ConsumerStartTime != "" {
if config.DeliverPolicy != nats.DeliverByStartTimePolicy {
return errors.New("existing consumer's deliver policy is not set to DeliverByStartTime (tip: do not use existing consumer)")
}
}

return nil
}

func (n *NatsJetstream) createConsumer(ctx nats.JetStreamContext, args *args.NatsJetstreamReadArgs) (*nats.ConsumerInfo, error) {
if args == nil || ctx == nil {
return nil, errors.New("both ctx and args cannot be nil")
}

if !args.CreateDurableConsumer && !args.ExistingDurableConsumer {
return nil, errors.New("durable consumer usage not enabled - nothing to do")
}

if args.ExistingDurableConsumer {
if args.ConsumerName == "" {
return nil, errors.New("consumer name must be specified when existing consumer is enabled")
}

consumerInfo, err := ctx.ConsumerInfo(args.Stream, args.ConsumerName)
if err != nil {
return nil, errors.Wrap(err, "unable to fetch existing consumer")
}

if err := n.validateExistingConsumerConfig(args, &consumerInfo.Config); err != nil {
return nil, errors.Wrap(err, "unable to validate existing consumer config")
}

return consumerInfo, nil
}

filterSubject := args.ConsumerFilterSubject

if filterSubject == "" {
filterSubject = args.Stream
}

consumerCfg := &nats.ConsumerConfig{
Durable: getConsumerName(args.ConsumerName),
Description: "plumber consumer",
OptStartSeq: uint64(args.ConsumerStartSequence),
FilterSubject: filterSubject,
AckPolicy: nats.AckExplicitPolicy,
DeliverPolicy: nats.DeliverLastPolicy,
}

// Which delivery policy should we use?
if args.ConsumerStartSequence != 0 {
consumerCfg.DeliverPolicy = nats.DeliverByStartSequencePolicy
} else if args.ConsumerStartTime != "" {
t, err := time.Parse(time.RFC3339, args.ConsumerStartTime)
if err != nil {
return nil, errors.Wrap(err, "unable to parse start time")
}

consumerCfg.DeliverPolicy = nats.DeliverByStartTimePolicy
consumerCfg.OptStartTime = &t
}

consumerInfo, err := ctx.AddConsumer(args.Stream, consumerCfg)
if err != nil {
return nil, errors.Wrap(err, "unable to add consumer")
}

n.log.Debugf("Created durable consumer '%s'", consumerInfo.Name)

return consumerInfo, nil
}

// If name is empty, generate a random'ish name; otherwise use provided
func getConsumerName(name string) string {
if name == "" {
return "plumber-" + util.RandomString(8)
}

return name
}

func validateBaseConnOpts(connOpts *opts.ConnectionOptions) error {
if connOpts == nil {
return validate.ErrMissingConnOpts
Expand Down
106 changes: 98 additions & 8 deletions backends/nats-jetstream/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package nats_jetstream
import (
"context"
"encoding/json"
"strings"
"time"

"github.com/batchcorp/plumber-schemas/build/go/protos/args"
"github.com/nats-io/nats.go"
"github.com/pkg/errors"
uuid "github.com/satori/go.uuid"
Expand All @@ -27,12 +29,21 @@ func (n *NatsJetstream) Read(ctx context.Context, readOpts *opts.ReadOptions, re

n.log.Info("Listening for message(s) ...")

var count int64

// nats.Subscribe is async, use channel to wait to exit
doneCh := make(chan struct{})

jsCtx.Subscribe(readOpts.NatsJetstream.Args.Stream, func(msg *nats.Msg) {
var count int64

var consumerInfo *nats.ConsumerInfo
var sub *nats.Subscription

handler := func(msg *nats.Msg) {
n.log.Debugf("Received new message on subject '%s'", msg.Subject)

if err := msg.Ack(); err != nil {
n.log.Warningf("unable to ack message")
}

count++

serializedMsg, err := json.Marshal(msg)
Expand All @@ -44,30 +55,109 @@ func (n *NatsJetstream) Read(ctx context.Context, readOpts *opts.ReadOptions, re
return
}

jsRecord := &records.NatsJetstream{
Stream: readOpts.NatsJetstream.Args.Stream,
Value: msg.Data,
}

if consumerInfo != nil {
ci, err := msg.Sub.ConsumerInfo()
if err != nil {
n.log.Warningf("unable to fetch consumer info for msg: %s", err)
} else {
jsRecord.ConsumerName = ci.Name
jsRecord.Sequence = int64(ci.Delivered.Stream)
}
}

resultsChan <- &records.ReadRecord{
MessageId: uuid.NewV4().String(),
Num: count,
ReceivedAtUnixTsUtc: time.Now().UTC().Unix(),
Payload: msg.Data,
XRaw: serializedMsg,
Record: &records.ReadRecord_NatsJetstream{
NatsJetstream: &records.NatsJetstream{
Stream: readOpts.NatsJetstream.Args.Stream,
Value: msg.Data,
},
NatsJetstream: jsRecord,
},
}

if !readOpts.Continuous {
doneCh <- struct{}{}
}
})
}

if readOpts.NatsJetstream.Args.CreateDurableConsumer || readOpts.NatsJetstream.Args.ExistingDurableConsumer {
consumerInfo, err = n.createConsumer(jsCtx, readOpts.NatsJetstream.Args)
if err != nil {
return errors.Wrap(err, "unable to create consumer")
}

sub, err = jsCtx.PullSubscribe(consumerInfo.Stream, consumerInfo.Name)
} else {
sub, err = jsCtx.Subscribe(readOpts.NatsJetstream.Args.Stream, handler)
}

if err != nil {
n.log.Errorf("unable to subscribe: %s", err)
}

defer sub.Unsubscribe()
defer n.cleanupConsumer(jsCtx, consumerInfo, readOpts.NatsJetstream.Args)

if sub.Type() == nats.PullSubscription {
TOP:
for {
msgs, err := sub.Fetch(1, nats.MaxWait(60*time.Second))
if err != nil {
if strings.Contains(err.Error(), "timeout") {
continue
}
}

for _, m := range msgs {
handler(m)

if !readOpts.Continuous {
break TOP
}
}
}
}

<-doneCh

return nil
}

func (n *NatsJetstream) cleanupConsumer(jsCtx nats.JetStreamContext, ci *nats.ConsumerInfo, readArgs *args.NatsJetstreamReadArgs) {
// Nothing to do if no consumer info or read args
if ci == nil || readArgs == nil {
return
}

// Nothing to do
if !readArgs.CreateDurableConsumer && !readArgs.ExistingDurableConsumer {
return
}

// Nothing to do if no jctx
if jsCtx == nil {
return
}

// Nothing to do if consumer should be kept
if readArgs.KeepConsumer {
return
}

// Consumer should be deleted
if err := jsCtx.DeleteConsumer(ci.Stream, ci.Name); err != nil {
n.log.Errorf("unable to delete consumer during cleanup: %s", err)
}

n.log.Debugf("successfully deleted consumer '%s' during cleanup", ci.Name)
}

func validateReadOptions(readOpts *opts.ReadOptions) error {
if readOpts == nil {
return validate.ErrMissingReadOptions
Expand Down
1 change: 1 addition & 0 deletions backends/nats-jetstream/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func (n *NatsJetstream) Relay(ctx context.Context, relayOpts *opts.RelayOptions,
return errors.Wrap(err, "failed to get jetstream context")
}

// TODO: This should be a pull subscriber
jsCtx.Subscribe(relayOpts.NatsJetstream.Args.Stream, func(msg *nats.Msg) {
relayCh <- &types.RelayMessage{
Value: msg,
Expand Down
26 changes: 16 additions & 10 deletions docs/env.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,16 +165,22 @@

### NATS JetStream

| **Environment Variable** | **Description** | **Default** | **Required** |
| ------------------------ | --------------- | ----------- | ------------ |
| PLUMBER_RELAY_NATS_JETSTREAM_DSN | Dial string for NATS server (Ex: nats://localhost:4222) | nats://localhost:4222 | **true** |
| PLUMBER_RELAY_NATS_JETSTREAM_CLIENT_ID | User specified client ID to connect with | plumber | false |
| PLUMBER_RELAY_NATS_JETSTREAM_STREAM | Stream name to read from | | **true** |
| PLUMBER_RELAY_NATS_JETSTREAM_USE_TLS | Force TLS connection. (Ignored if DSN begins with "tls://") | false | false |
| PLUMBER_RELAY_NATS_JETSTREAM_TLS_CA_CERT | CA file (only used for TLS connections) | | false |
| PLUMBER_RELAY_NATS_JETSTREAM_TLS_CLIENT_CERT | Client cert file (only used for TLS connections) | | false |
| PLUMBER_RELAY_NATS_JETSTREAM_TLS_CLIENT_KEY | Client key file (only used for TLS connections) | | false |
| PLUMBER_RELAY_NATS_JETSTREAM_SKIP_VERIFY_TLS | Whether to verify server certificate (only needed using TLS) | | false |
| **Environment Variable** | **Description** | **Default** | **Required** |
|----------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------| ----------- |------------|
| PLUMBER_RELAY_NATS_JETSTREAM_DSN | Dial string for NATS server (Ex: nats://localhost:4222) | nats://localhost:4222 | **true** |
| PLUMBER_RELAY_NATS_JETSTREAM_CLIENT_ID | User specified client ID to connect with | plumber | false |
| PLUMBER_RELAY_NATS_JETSTREAM_STREAM | Stream name to read from | | **true** |
| PLUMBER_RELAY_NATS_JETSTREAM_USE_TLS | Force TLS connection. (Ignored if DSN begins with "tls://") | false | false |
| PLUMBER_RELAY_NATS_JETSTREAM_TLS_CA_CERT | CA file (only used for TLS connections) | | false |
| PLUMBER_RELAY_NATS_JETSTREAM_TLS_CLIENT_CERT | Client cert file (only used for TLS connections) | | false |
| PLUMBER_RELAY_NATS_JETSTREAM_TLS_CLIENT_KEY | Client key file (only used for TLS connections) | | false |
| PLUMBER_RELAY_NATS_JETSTREAM_SKIP_VERIFY_TLS | Whether to verify server certificate (only needed using TLS) | | false |
| PLUMBER_RELAY_NATS_JETSTREAM_CONSUMER_NAME | Consumer name (has no effect if create_durable_consumer or existing_durable_consumer is not set) | | false |
| PLUMBER_RELAY_NATS_JETSTREAM_CREATE_DURABLE_CONSUMER | Whether to create a durable consumer (setting this will cause plumber to create a pull consumer) | | false |
| PLUMBER_RELAY_NATS_JETSTREAM_KEEP_CONSUMER | Whether to delete the consumer after use | | false |
| PLUMBER_RELAY_NATS_JETSTREAM_CONSUMER_START_SEQUENCE | Where in the stream the consumer should start reading (NOTE: If set, consumer deliver policy will be set to DeliverByStartSequence) | | false |
| PLUMBER_RELAY_NATS_JETSTREAM_CONSUMER_START_TIME | At what time in the stream should the consumer begin reading (NOTE: If set, consumer deliver policy will be set to DeliverByStartTime) | | false |
| PLUMBER_RELAY_NATS_JETSTREAM_CONSUMER_FILTER_SUBJECT | Only receive a subset of messages from the Stream based on the subject | | false |

### Nsq

Expand Down
19 changes: 19 additions & 0 deletions docs/examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,25 @@ plumber read nats-streaming --address="nats://user:[email protected]:4222" --cha
plumber read nats-jetstream --dsn="nats://user:[email protected]:4222" --stream "orders.>" --client-id "plumber"
```
Create and use a durable consumer:
```bash
plumber read nats-jetstream --dsn="nats://user:[email protected]:4222" --stream foo --create-durable-consumer
```
Use an existing durable consumer:
```bash
plumber read nats-jetstream --dsn="nats://user:[email protected]:4222" --stream foo --existing-durable-consumer --consumer-name existing_consumer
```
Create a new durable consumer at a specific stream start sequence:
```bash
plumber read nats-jetstream --dsn="nats://user:[email protected]:4222" --stream foo --create-durable-consumer --consumer-start-sequence 42
```
NOTE: By default, `plumber` will remove any consumers it creates. To leave consumers untouched, set `--keep-consumer`.
##### Redis PubSub
```bash
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/batchcorp/kong v0.2.17-batch-fix
github.com/batchcorp/natty v0.0.16
github.com/batchcorp/pgoutput v0.3.2
github.com/batchcorp/plumber-schemas v0.0.160
github.com/batchcorp/plumber-schemas v0.0.163
github.com/batchcorp/rabbit v0.1.17
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/eclipse/paho.mqtt.golang v1.2.0
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ github.com/batchcorp/plumber-schemas v0.0.159 h1:eUTrrFCzNeg41ATliRnMyBT8IeJ6PCH
github.com/batchcorp/plumber-schemas v0.0.159/go.mod h1:3ujEtKZPcA9xzQ1ZTq2tsdRxwgXNj/6QTESGMGkN8qE=
github.com/batchcorp/plumber-schemas v0.0.160 h1:qLnDC7JKcL3/86EfuzZeUjWQkor8KCne7gz2nSpCxKM=
github.com/batchcorp/plumber-schemas v0.0.160/go.mod h1:3ujEtKZPcA9xzQ1ZTq2tsdRxwgXNj/6QTESGMGkN8qE=
github.com/batchcorp/plumber-schemas v0.0.161 h1:WnDB5bx/P9bAnyA9RPrPPrTHUcXrIyRMMLW70eeVyiA=
github.com/batchcorp/plumber-schemas v0.0.161/go.mod h1:3ujEtKZPcA9xzQ1ZTq2tsdRxwgXNj/6QTESGMGkN8qE=
github.com/batchcorp/plumber-schemas v0.0.162 h1:nppXpqKmy755Ci3nApwA/zTA4CW9rkYZFqqGkWznDAc=
github.com/batchcorp/plumber-schemas v0.0.162/go.mod h1:3ujEtKZPcA9xzQ1ZTq2tsdRxwgXNj/6QTESGMGkN8qE=
github.com/batchcorp/plumber-schemas v0.0.163 h1:gAqfKIwcs1iHUTVphx4bLI5AW9eMS3e1tVHdTpUbqEM=
github.com/batchcorp/plumber-schemas v0.0.163/go.mod h1:3ujEtKZPcA9xzQ1ZTq2tsdRxwgXNj/6QTESGMGkN8qE=
github.com/batchcorp/plz v0.9.2 h1:bPqb+sn7OUrpHjeTEI9YO4BJS9IQ7AstDDz2gn+tcn8=
github.com/batchcorp/plz v0.9.2/go.mod h1:3gacX+hQo+xvl0vtLqCMufzxuNCwt4geAVOMt2LQYfE=
github.com/batchcorp/rabbit v0.1.17 h1:dui1W7FLTrNxyVlDN+G+6d8LXz8HBhVAcUethXql9vQ=
Expand Down
Loading

0 comments on commit bd330d0

Please sign in to comment.