diff --git a/clients/cmd/fluentd/Dockerfile b/clients/cmd/fluentd/Dockerfile index 77470966c10d7..b9bc8f4740fcc 100644 --- a/clients/cmd/fluentd/Dockerfile +++ b/clients/cmd/fluentd/Dockerfile @@ -9,7 +9,7 @@ COPY . /src/loki WORKDIR /src/loki RUN make BUILD_IN_CONTAINER=false fluentd-plugin -FROM fluent/fluentd:v1.17-debian-1 +FROM fluent/fluentd:v1.18-debian-1 ENV LOKI_URL="https://logs-prod-us-central1.grafana.net" COPY --from=build /src/loki/clients/cmd/fluentd/lib/fluent/plugin/out_loki.rb /fluentd/plugins/out_loki.rb diff --git a/clients/cmd/fluentd/docker/Gemfile b/clients/cmd/fluentd/docker/Gemfile index d3033ffc3350d..428c18b840894 100644 --- a/clients/cmd/fluentd/docker/Gemfile +++ b/clients/cmd/fluentd/docker/Gemfile @@ -2,5 +2,5 @@ source 'https://rubygems.org' -gem 'fluentd', '1.17.1' +gem 'fluentd', '1.18.0' gem 'fluent-plugin-multi-format-parser', '~>1.1.0' diff --git a/docs/sources/send-data/_index.md b/docs/sources/send-data/_index.md index 411f0f4367042..58c8a0f2ce0d5 100644 --- a/docs/sources/send-data/_index.md +++ b/docs/sources/send-data/_index.md @@ -56,6 +56,7 @@ By adding our output plugin you can quickly try Loki without doing big configura These third-party clients also enable sending logs to Loki: - [Cribl Loki Destination](https://docs.cribl.io/stream/destinations-loki) +- [GrafanaLokiLogger](https://github.com/antoniojmsjr/GrafanaLokiLogger) (Delphi/Lazarus) - [ilogtail](https://github.com/alibaba/ilogtail) (Go) - [Log4j2 appender for Loki](https://github.com/tkowalcz/tjahzi) (Java) - [loki-logback-appender](https://github.com/loki4j/loki-logback-appender) (Java) diff --git a/docs/sources/send-data/otel/_index.md b/docs/sources/send-data/otel/_index.md index ce0b1de64483e..88d099a4c3165 100644 --- a/docs/sources/send-data/otel/_index.md +++ b/docs/sources/send-data/otel/_index.md @@ -18,7 +18,7 @@ For ingesting logs to Loki using the OpenTelemetry Collector, you must use the [ When logs are ingested by Loki using an OpenTelemetry protocol (OTLP) ingestion endpoint, some of the data is stored as [Structured Metadata]({{< relref "../../get-started/labels/structured-metadata" >}}). -You must set `allow_structured_metadata` to `true` within your Loki config file. Otherwise, Loki will reject the log payload as malformed. +You must set `allow_structured_metadata` to `true` within your Loki config file. Otherwise, Loki will reject the log payload as malformed. Note that Structured Metadata is enabled by default in Loki 3.0 and later. ```yaml limits_config: diff --git a/docs/sources/send-data/promtail/stages/metrics.md b/docs/sources/send-data/promtail/stages/metrics.md index b034bd6d6d6a1..ea1c7b78150c5 100644 --- a/docs/sources/send-data/promtail/stages/metrics.md +++ b/docs/sources/send-data/promtail/stages/metrics.md @@ -51,8 +51,8 @@ type: Counter [max_idle_duration: ] config: - # If present and true all log lines will be counted without - # attempting to match the source to the extract map. + # If present and true all log lines will be counted without attempting + # to match the `value` to the field specified by `source` in the extracted map. # It is an error to specify `match_all: true` and also specify a `value` [match_all: ] @@ -231,7 +231,7 @@ This pipeline first tries to find text in the format `order_status=` in the log line, pulling out the `` into the extracted map with the key `order_status`. -The metric stages creates `successful_orders_total` and `failed_orders_total` +The metrics stage creates `successful_orders_total` and `failed_orders_total` metrics that only increment when the value of `order_status` in the extracted map is `success` or `fail` respectively. @@ -265,7 +265,7 @@ number in the `retries` field from the extracted map. - metrics: http_response_time_seconds: type: Histogram - description: "length of each log line" + description: "distribution of log response time" source: response_time config: buckets: [0.001,0.0025,0.005,0.010,0.025,0.050] diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 6c8ed01c5c0c7..18c4cdceb649e 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -188,6 +188,26 @@ block_builder: # CLI flag: -blockbuilder.backoff..backoff-retries [max_retries: | default = 10] +block_scheduler: + # Consumer group used by block scheduler to track the last consumed offset. + # CLI flag: -block-scheduler.consumer-group + [consumer_group: | default = "block-scheduler"] + + # How often the scheduler should plan jobs. + # CLI flag: -block-scheduler.interval + [interval: | default = 5m] + + # Period used by the planner to calculate the start and end offset such that + # each job consumes records spanning the target period. + # CLI flag: -block-scheduler.target-records-spanning-period + [target_records_spanning_period: | default = 1h] + + # Lookback period in milliseconds used by the scheduler to plan jobs when the + # consumer group has no commits. -1 consumes from the latest offset. -2 + # consumes from the start of the partition. + # CLI flag: -block-scheduler.lookback-period + [lookback_period: | default = -2] + pattern_ingester: # Whether the pattern ingester is enabled. # CLI flag: -pattern-ingester.enabled diff --git a/go.mod b/go.mod index 5174a10e73330..0db5700d14b03 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( github.com/alicebob/miniredis/v2 v2.33.0 github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible github.com/aws/aws-sdk-go v1.55.5 - github.com/baidubce/bce-sdk-go v0.9.202 + github.com/baidubce/bce-sdk-go v0.9.203 github.com/bmatcuk/doublestar/v4 v4.7.1 github.com/c2h5oh/datasize v0.0.0-20231215233829-aa82cc1e6500 github.com/cespare/xxhash/v2 v2.3.0 @@ -76,7 +76,7 @@ require ( github.com/oklog/run v1.1.0 github.com/oklog/ulid v1.3.1 // indirect github.com/opentracing-contrib/go-grpc v0.1.0 - github.com/opentracing-contrib/go-stdlib v1.0.0 + github.com/opentracing-contrib/go-stdlib v1.1.0 github.com/opentracing/opentracing-go v1.2.1-0.20220228012449-10b1cf09e00b github.com/oschwald/geoip2-golang v1.11.0 // github.com/pierrec/lz4 v2.0.5+incompatible diff --git a/go.sum b/go.sum index 9d7fe1bb3c307..dbf5091d71167 100644 --- a/go.sum +++ b/go.sum @@ -1008,8 +1008,8 @@ github.com/aws/smithy-go v1.11.1 h1:IQ+lPZVkSM3FRtyaDox41R8YS6iwPMYIreejOgPW49g= github.com/aws/smithy-go v1.11.1/go.mod h1:3xHYmszWVx2c0kIwQeEVf9uSm4fYZt67FBJnwub1bgM= github.com/axiomhq/hyperloglog v0.2.0 h1:u1XT3yyY1rjzlWuP6NQIrV4bRYHOaqZaovqjcBEvZJo= github.com/axiomhq/hyperloglog v0.2.0/go.mod h1:GcgMjz9gaDKZ3G0UMS6Fq/VkZ4l7uGgcJyxA7M+omIM= -github.com/baidubce/bce-sdk-go v0.9.202 h1:TGRdO4g4CtiI2IZ6MxeUmkbKe6l8kq+mYH6SbxczO3g= -github.com/baidubce/bce-sdk-go v0.9.202/go.mod h1:zbYJMQwE4IZuyrJiFO8tO8NbtYiKTFTbwh4eIsqjVdg= +github.com/baidubce/bce-sdk-go v0.9.203 h1:D4YBk4prtlIjrnwrh5nvsSSjLjataApDmeL0fxvI/KU= +github.com/baidubce/bce-sdk-go v0.9.203/go.mod h1:zbYJMQwE4IZuyrJiFO8tO8NbtYiKTFTbwh4eIsqjVdg= github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3 h1:6df1vn4bBlDDo4tARvBm7l6KA9iVMnE3NWizDeWSrps= github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3/go.mod h1:CIWtjkly68+yqLPbvwwR/fjNJA/idrtULjZWh2v1ys0= github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM= @@ -2309,8 +2309,8 @@ github.com/opencontainers/image-spec v1.1.0/go.mod h1:W4s4sFTMaBeK1BQLXbG4AdM2sz github.com/opentracing-contrib/go-grpc v0.1.0 h1:9JHDtQXv6UL0tFF8KJB/4ApJgeOcaHp1h07d0PJjESc= github.com/opentracing-contrib/go-grpc v0.1.0/go.mod h1:i3/jx/TvJZ/HKidtT4XGIi/NosUEpzS9xjVJctbKZzI= github.com/opentracing-contrib/go-observer v0.0.0-20170622124052-a52f23424492/go.mod h1:Ngi6UdF0k5OKD5t5wlmGhe/EDKPoUM3BXZSSfIuJbis= -github.com/opentracing-contrib/go-stdlib v1.0.0 h1:TBS7YuVotp8myLon4Pv7BtCBzOTo1DeZCld0Z63mW2w= -github.com/opentracing-contrib/go-stdlib v1.0.0/go.mod h1:qtI1ogk+2JhVPIXVc6q+NHziSmy2W5GbdQZFUHADCBU= +github.com/opentracing-contrib/go-stdlib v1.1.0 h1:cZBWc4pA4e65tqTJddbflK435S0tDImj6c9BMvkdUH0= +github.com/opentracing-contrib/go-stdlib v1.1.0/go.mod h1:S0p+X9p6dcBkoMTL+Qq2VOvxKs9ys5PpYWXWqlCS0bQ= github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74= github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= diff --git a/pkg/blockbuilder/scheduler/kafkautil.go b/pkg/blockbuilder/scheduler/kafkautil.go new file mode 100644 index 0000000000000..f746f2a9fd4e0 --- /dev/null +++ b/pkg/blockbuilder/scheduler/kafkautil.go @@ -0,0 +1,80 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package scheduler + +import ( + "context" + "errors" + "fmt" + "sync" + + "github.com/twmb/franz-go/pkg/kadm" + "github.com/twmb/franz-go/pkg/kerr" +) + +// GetGroupLag is similar to `kadm.Client.Lag` but works when the group doesn't have live participants. +// Similar to `kadm.CalculateGroupLagWithStartOffsets`, it takes into account that the group may not have any commits. +// +// The lag is the difference between the last produced offset (high watermark) and an offset in the "past". +// If the block builder committed an offset for a given partition to the consumer group at least once, then +// the lag is the difference between the last produced offset and the offset committed in the consumer group. +// Otherwise, if the block builder didn't commit an offset for a given partition yet (e.g. block builder is +// running for the first time), then the lag is the difference between the last produced offset and fallbackOffsetMillis. +func GetGroupLag(ctx context.Context, admClient *kadm.Client, topic, group string, fallbackOffsetMillis int64) (kadm.GroupLag, error) { + offsets, err := admClient.FetchOffsets(ctx, group) + if err != nil { + if !errors.Is(err, kerr.GroupIDNotFound) { + return nil, fmt.Errorf("fetch offsets: %w", err) + } + } + if err := offsets.Error(); err != nil { + return nil, fmt.Errorf("fetch offsets got error in response: %w", err) + } + + startOffsets, err := admClient.ListStartOffsets(ctx, topic) + if err != nil { + return nil, err + } + endOffsets, err := admClient.ListEndOffsets(ctx, topic) + if err != nil { + return nil, err + } + + resolveFallbackOffsets := sync.OnceValues(func() (kadm.ListedOffsets, error) { + return admClient.ListOffsetsAfterMilli(ctx, fallbackOffsetMillis, topic) + }) + // If the group-partition in offsets doesn't have a commit, fall back depending on where fallbackOffsetMillis points at. + for topic, pt := range startOffsets.Offsets() { + for partition, startOffset := range pt { + if _, ok := offsets.Lookup(topic, partition); ok { + continue + } + fallbackOffsets, err := resolveFallbackOffsets() + if err != nil { + return nil, fmt.Errorf("resolve fallback offsets: %w", err) + } + o, ok := fallbackOffsets.Lookup(topic, partition) + if !ok { + return nil, fmt.Errorf("partition %d not found in fallback offsets for topic %s", partition, topic) + } + if o.Offset < startOffset.At { + // Skip the resolved fallback offset if it's before the partition's start offset (i.e. before the earliest offset of the partition). + // This should not happen in Kafka, but can happen in Kafka-compatible systems, e.g. Warpstream. + continue + } + offsets.Add(kadm.OffsetResponse{Offset: kadm.Offset{ + Topic: o.Topic, + Partition: o.Partition, + At: o.Offset, + LeaderEpoch: o.LeaderEpoch, + }}) + } + } + + descrGroup := kadm.DescribedGroup{ + // "Empty" is the state that indicates that the group doesn't have active consumer members; this is always the case for block-builder, + // because we don't use group consumption. + State: "Empty", + } + return kadm.CalculateGroupLagWithStartOffsets(descrGroup, offsets, startOffsets, endOffsets), nil +} diff --git a/pkg/blockbuilder/scheduler/kafkautil_test.go b/pkg/blockbuilder/scheduler/kafkautil_test.go new file mode 100644 index 0000000000000..d2a865702a808 --- /dev/null +++ b/pkg/blockbuilder/scheduler/kafkautil_test.go @@ -0,0 +1,164 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package scheduler + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/twmb/franz-go/pkg/kadm" + "github.com/twmb/franz-go/pkg/kgo" + + "github.com/grafana/loki/v3/pkg/kafka/testkafka" +) + +const ( + testTopic = "test" + testGroup = "testgroup" +) + +func TestKafkaGetGroupLag(t *testing.T) { + ctx, cancel := context.WithCancelCause(context.Background()) + t.Cleanup(func() { cancel(errors.New("test done")) }) + + _, addr := testkafka.CreateClusterWithoutCustomConsumerGroupsSupport(t, 3, testTopic) + kafkaClient := mustKafkaClient(t, addr) + admClient := kadm.NewClient(kafkaClient) + + const numRecords = 5 + + var producedRecords []kgo.Record + kafkaTime := time.Now().Add(-12 * time.Hour) + for i := int64(0); i < numRecords; i++ { + kafkaTime = kafkaTime.Add(time.Minute) + + // Produce and keep records to partition 0. + res := produceRecords(ctx, t, kafkaClient, kafkaTime, "1", testTopic, 0, []byte(`test value`)) + rec, err := res.First() + require.NoError(t, err) + require.NotNil(t, rec) + + producedRecords = append(producedRecords, *rec) + + // Produce same records to partition 1 (this partition won't have any commits). + produceRecords(ctx, t, kafkaClient, kafkaTime, "1", testTopic, 1, []byte(`test value`)) + } + require.Len(t, producedRecords, numRecords) + + // Commit last produced record from partition 0. + rec := producedRecords[len(producedRecords)-1] + offsets := make(kadm.Offsets) + offsets.Add(kadm.Offset{ + Topic: rec.Topic, + Partition: rec.Partition, + At: rec.Offset + 1, + LeaderEpoch: rec.LeaderEpoch, + }) + err := admClient.CommitAllOffsets(ctx, testGroup, offsets) + require.NoError(t, err) + + // Truncate partition 1 after second to last record to emulate the retention + // Note Kafka sets partition's start offset to the requested offset. Any records within the segment before the requested offset can no longer be read. + // Note the difference between DeleteRecords and DeleteOffsets in kadm docs. + deleteRecOffsets := make(kadm.Offsets) + deleteRecOffsets.Add(kadm.Offset{ + Topic: testTopic, + Partition: 1, + At: numRecords - 2, + }) + _, err = admClient.DeleteRecords(ctx, deleteRecOffsets) + require.NoError(t, err) + + getTopicPartitionLag := func(t *testing.T, lag kadm.GroupLag, topic string, part int32) int64 { + l, ok := lag.Lookup(topic, part) + require.True(t, ok) + return l.Lag + } + + t.Run("fallbackOffset=milliseconds", func(t *testing.T) { + // get the timestamp of the last produced record + rec := producedRecords[len(producedRecords)-1] + fallbackOffset := rec.Timestamp.Add(-time.Millisecond).UnixMilli() + groupLag, err := GetGroupLag(ctx, admClient, testTopic, testGroup, fallbackOffset) + require.NoError(t, err) + + require.EqualValues(t, 0, getTopicPartitionLag(t, groupLag, testTopic, 0), "partition 0 must have no lag") + require.EqualValues(t, 1, getTopicPartitionLag(t, groupLag, testTopic, 1), "partition 1 must fall back to known record and get its lag from there") + require.EqualValues(t, 0, getTopicPartitionLag(t, groupLag, testTopic, 2), "partition 2 has no data and must have no lag") + }) + + t.Run("fallbackOffset=before-earliest", func(t *testing.T) { + // get the timestamp of third to last produced record (record before earliest in partition 1) + rec := producedRecords[len(producedRecords)-3] + fallbackOffset := rec.Timestamp.Add(-time.Millisecond).UnixMilli() + groupLag, err := GetGroupLag(ctx, admClient, testTopic, testGroup, fallbackOffset) + require.NoError(t, err) + + require.EqualValues(t, 0, getTopicPartitionLag(t, groupLag, testTopic, 0), "partition 0 must have no lag") + require.EqualValues(t, 2, getTopicPartitionLag(t, groupLag, testTopic, 1), "partition 1 must fall back to earliest and get its lag from there") + require.EqualValues(t, 0, getTopicPartitionLag(t, groupLag, testTopic, 2), "partition 2 has no data and must have no lag") + }) + + t.Run("fallbackOffset=0", func(t *testing.T) { + groupLag, err := GetGroupLag(ctx, admClient, testTopic, testGroup, 0) + require.NoError(t, err) + + require.EqualValues(t, 0, getTopicPartitionLag(t, groupLag, testTopic, 0), "partition 0 must have no lag") + require.EqualValues(t, 2, getTopicPartitionLag(t, groupLag, testTopic, 1), "partition 1 must fall back to the earliest and get its lag from there") + require.EqualValues(t, 0, getTopicPartitionLag(t, groupLag, testTopic, 2), "partition 2 has no data and must have no lag") + }) + + t.Run("group=unknown", func(t *testing.T) { + groupLag, err := GetGroupLag(ctx, admClient, testTopic, "unknown", 0) + require.NoError(t, err) + + // This group doesn't have any commits, so it must calc its lag from the fallback. + require.EqualValues(t, numRecords, getTopicPartitionLag(t, groupLag, testTopic, 0)) + require.EqualValues(t, 2, getTopicPartitionLag(t, groupLag, testTopic, 1), "partition 1 must fall back to the earliest and get its lag from there") + require.EqualValues(t, 0, getTopicPartitionLag(t, groupLag, testTopic, 2), "partition 2 has no data and must have no lag") + }) +} + +func mustKafkaClient(t *testing.T, addrs ...string) *kgo.Client { + writeClient, err := kgo.NewClient( + kgo.SeedBrokers(addrs...), + kgo.AllowAutoTopicCreation(), + // We will choose the partition of each record. + kgo.RecordPartitioner(kgo.ManualPartitioner()), + ) + require.NoError(t, err) + t.Cleanup(writeClient.Close) + return writeClient +} + +func produceRecords( + ctx context.Context, + t *testing.T, + kafkaClient *kgo.Client, + ts time.Time, + userID string, + topic string, + part int32, + val []byte, +) kgo.ProduceResults { + rec := &kgo.Record{ + Timestamp: ts, + Key: []byte(userID), + Value: val, + Topic: topic, + Partition: part, // samples in this batch are split between N partitions + } + produceResult := kafkaClient.ProduceSync(ctx, rec) + require.NoError(t, produceResult.FirstErr()) + return produceResult +} + +func commitOffset(ctx context.Context, t *testing.T, kafkaClient *kgo.Client, group string, offset kadm.Offset) { + offsets := make(kadm.Offsets) + offsets.Add(offset) + err := kadm.NewClient(kafkaClient).CommitAllOffsets(ctx, group, offsets) + require.NoError(t, err) +} diff --git a/pkg/blockbuilder/scheduler/metrics.go b/pkg/blockbuilder/scheduler/metrics.go new file mode 100644 index 0000000000000..4e1dbfa2afa1c --- /dev/null +++ b/pkg/blockbuilder/scheduler/metrics.go @@ -0,0 +1,24 @@ +package scheduler + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +type Metrics struct { + lag *prometheus.GaugeVec + committedOffset *prometheus.GaugeVec +} + +func NewMetrics(reg prometheus.Registerer) *Metrics { + return &Metrics{ + lag: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + Name: "loki_block_scheduler_group_lag", + Help: "How far behind the block scheduler consumer group is from the latest offset.", + }, []string{"partition"}), + committedOffset: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + Name: "loki_block_scheduler_group_committed_offset", + Help: "The current offset the block scheduler consumer group is at.", + }, []string{"partition"}), + } +} diff --git a/pkg/blockbuilder/scheduler/offsets_reader.go b/pkg/blockbuilder/scheduler/offsets_reader.go new file mode 100644 index 0000000000000..742185dba817f --- /dev/null +++ b/pkg/blockbuilder/scheduler/offsets_reader.go @@ -0,0 +1,62 @@ +package scheduler + +import ( + "context" + "errors" + "time" + + "github.com/twmb/franz-go/pkg/kadm" + "github.com/twmb/franz-go/pkg/kgo" +) + +type offsetReader struct { + topic string + consumerGroup string + fallbackOffsetMillis int64 + + adminClient *kadm.Client +} + +func NewOffsetReader(topic, consumerGroup string, lookbackPeriodInMs int64, client *kgo.Client) OffsetReader { + var fallbackOffsetMillis int64 + if lookbackPeriodInMs >= 0 { + fallbackOffsetMillis = time.Now().UnixMilli() - lookbackPeriodInMs + } else { + fallbackOffsetMillis = lookbackPeriodInMs + } + + return &offsetReader{ + topic: topic, + consumerGroup: consumerGroup, + adminClient: kadm.NewClient(client), + fallbackOffsetMillis: fallbackOffsetMillis, + } +} + +func (r *offsetReader) GroupLag(ctx context.Context) (map[int32]kadm.GroupMemberLag, error) { + lag, err := GetGroupLag(ctx, r.adminClient, r.topic, r.consumerGroup, r.fallbackOffsetMillis) + if err != nil { + return nil, err + } + + offsets, ok := lag[r.topic] + if !ok { + return nil, errors.New("no lag found for the topic") + } + + return offsets, nil +} + +func (r *offsetReader) ListOffsetsAfterMilli(ctx context.Context, ts int64) (map[int32]kadm.ListedOffset, error) { + offsets, err := r.adminClient.ListOffsetsAfterMilli(ctx, ts, r.topic) + if err != nil { + return nil, err + } + + resp, ok := offsets[r.topic] + if !ok { + return nil, errors.New("no offsets found for the topic") + } + + return resp, nil +} diff --git a/pkg/blockbuilder/scheduler/queue.go b/pkg/blockbuilder/scheduler/queue.go index 3e9cf087c6792..e2f125ad70a07 100644 --- a/pkg/blockbuilder/scheduler/queue.go +++ b/pkg/blockbuilder/scheduler/queue.go @@ -30,6 +30,25 @@ func NewJobQueue() *JobQueue { } } +func (q *JobQueue) Exists(job *types.Job) (types.JobStatus, bool) { + q.mu.RLock() + defer q.mu.RUnlock() + + if _, ok := q.inProgress[job.ID]; ok { + return types.JobStatusInProgress, true + } + + if _, ok := q.pending[job.ID]; ok { + return types.JobStatusPending, true + } + + if _, ok := q.completed[job.ID]; ok { + return types.JobStatusComplete, true + } + + return -1, false +} + // Enqueue adds a new job to the pending queue // This is a naive implementation, intended to be refactored func (q *JobQueue) Enqueue(job *types.Job) error { diff --git a/pkg/blockbuilder/scheduler/scheduler.go b/pkg/blockbuilder/scheduler/scheduler.go index 274713b5b1c36..dbf732742de39 100644 --- a/pkg/blockbuilder/scheduler/scheduler.go +++ b/pkg/blockbuilder/scheduler/scheduler.go @@ -2,43 +2,140 @@ package scheduler import ( "context" + "errors" + "flag" + "strconv" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/services" + "github.com/prometheus/client_golang/prometheus" + "github.com/twmb/franz-go/pkg/kadm" "github.com/grafana/loki/v3/pkg/blockbuilder/types" ) var ( _ types.Scheduler = unimplementedScheduler{} - _ types.Scheduler = &QueueScheduler{} + _ types.Scheduler = &BlockScheduler{} ) -// unimplementedScheduler provides default implementations that panic. -type unimplementedScheduler struct{} +type Config struct { + ConsumerGroup string `yaml:"consumer_group"` + Interval time.Duration `yaml:"interval"` + TargetRecordConsumptionPeriod time.Duration `yaml:"target_records_spanning_period"` + LookbackPeriod int64 `yaml:"lookback_period"` +} -func (s unimplementedScheduler) HandleGetJob(_ context.Context, _ string) (*types.Job, bool, error) { - panic("unimplemented") +func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.DurationVar(&cfg.Interval, prefix+"interval", 5*time.Minute, "How often the scheduler should plan jobs.") + f.DurationVar(&cfg.TargetRecordConsumptionPeriod, prefix+"target-records-spanning-period", time.Hour, "Period used by the planner to calculate the start and end offset such that each job consumes records spanning the target period.") + f.StringVar(&cfg.ConsumerGroup, prefix+"consumer-group", "block-scheduler", "Consumer group used by block scheduler to track the last consumed offset.") + f.Int64Var(&cfg.LookbackPeriod, prefix+"lookback-period", -2, "Lookback period in milliseconds used by the scheduler to plan jobs when the consumer group has no commits. -1 consumes from the latest offset. -2 consumes from the start of the partition.") } -func (s unimplementedScheduler) HandleCompleteJob(_ context.Context, _ string, _ *types.Job) error { - panic("unimplemented") +func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + cfg.RegisterFlagsWithPrefix("block-scheduler.", f) } -func (s unimplementedScheduler) HandleSyncJob(_ context.Context, _ string, _ *types.Job) error { - panic("unimplemented") +func (cfg *Config) Validate() error { + if cfg.Interval <= 0 { + return errors.New("interval must be a non-zero value") + } + + if cfg.LookbackPeriod < -2 { + return errors.New("only -1(latest) and -2(earliest) are valid as negative values for lookback_period") + } + + return nil } -// QueueScheduler implements the Scheduler interface -type QueueScheduler struct { - queue *JobQueue +// BlockScheduler implements the Scheduler interface +type BlockScheduler struct { + services.Service + + cfg Config + logger log.Logger + queue *JobQueue + metrics *Metrics + + offsetReader OffsetReader + planner Planner } // NewScheduler creates a new scheduler instance -func NewScheduler(queue *JobQueue) *QueueScheduler { - return &QueueScheduler{ - queue: queue, +func NewScheduler(cfg Config, queue *JobQueue, offsetReader OffsetReader, logger log.Logger, r prometheus.Registerer) *BlockScheduler { + planner := NewTimeRangePlanner(cfg.TargetRecordConsumptionPeriod, offsetReader, func() time.Time { return time.Now().UTC() }, logger) + s := &BlockScheduler{ + cfg: cfg, + planner: planner, + offsetReader: offsetReader, + logger: logger, + metrics: NewMetrics(r), + queue: queue, + } + s.Service = services.NewBasicService(nil, s.running, nil) + return s +} + +func (s *BlockScheduler) running(ctx context.Context) error { + if err := s.runOnce(ctx); err != nil { + level.Error(s.logger).Log("msg", "failed to schedule jobs", "err", err) + } + + ticker := time.NewTicker(s.cfg.Interval) + for { + select { + case <-ticker.C: + if err := s.runOnce(ctx); err != nil { + // TODO: add metrics + level.Error(s.logger).Log("msg", "failed to schedule jobs", "err", err) + } + case <-ctx.Done(): + return nil + } } } -func (s *QueueScheduler) HandleGetJob(ctx context.Context, builderID string) (*types.Job, bool, error) { +func (s *BlockScheduler) runOnce(ctx context.Context) error { + lag, err := s.offsetReader.GroupLag(ctx) + if err != nil { + level.Error(s.logger).Log("msg", "failed to get group lag", "err", err) + return err + } + + s.publishLagMetrics(lag) + + jobs, err := s.planner.Plan(ctx) + if err != nil { + level.Error(s.logger).Log("msg", "failed to plan jobs", "err", err) + } + + for _, job := range jobs { + // TODO: end offset keeps moving each time we plan jobs, maybe we should not use it as part of the job ID + if status, ok := s.queue.Exists(&job); ok { + level.Debug(s.logger).Log("msg", "job already exists", "job", job, "status", status) + continue + } + + if err := s.queue.Enqueue(&job); err != nil { + level.Error(s.logger).Log("msg", "failed to enqueue job", "job", job, "err", err) + } + } + + return nil +} + +func (s *BlockScheduler) publishLagMetrics(lag map[int32]kadm.GroupMemberLag) { + for partition, offsets := range lag { + // useful for scaling builders + s.metrics.lag.WithLabelValues(strconv.Itoa(int(partition))).Set(float64(offsets.Lag)) + s.metrics.committedOffset.WithLabelValues(strconv.Itoa(int(partition))).Set(float64(offsets.Commit.At)) + } +} + +func (s *BlockScheduler) HandleGetJob(ctx context.Context, builderID string) (*types.Job, bool, error) { select { case <-ctx.Done(): return nil, false, ctx.Err() @@ -47,10 +144,26 @@ func (s *QueueScheduler) HandleGetJob(ctx context.Context, builderID string) (*t } } -func (s *QueueScheduler) HandleCompleteJob(_ context.Context, builderID string, job *types.Job) error { +func (s *BlockScheduler) HandleCompleteJob(_ context.Context, builderID string, job *types.Job) error { + // TODO: handle commits return s.queue.MarkComplete(job.ID, builderID) } -func (s *QueueScheduler) HandleSyncJob(_ context.Context, builderID string, job *types.Job) error { +func (s *BlockScheduler) HandleSyncJob(_ context.Context, builderID string, job *types.Job) error { return s.queue.SyncJob(job.ID, builderID, job) } + +// unimplementedScheduler provides default implementations that panic. +type unimplementedScheduler struct{} + +func (s unimplementedScheduler) HandleGetJob(_ context.Context, _ string) (*types.Job, bool, error) { + panic("unimplemented") +} + +func (s unimplementedScheduler) HandleCompleteJob(_ context.Context, _ string, _ *types.Job) error { + panic("unimplemented") +} + +func (s unimplementedScheduler) HandleSyncJob(_ context.Context, _ string, _ *types.Job) error { + panic("unimplemented") +} diff --git a/pkg/blockbuilder/scheduler/scheduler_test.go b/pkg/blockbuilder/scheduler/scheduler_test.go index ad6829bc8fe69..35e53ee255993 100644 --- a/pkg/blockbuilder/scheduler/scheduler_test.go +++ b/pkg/blockbuilder/scheduler/scheduler_test.go @@ -5,22 +5,25 @@ import ( "testing" "time" + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/grafana/loki/v3/pkg/blockbuilder/builder" "github.com/grafana/loki/v3/pkg/blockbuilder/types" ) type testEnv struct { queue *JobQueue - scheduler *QueueScheduler - transport *builder.MemoryTransport + scheduler *BlockScheduler + transport *types.MemoryTransport builder *builder.Worker } func newTestEnv(builderID string) *testEnv { queue := NewJobQueue() - scheduler := NewScheduler(queue) - transport := builder.NewMemoryTransport(scheduler) - builder := builder.NewWorker(builderID, builder.NewMemoryTransport(scheduler)) + scheduler := NewScheduler(Config{}, queue, nil, log.NewNopLogger(), prometheus.NewRegistry()) + transport := types.NewMemoryTransport(scheduler) + builder := builder.NewWorker(builderID, transport) return &testEnv{ queue: queue, @@ -86,7 +89,7 @@ func TestMultipleBuilders(t *testing.T) { // Create first environment env1 := newTestEnv("test-builder-1") // Create second builder using same scheduler - builder2 := builder.NewWorker("test-builder-2", builder.NewMemoryTransport(env1.scheduler)) + builder2 := builder.NewWorker("test-builder-2", env1.transport) ctx := context.Background() diff --git a/pkg/blockbuilder/scheduler/strategy.go b/pkg/blockbuilder/scheduler/strategy.go new file mode 100644 index 0000000000000..5ea1fb6db2d9c --- /dev/null +++ b/pkg/blockbuilder/scheduler/strategy.go @@ -0,0 +1,142 @@ +package scheduler + +import ( + "context" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/twmb/franz-go/pkg/kadm" + + "github.com/grafana/loki/v3/pkg/blockbuilder/types" +) + +// OffsetReader is an interface to list offsets for all partitions of a topic from Kafka. +type OffsetReader interface { + ListOffsetsAfterMilli(context.Context, int64) (map[int32]kadm.ListedOffset, error) + GroupLag(context.Context) (map[int32]kadm.GroupMemberLag, error) +} + +type Planner interface { + Name() string + Plan(ctx context.Context) ([]types.Job, error) +} + +const ( + RecordCountStrategy = "record_count" + TimeRangeStrategy = "time_range" +) + +// tries to consume upto targetRecordCount records per partition +type RecordCountPlanner struct { + targetRecordCount int64 + offsetReader OffsetReader + logger log.Logger +} + +func NewRecordCountPlanner(targetRecordCount int64) *RecordCountPlanner { + return &RecordCountPlanner{ + targetRecordCount: targetRecordCount, + } +} + +func (p *RecordCountPlanner) Name() string { + return RecordCountStrategy +} + +func (p *RecordCountPlanner) Plan(ctx context.Context) ([]types.Job, error) { + offsets, err := p.offsetReader.GroupLag(ctx) + if err != nil { + level.Error(p.logger).Log("msg", "failed to get group lag", "err", err) + return nil, err + } + + jobs := make([]types.Job, 0, len(offsets)) + for _, partition := range offsets { + // kadm.GroupMemberLag contains valid Commit.At even when consumer group never committed any offset. + // no additional validation is needed here + startOffset := partition.Commit.At + 1 + endOffset := min(startOffset+p.targetRecordCount, partition.End.Offset) + + job := types.Job{ + Partition: int(partition.Partition), + Offsets: types.Offsets{ + Min: startOffset, + Max: endOffset, + }, + } + + jobs = append(jobs, job) + } + + return jobs, nil +} + +// Targets consuming records spanning a configured period. +// This is a stateless planner, it is upto the caller to deduplicate or update jobs that are already in queue or progress. +type TimeRangePlanner struct { + offsetReader OffsetReader + + buffer time.Duration + targetPeriod time.Duration + now func() time.Time + + logger log.Logger +} + +func NewTimeRangePlanner(interval time.Duration, offsetReader OffsetReader, now func() time.Time, logger log.Logger) *TimeRangePlanner { + return &TimeRangePlanner{ + targetPeriod: interval, + buffer: interval, + offsetReader: offsetReader, + now: now, + logger: logger, + } +} + +func (p *TimeRangePlanner) Name() string { + return TimeRangeStrategy +} + +func (p *TimeRangePlanner) Plan(ctx context.Context) ([]types.Job, error) { + // truncate to the nearest Interval + consumeUptoTS := p.now().Add(-p.buffer).Truncate(p.targetPeriod) + + // this will return the latest offset in the partition if no records are produced after this ts. + consumeUptoOffsets, err := p.offsetReader.ListOffsetsAfterMilli(ctx, consumeUptoTS.UnixMilli()) + if err != nil { + level.Error(p.logger).Log("msg", "failed to list offsets after timestamp", "err", err) + return nil, err + } + + offsets, err := p.offsetReader.GroupLag(ctx) + if err != nil { + level.Error(p.logger).Log("msg", "failed to get group lag", "err", err) + return nil, err + } + + var jobs []types.Job + for _, partitionOffset := range offsets { + startOffset := partitionOffset.Commit.At + 1 + // TODO: we could further break down the work into Interval sized chunks if this partition has pending records spanning a long time range + // or have the builder consume in chunks and commit the job status back to scheduler. + endOffset := consumeUptoOffsets[partitionOffset.Partition].Offset + + if startOffset >= endOffset { + level.Info(p.logger).Log("msg", "no pending records to process", "partition", partitionOffset.Partition, + "commitOffset", partitionOffset.Commit.At, + "consumeUptoOffset", consumeUptoOffsets[partitionOffset.Partition].Offset) + continue + } + + jobs = append(jobs, types.Job{ + Partition: int(partitionOffset.Partition), + Offsets: types.Offsets{ + Min: startOffset, + Max: endOffset, + }, + }) + } + + return jobs, nil +} diff --git a/pkg/blockbuilder/scheduler/strategy_test.go b/pkg/blockbuilder/scheduler/strategy_test.go new file mode 100644 index 0000000000000..eb4704f268c74 --- /dev/null +++ b/pkg/blockbuilder/scheduler/strategy_test.go @@ -0,0 +1,159 @@ +package scheduler + +import ( + "context" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/stretchr/testify/require" + "github.com/twmb/franz-go/pkg/kadm" + + "github.com/grafana/loki/v3/pkg/blockbuilder/types" +) + +func TestTimeRangePlanner_Plan(t *testing.T) { + interval := 15 * time.Minute + for _, tc := range []struct { + name string + now time.Time + expectedJobs []types.Job + groupLag map[int32]kadm.GroupMemberLag + consumeUpto map[int32]kadm.ListedOffset + }{ + { + // Interval 1 + // now: 00:42:00. consume until 00:15:00 + // last consumed offset 100 with record ts: 00:10:00 + // record offset with ts after 00:15:00 - offset 200 + // resulting jobs: [100, 200] + name: "normal case. schedule first interval", + now: time.Date(0, 0, 0, 0, 42, 0, 0, time.UTC), // 00:42:00 + groupLag: map[int32]kadm.GroupMemberLag{ + 0: { + Commit: kadm.Offset{ + At: 100, + }, + Partition: 0, + }, + }, + consumeUpto: map[int32]kadm.ListedOffset{ + 0: { + Offset: 200, + }, + }, + expectedJobs: []types.Job{ + { + Partition: 0, + Offsets: types.Offsets{Min: 101, Max: 200}, + }, + }, + }, + { + // Interval 2 + // now: 00:46:00. consume until 00:30:00 + // last consumed offset 199 with record ts: 00:11:00 + // record offset with ts after 00:30:00 - offset 300 + // resulting jobs: [200, 300] + name: "normal case. schedule second interval", + now: time.Date(0, 0, 0, 0, 46, 0, 0, time.UTC), // 00:46:00 + groupLag: map[int32]kadm.GroupMemberLag{ + 0: { + Commit: kadm.Offset{ + At: 199, + }, + Partition: 0, + }, + 1: { + Commit: kadm.Offset{ + At: 11, + }, + Partition: 1, + }, + }, + consumeUpto: map[int32]kadm.ListedOffset{ + 0: { + Offset: 300, + }, + 1: { + Offset: 123, + }, + }, + expectedJobs: []types.Job{ + { + Partition: 0, + Offsets: types.Offsets{Min: 200, Max: 300}, + }, + { + Partition: 1, + Offsets: types.Offsets{Min: 12, Max: 123}, + }, + }, + }, + { + // Interval 2 - run scheduling again + // now: 00:48:00. consume until 00:30:00 + // last consumed offset 299 + // record offset with ts after 00:30:00 - offset 300 + // no jobs to schedule for partition 0 + name: "no pending records to consume. schedule second interval once more time", + now: time.Date(0, 0, 0, 0, 48, 0, 0, time.UTC), // 00:48:00 + groupLag: map[int32]kadm.GroupMemberLag{ + 0: { + Commit: kadm.Offset{ + At: 299, + }, + Partition: 0, + }, + 1: { + Commit: kadm.Offset{ + At: 11, + }, + Partition: 1, + }, + }, + consumeUpto: map[int32]kadm.ListedOffset{ + 0: { + Offset: 300, + }, + // still pending. assume no builder were assigned + 1: { + Offset: 123, + }, + }, + expectedJobs: []types.Job{ + { + Partition: 1, + Offsets: types.Offsets{Min: 12, Max: 123}, + }, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + mockOffsetReader := &mockOffsetReader{ + offsetsAfterMilli: tc.consumeUpto, + groupLag: tc.groupLag, + } + planner := NewTimeRangePlanner(interval, mockOffsetReader, func() time.Time { return tc.now }, log.NewNopLogger()) + + jobs, err := planner.Plan(context.Background()) + require.NoError(t, err) + + require.Equal(t, len(tc.expectedJobs), len(jobs)) + require.Equal(t, tc.expectedJobs, jobs) + }) + } +} + +type mockOffsetReader struct { + offsetsAfterMilli map[int32]kadm.ListedOffset + groupLag map[int32]kadm.GroupMemberLag +} + +func (m *mockOffsetReader) ListOffsetsAfterMilli(_ context.Context, _ int64) (map[int32]kadm.ListedOffset, error) { + return m.offsetsAfterMilli, nil +} + +func (m *mockOffsetReader) GroupLag(_ context.Context) (map[int32]kadm.GroupMemberLag, error) { + return m.groupLag, nil +} diff --git a/pkg/blockbuilder/types/grpc_transport.go b/pkg/blockbuilder/types/grpc_transport.go new file mode 100644 index 0000000000000..0d132e2d07bd7 --- /dev/null +++ b/pkg/blockbuilder/types/grpc_transport.go @@ -0,0 +1,147 @@ +package types + +import ( + "context" + "flag" + "io" + + "github.com/grafana/dskit/grpcclient" + "github.com/grafana/dskit/instrument" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "google.golang.org/grpc" + "google.golang.org/grpc/health/grpc_health_v1" + + "github.com/grafana/loki/v3/pkg/blockbuilder/types/proto" + "github.com/grafana/loki/v3/pkg/util/constants" +) + +var _ Transport = &GRPCTransport{} + +type GRPCTransportConfig struct { + Address string `yaml:"address,omitempty"` + + // GRPCClientConfig configures the gRPC connection between the block-builder and its scheduler. + GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"` +} + +func (cfg *GRPCTransportConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.StringVar(&cfg.Address, prefix+"address", "", "address in DNS Service Discovery format: https://grafana.com/docs/mimir/latest/configure/about-dns-service-discovery/#supported-discovery-modes") +} + +type grpcTransportMetrics struct { + requestLatency *prometheus.HistogramVec +} + +func newGRPCTransportMetrics(registerer prometheus.Registerer) *grpcTransportMetrics { + return &grpcTransportMetrics{ + requestLatency: promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{ + Namespace: constants.Loki, + Subsystem: "block_builder_grpc", + Name: "request_duration_seconds", + Help: "Time (in seconds) spent serving requests when using the block builder grpc transport", + Buckets: instrument.DefBuckets, + }, []string{"operation", "status_code"}), + } +} + +// GRPCTransport implements the Transport interface using gRPC +type GRPCTransport struct { + grpc_health_v1.HealthClient + io.Closer + proto.BlockBuilderServiceClient +} + +// NewGRPCTransportFromAddress creates a new gRPC transport instance from an address and dial options +func NewGRPCTransportFromAddress( + metrics *grpcTransportMetrics, + cfg GRPCTransportConfig, +) (*GRPCTransport, error) { + + dialOpts, err := cfg.GRPCClientConfig.DialOption(grpcclient.Instrument(metrics.requestLatency)) + if err != nil { + return nil, err + } + + // nolint:staticcheck // grpc.Dial() has been deprecated; we'll address it before upgrading to gRPC 2. + conn, err := grpc.Dial(cfg.Address, dialOpts...) + if err != nil { + return nil, errors.Wrap(err, "new grpc pool dial") + } + + return &GRPCTransport{ + Closer: conn, + HealthClient: grpc_health_v1.NewHealthClient(conn), + BlockBuilderServiceClient: proto.NewBlockBuilderServiceClient(conn), + }, nil +} + +// SendGetJobRequest implements Transport +func (t *GRPCTransport) SendGetJobRequest(ctx context.Context, req *GetJobRequest) (*GetJobResponse, error) { + protoReq := &proto.GetJobRequest{ + BuilderId: req.BuilderID, + } + + resp, err := t.GetJob(ctx, protoReq) + if err != nil { + return nil, err + } + + return &GetJobResponse{ + Job: protoToJob(resp.GetJob()), + OK: resp.GetOk(), + }, nil +} + +// SendCompleteJob implements Transport +func (t *GRPCTransport) SendCompleteJob(ctx context.Context, req *CompleteJobRequest) error { + protoReq := &proto.CompleteJobRequest{ + BuilderId: req.BuilderID, + Job: jobToProto(req.Job), + } + + _, err := t.CompleteJob(ctx, protoReq) + return err +} + +// SendSyncJob implements Transport +func (t *GRPCTransport) SendSyncJob(ctx context.Context, req *SyncJobRequest) error { + protoReq := &proto.SyncJobRequest{ + BuilderId: req.BuilderID, + Job: jobToProto(req.Job), + } + + _, err := t.SyncJob(ctx, protoReq) + return err +} + +// protoToJob converts a proto Job to a types.Job +func protoToJob(p *proto.Job) *Job { + if p == nil { + return nil + } + return &Job{ + ID: p.GetId(), + Partition: int(p.GetPartition()), + Offsets: Offsets{ + Min: p.GetOffsets().GetMin(), + Max: p.GetOffsets().GetMax(), + }, + } +} + +// jobToProto converts a types.Job to a proto Job +func jobToProto(j *Job) *proto.Job { + if j == nil { + return nil + } + return &proto.Job{ + Id: j.ID, + Partition: int32(j.Partition), + Offsets: &proto.Offsets{ + Min: j.Offsets.Min, + Max: j.Offsets.Max, + }, + } +} diff --git a/pkg/blockbuilder/types/interfaces.go b/pkg/blockbuilder/types/interfaces.go index 74267f912fd7e..dd719757ba6a1 100644 --- a/pkg/blockbuilder/types/interfaces.go +++ b/pkg/blockbuilder/types/interfaces.go @@ -24,6 +24,15 @@ type Scheduler interface { // Transport defines the interface for communication between block builders and scheduler type Transport interface { + BuilderTransport + SchedulerTransport +} + +// SchedulerTransport is for calls originating from the scheduler +type SchedulerTransport interface{} + +// BuilderTransport is for calls originating from the builder +type BuilderTransport interface { // SendGetJobRequest sends a request to get a new job SendGetJobRequest(ctx context.Context, req *GetJobRequest) (*GetJobResponse, error) // SendCompleteJob sends a job completion notification diff --git a/pkg/blockbuilder/types/job.go b/pkg/blockbuilder/types/job.go index d6ed42b598906..2c06fec4d48cd 100644 --- a/pkg/blockbuilder/types/job.go +++ b/pkg/blockbuilder/types/job.go @@ -4,8 +4,7 @@ import "fmt" // Job represents a block building task. type Job struct { - ID string - Status JobStatus + ID string // Partition and offset information Partition int Offsets Offsets @@ -30,7 +29,6 @@ type Offsets struct { func NewJob(partition int, offsets Offsets) *Job { return &Job{ ID: GenerateJobID(partition, offsets), - Status: JobStatusPending, Partition: partition, Offsets: offsets, } diff --git a/pkg/blockbuilder/types/proto/blockbuilder.pb.go b/pkg/blockbuilder/types/proto/blockbuilder.pb.go new file mode 100644 index 0000000000000..c5c4b05d38604 --- /dev/null +++ b/pkg/blockbuilder/types/proto/blockbuilder.pb.go @@ -0,0 +1,2317 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: pkg/blockbuilder/types/proto/blockbuilder.proto + +package proto + +import ( + context "context" + fmt "fmt" + proto "github.com/gogo/protobuf/proto" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + io "io" + math "math" + math_bits "math/bits" + reflect "reflect" + strings "strings" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package + +// GetJobRequest represents a request for a new job +type GetJobRequest struct { + BuilderId string `protobuf:"bytes,1,opt,name=builder_id,json=builderId,proto3" json:"builder_id,omitempty"` +} + +func (m *GetJobRequest) Reset() { *m = GetJobRequest{} } +func (*GetJobRequest) ProtoMessage() {} +func (*GetJobRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_04968622516f7b79, []int{0} +} +func (m *GetJobRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *GetJobRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_GetJobRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *GetJobRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_GetJobRequest.Merge(m, src) +} +func (m *GetJobRequest) XXX_Size() int { + return m.Size() +} +func (m *GetJobRequest) XXX_DiscardUnknown() { + xxx_messageInfo_GetJobRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_GetJobRequest proto.InternalMessageInfo + +func (m *GetJobRequest) GetBuilderId() string { + if m != nil { + return m.BuilderId + } + return "" +} + +// GetJobResponse contains the response for a job request +type GetJobResponse struct { + Job *Job `protobuf:"bytes,1,opt,name=job,proto3" json:"job,omitempty"` + Ok bool `protobuf:"varint,2,opt,name=ok,proto3" json:"ok,omitempty"` +} + +func (m *GetJobResponse) Reset() { *m = GetJobResponse{} } +func (*GetJobResponse) ProtoMessage() {} +func (*GetJobResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_04968622516f7b79, []int{1} +} +func (m *GetJobResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *GetJobResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_GetJobResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *GetJobResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_GetJobResponse.Merge(m, src) +} +func (m *GetJobResponse) XXX_Size() int { + return m.Size() +} +func (m *GetJobResponse) XXX_DiscardUnknown() { + xxx_messageInfo_GetJobResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_GetJobResponse proto.InternalMessageInfo + +func (m *GetJobResponse) GetJob() *Job { + if m != nil { + return m.Job + } + return nil +} + +func (m *GetJobResponse) GetOk() bool { + if m != nil { + return m.Ok + } + return false +} + +// CompleteJobRequest represents a job completion notification +type CompleteJobRequest struct { + BuilderId string `protobuf:"bytes,1,opt,name=builder_id,json=builderId,proto3" json:"builder_id,omitempty"` + Job *Job `protobuf:"bytes,2,opt,name=job,proto3" json:"job,omitempty"` +} + +func (m *CompleteJobRequest) Reset() { *m = CompleteJobRequest{} } +func (*CompleteJobRequest) ProtoMessage() {} +func (*CompleteJobRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_04968622516f7b79, []int{2} +} +func (m *CompleteJobRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *CompleteJobRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_CompleteJobRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *CompleteJobRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_CompleteJobRequest.Merge(m, src) +} +func (m *CompleteJobRequest) XXX_Size() int { + return m.Size() +} +func (m *CompleteJobRequest) XXX_DiscardUnknown() { + xxx_messageInfo_CompleteJobRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_CompleteJobRequest proto.InternalMessageInfo + +func (m *CompleteJobRequest) GetBuilderId() string { + if m != nil { + return m.BuilderId + } + return "" +} + +func (m *CompleteJobRequest) GetJob() *Job { + if m != nil { + return m.Job + } + return nil +} + +// CompleteJobResponse is an empty response for job completion +type CompleteJobResponse struct { +} + +func (m *CompleteJobResponse) Reset() { *m = CompleteJobResponse{} } +func (*CompleteJobResponse) ProtoMessage() {} +func (*CompleteJobResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_04968622516f7b79, []int{3} +} +func (m *CompleteJobResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *CompleteJobResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_CompleteJobResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *CompleteJobResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_CompleteJobResponse.Merge(m, src) +} +func (m *CompleteJobResponse) XXX_Size() int { + return m.Size() +} +func (m *CompleteJobResponse) XXX_DiscardUnknown() { + xxx_messageInfo_CompleteJobResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_CompleteJobResponse proto.InternalMessageInfo + +// SyncJobRequest represents a job sync request +type SyncJobRequest struct { + BuilderId string `protobuf:"bytes,1,opt,name=builder_id,json=builderId,proto3" json:"builder_id,omitempty"` + Job *Job `protobuf:"bytes,2,opt,name=job,proto3" json:"job,omitempty"` +} + +func (m *SyncJobRequest) Reset() { *m = SyncJobRequest{} } +func (*SyncJobRequest) ProtoMessage() {} +func (*SyncJobRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_04968622516f7b79, []int{4} +} +func (m *SyncJobRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *SyncJobRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SyncJobRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *SyncJobRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_SyncJobRequest.Merge(m, src) +} +func (m *SyncJobRequest) XXX_Size() int { + return m.Size() +} +func (m *SyncJobRequest) XXX_DiscardUnknown() { + xxx_messageInfo_SyncJobRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_SyncJobRequest proto.InternalMessageInfo + +func (m *SyncJobRequest) GetBuilderId() string { + if m != nil { + return m.BuilderId + } + return "" +} + +func (m *SyncJobRequest) GetJob() *Job { + if m != nil { + return m.Job + } + return nil +} + +// SyncJobResponse is an empty response for job sync +type SyncJobResponse struct { +} + +func (m *SyncJobResponse) Reset() { *m = SyncJobResponse{} } +func (*SyncJobResponse) ProtoMessage() {} +func (*SyncJobResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_04968622516f7b79, []int{5} +} +func (m *SyncJobResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *SyncJobResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SyncJobResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *SyncJobResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_SyncJobResponse.Merge(m, src) +} +func (m *SyncJobResponse) XXX_Size() int { + return m.Size() +} +func (m *SyncJobResponse) XXX_DiscardUnknown() { + xxx_messageInfo_SyncJobResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_SyncJobResponse proto.InternalMessageInfo + +// Offsets represents the start and end offsets for a job +type Offsets struct { + Min int64 `protobuf:"varint,1,opt,name=min,proto3" json:"min,omitempty"` + Max int64 `protobuf:"varint,2,opt,name=max,proto3" json:"max,omitempty"` +} + +func (m *Offsets) Reset() { *m = Offsets{} } +func (*Offsets) ProtoMessage() {} +func (*Offsets) Descriptor() ([]byte, []int) { + return fileDescriptor_04968622516f7b79, []int{6} +} +func (m *Offsets) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Offsets) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Offsets.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *Offsets) XXX_Merge(src proto.Message) { + xxx_messageInfo_Offsets.Merge(m, src) +} +func (m *Offsets) XXX_Size() int { + return m.Size() +} +func (m *Offsets) XXX_DiscardUnknown() { + xxx_messageInfo_Offsets.DiscardUnknown(m) +} + +var xxx_messageInfo_Offsets proto.InternalMessageInfo + +func (m *Offsets) GetMin() int64 { + if m != nil { + return m.Min + } + return 0 +} + +func (m *Offsets) GetMax() int64 { + if m != nil { + return m.Max + } + return 0 +} + +// Job represents a block building job +type Job struct { + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + Partition int32 `protobuf:"varint,2,opt,name=partition,proto3" json:"partition,omitempty"` + Offsets *Offsets `protobuf:"bytes,3,opt,name=offsets,proto3" json:"offsets,omitempty"` +} + +func (m *Job) Reset() { *m = Job{} } +func (*Job) ProtoMessage() {} +func (*Job) Descriptor() ([]byte, []int) { + return fileDescriptor_04968622516f7b79, []int{7} +} +func (m *Job) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Job) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Job.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *Job) XXX_Merge(src proto.Message) { + xxx_messageInfo_Job.Merge(m, src) +} +func (m *Job) XXX_Size() int { + return m.Size() +} +func (m *Job) XXX_DiscardUnknown() { + xxx_messageInfo_Job.DiscardUnknown(m) +} + +var xxx_messageInfo_Job proto.InternalMessageInfo + +func (m *Job) GetId() string { + if m != nil { + return m.Id + } + return "" +} + +func (m *Job) GetPartition() int32 { + if m != nil { + return m.Partition + } + return 0 +} + +func (m *Job) GetOffsets() *Offsets { + if m != nil { + return m.Offsets + } + return nil +} + +func init() { + proto.RegisterType((*GetJobRequest)(nil), "blockbuilder.types.GetJobRequest") + proto.RegisterType((*GetJobResponse)(nil), "blockbuilder.types.GetJobResponse") + proto.RegisterType((*CompleteJobRequest)(nil), "blockbuilder.types.CompleteJobRequest") + proto.RegisterType((*CompleteJobResponse)(nil), "blockbuilder.types.CompleteJobResponse") + proto.RegisterType((*SyncJobRequest)(nil), "blockbuilder.types.SyncJobRequest") + proto.RegisterType((*SyncJobResponse)(nil), "blockbuilder.types.SyncJobResponse") + proto.RegisterType((*Offsets)(nil), "blockbuilder.types.Offsets") + proto.RegisterType((*Job)(nil), "blockbuilder.types.Job") +} + +func init() { + proto.RegisterFile("pkg/blockbuilder/types/proto/blockbuilder.proto", fileDescriptor_04968622516f7b79) +} + +var fileDescriptor_04968622516f7b79 = []byte{ + // 438 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x93, 0xcf, 0xae, 0xd2, 0x40, + 0x18, 0xc5, 0x3b, 0x6d, 0xbc, 0xc8, 0x77, 0x23, 0xea, 0xdc, 0x18, 0x09, 0xea, 0xe4, 0x5a, 0x13, + 0xbd, 0x2e, 0x6c, 0x13, 0xd4, 0x17, 0xc0, 0x85, 0x11, 0x17, 0xc6, 0xe2, 0x8a, 0x85, 0xda, 0x3f, + 0x03, 0x0e, 0x2d, 0x9d, 0xda, 0x0e, 0x06, 0x76, 0x3e, 0x82, 0x8f, 0xe0, 0xd2, 0x47, 0x71, 0xc9, + 0x92, 0xa5, 0x94, 0x8d, 0x4b, 0x1e, 0xc1, 0x74, 0xda, 0xa2, 0x0d, 0x0d, 0xb8, 0xb9, 0xab, 0x36, + 0xa7, 0xbf, 0x9e, 0x73, 0xf2, 0x7d, 0x33, 0x60, 0x46, 0xfe, 0xd8, 0x74, 0x02, 0xee, 0xfa, 0xce, + 0x8c, 0x05, 0x1e, 0x8d, 0x4d, 0xb1, 0x88, 0x68, 0x62, 0x46, 0x31, 0x17, 0xbc, 0xf2, 0xc1, 0x90, + 0x12, 0xc6, 0x15, 0x4d, 0xc2, 0xba, 0x01, 0xd7, 0x5e, 0x52, 0xd1, 0xe7, 0x8e, 0x45, 0x3f, 0xcf, + 0x68, 0x22, 0xf0, 0x3d, 0x80, 0x82, 0xf8, 0xc0, 0xbc, 0x36, 0x3a, 0x47, 0x17, 0x4d, 0xab, 0x59, + 0x28, 0xaf, 0x3c, 0xfd, 0x35, 0xb4, 0x4a, 0x3e, 0x89, 0x78, 0x98, 0x50, 0xfc, 0x18, 0xb4, 0x09, + 0x77, 0x24, 0x79, 0xda, 0xbd, 0x6d, 0xec, 0x67, 0x18, 0x19, 0x9d, 0x31, 0xb8, 0x05, 0x2a, 0xf7, + 0xdb, 0xea, 0x39, 0xba, 0xb8, 0x6a, 0xa9, 0xdc, 0xd7, 0xdf, 0x03, 0x7e, 0xc1, 0xa7, 0x51, 0x40, + 0x05, 0xfd, 0xef, 0x06, 0x65, 0x9e, 0x7a, 0x3c, 0x4f, 0xbf, 0x05, 0x67, 0x15, 0xff, 0xbc, 0xb1, + 0x3e, 0x84, 0xd6, 0x60, 0x11, 0xba, 0x97, 0x12, 0x79, 0x13, 0xae, 0xef, 0xbc, 0x8b, 0xb8, 0x27, + 0xd0, 0x78, 0x33, 0x1a, 0x25, 0x54, 0x24, 0xf8, 0x06, 0x68, 0x53, 0x16, 0xca, 0x00, 0xcd, 0xca, + 0x5e, 0xa5, 0x62, 0xcf, 0xa5, 0x75, 0xa6, 0xd8, 0x73, 0x7d, 0x02, 0x5a, 0x3f, 0x9f, 0xd5, 0xae, + 0x8a, 0xca, 0x3c, 0x7c, 0x17, 0x9a, 0x91, 0x1d, 0x0b, 0x26, 0x18, 0x0f, 0x25, 0x7e, 0xc5, 0xfa, + 0x2b, 0xe0, 0xe7, 0xd0, 0xe0, 0x79, 0x46, 0x5b, 0x93, 0x2d, 0xef, 0xd4, 0xb5, 0x2c, 0x6a, 0x58, + 0x25, 0xdb, 0xfd, 0xae, 0xc2, 0x59, 0x2f, 0xe3, 0x7a, 0x39, 0x37, 0xa0, 0xf1, 0x17, 0xe6, 0x52, + 0xfc, 0x16, 0x4e, 0xf2, 0x2d, 0xe3, 0xfb, 0x75, 0x3e, 0x95, 0x13, 0xd3, 0xd1, 0x0f, 0x21, 0xc5, + 0x0c, 0x14, 0xfc, 0x11, 0x4e, 0xff, 0xd9, 0x05, 0x7e, 0x58, 0xf7, 0xd3, 0xfe, 0x61, 0xe8, 0x3c, + 0x3a, 0xca, 0xed, 0x12, 0xde, 0x41, 0xa3, 0x18, 0x3d, 0xae, 0xad, 0x54, 0xdd, 0x79, 0xe7, 0xc1, + 0x41, 0xa6, 0x74, 0xed, 0x4d, 0x96, 0x6b, 0xa2, 0xac, 0xd6, 0x44, 0xd9, 0xae, 0x09, 0xfa, 0x9a, + 0x12, 0xf4, 0x23, 0x25, 0xe8, 0x67, 0x4a, 0xd0, 0x32, 0x25, 0xe8, 0x57, 0x4a, 0xd0, 0xef, 0x94, + 0x28, 0xdb, 0x94, 0xa0, 0x6f, 0x1b, 0xa2, 0x2c, 0x37, 0x44, 0x59, 0x6d, 0x88, 0x32, 0x7c, 0x36, + 0x66, 0xe2, 0xd3, 0xcc, 0x31, 0x5c, 0x3e, 0x35, 0xc7, 0xb1, 0x3d, 0xb2, 0x43, 0xdb, 0x0c, 0xb8, + 0xcf, 0x0e, 0xde, 0x59, 0xe7, 0x44, 0x3e, 0x9e, 0xfe, 0x09, 0x00, 0x00, 0xff, 0xff, 0x6b, 0x42, + 0xf6, 0xf1, 0xda, 0x03, 0x00, 0x00, +} + +func (this *GetJobRequest) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*GetJobRequest) + if !ok { + that2, ok := that.(GetJobRequest) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.BuilderId != that1.BuilderId { + return false + } + return true +} +func (this *GetJobResponse) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*GetJobResponse) + if !ok { + that2, ok := that.(GetJobResponse) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if !this.Job.Equal(that1.Job) { + return false + } + if this.Ok != that1.Ok { + return false + } + return true +} +func (this *CompleteJobRequest) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*CompleteJobRequest) + if !ok { + that2, ok := that.(CompleteJobRequest) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.BuilderId != that1.BuilderId { + return false + } + if !this.Job.Equal(that1.Job) { + return false + } + return true +} +func (this *CompleteJobResponse) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*CompleteJobResponse) + if !ok { + that2, ok := that.(CompleteJobResponse) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + return true +} +func (this *SyncJobRequest) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*SyncJobRequest) + if !ok { + that2, ok := that.(SyncJobRequest) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.BuilderId != that1.BuilderId { + return false + } + if !this.Job.Equal(that1.Job) { + return false + } + return true +} +func (this *SyncJobResponse) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*SyncJobResponse) + if !ok { + that2, ok := that.(SyncJobResponse) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + return true +} +func (this *Offsets) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*Offsets) + if !ok { + that2, ok := that.(Offsets) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.Min != that1.Min { + return false + } + if this.Max != that1.Max { + return false + } + return true +} +func (this *Job) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*Job) + if !ok { + that2, ok := that.(Job) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.Id != that1.Id { + return false + } + if this.Partition != that1.Partition { + return false + } + if !this.Offsets.Equal(that1.Offsets) { + return false + } + return true +} +func (this *GetJobRequest) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 5) + s = append(s, "&proto.GetJobRequest{") + s = append(s, "BuilderId: "+fmt.Sprintf("%#v", this.BuilderId)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *GetJobResponse) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 6) + s = append(s, "&proto.GetJobResponse{") + if this.Job != nil { + s = append(s, "Job: "+fmt.Sprintf("%#v", this.Job)+",\n") + } + s = append(s, "Ok: "+fmt.Sprintf("%#v", this.Ok)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *CompleteJobRequest) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 6) + s = append(s, "&proto.CompleteJobRequest{") + s = append(s, "BuilderId: "+fmt.Sprintf("%#v", this.BuilderId)+",\n") + if this.Job != nil { + s = append(s, "Job: "+fmt.Sprintf("%#v", this.Job)+",\n") + } + s = append(s, "}") + return strings.Join(s, "") +} +func (this *CompleteJobResponse) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 4) + s = append(s, "&proto.CompleteJobResponse{") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *SyncJobRequest) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 6) + s = append(s, "&proto.SyncJobRequest{") + s = append(s, "BuilderId: "+fmt.Sprintf("%#v", this.BuilderId)+",\n") + if this.Job != nil { + s = append(s, "Job: "+fmt.Sprintf("%#v", this.Job)+",\n") + } + s = append(s, "}") + return strings.Join(s, "") +} +func (this *SyncJobResponse) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 4) + s = append(s, "&proto.SyncJobResponse{") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *Offsets) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 6) + s = append(s, "&proto.Offsets{") + s = append(s, "Min: "+fmt.Sprintf("%#v", this.Min)+",\n") + s = append(s, "Max: "+fmt.Sprintf("%#v", this.Max)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *Job) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 7) + s = append(s, "&proto.Job{") + s = append(s, "Id: "+fmt.Sprintf("%#v", this.Id)+",\n") + s = append(s, "Partition: "+fmt.Sprintf("%#v", this.Partition)+",\n") + if this.Offsets != nil { + s = append(s, "Offsets: "+fmt.Sprintf("%#v", this.Offsets)+",\n") + } + s = append(s, "}") + return strings.Join(s, "") +} +func valueToGoStringBlockbuilder(v interface{}, typ string) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("func(v %v) *%v { return &v } ( %#v )", typ, typ, pv) +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// BlockBuilderServiceClient is the client API for BlockBuilderService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type BlockBuilderServiceClient interface { + // GetJob requests a new job from the scheduler + GetJob(ctx context.Context, in *GetJobRequest, opts ...grpc.CallOption) (*GetJobResponse, error) + // CompleteJob notifies the scheduler that a job has been completed + CompleteJob(ctx context.Context, in *CompleteJobRequest, opts ...grpc.CallOption) (*CompleteJobResponse, error) + // SyncJob syncs job state with the scheduler + SyncJob(ctx context.Context, in *SyncJobRequest, opts ...grpc.CallOption) (*SyncJobResponse, error) +} + +type blockBuilderServiceClient struct { + cc *grpc.ClientConn +} + +func NewBlockBuilderServiceClient(cc *grpc.ClientConn) BlockBuilderServiceClient { + return &blockBuilderServiceClient{cc} +} + +func (c *blockBuilderServiceClient) GetJob(ctx context.Context, in *GetJobRequest, opts ...grpc.CallOption) (*GetJobResponse, error) { + out := new(GetJobResponse) + err := c.cc.Invoke(ctx, "/blockbuilder.types.BlockBuilderService/GetJob", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *blockBuilderServiceClient) CompleteJob(ctx context.Context, in *CompleteJobRequest, opts ...grpc.CallOption) (*CompleteJobResponse, error) { + out := new(CompleteJobResponse) + err := c.cc.Invoke(ctx, "/blockbuilder.types.BlockBuilderService/CompleteJob", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *blockBuilderServiceClient) SyncJob(ctx context.Context, in *SyncJobRequest, opts ...grpc.CallOption) (*SyncJobResponse, error) { + out := new(SyncJobResponse) + err := c.cc.Invoke(ctx, "/blockbuilder.types.BlockBuilderService/SyncJob", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// BlockBuilderServiceServer is the server API for BlockBuilderService service. +type BlockBuilderServiceServer interface { + // GetJob requests a new job from the scheduler + GetJob(context.Context, *GetJobRequest) (*GetJobResponse, error) + // CompleteJob notifies the scheduler that a job has been completed + CompleteJob(context.Context, *CompleteJobRequest) (*CompleteJobResponse, error) + // SyncJob syncs job state with the scheduler + SyncJob(context.Context, *SyncJobRequest) (*SyncJobResponse, error) +} + +// UnimplementedBlockBuilderServiceServer can be embedded to have forward compatible implementations. +type UnimplementedBlockBuilderServiceServer struct { +} + +func (*UnimplementedBlockBuilderServiceServer) GetJob(ctx context.Context, req *GetJobRequest) (*GetJobResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetJob not implemented") +} +func (*UnimplementedBlockBuilderServiceServer) CompleteJob(ctx context.Context, req *CompleteJobRequest) (*CompleteJobResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method CompleteJob not implemented") +} +func (*UnimplementedBlockBuilderServiceServer) SyncJob(ctx context.Context, req *SyncJobRequest) (*SyncJobResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method SyncJob not implemented") +} + +func RegisterBlockBuilderServiceServer(s *grpc.Server, srv BlockBuilderServiceServer) { + s.RegisterService(&_BlockBuilderService_serviceDesc, srv) +} + +func _BlockBuilderService_GetJob_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetJobRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(BlockBuilderServiceServer).GetJob(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/blockbuilder.types.BlockBuilderService/GetJob", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(BlockBuilderServiceServer).GetJob(ctx, req.(*GetJobRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _BlockBuilderService_CompleteJob_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(CompleteJobRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(BlockBuilderServiceServer).CompleteJob(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/blockbuilder.types.BlockBuilderService/CompleteJob", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(BlockBuilderServiceServer).CompleteJob(ctx, req.(*CompleteJobRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _BlockBuilderService_SyncJob_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(SyncJobRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(BlockBuilderServiceServer).SyncJob(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/blockbuilder.types.BlockBuilderService/SyncJob", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(BlockBuilderServiceServer).SyncJob(ctx, req.(*SyncJobRequest)) + } + return interceptor(ctx, in, info, handler) +} + +var _BlockBuilderService_serviceDesc = grpc.ServiceDesc{ + ServiceName: "blockbuilder.types.BlockBuilderService", + HandlerType: (*BlockBuilderServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "GetJob", + Handler: _BlockBuilderService_GetJob_Handler, + }, + { + MethodName: "CompleteJob", + Handler: _BlockBuilderService_CompleteJob_Handler, + }, + { + MethodName: "SyncJob", + Handler: _BlockBuilderService_SyncJob_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "pkg/blockbuilder/types/proto/blockbuilder.proto", +} + +func (m *GetJobRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *GetJobRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *GetJobRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.BuilderId) > 0 { + i -= len(m.BuilderId) + copy(dAtA[i:], m.BuilderId) + i = encodeVarintBlockbuilder(dAtA, i, uint64(len(m.BuilderId))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *GetJobResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *GetJobResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *GetJobResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Ok { + i-- + if m.Ok { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x10 + } + if m.Job != nil { + { + size, err := m.Job.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintBlockbuilder(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *CompleteJobRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *CompleteJobRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *CompleteJobRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Job != nil { + { + size, err := m.Job.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintBlockbuilder(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + if len(m.BuilderId) > 0 { + i -= len(m.BuilderId) + copy(dAtA[i:], m.BuilderId) + i = encodeVarintBlockbuilder(dAtA, i, uint64(len(m.BuilderId))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *CompleteJobResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *CompleteJobResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *CompleteJobResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + return len(dAtA) - i, nil +} + +func (m *SyncJobRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *SyncJobRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *SyncJobRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Job != nil { + { + size, err := m.Job.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintBlockbuilder(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + if len(m.BuilderId) > 0 { + i -= len(m.BuilderId) + copy(dAtA[i:], m.BuilderId) + i = encodeVarintBlockbuilder(dAtA, i, uint64(len(m.BuilderId))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *SyncJobResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *SyncJobResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *SyncJobResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + return len(dAtA) - i, nil +} + +func (m *Offsets) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Offsets) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *Offsets) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Max != 0 { + i = encodeVarintBlockbuilder(dAtA, i, uint64(m.Max)) + i-- + dAtA[i] = 0x10 + } + if m.Min != 0 { + i = encodeVarintBlockbuilder(dAtA, i, uint64(m.Min)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + +func (m *Job) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Job) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *Job) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Offsets != nil { + { + size, err := m.Offsets.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintBlockbuilder(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x1a + } + if m.Partition != 0 { + i = encodeVarintBlockbuilder(dAtA, i, uint64(m.Partition)) + i-- + dAtA[i] = 0x10 + } + if len(m.Id) > 0 { + i -= len(m.Id) + copy(dAtA[i:], m.Id) + i = encodeVarintBlockbuilder(dAtA, i, uint64(len(m.Id))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func encodeVarintBlockbuilder(dAtA []byte, offset int, v uint64) int { + offset -= sovBlockbuilder(v) + base := offset + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return base +} +func (m *GetJobRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.BuilderId) + if l > 0 { + n += 1 + l + sovBlockbuilder(uint64(l)) + } + return n +} + +func (m *GetJobResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Job != nil { + l = m.Job.Size() + n += 1 + l + sovBlockbuilder(uint64(l)) + } + if m.Ok { + n += 2 + } + return n +} + +func (m *CompleteJobRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.BuilderId) + if l > 0 { + n += 1 + l + sovBlockbuilder(uint64(l)) + } + if m.Job != nil { + l = m.Job.Size() + n += 1 + l + sovBlockbuilder(uint64(l)) + } + return n +} + +func (m *CompleteJobResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + return n +} + +func (m *SyncJobRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.BuilderId) + if l > 0 { + n += 1 + l + sovBlockbuilder(uint64(l)) + } + if m.Job != nil { + l = m.Job.Size() + n += 1 + l + sovBlockbuilder(uint64(l)) + } + return n +} + +func (m *SyncJobResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + return n +} + +func (m *Offsets) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Min != 0 { + n += 1 + sovBlockbuilder(uint64(m.Min)) + } + if m.Max != 0 { + n += 1 + sovBlockbuilder(uint64(m.Max)) + } + return n +} + +func (m *Job) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Id) + if l > 0 { + n += 1 + l + sovBlockbuilder(uint64(l)) + } + if m.Partition != 0 { + n += 1 + sovBlockbuilder(uint64(m.Partition)) + } + if m.Offsets != nil { + l = m.Offsets.Size() + n += 1 + l + sovBlockbuilder(uint64(l)) + } + return n +} + +func sovBlockbuilder(x uint64) (n int) { + return (math_bits.Len64(x|1) + 6) / 7 +} +func sozBlockbuilder(x uint64) (n int) { + return sovBlockbuilder(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (this *GetJobRequest) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&GetJobRequest{`, + `BuilderId:` + fmt.Sprintf("%v", this.BuilderId) + `,`, + `}`, + }, "") + return s +} +func (this *GetJobResponse) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&GetJobResponse{`, + `Job:` + strings.Replace(this.Job.String(), "Job", "Job", 1) + `,`, + `Ok:` + fmt.Sprintf("%v", this.Ok) + `,`, + `}`, + }, "") + return s +} +func (this *CompleteJobRequest) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&CompleteJobRequest{`, + `BuilderId:` + fmt.Sprintf("%v", this.BuilderId) + `,`, + `Job:` + strings.Replace(this.Job.String(), "Job", "Job", 1) + `,`, + `}`, + }, "") + return s +} +func (this *CompleteJobResponse) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&CompleteJobResponse{`, + `}`, + }, "") + return s +} +func (this *SyncJobRequest) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&SyncJobRequest{`, + `BuilderId:` + fmt.Sprintf("%v", this.BuilderId) + `,`, + `Job:` + strings.Replace(this.Job.String(), "Job", "Job", 1) + `,`, + `}`, + }, "") + return s +} +func (this *SyncJobResponse) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&SyncJobResponse{`, + `}`, + }, "") + return s +} +func (this *Offsets) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&Offsets{`, + `Min:` + fmt.Sprintf("%v", this.Min) + `,`, + `Max:` + fmt.Sprintf("%v", this.Max) + `,`, + `}`, + }, "") + return s +} +func (this *Job) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&Job{`, + `Id:` + fmt.Sprintf("%v", this.Id) + `,`, + `Partition:` + fmt.Sprintf("%v", this.Partition) + `,`, + `Offsets:` + strings.Replace(this.Offsets.String(), "Offsets", "Offsets", 1) + `,`, + `}`, + }, "") + return s +} +func valueToStringBlockbuilder(v interface{}) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("*%v", pv) +} +func (m *GetJobRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBlockbuilder + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: GetJobRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: GetJobRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field BuilderId", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBlockbuilder + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthBlockbuilder + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthBlockbuilder + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.BuilderId = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipBlockbuilder(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthBlockbuilder + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthBlockbuilder + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *GetJobResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBlockbuilder + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: GetJobResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: GetJobResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Job", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBlockbuilder + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthBlockbuilder + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthBlockbuilder + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Job == nil { + m.Job = &Job{} + } + if err := m.Job.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Ok", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBlockbuilder + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.Ok = bool(v != 0) + default: + iNdEx = preIndex + skippy, err := skipBlockbuilder(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthBlockbuilder + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthBlockbuilder + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *CompleteJobRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBlockbuilder + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: CompleteJobRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: CompleteJobRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field BuilderId", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBlockbuilder + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthBlockbuilder + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthBlockbuilder + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.BuilderId = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Job", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBlockbuilder + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthBlockbuilder + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthBlockbuilder + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Job == nil { + m.Job = &Job{} + } + if err := m.Job.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipBlockbuilder(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthBlockbuilder + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthBlockbuilder + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *CompleteJobResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBlockbuilder + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: CompleteJobResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: CompleteJobResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipBlockbuilder(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthBlockbuilder + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthBlockbuilder + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *SyncJobRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBlockbuilder + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: SyncJobRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: SyncJobRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field BuilderId", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBlockbuilder + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthBlockbuilder + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthBlockbuilder + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.BuilderId = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Job", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBlockbuilder + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthBlockbuilder + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthBlockbuilder + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Job == nil { + m.Job = &Job{} + } + if err := m.Job.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipBlockbuilder(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthBlockbuilder + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthBlockbuilder + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *SyncJobResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBlockbuilder + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: SyncJobResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: SyncJobResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipBlockbuilder(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthBlockbuilder + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthBlockbuilder + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *Offsets) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBlockbuilder + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Offsets: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Offsets: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Min", wireType) + } + m.Min = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBlockbuilder + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Min |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Max", wireType) + } + m.Max = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBlockbuilder + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Max |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipBlockbuilder(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthBlockbuilder + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthBlockbuilder + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *Job) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBlockbuilder + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Job: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Job: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Id", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBlockbuilder + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthBlockbuilder + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthBlockbuilder + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Id = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Partition", wireType) + } + m.Partition = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBlockbuilder + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Partition |= int32(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Offsets", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBlockbuilder + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthBlockbuilder + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthBlockbuilder + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Offsets == nil { + m.Offsets = &Offsets{} + } + if err := m.Offsets.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipBlockbuilder(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthBlockbuilder + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthBlockbuilder + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipBlockbuilder(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowBlockbuilder + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowBlockbuilder + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowBlockbuilder + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthBlockbuilder + } + iNdEx += length + if iNdEx < 0 { + return 0, ErrInvalidLengthBlockbuilder + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowBlockbuilder + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipBlockbuilder(dAtA[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + if iNdEx < 0 { + return 0, ErrInvalidLengthBlockbuilder + } + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthBlockbuilder = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowBlockbuilder = fmt.Errorf("proto: integer overflow") +) diff --git a/pkg/blockbuilder/types/proto/blockbuilder.proto b/pkg/blockbuilder/types/proto/blockbuilder.proto new file mode 100644 index 0000000000000..89811989b821c --- /dev/null +++ b/pkg/blockbuilder/types/proto/blockbuilder.proto @@ -0,0 +1,57 @@ +syntax = "proto3"; + +package blockbuilder.types; + +option go_package = "github.com/grafana/loki/pkg/blockbuilder/types/proto"; + +// BlockBuilderService defines the gRPC service for block builder communication +service BlockBuilderService { + // GetJob requests a new job from the scheduler + rpc GetJob(GetJobRequest) returns (GetJobResponse) {} + // CompleteJob notifies the scheduler that a job has been completed + rpc CompleteJob(CompleteJobRequest) returns (CompleteJobResponse) {} + // SyncJob syncs job state with the scheduler + rpc SyncJob(SyncJobRequest) returns (SyncJobResponse) {} +} + +// GetJobRequest represents a request for a new job +message GetJobRequest { + string builder_id = 1; +} + +// GetJobResponse contains the response for a job request +message GetJobResponse { + Job job = 1; + bool ok = 2; +} + +// CompleteJobRequest represents a job completion notification +message CompleteJobRequest { + string builder_id = 1; + Job job = 2; +} + +// CompleteJobResponse is an empty response for job completion +message CompleteJobResponse {} + +// SyncJobRequest represents a job sync request +message SyncJobRequest { + string builder_id = 1; + Job job = 2; +} + +// SyncJobResponse is an empty response for job sync +message SyncJobResponse {} + +// Offsets represents the start and end offsets for a job +message Offsets { + int64 min = 1; + int64 max = 2; +} + +// Job represents a block building job +message Job { + string id = 1; + int32 partition = 2; + Offsets offsets = 3; +} diff --git a/pkg/blockbuilder/builder/transport.go b/pkg/blockbuilder/types/transport.go similarity index 66% rename from pkg/blockbuilder/builder/transport.go rename to pkg/blockbuilder/types/transport.go index ae498459cb667..5659ffb48a4b4 100644 --- a/pkg/blockbuilder/builder/transport.go +++ b/pkg/blockbuilder/types/transport.go @@ -1,58 +1,56 @@ -package builder +package types import ( "context" - - "github.com/grafana/loki/v3/pkg/blockbuilder/types" ) var ( - _ types.Transport = unimplementedTransport{} - _ types.Transport = &MemoryTransport{} + _ Transport = unimplementedTransport{} + _ Transport = &MemoryTransport{} ) // unimplementedTransport provides default implementations that panic type unimplementedTransport struct{} -func (t unimplementedTransport) SendGetJobRequest(_ context.Context, _ *types.GetJobRequest) (*types.GetJobResponse, error) { +func (t unimplementedTransport) SendGetJobRequest(_ context.Context, _ *GetJobRequest) (*GetJobResponse, error) { panic("unimplemented") } -func (t unimplementedTransport) SendCompleteJob(_ context.Context, _ *types.CompleteJobRequest) error { +func (t unimplementedTransport) SendCompleteJob(_ context.Context, _ *CompleteJobRequest) error { panic("unimplemented") } -func (t unimplementedTransport) SendSyncJob(_ context.Context, _ *types.SyncJobRequest) error { +func (t unimplementedTransport) SendSyncJob(_ context.Context, _ *SyncJobRequest) error { panic("unimplemented") } // MemoryTransport implements Transport interface for in-memory communication type MemoryTransport struct { - scheduler types.Scheduler + scheduler Scheduler } // NewMemoryTransport creates a new in-memory transport instance -func NewMemoryTransport(scheduler types.Scheduler) *MemoryTransport { +func NewMemoryTransport(scheduler Scheduler) *MemoryTransport { return &MemoryTransport{ scheduler: scheduler, } } -func (t *MemoryTransport) SendGetJobRequest(ctx context.Context, req *types.GetJobRequest) (*types.GetJobResponse, error) { +func (t *MemoryTransport) SendGetJobRequest(ctx context.Context, req *GetJobRequest) (*GetJobResponse, error) { job, ok, err := t.scheduler.HandleGetJob(ctx, req.BuilderID) if err != nil { return nil, err } - return &types.GetJobResponse{ + return &GetJobResponse{ Job: job, OK: ok, }, nil } -func (t *MemoryTransport) SendCompleteJob(ctx context.Context, req *types.CompleteJobRequest) error { +func (t *MemoryTransport) SendCompleteJob(ctx context.Context, req *CompleteJobRequest) error { return t.scheduler.HandleCompleteJob(ctx, req.BuilderID, req.Job) } -func (t *MemoryTransport) SendSyncJob(ctx context.Context, req *types.SyncJobRequest) error { +func (t *MemoryTransport) SendSyncJob(ctx context.Context, req *SyncJobRequest) error { return t.scheduler.HandleSyncJob(ctx, req.BuilderID, req.Job) } diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 153387035d6b5..9747a8f231f7e 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -32,6 +32,7 @@ import ( "github.com/grafana/loki/v3/pkg/analytics" blockbuilder "github.com/grafana/loki/v3/pkg/blockbuilder/builder" + blockscheduler "github.com/grafana/loki/v3/pkg/blockbuilder/scheduler" "github.com/grafana/loki/v3/pkg/bloombuild" "github.com/grafana/loki/v3/pkg/bloomgateway" "github.com/grafana/loki/v3/pkg/compactor" @@ -91,6 +92,7 @@ type Config struct { IngesterClient ingester_client.Config `yaml:"ingester_client,omitempty"` Ingester ingester.Config `yaml:"ingester,omitempty"` BlockBuilder blockbuilder.Config `yaml:"block_builder,omitempty"` + BlockScheduler blockscheduler.Config `yaml:"block_scheduler,omitempty"` Pattern pattern.Config `yaml:"pattern_ingester,omitempty"` IndexGateway indexgateway.Config `yaml:"index_gateway"` BloomBuild bloombuild.Config `yaml:"bloom_build,omitempty" category:"experimental"` @@ -186,6 +188,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) { c.Profiling.RegisterFlags(f) c.KafkaConfig.RegisterFlags(f) c.BlockBuilder.RegisterFlags(f) + c.BlockScheduler.RegisterFlags(f) } func (c *Config) registerServerFlagsWithChangedDefaultValues(fs *flag.FlagSet) { @@ -264,6 +267,9 @@ func (c *Config) Validate() error { if err := c.BlockBuilder.Validate(); err != nil { errs = append(errs, errors.Wrap(err, "CONFIG ERROR: invalid block_builder config")) } + if err := c.BlockScheduler.Validate(); err != nil { + errs = append(errs, errors.Wrap(err, "CONFIG ERROR: invalid block_scheduler config")) + } if err := c.LimitsConfig.Validate(); err != nil { errs = append(errs, errors.Wrap(err, "CONFIG ERROR: invalid limits_config config")) } @@ -379,6 +385,7 @@ type Loki struct { partitionRingWatcher *ring.PartitionRingWatcher partitionRing *ring.PartitionInstanceRing blockBuilder *blockbuilder.BlockBuilder + blockScheduler *blockscheduler.BlockScheduler ClientMetrics storage.ClientMetrics deleteClientMetrics *deletion.DeleteRequestClientMetrics @@ -690,6 +697,7 @@ func (t *Loki) setupModuleManager() error { mm.RegisterModule(PatternIngester, t.initPatternIngester) mm.RegisterModule(PartitionRing, t.initPartitionRing, modules.UserInvisibleModule) mm.RegisterModule(BlockBuilder, t.initBlockBuilder) + mm.RegisterModule(BlockScheduler, t.initBlockScheduler) mm.RegisterModule(All, nil) mm.RegisterModule(Read, nil) @@ -728,6 +736,7 @@ func (t *Loki) setupModuleManager() error { PartitionRing: {MemberlistKV, Server, Ring}, MemberlistKV: {Server}, BlockBuilder: {PartitionRing, Store, Server}, + BlockScheduler: {Server}, Read: {QueryFrontend, Querier}, Write: {Ingester, Distributor, PatternIngester}, diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 994576076af3e..c4449f3c51134 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -37,6 +37,7 @@ import ( "github.com/grafana/loki/v3/pkg/analytics" blockbuilder "github.com/grafana/loki/v3/pkg/blockbuilder/builder" + blockscheduler "github.com/grafana/loki/v3/pkg/blockbuilder/scheduler" "github.com/grafana/loki/v3/pkg/bloombuild/builder" "github.com/grafana/loki/v3/pkg/bloombuild/planner" bloomprotos "github.com/grafana/loki/v3/pkg/bloombuild/protos" @@ -49,6 +50,7 @@ import ( "github.com/grafana/loki/v3/pkg/distributor" "github.com/grafana/loki/v3/pkg/indexgateway" "github.com/grafana/loki/v3/pkg/ingester" + kclient "github.com/grafana/loki/v3/pkg/kafka/client" "github.com/grafana/loki/v3/pkg/kafka/partition" "github.com/grafana/loki/v3/pkg/kafka/partitionring" "github.com/grafana/loki/v3/pkg/logproto" @@ -139,6 +141,7 @@ const ( InitCodec string = "init-codec" PartitionRing string = "partition-ring" BlockBuilder string = "block-builder" + BlockScheduler string = "block-scheduler" ) const ( @@ -1863,6 +1866,22 @@ func (t *Loki) initBlockBuilder() (services.Service, error) { return t.blockBuilder, nil } +func (t *Loki) initBlockScheduler() (services.Service, error) { + logger := log.With(util_log.Logger, "component", "block_scheduler") + + clientMetrics := kclient.NewReaderClientMetrics("block-scheduler", prometheus.DefaultRegisterer) + c, err := kclient.NewReaderClient( + t.Cfg.KafkaConfig, + clientMetrics, + log.With(logger, "component", "kafka-client"), + ) + if err != nil { + return nil, fmt.Errorf("creating kafka client: %w", err) + } + offsetReader := blockscheduler.NewOffsetReader(t.Cfg.KafkaConfig.Topic, t.Cfg.BlockScheduler.ConsumerGroup, t.Cfg.BlockScheduler.LookbackPeriod, c) + return blockscheduler.NewScheduler(t.Cfg.BlockScheduler, blockscheduler.NewJobQueue(), offsetReader, logger, prometheus.DefaultRegisterer), nil +} + func (t *Loki) deleteRequestsClient(clientType string, limits limiter.CombinedLimits) (deletion.DeleteRequestsClient, error) { if !t.supportIndexDeleteRequest() || !t.Cfg.CompactorConfig.RetentionEnabled { return deletion.NewNoOpDeleteRequestsStore(), nil diff --git a/production/docker/docker-compose.yaml b/production/docker/docker-compose.yaml index a29e5efbf4db2..eb0d5a7b0c1ca 100644 --- a/production/docker/docker-compose.yaml +++ b/production/docker/docker-compose.yaml @@ -38,7 +38,7 @@ services: - loki prometheus: - image: prom/prometheus:v3.0.0 + image: prom/prometheus:v3.0.1 ports: - 9090 volumes: diff --git a/vendor/github.com/baidubce/bce-sdk-go/bce/config.go b/vendor/github.com/baidubce/bce-sdk-go/bce/config.go index 243b4c81ce93a..cd632ec2cd89a 100644 --- a/vendor/github.com/baidubce/bce-sdk-go/bce/config.go +++ b/vendor/github.com/baidubce/bce-sdk-go/bce/config.go @@ -26,7 +26,7 @@ import ( // Constants and default values for the package bce const ( - SDK_VERSION = "0.9.202" + SDK_VERSION = "0.9.203" URI_PREFIX = "/" // now support uri without prefix "v1" so just set root path DEFAULT_DOMAIN = "baidubce.com" DEFAULT_PROTOCOL = "http" diff --git a/vendor/github.com/opentracing-contrib/go-stdlib/nethttp/client.go b/vendor/github.com/opentracing-contrib/go-stdlib/nethttp/client.go index bfb305ffa463a..d4a8db5c5c33f 100644 --- a/vendor/github.com/opentracing-contrib/go-stdlib/nethttp/client.go +++ b/vendor/github.com/opentracing-contrib/go-stdlib/nethttp/client.go @@ -1,3 +1,4 @@ +//go:build go1.7 // +build go1.7 package nethttp @@ -32,12 +33,12 @@ type Transport struct { } type clientOptions struct { + urlTagFunc func(u *url.URL) string + spanObserver func(span opentracing.Span, r *http.Request) operationName string componentName string - urlTagFunc func(u *url.URL) string disableClientTrace bool disableInjectSpanContext bool - spanObserver func(span opentracing.Span, r *http.Request) } // ClientOption contols the behavior of TraceRequest. @@ -78,7 +79,7 @@ func ClientTrace(enabled bool) ClientOption { // InjectSpanContext returns a ClientOption that turns on or off // injection of the Span context in the request HTTP headers. -// If this option is not used, the default behaviour is to +// If this option is not used, the default behavior is to // inject the span context. func InjectSpanContext(enabled bool) ClientOption { return func(options *clientOptions) { @@ -100,24 +101,24 @@ func ClientSpanObserver(f func(span opentracing.Span, r *http.Request)) ClientOp // // Example: // -// func AskGoogle(ctx context.Context) error { -// client := &http.Client{Transport: &nethttp.Transport{}} -// req, err := http.NewRequest("GET", "http://google.com", nil) -// if err != nil { -// return err -// } -// req = req.WithContext(ctx) // extend existing trace, if any +// func AskGoogle(ctx context.Context) error { +// client := &http.Client{Transport: &nethttp.Transport{}} +// req, err := http.NewRequest("GET", "http://google.com", nil) +// if err != nil { +// return err +// } +// req = req.WithContext(ctx) // extend existing trace, if any // -// req, ht := nethttp.TraceRequest(tracer, req) -// defer ht.Finish() +// req, ht := nethttp.TraceRequest(tracer, req) +// defer ht.Finish() // -// res, err := client.Do(req) -// if err != nil { -// return err -// } -// res.Body.Close() -// return nil -// } +// res, err := client.Do(req) +// if err != nil { +// return err +// } +// res.Body.Close() +// return nil +// } func TraceRequest(tr opentracing.Tracer, req *http.Request, options ...ClientOption) (*http.Request, *Tracer) { opts := &clientOptions{ urlTagFunc: func(u *url.URL) string { @@ -149,6 +150,18 @@ func (c closeTracker) Close() error { return err } +type writerCloseTracker struct { + io.ReadWriteCloser + sp opentracing.Span +} + +func (c writerCloseTracker) Close() error { + err := c.ReadWriteCloser.Close() + c.sp.LogFields(log.String("event", "ClosedBody")) + c.sp.Finish() + return err +} + // TracerFromRequest retrieves the Tracer from the request. If the request does // not have a Tracer it will return nil. func TracerFromRequest(req *http.Request) *Tracer { @@ -170,31 +183,36 @@ func (t *Transport) RoundTrip(req *http.Request) (*http.Response, error) { return rt.RoundTrip(req) } - tracer.start(req) + sp := tracer.start(req) - ext.HTTPMethod.Set(tracer.sp, req.Method) - ext.HTTPUrl.Set(tracer.sp, tracer.opts.urlTagFunc(req.URL)) - tracer.opts.spanObserver(tracer.sp, req) + ext.HTTPMethod.Set(sp, req.Method) + ext.HTTPUrl.Set(sp, tracer.opts.urlTagFunc(req.URL)) + ext.PeerAddress.Set(sp, req.URL.Host) + tracer.opts.spanObserver(sp, req) if !tracer.opts.disableInjectSpanContext { carrier := opentracing.HTTPHeadersCarrier(req.Header) - tracer.sp.Tracer().Inject(tracer.sp.Context(), opentracing.HTTPHeaders, carrier) + sp.Tracer().Inject(sp.Context(), opentracing.HTTPHeaders, carrier) //nolint:errcheck // TODO: should we check the error? Returning it makes the tests fail } resp, err := rt.RoundTrip(req) - if err != nil { - tracer.sp.Finish() + sp.Finish() return resp, err } - ext.HTTPStatusCode.Set(tracer.sp, uint16(resp.StatusCode)) + ext.HTTPStatusCode.Set(sp, uint16(resp.StatusCode)) //nolint:gosec // can't have integer overflow with status code if resp.StatusCode >= http.StatusInternalServerError { - ext.Error.Set(tracer.sp, true) + ext.Error.Set(sp, true) } - if req.Method == "HEAD" { - tracer.sp.Finish() + if req.Method == http.MethodHead { + sp.Finish() } else { - resp.Body = closeTracker{resp.Body, tracer.sp} + readWriteCloser, ok := resp.Body.(io.ReadWriteCloser) + if ok { + resp.Body = writerCloseTracker{readWriteCloser, sp} + } else { + resp.Body = closeTracker{resp.Body, sp} + } } return resp, nil } @@ -223,8 +241,7 @@ func (h *Tracer) start(req *http.Request) opentracing.Span { } ctx := h.root.Context() - h.sp = h.tr.StartSpan("HTTP "+req.Method, opentracing.ChildOf(ctx)) - ext.SpanKindRPCClient.Set(h.sp) + h.sp = h.tr.StartSpan("HTTP "+req.Method, opentracing.ChildOf(ctx), ext.SpanKindRPCClient) componentName := h.opts.componentName if componentName == "" { @@ -266,8 +283,7 @@ func (h *Tracer) clientTrace() *httptrace.ClientTrace { } func (h *Tracer) getConn(hostPort string) { - ext.HTTPUrl.Set(h.sp, hostPort) - h.sp.LogFields(log.String("event", "GetConn")) + h.sp.LogFields(log.String("event", "GetConn"), log.String("hostPort", hostPort)) } func (h *Tracer) gotConn(info httptrace.GotConnInfo) { diff --git a/vendor/github.com/opentracing-contrib/go-stdlib/nethttp/status-code-tracker.go b/vendor/github.com/opentracing-contrib/go-stdlib/nethttp/metrics-tracker.go similarity index 92% rename from vendor/github.com/opentracing-contrib/go-stdlib/nethttp/status-code-tracker.go rename to vendor/github.com/opentracing-contrib/go-stdlib/nethttp/metrics-tracker.go index 80a5ce08645b3..f4250cbb9b226 100644 --- a/vendor/github.com/opentracing-contrib/go-stdlib/nethttp/status-code-tracker.go +++ b/vendor/github.com/opentracing-contrib/go-stdlib/nethttp/metrics-tracker.go @@ -1,3 +1,4 @@ +//go:build go1.8 // +build go1.8 package nethttp @@ -7,28 +8,31 @@ import ( "net/http" ) -type statusCodeTracker struct { +type metricsTracker struct { http.ResponseWriter status int + size int } -func (w *statusCodeTracker) WriteHeader(status int) { +func (w *metricsTracker) WriteHeader(status int) { w.status = status w.ResponseWriter.WriteHeader(status) } -func (w *statusCodeTracker) Write(b []byte) (int, error) { - return w.ResponseWriter.Write(b) +func (w *metricsTracker) Write(b []byte) (int, error) { + size, err := w.ResponseWriter.Write(b) + w.size += size + return size, err } // wrappedResponseWriter returns a wrapped version of the original // ResponseWriter and only implements the same combination of additional // interfaces as the original. This implementation is based on // https://github.com/felixge/httpsnoop. -func (w *statusCodeTracker) wrappedResponseWriter() http.ResponseWriter { +func (w *metricsTracker) wrappedResponseWriter() http.ResponseWriter { var ( hj, i0 = w.ResponseWriter.(http.Hijacker) - cn, i1 = w.ResponseWriter.(http.CloseNotifier) + cn, i1 = w.ResponseWriter.(http.CloseNotifier) //nolint:staticcheck // TODO: Replace deprecated CloseNotifier pu, i2 = w.ResponseWriter.(http.Pusher) fl, i3 = w.ResponseWriter.(http.Flusher) rf, i4 = w.ResponseWriter.(io.ReaderFrom) diff --git a/vendor/github.com/opentracing-contrib/go-stdlib/nethttp/server.go b/vendor/github.com/opentracing-contrib/go-stdlib/nethttp/server.go index db2df6620412b..322e88fca65ef 100644 --- a/vendor/github.com/opentracing-contrib/go-stdlib/nethttp/server.go +++ b/vendor/github.com/opentracing-contrib/go-stdlib/nethttp/server.go @@ -1,3 +1,4 @@ +//go:build go1.7 // +build go1.7 package nethttp @@ -10,6 +11,8 @@ import ( "github.com/opentracing/opentracing-go/ext" ) +var responseSizeKey = "http.response_size" + type mwOptions struct { opNameFunc func(r *http.Request) string spanFilter func(r *http.Request) bool @@ -67,24 +70,26 @@ func MWURLTagFunc(f func(u *url.URL) string) MWOption { // Additionally, it adds the span to the request's context. // // By default, the operation name of the spans is set to "HTTP {method}". -// This can be overriden with options. +// This can be overridden with options. // // Example: -// http.ListenAndServe("localhost:80", nethttp.Middleware(tracer, http.DefaultServeMux)) +// +// http.ListenAndServe("localhost:80", nethttp.Middleware(tracer, http.DefaultServeMux)) // // The options allow fine tuning the behavior of the middleware. // // Example: -// mw := nethttp.Middleware( -// tracer, -// http.DefaultServeMux, -// nethttp.OperationNameFunc(func(r *http.Request) string { -// return "HTTP " + r.Method + ":/api/customers" -// }), -// nethttp.MWSpanObserver(func(sp opentracing.Span, r *http.Request) { -// sp.SetTag("http.uri", r.URL.EscapedPath()) -// }), -// ) +// +// mw := nethttp.Middleware( +// tracer, +// http.DefaultServeMux, +// nethttp.OperationNameFunc(func(r *http.Request) string { +// return "HTTP " + r.Method + ":/api/customers" +// }), +// nethttp.MWSpanObserver(func(sp opentracing.Span, r *http.Request) { +// sp.SetTag("http.uri", r.URL.EscapedPath()) +// }), +// ) func Middleware(tr opentracing.Tracer, h http.Handler, options ...MWOption) http.Handler { return MiddlewareFunc(tr, h.ServeHTTP, options...) } @@ -93,7 +98,8 @@ func Middleware(tr opentracing.Tracer, h http.Handler, options ...MWOption) http // It behaves identically to the Middleware function above. // // Example: -// http.ListenAndServe("localhost:80", nethttp.MiddlewareFunc(tracer, MyHandler)) +// +// http.ListenAndServe("localhost:80", nethttp.MiddlewareFunc(tracer, MyHandler)) func MiddlewareFunc(tr opentracing.Tracer, h http.HandlerFunc, options ...MWOption) http.HandlerFunc { opts := mwOptions{ opNameFunc: func(r *http.Request) string { @@ -126,22 +132,25 @@ func MiddlewareFunc(tr opentracing.Tracer, h http.HandlerFunc, options ...MWOpti ext.Component.Set(sp, componentName) opts.spanObserver(sp, r) - sct := &statusCodeTracker{ResponseWriter: w} + mt := &metricsTracker{ResponseWriter: w} r = r.WithContext(opentracing.ContextWithSpan(r.Context(), sp)) defer func() { panicErr := recover() didPanic := panicErr != nil - if sct.status == 0 && !didPanic { + if mt.status == 0 && !didPanic { // Standard behavior of http.Server is to assume status code 200 if one was not written by a handler that returned successfully. // https://github.com/golang/go/blob/fca286bed3ed0e12336532cc711875ae5b3cb02a/src/net/http/server.go#L120 - sct.status = 200 + mt.status = 200 + } + if mt.status > 0 { + ext.HTTPStatusCode.Set(sp, uint16(mt.status)) //nolint:gosec // can't have integer overflow with status code } - if sct.status > 0 { - ext.HTTPStatusCode.Set(sp, uint16(sct.status)) + if mt.size > 0 { + sp.SetTag(responseSizeKey, mt.size) } - if sct.status >= http.StatusInternalServerError || didPanic { + if mt.status >= http.StatusInternalServerError || didPanic { ext.Error.Set(sp, true) } sp.Finish() @@ -151,7 +160,7 @@ func MiddlewareFunc(tr opentracing.Tracer, h http.HandlerFunc, options ...MWOpti } }() - h(sct.wrappedResponseWriter(), r) + h(mt.wrappedResponseWriter(), r) } return http.HandlerFunc(fn) } diff --git a/vendor/modules.txt b/vendor/modules.txt index e7e389624a74a..ad64ef0e776b9 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -465,7 +465,7 @@ github.com/aws/smithy-go/transport/http/internal/io # github.com/axiomhq/hyperloglog v0.2.0 ## explicit; go 1.21 github.com/axiomhq/hyperloglog -# github.com/baidubce/bce-sdk-go v0.9.202 +# github.com/baidubce/bce-sdk-go v0.9.203 ## explicit; go 1.11 github.com/baidubce/bce-sdk-go/auth github.com/baidubce/bce-sdk-go/bce @@ -1311,7 +1311,7 @@ github.com/opencontainers/image-spec/specs-go/v1 # github.com/opentracing-contrib/go-grpc v0.1.0 ## explicit; go 1.22.7 github.com/opentracing-contrib/go-grpc -# github.com/opentracing-contrib/go-stdlib v1.0.0 +# github.com/opentracing-contrib/go-stdlib v1.1.0 ## explicit; go 1.14 github.com/opentracing-contrib/go-stdlib/nethttp # github.com/opentracing/opentracing-go v1.2.1-0.20220228012449-10b1cf09e00b