diff --git a/go.mod b/go.mod index 8ff24fe..9e05b4b 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,6 @@ require ( github.com/roadrunner-server/api/v4 v4.7.0 github.com/roadrunner-server/endure/v2 v2.4.2 github.com/roadrunner-server/errors v1.3.0 - github.com/roadrunner-server/sdk/v4 v4.3.2 github.com/twmb/franz-go v1.14.4 go.opentelemetry.io/contrib/propagators/jaeger v1.17.0 go.opentelemetry.io/otel v1.16.0 @@ -25,7 +24,6 @@ require ( github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/klauspost/compress v1.16.7 // indirect github.com/pierrec/lz4/v4 v4.1.18 // indirect - github.com/roadrunner-server/tcplisten v1.4.0 // indirect github.com/twmb/franz-go/pkg/kmsg v1.6.1 // indirect go.opentelemetry.io/otel/metric v1.16.0 // indirect go.uber.org/multierr v1.11.0 // indirect diff --git a/go.sum b/go.sum index e52c96a..9dd409f 100644 --- a/go.sum +++ b/go.sum @@ -31,10 +31,6 @@ github.com/roadrunner-server/endure/v2 v2.4.2 h1:aFnPc321l5HDzE2mN5wwfksJ40lgXwf github.com/roadrunner-server/endure/v2 v2.4.2/go.mod h1:vWTvn6NiYxUBDgwAyjv92i/qFemSUs+cTItMZvc5Zsk= github.com/roadrunner-server/errors v1.3.0 h1:kLVXpXne0jMReN7pj8KIhyYyjqKjsPC5DRGqMsd4/Fo= github.com/roadrunner-server/errors v1.3.0/go.mod h1:XYVuhXvxi3yQaP/zCLB6QRZ0JvQIRaBa0SKFHL4WLKg= -github.com/roadrunner-server/sdk/v4 v4.3.2 h1:/7ADKRiFQNXDWmkiSKVLs7ngvmitwOGFWiqxOWAfFpM= -github.com/roadrunner-server/sdk/v4 v4.3.2/go.mod h1:UkiAk5IdmUzkXncfy671OoH6i/zWpWc+JY3IU/AnQuc= -github.com/roadrunner-server/tcplisten v1.4.0 h1:yWo09zktv/CSV6VywLfw4pwNcUchgTiIrW4uIICtO5M= -github.com/roadrunner-server/tcplisten v1.4.0/go.mod h1:A6+VSnW2ETGnN/e/CMdP63ZXqQDaC0UDMU6QmyuB0yM= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= diff --git a/kafkajobs/driver.go b/kafkajobs/driver.go index 0318c6b..d83a963 100644 --- a/kafkajobs/driver.go +++ b/kafkajobs/driver.go @@ -7,11 +7,11 @@ import ( "sync" "sync/atomic" "time" + "unsafe" "github.com/goccy/go-json" "github.com/roadrunner-server/api/v4/plugins/v3/jobs" "github.com/roadrunner-server/errors" - "github.com/roadrunner-server/sdk/v4/utils" "github.com/twmb/franz-go/pkg/kgo" jprop "go.opentelemetry.io/contrib/propagators/jaeger" "go.opentelemetry.io/otel" @@ -106,7 +106,7 @@ func FromConfig(tracer *sdktrace.TracerProvider, configKey string, log *zap.Logg recordsCh: make(chan *kgo.Record, 100), requeueCh: make(chan *Item, 10), commandsCh: cmder, - delayed: utils.Int64(0), + delayed: toPtr(int64(0)), cfg: &conf, } @@ -212,7 +212,7 @@ func FromPipeline(tracer *sdktrace.TracerProvider, pipeline jobs.Pipeline, log * recordsCh: make(chan *kgo.Record, 100), requeueCh: make(chan *Item, 10), commandsCh: cmder, - delayed: utils.Int64(0), + delayed: toPtr(int64(0)), cfg: &conf, } @@ -414,7 +414,7 @@ func (d *Driver) handleItem(ctx context.Context, msg *Item) error { if len(v) > 0 { kh = append(kh, kgo.RecordHeader{ Key: k, - Value: utils.AsBytes(v[0]), + Value: strToBytes(v[0]), }) } } @@ -431,12 +431,12 @@ func (d *Driver) handleItem(ctx context.Context, msg *Item) error { // RRJob kh = append(kh, kgo.RecordHeader{ Key: jobs.RRJob, - Value: utils.AsBytes(msg.Job), + Value: strToBytes(msg.Job), }) // RRPipeline kh = append(kh, kgo.RecordHeader{ Key: jobs.RRPipeline, - Value: utils.AsBytes(msg.Options.Pipeline), + Value: strToBytes(msg.Options.Pipeline), }) // RRPriority rrpri := make([]byte, 8) @@ -507,3 +507,15 @@ func (d *Driver) ping(client *kgo.Client, pipe jobs.Pipeline) error { return nil } + +func strToBytes(data string) []byte { + if data == "" { + return nil + } + + return unsafe.Slice(unsafe.StringData(data), len(data)) +} + +func toPtr[T any](v T) *T { + return &v +}