Skip to content

Commit

Permalink
feature: deprecate consume_all
Browse files Browse the repository at this point in the history
Signed-off-by: Valery Piashchynski <[email protected]>
  • Loading branch information
rustatian committed Aug 26, 2023
1 parent 5c5f04b commit ee4fd87
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 112 deletions.
14 changes: 7 additions & 7 deletions beanstalkjobs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package beanstalkjobs

import (
"time"

"github.com/roadrunner-server/sdk/v4/utils"
)

const (
Expand All @@ -15,10 +13,8 @@ const (

type config struct {
// global
Addr string `mapstructure:"addr"`
Timeout time.Duration `mapstructure:"timeout"`
ConsumeAll bool `mapstructure:"consume_all"`

Addr string `mapstructure:"addr"`
Timeout time.Duration `mapstructure:"timeout"`
// local
PipePriority int64 `mapstructure:"priority"`
TubePriority *uint32 `mapstructure:"tube_priority"`
Expand All @@ -36,7 +32,7 @@ func (c *config) InitDefault() {
}

if c.TubePriority == nil {
c.TubePriority = utils.Uint32(0)
c.TubePriority = toPtr(uint32(0))
}

if c.PipePriority == 0 {
Expand All @@ -51,3 +47,7 @@ func (c *config) InitDefault() {
c.Timeout = time.Second * 30
}
}

func toPtr[T any](v T) *T {
return &v
}
45 changes: 26 additions & 19 deletions beanstalkjobs/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package beanstalkjobs

import (
"context"
stderr "errors"
"net"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -201,38 +202,44 @@ func (cp *ConnPool) redial() error {
func (cp *ConnPool) checkAndRedial(err error) error {
const op = errors.Op("connection_pool_check_redial")
const EOF string = "EOF"
switch et := err.(type) { //nolint:gocritic,errorlint
// check if the error
case beanstalk.ConnError:
// error is not wrapped
switch bErr := et.Err.(type) { //nolint:errorlint
case *net.OpError:
var et beanstalk.ConnError
var bErr *net.OpError

if stderr.As(err, &et) {
if stderr.As(et.Err, &bErr) {
cp.log.Debug("beanstalk connection error, redialing", zap.Error(et))
cp.RUnlock()
errR := cp.redial()
cp.RLock()
// if redial failed - return
if errR != nil {
cp.log.Error("beanstalk redial failed", zap.Error(errR))
return errors.E(op, errors.Errorf("%v:%v", bErr, errR))
}

cp.log.Debug("beanstalk redial was successful")
// if redial was successful -> continue listening
return nil
default:
if et.Err.Error() == EOF {
// if error is related to the broken connection - redial
cp.RUnlock()
errR := cp.redial()
cp.RLock()
// if redial failed - return
if errR != nil {
return errors.E(op, errors.Errorf("%v:%v", err, errR))
}
// if redial was successful -> continue listening
return nil
}

if et.Err.Error() == EOF {
cp.log.Debug("beanstalk connection error, redialing", zap.Error(et.Err))
// if error is related to the broken connection - redial
cp.RUnlock()
errR := cp.redial()
cp.RLock()
// if redial failed - return
if errR != nil {
cp.log.Error("beanstalk redial failed", zap.Error(errR))
return errors.E(op, errors.Errorf("%v:%v", err, errR))
}

cp.log.Debug("beanstalk redial was successful")
// if redial was successful -> continue listening
return nil
}
}

// return initial error
cp.log.Error("beanstalk connection error, unknown type of error", zap.Error(err))
return err
}
16 changes: 6 additions & 10 deletions beanstalkjobs/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@ import (
"sync/atomic"
"time"

"github.com/roadrunner-server/api/v4/plugins/v2/jobs"
"github.com/roadrunner-server/api/v4/plugins/v3/jobs"
"github.com/roadrunner-server/errors"
"github.com/roadrunner-server/sdk/v4/utils"
jprop "go.opentelemetry.io/contrib/propagators/jaeger"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
Expand All @@ -35,11 +34,10 @@ type Configurer interface {
}

type Driver struct {
log *zap.Logger
pq jobs.Queue
consumeAll bool
tracer *sdktrace.TracerProvider
prop propagation.TextMapPropagator
log *zap.Logger
pq jobs.Queue
tracer *sdktrace.TracerProvider
prop propagation.TextMapPropagator

pipeline atomic.Pointer[jobs.Pipeline]
listeners uint32
Expand Down Expand Up @@ -113,7 +111,6 @@ func FromConfig(tracer *sdktrace.TracerProvider, configKey string, log *zap.Logg
pool: cPool,
network: dsn[0],
addr: dsn[1],
consumeAll: conf.ConsumeAll,
tout: conf.Timeout,
tName: conf.Tube,
reserveTimeout: conf.ReserveTimeout,
Expand Down Expand Up @@ -175,10 +172,9 @@ func FromPipeline(tracer *sdktrace.TracerProvider, pipe jobs.Pipeline, log *zap.
network: dsn[0],
addr: dsn[1],
tout: conf.Timeout,
consumeAll: pipe.Bool(consumeAll, false),
tName: pipe.String(tube, "default"),
reserveTimeout: time.Second * time.Duration(pipe.Int(reserveTimeout, 5)),
tubePriority: utils.Uint32(uint32(pipe.Int(tubePriority, 1))),
tubePriority: toPtr(uint32(pipe.Int(tubePriority, 1))),
priority: pipe.Priority(),

// buffered with two because jobs root plugin can call Stop at the same time as Pause
Expand Down
64 changes: 23 additions & 41 deletions beanstalkjobs/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ import (
"github.com/beanstalkd/go-beanstalk"
"github.com/goccy/go-json"
"github.com/google/uuid"
"github.com/roadrunner-server/api/v4/plugins/v2/jobs"
"github.com/roadrunner-server/sdk/v4/utils"
"github.com/roadrunner-server/api/v4/plugins/v3/jobs"
"go.uber.org/zap"
)

Expand All @@ -25,7 +24,7 @@ type Item struct {
// Ident is unique identifier of the job, should be provided from outside
Ident string `json:"id"`
// Payload is string data (usually JSON) passed to Job broker.
Payload string `json:"payload"`
Payload []byte `json:"payload"`
// Headers with key-values pairs
headers map[string][]string
// Options contains set of PipelineOptions specific to job execution. Can be empty.
Expand Down Expand Up @@ -71,7 +70,7 @@ func (i *Item) GroupID() string {

// Body packs job payload into binary payload.
func (i *Item) Body() []byte {
return utils.AsBytes(i.Payload)
return i.Payload
}

func (i *Item) Headers() map[string][]string {
Expand Down Expand Up @@ -158,39 +157,29 @@ func fromJob(job jobs.Message) *Item {
}
}

func (d *Driver) unpack(id uint64, data []byte, out *Item) error {
func (d *Driver) unpack(id uint64, data []byte, out *Item) {
// try to decode the item
err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(out)
// if not - fill the item with default values (or values we already have)
if err != nil {
if d.consumeAll {
uid := uuid.NewString()
d.log.Debug("get raw payload", zap.String("assigned ID", uid))

if isJSONEncoded(data) != nil {
data, err = json.Marshal(data)
if err != nil {
return err
}
}

*out = Item{
Job: auto,
Ident: uid,
Payload: utils.AsString(data),
headers: make(map[string][]string, 2),
Options: &Options{
Priority: 10,
Pipeline: (*d.pipeline.Load()).Name(),
Queue: d.tName,
id: id,
requeueFn: d.handleItem,
},
}

out.Options.conn.Store(d.pool.connTS.Load())

return nil
d.log.Debug("failed to unpack the item", zap.Error(err))

*out = Item{
Job: auto,
Ident: uuid.NewString(),
Payload: data,
headers: make(map[string][]string, 2),
Options: &Options{
Priority: (*d.pipeline.Load()).Priority(),
Pipeline: (*d.pipeline.Load()).Name(),
Queue: d.tName,
id: id,
requeueFn: d.handleItem,
},
}
return err

out.Options.conn.Store(d.pool.connTS.Load())
return
}

if out.Options.Priority == 0 {
Expand All @@ -201,11 +190,4 @@ func (d *Driver) unpack(id uint64, data []byte, out *Item) error {
out.Options.conn.Store(d.pool.connTS.Load())
out.Options.id = id
out.Options.requeueFn = d.handleItem

return nil
}

func isJSONEncoded(data []byte) error {
var a any
return json.Unmarshal(data, &a)
}
27 changes: 5 additions & 22 deletions beanstalkjobs/listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

"github.com/beanstalkd/go-beanstalk"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/propagation"
"go.uber.org/zap"
)
Expand All @@ -23,7 +22,8 @@ func (d *Driver) listen() {
id, body, err := d.pool.Reserve(d.reserveTimeout)
if err != nil {
// error isn't wrapped
if errB, ok := err.(beanstalk.ConnError); ok { //nolint:errorlint
var errB beanstalk.ConnError
if stderr.As(err, &errB) {
if stderr.Is(errB.Err, beanstalk.ErrTimeout) {
d.log.Info("beanstalk reserve timeout", zap.Error(errB))
continue
Expand All @@ -36,33 +36,16 @@ func (d *Driver) listen() {
}

item := &Item{}
err = d.unpack(id, body, item)
d.unpack(id, body, item)

ctx := otel.GetTextMapPropagator().Extract(context.Background(), propagation.HeaderCarrier(item.headers))
ctx, span := d.tracer.Tracer(tracerName).Start(ctx, "beanstalk_listener")

if err != nil {
span.SetAttributes(attribute.KeyValue{
Key: "error",
Value: attribute.StringValue(err.Error()),
})
d.log.Error("beanstalk unpack item", zap.Error(err))
errDel := d.pool.Delete(context.Background(), id)
if errDel != nil {
d.log.Error("delete item", zap.Error(errDel), zap.Uint64("id", id))
}

span.End()
continue
}

if item.Options.AutoAck {
d.log.Debug("auto_ack option enabled", zap.Uint64("id", id))
errDel := d.pool.Delete(context.Background(), id)
if errDel != nil {
span.SetAttributes(attribute.KeyValue{
Key: "error",
Value: attribute.StringValue(err.Error()),
})
span.RecordError(errDel)
d.log.Error("delete item", zap.Error(errDel), zap.Uint64("id", id))
}
}
Expand Down
6 changes: 2 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ require (
github.com/beanstalkd/go-beanstalk v0.2.0
github.com/cenkalti/backoff/v4 v4.2.1
github.com/goccy/go-json v0.10.2
github.com/google/uuid v1.3.0
github.com/roadrunner-server/api/v4 v4.6.2
github.com/google/uuid v1.3.1
github.com/roadrunner-server/api/v4 v4.7.0
github.com/roadrunner-server/endure/v2 v2.4.2
github.com/roadrunner-server/errors v1.3.0
github.com/roadrunner-server/sdk/v4 v4.3.2
go.opentelemetry.io/contrib/propagators/jaeger v1.17.0
go.opentelemetry.io/otel v1.16.0
go.opentelemetry.io/otel/sdk v1.16.0
Expand All @@ -23,7 +22,6 @@ require (
require (
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/roadrunner-server/tcplisten v1.4.0 // indirect
go.opentelemetry.io/otel/metric v1.16.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/sys v0.11.0 // indirect
Expand Down
12 changes: 4 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,16 @@ github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4=
github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/roadrunner-server/api/v4 v4.6.2 h1:BsbVb7Y0Dsz1/6GMja/yVcOAbozHbYQ3i24DTLh7geI=
github.com/roadrunner-server/api/v4 v4.6.2/go.mod h1:7RPouCwEAZSfWKU5eAf5Bc6snFw97RvfWk1Mz+kaKLQ=
github.com/roadrunner-server/api/v4 v4.7.0 h1:vZ8gYWjEpJOa8slUqPJJKENS4vrFKhvI9+nQKDtSqHU=
github.com/roadrunner-server/api/v4 v4.7.0/go.mod h1:Ut9T2j3E22cnRJtipbU8N3WVhyV040iiDfddzojlKwY=
github.com/roadrunner-server/endure/v2 v2.4.2 h1:aFnPc321l5HDzE2mN5wwfksJ40lgXwfU3RSqdS1LyUQ=
github.com/roadrunner-server/endure/v2 v2.4.2/go.mod h1:vWTvn6NiYxUBDgwAyjv92i/qFemSUs+cTItMZvc5Zsk=
github.com/roadrunner-server/errors v1.3.0 h1:kLVXpXne0jMReN7pj8KIhyYyjqKjsPC5DRGqMsd4/Fo=
github.com/roadrunner-server/errors v1.3.0/go.mod h1:XYVuhXvxi3yQaP/zCLB6QRZ0JvQIRaBa0SKFHL4WLKg=
github.com/roadrunner-server/sdk/v4 v4.3.2 h1:/7ADKRiFQNXDWmkiSKVLs7ngvmitwOGFWiqxOWAfFpM=
github.com/roadrunner-server/sdk/v4 v4.3.2/go.mod h1:UkiAk5IdmUzkXncfy671OoH6i/zWpWc+JY3IU/AnQuc=
github.com/roadrunner-server/tcplisten v1.4.0 h1:yWo09zktv/CSV6VywLfw4pwNcUchgTiIrW4uIICtO5M=
github.com/roadrunner-server/tcplisten v1.4.0/go.mod h1:A6+VSnW2ETGnN/e/CMdP63ZXqQDaC0UDMU6QmyuB0yM=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
go.opentelemetry.io/contrib/propagators/jaeger v1.17.0 h1:Zbpbmwav32Ea5jSotpmkWEl3a6Xvd4tw/3xxGO1i05Y=
Expand Down
2 changes: 1 addition & 1 deletion plugin.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package beanstalk

import (
"github.com/roadrunner-server/api/v4/plugins/v2/jobs"
"github.com/roadrunner-server/api/v4/plugins/v3/jobs"
"github.com/roadrunner-server/beanstalk/v4/beanstalkjobs"
"github.com/roadrunner-server/endure/v2/dep"
"github.com/roadrunner-server/errors"
Expand Down

0 comments on commit ee4fd87

Please sign in to comment.