Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
Signed-off-by: Valery Piashchynski <[email protected]>
  • Loading branch information
rustatian committed Jan 15, 2022
1 parent 1f1c9b1 commit 8e253ac
Show file tree
Hide file tree
Showing 10 changed files with 1,485 additions and 2 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@

# Dependency directories (remove the comment below to include it)
# vendor/
.idea
66 changes: 64 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,64 @@
# beanstalk
Beanstalk plugin for the RRv2

### Beanstalk Driver

Beanstalk is a simple and fast general purpose work queue. To install Beanstalk,
you can use the [local queue server](https://github.com/beanstalkd/beanstalkd)
or run the server inside [AWS Elastic](https://aws.amazon.com/elasticbeanstalk/).
You can choose any option that is convenient for you.

Setting up the server is similar to setting up AMQP and requires specifying the
connection in the `"beanstalk"` section of your RoadRunner configuration file.

```yaml
beanstalk:
addr: tcp://127.0.0.1:11300
```
After setting up the connection, you can start using it. Let's take a look at
the complete config with all the options for this driver:
```yaml
beanstalk:
# Optional section.
# Default: tcp://127.0.0.1:11300
addr: tcp://127.0.0.1:11300

# Optional section.
# Default: 30s
timeout: 10s

jobs:
pipelines:
# User defined name of the queue.
example:
# Required section.
# Should be "beanstalk" for the Beanstalk driver.
driver: beanstalk

# Optional section.
# Default: 10
priority: 10

# Optional section.
# Default: 1
tube_priority: 1

# Optional section.
# Default: default
tube: default

# Optional section.
# Default: 5s
reserve_timeout: 5s
```
These are all settings that are available to you for configuring this type of
driver. Let's take a look at what they are responsible for:
- `priority` - Similar to the same option in other drivers. This is queue
default priority for for each task pushed into this queue if the priority
value for these tasks was not explicitly set.

- `tube_priority` - The value for specifying the priority within Beanstalk is
the internal priority of the server. The value should not exceed `int32` size.

- `tube` - The name of the inner "tube" specific to the Beanstalk driver.
51 changes: 51 additions & 0 deletions beanstalkjobs/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package beanstalkjobs

import (
"time"

"github.com/roadrunner-server/sdk/v2/utils"
)

const (
tubePriority string = "tube_priority"
tube string = "tube"
reserveTimeout string = "reserve_timeout"
)

type config struct {
// global
Addr string `mapstructure:"addr"`
Timeout time.Duration `mapstructure:"timeout"`

// local
PipePriority int64 `mapstructure:"priority"`
TubePriority *uint32 `mapstructure:"tube_priority"`
Tube string `mapstructure:"tube"`
ReserveTimeout time.Duration `mapstructure:"reserve_timeout"`
}

func (c *config) InitDefault() {
if c.Tube == "" {
c.Tube = "default"
}

if c.ReserveTimeout == 0 {
c.ReserveTimeout = time.Second * 1
}

if c.TubePriority == nil {
c.TubePriority = utils.Uint32(0)
}

if c.PipePriority == 0 {
c.PipePriority = 10
}

if c.Addr == "" {
c.Addr = "tcp://127.0.0.1:11300"
}

if c.Timeout == 0 {
c.Timeout = time.Second * 30
}
}
230 changes: 230 additions & 0 deletions beanstalkjobs/connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
package beanstalkjobs

import (
"context"
"net"
"sync"
"time"

"github.com/beanstalkd/go-beanstalk"
"github.com/cenkalti/backoff/v4"
"github.com/roadrunner-server/errors"
"go.uber.org/zap"
)

type ConnPool struct {
sync.RWMutex

log *zap.Logger

conn *beanstalk.Conn
connT *beanstalk.Conn
ts *beanstalk.TubeSet
t *beanstalk.Tube

network string
address string
tName string
tout time.Duration
}

func NewConnPool(network, address, tName string, tout time.Duration, log *zap.Logger) (*ConnPool, error) {
connT, err := beanstalk.DialTimeout(network, address, tout)
if err != nil {
return nil, err
}

connTS, err := beanstalk.DialTimeout(network, address, tout)
if err != nil {
return nil, err
}

tb := beanstalk.NewTube(connT, tName)
ts := beanstalk.NewTubeSet(connTS, tName)

return &ConnPool{
log: log,
network: network,
address: address,
tName: tName,
tout: tout,
conn: connTS,
connT: connT,
ts: ts,
t: tb,
}, nil
}

// Put the payload
// TODO use the context ??
func (cp *ConnPool) Put(_ context.Context, body []byte, pri uint32, delay, ttr time.Duration) (uint64, error) {
cp.RLock()
defer cp.RUnlock()

// TODO(rustatian): redial based on the token
id, err := cp.t.Put(body, pri, delay, ttr)
if err != nil {
// errN contains both, err and internal checkAndRedial error
errN := cp.checkAndRedial(err)
if errN != nil {
return 0, errors.Errorf("err: %s\nerr redial: %s", err, errN)
} else {
// retry put only when we redialed
return cp.t.Put(body, pri, delay, ttr)
}
}

return id, nil
}

// Reserve reserves and returns a job from one of the tubes in t. If no
// job is available before time timeout has passed, Reserve returns a
// ConnError recording ErrTimeout.
//
// Typically, a client will reserve a job, perform some work, then delete
// the job with Conn.Delete.
func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error) {
cp.RLock()
defer cp.RUnlock()

id, body, err := cp.ts.Reserve(reserveTimeout)
if err != nil {
// errN contains both, err and internal checkAndRedial error
errN := cp.checkAndRedial(err)
if errN != nil {
return 0, nil, errors.Errorf("err: %s\nerr redial: %s", err, errN)
} else {
// retry Reserve only when we redialed
return cp.ts.Reserve(reserveTimeout)
}
}

return id, body, nil
}

func (cp *ConnPool) Delete(_ context.Context, id uint64) error {
cp.RLock()
defer cp.RUnlock()

err := cp.conn.Delete(id)
if err != nil {
// errN contains both, err and internal checkAndRedial error
errN := cp.checkAndRedial(err)
if errN != nil {
return errors.Errorf("err: %s\nerr redial: %s", err, errN)
} else {
// retry Delete only when we redialed
return cp.conn.Delete(id)
}
}
return nil
}

func (cp *ConnPool) Stats(_ context.Context) (map[string]string, error) {
cp.RLock()
defer cp.RUnlock()

stat, err := cp.conn.Stats()
if err != nil {
errR := cp.checkAndRedial(err)
if errR != nil {
return nil, errors.Errorf("err: %s\nerr redial: %s", err, errR)
} else {
return cp.conn.Stats()
}
}

return stat, nil
}

// Stop and close the connections
func (cp *ConnPool) Stop() {
cp.Lock()
defer cp.Unlock()
_ = cp.conn.Close()
_ = cp.connT.Close()
}

func (cp *ConnPool) redial() error {
const op = errors.Op("connection_pool_redial")

cp.Lock()
// backoff here
expb := backoff.NewExponentialBackOff()
// TODO(rustatian) set via config
expb.MaxElapsedTime = time.Minute

operation := func() error {
connT, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout)
if err != nil {
return err
}
if connT == nil {
return errors.E(op, errors.Str("connectionT is nil"))
}

connTS, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout)
if err != nil {
return err
}

if connTS == nil {
return errors.E(op, errors.Str("connectionTS is nil"))
}

cp.t = beanstalk.NewTube(connT, cp.tName)
cp.ts = beanstalk.NewTubeSet(connTS, cp.tName)
cp.conn = connTS
cp.connT = connT

cp.log.Debug("beanstalk redial was successful")
return nil
}

retryErr := backoff.Retry(operation, expb)
if retryErr != nil {
cp.Unlock()
return retryErr
}
cp.Unlock()

return nil
}

func (cp *ConnPool) checkAndRedial(err error) error {
const op = errors.Op("connection_pool_check_redial")
const EOF string = "EOF"
switch et := err.(type) { //nolint:gocritic
// check if the error
case beanstalk.ConnError:
switch bErr := et.Err.(type) {
case *net.OpError:
cp.RUnlock()
errR := cp.redial()
cp.RLock()
// if redial failed - return
if errR != nil {
return errors.E(op, errors.Errorf("%v:%v", bErr, errR))
}

// if redial was successful -> continue listening
return nil
default:
if et.Err.Error() == EOF {
// if error is related to the broken connection - redial
cp.RUnlock()
errR := cp.redial()
cp.RLock()
// if redial failed - return
if errR != nil {
return errors.E(op, errors.Errorf("%v:%v", err, errR))
}
// if redial was successful -> continue listening
return nil
}
}
}

// return initial error
return err
}
Loading

0 comments on commit 8e253ac

Please sign in to comment.