diff --git a/backends/kafka/read.go b/backends/kafka/read.go index 124b0e08..94bab858 100644 --- a/backends/kafka/read.go +++ b/backends/kafka/read.go @@ -17,6 +17,7 @@ import ( "github.com/batchcorp/plumber/pb" "github.com/batchcorp/plumber/printer" + "github.com/batchcorp/plumber/util" ) type IReader interface { @@ -155,14 +156,29 @@ func (r *Reader) Read(ctx context.Context) error { msg.Value = decoded } - var str string + var data []byte + var convertErr error - if r.Options.Convert == "base64" { - str = base64.StdEncoding.EncodeToString(msg.Value) - } else { - str = string(msg.Value) + switch r.Options.Convert { + case "base64": + _, convertErr = base64.StdEncoding.Decode(data, msg.Value) + case "gzip": + data, convertErr = util.Gunzip(msg.Value) + default: + data = msg.Value } + if convertErr != nil { + if !r.Options.Follow { + return errors.Wrap(convertErr, "unable to complete conversion") + } + + printer.Error(fmt.Sprintf("unable to complete conversion for message: %s", convertErr)) + continue + } + + str := string(data) + if r.Options.LineNumbers { str = fmt.Sprintf("%d: ", lineNumber) + str lineNumber++ diff --git a/backends/rabbitmq/read.go b/backends/rabbitmq/read.go index 5bbb7a61..641ed07a 100644 --- a/backends/rabbitmq/read.go +++ b/backends/rabbitmq/read.go @@ -14,6 +14,7 @@ import ( "github.com/batchcorp/plumber/pb" "github.com/batchcorp/plumber/printer" + "github.com/batchcorp/plumber/util" ) type Reader struct { @@ -111,14 +112,29 @@ func (r *RabbitMQ) Read() error { msg.Body = decoded } - var str string + var data []byte + var convertErr error - if r.Options.Convert == "base64" { - str = base64.StdEncoding.EncodeToString(msg.Body) - } else { - str = string(msg.Body) + switch r.Options.Convert { + case "base64": + _, convertErr = base64.StdEncoding.Decode(data, msg.Body) + case "gzip": + data, convertErr = util.Gunzip(msg.Body) + default: + data = msg.Body } + if convertErr != nil { + if !r.Options.Follow { + return errors.Wrap(convertErr, "unable to complete conversion") + } + + printer.Error(fmt.Sprintf("unable to complete conversion for message: %s", convertErr)) + continue + } + + str := string(data) + if r.Options.LineNumbers { str = fmt.Sprintf("%d: ", lineNumber) + str lineNumber++ diff --git a/main.go b/main.go index a10c1507..257f1499 100644 --- a/main.go +++ b/main.go @@ -152,7 +152,7 @@ func setupCLI() *cli.App { Name: "convert", Usage: "Convert messages received on the bus", Value: &EnumValue{ - Enum: []string{"base64"}, + Enum: []string{"base64", "gzip"}, Default: "", }, }, @@ -210,7 +210,7 @@ func setupCLI() *cli.App { Name: "convert", Usage: "Convert messages received on the bus", Value: &EnumValue{ - Enum: []string{"base64"}, + Enum: []string{"base64", "gzip"}, Default: "", }, }, diff --git a/util/util.go b/util/util.go new file mode 100644 index 00000000..e674be0b --- /dev/null +++ b/util/util.go @@ -0,0 +1,28 @@ +package util + +import ( + "bytes" + "compress/gzip" + "io" + + "github.com/pkg/errors" +) + +func Gunzip(data []byte) ([]byte, error) { + b := bytes.NewBuffer(data) + + var r io.Reader + + r, err := gzip.NewReader(b) + if err != nil { + return nil, errors.Wrap(err, "unable to create new reader") + } + + var resB bytes.Buffer + + if _, err := resB.ReadFrom(r); err != nil { + return nil, errors.Wrap(err, "unable to read data from reader") + } + + return resB.Bytes(), nil +}