From ee4fd87a91e8d9e6e5561daf274df0d4f5f5008f Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Sat, 26 Aug 2023 03:10:19 +0200 Subject: [PATCH] feature: deprecate consume_all Signed-off-by: Valery Piashchynski --- beanstalkjobs/config.go | 14 ++++---- beanstalkjobs/connection.go | 45 +++++++++++++++----------- beanstalkjobs/driver.go | 16 ++++------ beanstalkjobs/item.go | 64 +++++++++++++------------------------ beanstalkjobs/listen.go | 27 +++------------- go.mod | 6 ++-- go.sum | 12 +++---- plugin.go | 2 +- 8 files changed, 74 insertions(+), 112 deletions(-) diff --git a/beanstalkjobs/config.go b/beanstalkjobs/config.go index b14d174..fdab5cb 100644 --- a/beanstalkjobs/config.go +++ b/beanstalkjobs/config.go @@ -2,8 +2,6 @@ package beanstalkjobs import ( "time" - - "github.com/roadrunner-server/sdk/v4/utils" ) const ( @@ -15,10 +13,8 @@ const ( type config struct { // global - Addr string `mapstructure:"addr"` - Timeout time.Duration `mapstructure:"timeout"` - ConsumeAll bool `mapstructure:"consume_all"` - + Addr string `mapstructure:"addr"` + Timeout time.Duration `mapstructure:"timeout"` // local PipePriority int64 `mapstructure:"priority"` TubePriority *uint32 `mapstructure:"tube_priority"` @@ -36,7 +32,7 @@ func (c *config) InitDefault() { } if c.TubePriority == nil { - c.TubePriority = utils.Uint32(0) + c.TubePriority = toPtr(uint32(0)) } if c.PipePriority == 0 { @@ -51,3 +47,7 @@ func (c *config) InitDefault() { c.Timeout = time.Second * 30 } } + +func toPtr[T any](v T) *T { + return &v +} diff --git a/beanstalkjobs/connection.go b/beanstalkjobs/connection.go index 17dc1ea..e93a89f 100644 --- a/beanstalkjobs/connection.go +++ b/beanstalkjobs/connection.go @@ -2,6 +2,7 @@ package beanstalkjobs import ( "context" + stderr "errors" "net" "sync" "sync/atomic" @@ -201,38 +202,44 @@ func (cp *ConnPool) redial() error { func (cp *ConnPool) checkAndRedial(err error) error { const op = errors.Op("connection_pool_check_redial") const EOF string = "EOF" - switch et := err.(type) { //nolint:gocritic,errorlint - // check if the error - case beanstalk.ConnError: - // error is not wrapped - switch bErr := et.Err.(type) { //nolint:errorlint - case *net.OpError: + var et beanstalk.ConnError + var bErr *net.OpError + + if stderr.As(err, &et) { + if stderr.As(et.Err, &bErr) { + cp.log.Debug("beanstalk connection error, redialing", zap.Error(et)) cp.RUnlock() errR := cp.redial() cp.RLock() // if redial failed - return if errR != nil { + cp.log.Error("beanstalk redial failed", zap.Error(errR)) return errors.E(op, errors.Errorf("%v:%v", bErr, errR)) } + cp.log.Debug("beanstalk redial was successful") // if redial was successful -> continue listening return nil - default: - if et.Err.Error() == EOF { - // if error is related to the broken connection - redial - cp.RUnlock() - errR := cp.redial() - cp.RLock() - // if redial failed - return - if errR != nil { - return errors.E(op, errors.Errorf("%v:%v", err, errR)) - } - // if redial was successful -> continue listening - return nil + } + + if et.Err.Error() == EOF { + cp.log.Debug("beanstalk connection error, redialing", zap.Error(et.Err)) + // if error is related to the broken connection - redial + cp.RUnlock() + errR := cp.redial() + cp.RLock() + // if redial failed - return + if errR != nil { + cp.log.Error("beanstalk redial failed", zap.Error(errR)) + return errors.E(op, errors.Errorf("%v:%v", err, errR)) } + + cp.log.Debug("beanstalk redial was successful") + // if redial was successful -> continue listening + return nil } } - // return initial error + cp.log.Error("beanstalk connection error, unknown type of error", zap.Error(err)) return err } diff --git a/beanstalkjobs/driver.go b/beanstalkjobs/driver.go index 1f0cbf3..b95a0ca 100644 --- a/beanstalkjobs/driver.go +++ b/beanstalkjobs/driver.go @@ -9,9 +9,8 @@ import ( "sync/atomic" "time" - "github.com/roadrunner-server/api/v4/plugins/v2/jobs" + "github.com/roadrunner-server/api/v4/plugins/v3/jobs" "github.com/roadrunner-server/errors" - "github.com/roadrunner-server/sdk/v4/utils" jprop "go.opentelemetry.io/contrib/propagators/jaeger" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/propagation" @@ -35,11 +34,10 @@ type Configurer interface { } type Driver struct { - log *zap.Logger - pq jobs.Queue - consumeAll bool - tracer *sdktrace.TracerProvider - prop propagation.TextMapPropagator + log *zap.Logger + pq jobs.Queue + tracer *sdktrace.TracerProvider + prop propagation.TextMapPropagator pipeline atomic.Pointer[jobs.Pipeline] listeners uint32 @@ -113,7 +111,6 @@ func FromConfig(tracer *sdktrace.TracerProvider, configKey string, log *zap.Logg pool: cPool, network: dsn[0], addr: dsn[1], - consumeAll: conf.ConsumeAll, tout: conf.Timeout, tName: conf.Tube, reserveTimeout: conf.ReserveTimeout, @@ -175,10 +172,9 @@ func FromPipeline(tracer *sdktrace.TracerProvider, pipe jobs.Pipeline, log *zap. network: dsn[0], addr: dsn[1], tout: conf.Timeout, - consumeAll: pipe.Bool(consumeAll, false), tName: pipe.String(tube, "default"), reserveTimeout: time.Second * time.Duration(pipe.Int(reserveTimeout, 5)), - tubePriority: utils.Uint32(uint32(pipe.Int(tubePriority, 1))), + tubePriority: toPtr(uint32(pipe.Int(tubePriority, 1))), priority: pipe.Priority(), // buffered with two because jobs root plugin can call Stop at the same time as Pause diff --git a/beanstalkjobs/item.go b/beanstalkjobs/item.go index 616e0d1..b826ffc 100644 --- a/beanstalkjobs/item.go +++ b/beanstalkjobs/item.go @@ -10,8 +10,7 @@ import ( "github.com/beanstalkd/go-beanstalk" "github.com/goccy/go-json" "github.com/google/uuid" - "github.com/roadrunner-server/api/v4/plugins/v2/jobs" - "github.com/roadrunner-server/sdk/v4/utils" + "github.com/roadrunner-server/api/v4/plugins/v3/jobs" "go.uber.org/zap" ) @@ -25,7 +24,7 @@ type Item struct { // Ident is unique identifier of the job, should be provided from outside Ident string `json:"id"` // Payload is string data (usually JSON) passed to Job broker. - Payload string `json:"payload"` + Payload []byte `json:"payload"` // Headers with key-values pairs headers map[string][]string // Options contains set of PipelineOptions specific to job execution. Can be empty. @@ -71,7 +70,7 @@ func (i *Item) GroupID() string { // Body packs job payload into binary payload. func (i *Item) Body() []byte { - return utils.AsBytes(i.Payload) + return i.Payload } func (i *Item) Headers() map[string][]string { @@ -158,39 +157,29 @@ func fromJob(job jobs.Message) *Item { } } -func (d *Driver) unpack(id uint64, data []byte, out *Item) error { +func (d *Driver) unpack(id uint64, data []byte, out *Item) { + // try to decode the item err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(out) + // if not - fill the item with default values (or values we already have) if err != nil { - if d.consumeAll { - uid := uuid.NewString() - d.log.Debug("get raw payload", zap.String("assigned ID", uid)) - - if isJSONEncoded(data) != nil { - data, err = json.Marshal(data) - if err != nil { - return err - } - } - - *out = Item{ - Job: auto, - Ident: uid, - Payload: utils.AsString(data), - headers: make(map[string][]string, 2), - Options: &Options{ - Priority: 10, - Pipeline: (*d.pipeline.Load()).Name(), - Queue: d.tName, - id: id, - requeueFn: d.handleItem, - }, - } - - out.Options.conn.Store(d.pool.connTS.Load()) - - return nil + d.log.Debug("failed to unpack the item", zap.Error(err)) + + *out = Item{ + Job: auto, + Ident: uuid.NewString(), + Payload: data, + headers: make(map[string][]string, 2), + Options: &Options{ + Priority: (*d.pipeline.Load()).Priority(), + Pipeline: (*d.pipeline.Load()).Name(), + Queue: d.tName, + id: id, + requeueFn: d.handleItem, + }, } - return err + + out.Options.conn.Store(d.pool.connTS.Load()) + return } if out.Options.Priority == 0 { @@ -201,11 +190,4 @@ func (d *Driver) unpack(id uint64, data []byte, out *Item) error { out.Options.conn.Store(d.pool.connTS.Load()) out.Options.id = id out.Options.requeueFn = d.handleItem - - return nil -} - -func isJSONEncoded(data []byte) error { - var a any - return json.Unmarshal(data, &a) } diff --git a/beanstalkjobs/listen.go b/beanstalkjobs/listen.go index 9ea5ab0..d0810ba 100644 --- a/beanstalkjobs/listen.go +++ b/beanstalkjobs/listen.go @@ -6,7 +6,6 @@ import ( "github.com/beanstalkd/go-beanstalk" "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/propagation" "go.uber.org/zap" ) @@ -23,7 +22,8 @@ func (d *Driver) listen() { id, body, err := d.pool.Reserve(d.reserveTimeout) if err != nil { // error isn't wrapped - if errB, ok := err.(beanstalk.ConnError); ok { //nolint:errorlint + var errB beanstalk.ConnError + if stderr.As(err, &errB) { if stderr.Is(errB.Err, beanstalk.ErrTimeout) { d.log.Info("beanstalk reserve timeout", zap.Error(errB)) continue @@ -36,33 +36,16 @@ func (d *Driver) listen() { } item := &Item{} - err = d.unpack(id, body, item) + d.unpack(id, body, item) + ctx := otel.GetTextMapPropagator().Extract(context.Background(), propagation.HeaderCarrier(item.headers)) ctx, span := d.tracer.Tracer(tracerName).Start(ctx, "beanstalk_listener") - if err != nil { - span.SetAttributes(attribute.KeyValue{ - Key: "error", - Value: attribute.StringValue(err.Error()), - }) - d.log.Error("beanstalk unpack item", zap.Error(err)) - errDel := d.pool.Delete(context.Background(), id) - if errDel != nil { - d.log.Error("delete item", zap.Error(errDel), zap.Uint64("id", id)) - } - - span.End() - continue - } - if item.Options.AutoAck { d.log.Debug("auto_ack option enabled", zap.Uint64("id", id)) errDel := d.pool.Delete(context.Background(), id) if errDel != nil { - span.SetAttributes(attribute.KeyValue{ - Key: "error", - Value: attribute.StringValue(err.Error()), - }) + span.RecordError(errDel) d.log.Error("delete item", zap.Error(errDel), zap.Uint64("id", id)) } } diff --git a/go.mod b/go.mod index 5f68f4f..9e35ebe 100644 --- a/go.mod +++ b/go.mod @@ -8,11 +8,10 @@ require ( github.com/beanstalkd/go-beanstalk v0.2.0 github.com/cenkalti/backoff/v4 v4.2.1 github.com/goccy/go-json v0.10.2 - github.com/google/uuid v1.3.0 - github.com/roadrunner-server/api/v4 v4.6.2 + github.com/google/uuid v1.3.1 + github.com/roadrunner-server/api/v4 v4.7.0 github.com/roadrunner-server/endure/v2 v2.4.2 github.com/roadrunner-server/errors v1.3.0 - github.com/roadrunner-server/sdk/v4 v4.3.2 go.opentelemetry.io/contrib/propagators/jaeger v1.17.0 go.opentelemetry.io/otel v1.16.0 go.opentelemetry.io/otel/sdk v1.16.0 @@ -23,7 +22,6 @@ require ( require ( github.com/go-logr/logr v1.2.4 // indirect github.com/go-logr/stdr v1.2.2 // indirect - github.com/roadrunner-server/tcplisten v1.4.0 // indirect go.opentelemetry.io/otel/metric v1.16.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/sys v0.11.0 // indirect diff --git a/go.sum b/go.sum index 38e7159..fac67b5 100644 --- a/go.sum +++ b/go.sum @@ -15,20 +15,16 @@ github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= -github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= +github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/roadrunner-server/api/v4 v4.6.2 h1:BsbVb7Y0Dsz1/6GMja/yVcOAbozHbYQ3i24DTLh7geI= -github.com/roadrunner-server/api/v4 v4.6.2/go.mod h1:7RPouCwEAZSfWKU5eAf5Bc6snFw97RvfWk1Mz+kaKLQ= +github.com/roadrunner-server/api/v4 v4.7.0 h1:vZ8gYWjEpJOa8slUqPJJKENS4vrFKhvI9+nQKDtSqHU= +github.com/roadrunner-server/api/v4 v4.7.0/go.mod h1:Ut9T2j3E22cnRJtipbU8N3WVhyV040iiDfddzojlKwY= github.com/roadrunner-server/endure/v2 v2.4.2 h1:aFnPc321l5HDzE2mN5wwfksJ40lgXwfU3RSqdS1LyUQ= github.com/roadrunner-server/endure/v2 v2.4.2/go.mod h1:vWTvn6NiYxUBDgwAyjv92i/qFemSUs+cTItMZvc5Zsk= github.com/roadrunner-server/errors v1.3.0 h1:kLVXpXne0jMReN7pj8KIhyYyjqKjsPC5DRGqMsd4/Fo= github.com/roadrunner-server/errors v1.3.0/go.mod h1:XYVuhXvxi3yQaP/zCLB6QRZ0JvQIRaBa0SKFHL4WLKg= -github.com/roadrunner-server/sdk/v4 v4.3.2 h1:/7ADKRiFQNXDWmkiSKVLs7ngvmitwOGFWiqxOWAfFpM= -github.com/roadrunner-server/sdk/v4 v4.3.2/go.mod h1:UkiAk5IdmUzkXncfy671OoH6i/zWpWc+JY3IU/AnQuc= -github.com/roadrunner-server/tcplisten v1.4.0 h1:yWo09zktv/CSV6VywLfw4pwNcUchgTiIrW4uIICtO5M= -github.com/roadrunner-server/tcplisten v1.4.0/go.mod h1:A6+VSnW2ETGnN/e/CMdP63ZXqQDaC0UDMU6QmyuB0yM= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= go.opentelemetry.io/contrib/propagators/jaeger v1.17.0 h1:Zbpbmwav32Ea5jSotpmkWEl3a6Xvd4tw/3xxGO1i05Y= diff --git a/plugin.go b/plugin.go index 7998c00..b5ad7c5 100644 --- a/plugin.go +++ b/plugin.go @@ -1,7 +1,7 @@ package beanstalk import ( - "github.com/roadrunner-server/api/v4/plugins/v2/jobs" + "github.com/roadrunner-server/api/v4/plugins/v3/jobs" "github.com/roadrunner-server/beanstalk/v4/beanstalkjobs" "github.com/roadrunner-server/endure/v2/dep" "github.com/roadrunner-server/errors"