Skip to content

Commit

Permalink
gzip decompression support for kafka and rabbit
Browse files Browse the repository at this point in the history
  • Loading branch information
dselans committed Jul 31, 2020
1 parent bf40997 commit 0d29f8a
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 12 deletions.
26 changes: 21 additions & 5 deletions backends/kafka/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/batchcorp/plumber/pb"
"github.com/batchcorp/plumber/printer"
"github.com/batchcorp/plumber/util"
)

type IReader interface {
Expand Down Expand Up @@ -155,14 +156,29 @@ func (r *Reader) Read(ctx context.Context) error {
msg.Value = decoded
}

var str string
var data []byte
var convertErr error

if r.Options.Convert == "base64" {
str = base64.StdEncoding.EncodeToString(msg.Value)
} else {
str = string(msg.Value)
switch r.Options.Convert {
case "base64":
_, convertErr = base64.StdEncoding.Decode(data, msg.Value)
case "gzip":
data, convertErr = util.Gunzip(msg.Value)
default:
data = msg.Value
}

if convertErr != nil {
if !r.Options.Follow {
return errors.Wrap(convertErr, "unable to complete conversion")
}

printer.Error(fmt.Sprintf("unable to complete conversion for message: %s", convertErr))
continue
}

str := string(data)

if r.Options.LineNumbers {
str = fmt.Sprintf("%d: ", lineNumber) + str
lineNumber++
Expand Down
26 changes: 21 additions & 5 deletions backends/rabbitmq/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/batchcorp/plumber/pb"
"github.com/batchcorp/plumber/printer"
"github.com/batchcorp/plumber/util"
)

type Reader struct {
Expand Down Expand Up @@ -111,14 +112,29 @@ func (r *RabbitMQ) Read() error {
msg.Body = decoded
}

var str string
var data []byte
var convertErr error

if r.Options.Convert == "base64" {
str = base64.StdEncoding.EncodeToString(msg.Body)
} else {
str = string(msg.Body)
switch r.Options.Convert {
case "base64":
_, convertErr = base64.StdEncoding.Decode(data, msg.Body)
case "gzip":
data, convertErr = util.Gunzip(msg.Body)
default:
data = msg.Body
}

if convertErr != nil {
if !r.Options.Follow {
return errors.Wrap(convertErr, "unable to complete conversion")
}

printer.Error(fmt.Sprintf("unable to complete conversion for message: %s", convertErr))
continue
}

str := string(data)

if r.Options.LineNumbers {
str = fmt.Sprintf("%d: ", lineNumber) + str
lineNumber++
Expand Down
4 changes: 2 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func setupCLI() *cli.App {
Name: "convert",
Usage: "Convert messages received on the bus",
Value: &EnumValue{
Enum: []string{"base64"},
Enum: []string{"base64", "gzip"},
Default: "",
},
},
Expand Down Expand Up @@ -210,7 +210,7 @@ func setupCLI() *cli.App {
Name: "convert",
Usage: "Convert messages received on the bus",
Value: &EnumValue{
Enum: []string{"base64"},
Enum: []string{"base64", "gzip"},
Default: "",
},
},
Expand Down
28 changes: 28 additions & 0 deletions util/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package util

import (
"bytes"
"compress/gzip"
"io"

"github.com/pkg/errors"
)

func Gunzip(data []byte) ([]byte, error) {
b := bytes.NewBuffer(data)

var r io.Reader

r, err := gzip.NewReader(b)
if err != nil {
return nil, errors.Wrap(err, "unable to create new reader")
}

var resB bytes.Buffer

if _, err := resB.ReadFrom(r); err != nil {
return nil, errors.Wrap(err, "unable to read data from reader")
}

return resB.Bytes(), nil
}

0 comments on commit 0d29f8a

Please sign in to comment.