Skip to content

Commit

Permalink
chore: remove SDK usage
Browse files Browse the repository at this point in the history
Signed-off-by: Valery Piashchynski <[email protected]>
  • Loading branch information
rustatian committed Aug 26, 2023
1 parent 149cdbf commit 5415c17
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 12 deletions.
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ require (
github.com/roadrunner-server/api/v4 v4.7.0
github.com/roadrunner-server/endure/v2 v2.4.2
github.com/roadrunner-server/errors v1.3.0
github.com/roadrunner-server/sdk/v4 v4.3.2
github.com/twmb/franz-go v1.14.4
go.opentelemetry.io/contrib/propagators/jaeger v1.17.0
go.opentelemetry.io/otel v1.16.0
Expand All @@ -25,7 +24,6 @@ require (
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/roadrunner-server/tcplisten v1.4.0 // indirect
github.com/twmb/franz-go/pkg/kmsg v1.6.1 // indirect
go.opentelemetry.io/otel/metric v1.16.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
Expand Down
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ github.com/roadrunner-server/endure/v2 v2.4.2 h1:aFnPc321l5HDzE2mN5wwfksJ40lgXwf
github.com/roadrunner-server/endure/v2 v2.4.2/go.mod h1:vWTvn6NiYxUBDgwAyjv92i/qFemSUs+cTItMZvc5Zsk=
github.com/roadrunner-server/errors v1.3.0 h1:kLVXpXne0jMReN7pj8KIhyYyjqKjsPC5DRGqMsd4/Fo=
github.com/roadrunner-server/errors v1.3.0/go.mod h1:XYVuhXvxi3yQaP/zCLB6QRZ0JvQIRaBa0SKFHL4WLKg=
github.com/roadrunner-server/sdk/v4 v4.3.2 h1:/7ADKRiFQNXDWmkiSKVLs7ngvmitwOGFWiqxOWAfFpM=
github.com/roadrunner-server/sdk/v4 v4.3.2/go.mod h1:UkiAk5IdmUzkXncfy671OoH6i/zWpWc+JY3IU/AnQuc=
github.com/roadrunner-server/tcplisten v1.4.0 h1:yWo09zktv/CSV6VywLfw4pwNcUchgTiIrW4uIICtO5M=
github.com/roadrunner-server/tcplisten v1.4.0/go.mod h1:A6+VSnW2ETGnN/e/CMdP63ZXqQDaC0UDMU6QmyuB0yM=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
Expand Down
24 changes: 18 additions & 6 deletions kafkajobs/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import (
"sync"
"sync/atomic"
"time"
"unsafe"

"github.com/goccy/go-json"
"github.com/roadrunner-server/api/v4/plugins/v3/jobs"
"github.com/roadrunner-server/errors"
"github.com/roadrunner-server/sdk/v4/utils"
"github.com/twmb/franz-go/pkg/kgo"
jprop "go.opentelemetry.io/contrib/propagators/jaeger"
"go.opentelemetry.io/otel"
Expand Down Expand Up @@ -106,7 +106,7 @@ func FromConfig(tracer *sdktrace.TracerProvider, configKey string, log *zap.Logg
recordsCh: make(chan *kgo.Record, 100),
requeueCh: make(chan *Item, 10),
commandsCh: cmder,
delayed: utils.Int64(0),
delayed: toPtr(int64(0)),
cfg: &conf,
}

Expand Down Expand Up @@ -212,7 +212,7 @@ func FromPipeline(tracer *sdktrace.TracerProvider, pipeline jobs.Pipeline, log *
recordsCh: make(chan *kgo.Record, 100),
requeueCh: make(chan *Item, 10),
commandsCh: cmder,
delayed: utils.Int64(0),
delayed: toPtr(int64(0)),
cfg: &conf,
}

Expand Down Expand Up @@ -414,7 +414,7 @@ func (d *Driver) handleItem(ctx context.Context, msg *Item) error {
if len(v) > 0 {
kh = append(kh, kgo.RecordHeader{
Key: k,
Value: utils.AsBytes(v[0]),
Value: strToBytes(v[0]),
})
}
}
Expand All @@ -431,12 +431,12 @@ func (d *Driver) handleItem(ctx context.Context, msg *Item) error {
// RRJob
kh = append(kh, kgo.RecordHeader{
Key: jobs.RRJob,
Value: utils.AsBytes(msg.Job),
Value: strToBytes(msg.Job),
})
// RRPipeline
kh = append(kh, kgo.RecordHeader{
Key: jobs.RRPipeline,
Value: utils.AsBytes(msg.Options.Pipeline),
Value: strToBytes(msg.Options.Pipeline),
})
// RRPriority
rrpri := make([]byte, 8)
Expand Down Expand Up @@ -507,3 +507,15 @@ func (d *Driver) ping(client *kgo.Client, pipe jobs.Pipeline) error {

return nil
}

func strToBytes(data string) []byte {
if data == "" {
return nil
}

return unsafe.Slice(unsafe.StringData(data), len(data))
}

func toPtr[T any](v T) *T {
return &v
}

0 comments on commit 5415c17

Please sign in to comment.